ELK+Filebeat+Kafka日志收集

概念

ELK分别代表Elasticsearch、Logstash、Kibana

Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。

Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。

Kibana 是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。

用ELK可以满足日志收集的需求,但是出于性能和效率的考虑,配合Filebeat和Kafka一起使用效果会更好一些。

Filebeat是一种日志数据采集器,基于Go语言,配置文件简单,占用很少的系统资源,Filebeat比Logstash更加轻量级,所以仅仅作为日志数据采集的工具,Filebeat是很好的选择。但它在数据的过滤筛选方面没有Logstash那么强大, 在将数据推到Elasticsearch中时如果希望对数据进行一些筛选过滤的处理,这时就需要配合Logstash一起使用。不过Logstash在短时间内处理大数据量的情况下资源消耗会很大,性能会下降,所以一般的做法会在Filebeat和Logstash中间加上一层Kafka消息队列来配合使用,起到一个缓冲和解耦的作用。Filebeat将收集到的日志数据推入到Kafka中,Logstash来进行消费,两边互不干预和影响。

下图为日志收集的流程图

环境说明

Elasticsearch、Logstash、Kibana、Kafka我并没有在服务器上进行源码搭建,而是采用的阿里云上现有的产品,当然它的产品是需要付费的,Kafka购买一个月如果是新会员有很大的优惠,其他的几款是因为有一个月的免费期,所以抓紧时间体验一把。

如果是自己搭建的环境,就是多花费一些时间,搭建好后,整体的使用流程都是一样的。

实战

Kafka

Kafka的环境我直接使用的阿里云的产品,也可以自行进行搭建,自己搭建的话需要安装Kafka和Zookeeper

Filebeat

1、下载(Download Filebeat

1
2
tar zxf filebeat-7.6.0-linux-x86_64.tar.gz
cd filebeat-7.6.0-linux-x86_64

2、修改配置(filebeat.yml)

配置文件中主要是配置input和output的内容,input代表要收集的数据是从哪来的(Nginx Log),output代表收集的数据要传到哪里去(Kafka)。

打开配置文件(filebeat.yml)后发现并没有Kafka相关的配置项,但在filebeat.reference.yml这个文件中有,所有先执行以下命令,将Kafka的配置项拷贝到filebeat.yml中

1
sed -n '1847,2000p' filebeat.reference.yml  >> filebeat.yml

配置文件具体配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

# For more available modules and options, please see the filebeat.reference.yml sample
# configuration file.

#=========================== Filebeat inputs =============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

# Change to true to enable this input configuration.
enabled: true

# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/nginx/access.log
#- c:\programdata\elasticsearch\logs\*

# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']

# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
#include_lines: ['^ERR', '^WARN']

# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz$']

# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
# level: debug
# review: 1

### Multiline options

# Multiline can be used for log messages spanning multiple lines. This is common
# for Java Stack Traces or C-Line Continuation

# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
#multiline.pattern: ^\[

# Defines if the pattern set under pattern should be negated or not. Default is false.
#multiline.negate: false

# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
# that was (not) matched before or after or as long as a pattern is not matched based on negate.
# Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
#multiline.match: after


#============================= Filebeat modules ===============================

filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml

# Set to true to enable config reloading
reload.enabled: false

# Period on which files under path should be checked for changes
#reload.period: 10s

#==================== Elasticsearch template setting ==========================

setup.template.settings:
index.number_of_shards: 1
#index.codec: best_compression
#_source.enabled: false

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
# env: staging


#============================== Dashboards =====================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here or by using the `setup` command.
#setup.dashboards.enabled: false

# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the artifacts.elastic.co
# website.
#setup.dashboards.url:

#============================== Kibana =====================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
setup.kibana:

# Kibana Host
# Scheme and port can be left out and will be set to the default (http and 5601)
# In case you specify and additional path, the scheme is required: http://localhost:5601/path
# IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
#host: "localhost:5601"

# Kibana Space ID
# ID of the Kibana Space into which the dashboards should be loaded. By default,
# the Default Space will be used.
#space.id:

#============================= Elastic Cloud ==================================

# These settings simplify using Filebeat with the Elastic Cloud (https://cloud.elastic.co/).

# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
#cloud.id:

# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `<user>:<pass>`.
#cloud.auth:

#================================ Outputs =====================================

# Configure what output to use when sending the data collected by the beat.

#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
# Array of hosts to connect to.
# hosts: ["localhost:9200"]

# Protocol - either `http` (default) or `https`.
#protocol: "https"

# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
#username: "elastic"
#password: "changeme"

#----------------------------- Logstash output --------------------------------
#output.logstash:
# The Logstash hosts
#hosts: ["localhost:5044"]

# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"

# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

#================================ Processors =====================================

# Configure processors to enhance or manipulate events generated by the beat.

processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

#============================== X-Pack Monitoring ===============================
# filebeat can export internal metrics to a central Elasticsearch monitoring
# cluster. This requires xpack monitoring to be enabled in Elasticsearch. The
# reporting is disabled by default.

# Set to true to enable the monitoring reporter.
#monitoring.enabled: false

# Sets the UUID of the Elasticsearch cluster under which monitoring data for this
# Filebeat instance will appear in the Stack Monitoring UI. If output.elasticsearch
# is enabled, the UUID is derived from the Elasticsearch cluster referenced by output.elasticsearch.
#monitoring.cluster_uuid:

# Uncomment to send the metrics to Elasticsearch. Most settings from the
# Elasticsearch output are accepted here as well.
# Note that the settings should point to your Elasticsearch *monitoring* cluster.
# Any setting that is not set is automatically inherited from the Elasticsearch
# output configuration, so if you have the Elasticsearch output configured such
# that it is pointing to your Elasticsearch monitoring cluster, you can simply
# uncomment the following line.
#monitoring.elasticsearch:

#================================= Migration ==================================

# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true
#------------------------------- Kafka output ----------------------------------
output.kafka:
# Boolean flag to enable or disable the output module.
enabled: true

# The list of Kafka broker addresses from which to fetch the cluster metadata.
# The cluster metadata contain the actual Kafka brokers events are published
# to.
hosts: ["xxx.xx.xx.xxx:xxxx"]

# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
topic: web_log

# The Kafka event key setting. Use format string to create a unique event key.
# By default no event key will be generated.
#key: ''

# The Kafka event partitioning strategy. Default hashing strategy is `hash`
# using the `output.kafka.key` setting or randomly distributes events if
# `output.kafka.key` is not configured.
#partition.hash:
# If enabled, events will only be published to partitions with reachable
# leaders. Default is false.
#reachable_only: false

# Configure alternative event field names used to compute the hash value.
# If empty `output.kafka.key` setting will be used.
# Default value is empty list.
#hash: []

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

# Kafka version Filebeat is assumed to run against. Defaults to the "1.0.0".
version: '0.10.2'

# Configure JSON encoding
#codec.json:
# Pretty-print JSON event
#pretty: false

# Configure escaping HTML symbols in strings.
#escape_html: false

# Metadata update configuration. Metadata contains leader information
# used to decide which broker to use when publishing.
#metadata:
# Max metadata request retry attempts when cluster is in middle of leader
# election. Defaults to 3 retries.
#retry.max: 3

# Wait time between retries during leader elections. Default is 250ms.
#retry.backoff: 250ms

# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is false.
#full: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

# The number of times to retry publishing an event after a publishing failure.
# After the specified number of retries, events are typically dropped.
# Some Beats, such as Filebeat, ignore the max_retries setting and retry until
# all events are published. Set max_retries to a value less than 0 to retry
# until all events are published. The default is 3.
#max_retries: 3

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048

# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s

# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s

# The maximum duration a broker will wait for number of required ACKs. The
# default is 10s.
#broker_timeout: 10s

# The number of messages buffered for each Kafka broker. The default is 256.
#channel_buffer_size: 256

# The keep-alive period for an active network connection. If 0s, keep-alives
# are disabled. The default is 0 seconds.
#keep_alive: 0

# Sets the output compression codec. Must be one of none, snappy and gzip. The
# default is gzip.
#compression: gzip

# Set the compression level. Currently only gzip provides a compression level
# between 0 and 9. The default value is chosen by the compression algorithm.
#compression_level: 4

# The maximum permitted size of JSON-encoded messages. Bigger messages will be
# dropped. The default value is 1000000 (bytes). This value should be equal to
# or less than the broker's message.max.bytes.
#max_message_bytes: 1000000

# The ACK reliability level required from broker. 0=no response, 1=wait for
# local commit, -1=wait for all replicas to commit. The default is 1. Note:
# If set to 0, no ACKs are returned by Kafka. Messages might be lost silently
# on error.
#required_acks: 1

# The configurable ClientID used for logging, debugging, and auditing
# purposes. The default is "beats".
#client_id: beats

# Enable SSL support. SSL is automatically enabled if any SSL setting is set.
#ssl.enabled: true

# Optional SSL configuration options. SSL is off by default.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# Configure SSL verification mode. If `none` is configured, all server hosts
# and certificates will be accepted. In this mode, SSL based connections are
# susceptible to man-in-the-middle attacks. Use only for testing. Default is
# `full`.
#ssl.verification_mode: full

# List of supported/valid TLS versions. By default all TLS versions from 1.1
# up to 1.3 are enabled.
#ssl.supported_protocols: [TLSv1.1, TLSv1.2, TLSv1.3]

# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"

# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

# Optional passphrase for decrypting the Certificate Key.
#ssl.key_passphrase: ''

# Configure cipher suites to be used for SSL connections
#ssl.cipher_suites: []

# Configure curve types for ECDHE-based cipher suites
#ssl.curve_types: []

# Configure what types of renegotiation are supported. Valid options are
# never, once, and freely. Default is never.
#ssl.renegotiation: never

注:

  • Kafka output配置段中的hosts字段按真实环境中的填写
  • 注释: “output.elasticsearch”,否则在启用时会报错(Exiting: error unpacking config data: more than one namespace configured accessing ‘output’ ….)

3、启动

(1)调试模式启动

1
./filebeat -e -c filebeat.yml

(2)后台守护进程启动

1
nohup ./filebeat -e -c filebeat.yml &

启动后用ps命令查看一下,如果进程存在,则证明启动成功

1
2
[root@kai www]# ps aux | grep filebeat
root 28664 0.0 3.0 508724 30628 pts/0 Sl 10:37 0:00 ./filebeat -e -c filebeat.yml

4、验证

需验证:数据采集是否成功
Filebeat启动方式:调试模式
目标:采集Nginx的访问日志,并推到指定的Kafka中

(1)访问网站页面,使得Nginx中有访问日志
(2)在调试模式下观察终端的输出数据,有以下输出说明Filebeat正在采集日志数据并往Kafka进行推送

(3)查看Kafka中的消息推入情况

可以看到推送的消息,证明Filebeat的这一环已经ok了

Logstash

配置文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
input {
kafka {
bootstrap_servers => [xxxxxxx]
client_id => "web_log_consumer"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics => ["web_log"]
group_id => "web_log_consumer"
}
}
filter {

}
output {
elasticsearch {
hosts => [xxxxxxx]
user => "elastic"
password => "xxxxx"
index => "nginx-access-log-%{+YYYY.MM.dd}"
}
}

Elasticsearch


es的自动创建索引默认是关闭的,在测试环境下可以选择自动创建,但线上建议是手动创建索引,我在测试过程中选择的是自动创建。
可以在配置文件中开启自动创建索引的选项,语句如下:

1
action.auto_create_index: true

Kibana

定义索引模式


查看日志数据


在Kibana的页面中可以看到网页的访问日志,到此整个日志收集的流程走完。