Elastic Stack供应了多种类型的Beats组件。
Filebeat是本地文件的日志数据采集器。
作为做事器上的代理安装,Filebeat监视日志目录或特定日志文件,tail file,并将它们转发给Elasticsearch或Logstash进行索引、kafka 等。

Beats可以直接将数据发送到Elasticsearch或者发送到Logstash,基于Logstash可以进一步地对数据进行处理,然后将处理后的数据存入到Elasticsearch,末了利用Kibana进行数据可视化
1、FileBeat简介FileBeat专门用于转发和网络日志数据的轻量级采集工具。它可以为作为代理安装在做事器上,FileBeat监视指定路径的日志文件,网络日志数据,并将网络到的日志转发到Elasticsearch或者Logstash。
2、FileBeat的事情事理启动FileBeat时,会启动一个或者多个输入(Input),这些Input监控指定的日志数据位置。FileBeat会针对每一个文件启动一个Harvester(收割机)。Harvester读取每一个文件的日志,将新的日志发送到libbeat,libbeat将数据网络到一起,并将数据发送给输出(Output)。
3、安装FileBeat
安装FileBeat只须要将FileBeat Linux安装包上传到Linux系统,并将压缩包解压到系统就可以了。FileBeat官方下载地址:
tar -xvzf filebeat-7.6.1-linux-x86_64.tar.gz -C /usr/local/bigdata
1)、配置FileBeatsFileBeats配置文件紧张分为两个部分。
inputsoutput1、 input配置filebeat.inputs:
- type: log
enabled: true
paths:
-/var/log/.log #- c:\programdata\elasticsearch\logs\
在FileBeats中,可以读取一个或多个数据源
2、output配置
默认FileBeat会将日志数据放入到名称为:filebeat-%filebeat版本号%-yyyy.MM.dd 的索引中。FileBeats中的filebeat.reference.yml包含了FileBeats所有支持的配置选项。
4、利用FileBeat采集Kafka日志到Elasticsearch采集Kafka做事器日志,在Elasticsearch中快速查询这些日志,定位问题。
须要用FileBeats将日志数据上传到Elasticsearch中
要指定FileBeat采集哪些Kafka日志,由于FileBeats中必须知道采集存放在哪儿的日志,才能进行采集。
采集到这些数据后,还须要指定FileBeats将采集到的日志输出到Elasticsearch,那么Elasticsearch的地址也必须指定。
1)、配置
cd /usr/local/bigdata/filebeat-7.6.1-linux-x86_64
把稳yml文件的格式,即参数后面的空格
touch filebeat_kafka_log.yml
vim filebeat_kafka_log.yml
添加内容
filebeat.inputs:
- type: log
enabled: true
paths:
- /usr/local/bigdata/kafka_2.12-3.0.0/logs/server.log.
output.elasticsearch:
hosts: ["server1:9200","server2:9200", "server3:9200"]
2)、运行FileBeat1、启动Elasticsearch在每个节点上实行以下命令,启动Elasticsearch集群
nohup /usr/local/bigdata/elasticsearch-7.6.1/bin/elasticsearch 2>&1 &
2、运行FileBeatcd /usr/local/bigdata/filebeat-7.6.1-linux-x86_64
./filebeat -c filebeat_kafka_log.yml -e
3、准备kafka测试数据由于本机上已经有kafka,直吸网络其日志即可
[root@server1 logs]# pwd
/usr/local/bigdata/kafka_2.12-3.0.0/logs
[root@server1 logs]# ll
total 4360
-rw-r--r-- 1 alanchan root 24488 Jan 16 08:51 controller.log
-rw-r--r-- 1 alanchan root 25312 Jan 16 07:56 controller.log.2023-01-16-07
-rw-r--r-- 1 alanchan root 0 Dec 30 10:19 kafka-authorizer.log
-rw-r--r-- 1 alanchan root 0 Dec 30 10:19 kafka-request.log
-rw-r--r-- 1 alanchan root 4352360 Feb 22 05:10 kafkaServer-gc.log.0.current
-rw-r--r-- 1 alanchan root 2049 Jan 26 01:01 log-cleaner.log
-rw-r--r-- 1 alanchan root 2011 Jan 25 06:41 log-cleaner.log.2023-01-25-06
-rw-r--r-- 1 alanchan root 1592 Feb 22 03:08 server.log
-rw-r--r-- 1 alanchan root 9940 Feb 17 01:52 server.log.2023-02-17-01
-rw-r--r-- 1 alanchan root 3207 Feb 17 02:05 server.log.2023-02-17-02
-rw-r--r-- 1 alanchan root 3207 Feb 22 03:10 server.log.2023-02-22-01
-rw-r--r-- 1 alanchan root 219 Feb 17 01:36 state-change.log
-rw-r--r-- 1 alanchan root 219 Jan 18 06:16 state-change.log.2023-01-18-06
-rw-r--r-- 1 alanchan root 7524 Jan 18 07:38 state-change.log.2023-01-18-07
3)、验证
ILM:索引生命周期管理所需的索引filebeat-7.6.1:在ES中,可以创建索引的别名,可以利用别名来指向一个或多个索引。由于Elasticsearch中的索引创建后是不许可修正的,很多的业务场景下单一索引无法知足需求。别名也有利于ILM以是索引生命周期管理。索引数据
1、查看索引信息
GET /_cat/indices?v
2、查询索引库中的数据
GET /filebeat-7.6.1-2023.02.22-000001/_search
FileBeat自动给我们添加了一些关于日志、采集类型、Host各种字段
3、一个日志涉及到多行问题
在日平日记的处理中,常常会碰到日志中涌现非常的情形。类似下面的情形:
[2020-04-3014:00:05,725] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Errorwhen sending leader epoch request for Map(test_10m-2 ->(currentLeaderEpoch=Optional[161], leaderEpoch=158))(kafka.server.ReplicaFetcherThread)
java.io.IOException:Connection to node2.itcast.cn:9092 (id: 1 rack: null) failed.
atorg.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
atkafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:102)
at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)
atkafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)
atkafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)
atkafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
atkafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2020-04-3014:00:05,725] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] RetryingleaderEpoch request for partition test_10m-2 as the leader reported an error:UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)[2020-04-30
14:00:08,731] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0]
Connection to node 1 (node2.itcast.cn/192.168.88.101:9092) could not be
established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
在FileBeat中,Harvest是逐行读取日志文件的。但上述的日志会涌现一条日志,跨多行的情形。有非常信息时,肯定会涌现多行。
每条日志都是有统一格式的开头的,拿Kafka的日志来说,[2020-04-30 14:00:05,725]这是一个统一的格式,如果不因此这样的形式开头,解释这一行肯定是属于某一条日志,而不是独立的一条日志。以是,我们可以通过日志的开头来判断某一行是否为新的一条日志。
1)、准备测试数据在原来的日志根本上增加下面的内容
[2023-02-17 02:05:27,335] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Errorwhen sending leader epoch request for Map(test_10m-2 ->(currentLeaderEpoch=Optional[161], leaderEpoch=158))(kafka.server.ReplicaFetcherThread)
java.io.IOException:Connection to node2.itcast.cn:9092 (id: 1 rack: null) failed.
atorg.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
atkafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:102)
at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)
atkafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)
atkafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)
atkafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
atkafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2023-02-17 02:05:27,335] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] RetryingleaderEpoch request for partition test_10m-2 as the leader reported an error:UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)[2020-04-30
14:00:08,731] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0]
Connection to node 1 (node2.itcast.cn/192.168.88.101:9092) could not be
established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
2)、FileBeat多行配置选项在FileBeat的配置中,专门有一个办理一条日志跨多行问题的配置。紧张为以下三个配置
multiline.pattern: ^\[
multiline.negate: false
multiline.match: after
#multiline.pattern表示能够匹配一条日志的模式,默认配置的因此[开头的才认为是一条新的日志。^\[表示匹配以[开头的
#multiline.negate:
# 配置为false,正常匹配(默认),表示不须要取反
# 配置为true,表示取反
#multiline.match:表示是否将未匹配到的行追加到上一日志,还是追加到下一个日志。
# The regexp Pattern that has to be matched. Theexample pattern matches all lines starting with [
#multiline.pattern: ^\[
# Definesif the pattern set under pattern should be negated or not. Default is false.
#multiline.negate:false
# Matchcan be set to "after" or "before". It is used to define iflines should be append to a pattern
# that was(not) matched before or after or as long as a pattern is not matched based onnegate.
# Note:After is the equivalent to previous and before is the equivalent to to next inLogstash #multiline.match: after
3)、修正FileBeat的配置文件filebeat.inputs:
- type: log
enabled: true
paths:
- /usr/local/bigdata/kafka_2.12-3.0.0/logs/server.log.
multiline.pattern: '^\['
multiline.negate: true
multiline.match: after
output.elasticsearch:
hosts: ["server1:9200","server2:9200", "server3:9200"]
4)、删除「注册表」/data.jsonrm –rf /usr/local/bigdata/filebeat-7.6.1-linux-x86_64/data/registry/filebeat/data.json
5)、删除之前创建的索引delete /filebeat-7.6.1-2023.02.22-000001
6)、重启filebeat
./filebeat -c filebeat_kafka_log.yml -e
7)、验证数据{
"_index": "filebeat-7.6.1-2023.02.22-000001",
"_type": "_doc",
"_id": "MBTDd4YBH2rQ2w9rLCpT",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2023-02-22T06:15:26.120Z",
"input": {
"type": "log"
},
"ecs": {
"version": "1.4.0"
},
"host": {
"name": "server1"
},
"agent": {
"id": "72d36c50-e35e-4ac9-b143-83d68cc2b915",
"version": "7.6.1",
"type": "filebeat",
"ephemeral_id": "a95ff5b6-26d6-496d-a74e-11a067dd129e",
"hostname": "server1"
},
"log": {
"offset": 3207,
"file": {
"path": "/usr/local/bigdata/kafka_2.12-3.0.0/logs/server.log.2023-02-22-01"
},
"flags": [
"multiline"
]
},
"message": "[2023-02-17 02:05:27,335] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Errorwhen sending leader epoch request for Map(test_10m-2 ->(currentLeaderEpoch=Optional[161], leaderEpoch=158))(kafka.server.ReplicaFetcherThread) java.io.IOException:Connection to node2.itcast.cn:9092 (id: 1 rack: null) failed. atorg.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) atkafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:102) at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310) atkafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208) atkafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173) atkafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) atkafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)"
}
}
5、FileBeat是如何事情的FileBeat紧张由input和harvesters(收割机)组成。这两个组件协同事情,并将数据发送到指定的输出。
1)、input和harvester1、inputs(输入)
input是卖力管理Harvesters和查找所有要读取的文件的组件
如果输入类型是log,input组件会查找磁盘上与路径描述的所有文件,并为每个文件启动一个Harvester,每个输入都独立地运行
2、Harvesters(收割机)Harvesters卖力读取单个文件的内容,它卖力打开/关闭文件,并逐行读取每个文件的内容,将读取到的内容发送给输出
每个文件都会启动一个Harvester
Harvester运行时,文件将处于打开状态。如果文件在读取时,被移除或者重命名,FileBeat将连续读取该文件
2)、FileBeats如何保持文件状态FileBeat保存每个文件的状态,并定时将状态信息保存在磁盘的「注册表」文件中
该状态记录Harvester读取的末了一次偏移量,并确保发送所有的日志数据
如果输出(Elasticsearch或者Logstash)无法访问,FileBeat会记录成功发送的末了一行,并在输出(Elasticsearch或者Logstash)可用时,连续读取文件发送数据
在运行FileBeat时,每个input的状态信息也会保存在内存中,重新启动FileBeat时,会从「注册表」文件中读取数据来重新构建状态。
在/usr/local/bigdata/filebeat-7.6.1-linux-x86_64/data目录中有一个Registry文件夹,里面有一个data.json,该文件中记录了Harvester读取日志的offset。
[{
"source": "/usr/local/bigdata/kafka_2.12-3.0.0/logs/server.log.2023-02-22-01",
"offset": 4593,
"timestamp": "2023-02-22T06:25:46.135669494Z",
"ttl": -1,
"type": "log",
"meta": null,
"FileStateOS": {
"inode": 10880886,
"device": 64768
}
}]