4.1 FileBeat简介
FileBeat专门用于转发和收集日志数据的轻量级采集工具。它可以为作为代理安装在服务器上,FileBeat监视指定路径的日志文件,收集日志数据,并将收集到的日志转发到Elasticsearch或者Logstash。
4.2 FileBeat的工作原理
启动FileBeat时,会启动一个或者多个输入(Input),这些Input监控指定的日志数据位置。FileBeat会针对每一个文件启动一个Harvester(收割机)。Harvester读取每一个文件的日志,将新的日志发送到libbeat,libbeat将数据收集到一起,并将数据发送给输出(Output)。
4.3 安装FileBeat
安装FileBeat只需要将FileBeat Linux安装包上传到Linux系统,并将压缩包解压到系统就可以了。
FileBeat官方下载地址:
https://www.elastic.co/cn/downloads/past-releases/filebeat-7-6-1
上传FileBeat安装到Linux,并解压
tar -xvzf filebeat-7.6.1-linux-x86_64.tar.gz -C /usr/local/es/
4.4 使用FileBeat采集MQ日志到Elasticsearch
4.4.1 需求分析
在资料中有一个mq_server.log.tar.gz压缩包,里面包含了很多的MQ服务器日志,现在我们为了通过在Elasticsearch中快速查询这些日志,定位问题。我们需要用FileBeats将日志数据上传到Elasticsearch中。
问题:
首先,我们要指定FileBeat采集哪些MQ日志,因为FileBeats中必须知道采集存放在哪儿的日志,才能进行采集。
其次,采集到这些数据后,还需要指定FileBeats将采集到的日志输出到Elasticsearch,那么Elasticsearch的地址也必须指定。
4.4.2 配置FileBeats
FileBeats配置文件主要分为两个部分。
- inputs
- output
从名字就能看出来,一个是用来输入数据的,一个是用来输出数据的。
input配置
filebeat.inputs: - type: log enabled: true paths: - /var/log/*.log #- c:programdataelasticsearchlogs*
在FileBeats中,可以读取一个或多个数据源。
FileBeats配置文件 - input
output配置
FileBeat配置文件 - output
默认FileBeat会将日志数据放入到名称为:filebeat-%filebeat版本号%-yyyy.MM.dd 的索引中。
PS:
FileBeats中的filebeat.reference.yml包含了FileBeats所有支持的配置选项。
4.4.3 配置文件
- 创建配置文件
cd /usr/local/es/filebeat-7.6.1-linux-x86_64
touch filebeat_mq_log.yml
vim filebeat_mq_log.yml - 复制以下到配置文件中
filebeat.inputs:
- type: logenabled: truepaths:
- /var/mq/log/server.log.*
- output.elasticsearch:
hosts: [“192.168.21.130:9200”, “192.168.21.131:9200”, “192.168.21.132:9200”]
4.4.4 运行FileBeat
- 启动Elasticsearch
在每个节点上执行以下命令,启动Elasticsearch集群:
nohup /usr/local/es/elasticsearch-7.6.1/bin/elasticsearch 2>&1 & - 运行FileBeat
- filebeat -c filebeat_mq_log.yml -e
- 将日志数据上传到/var/mq/log,并解压
mkdir -p /var/mq/log
cd /var/mq/log
tar -zxvf mq_server.log.tar.gz
4.5 查询数据
通过head插件,我们可以看到filebeat采集了日志消息,并写入到Elasticsearch集群中。
五 FileBeat是如何工作的
FileBeat主要由input和harvesters(收割机)组成。这两个组件协同工作,并将数据发送到指定的输出。
5.1 input和harvester
5.1.1 inputs(输入)
input是负责管理Harvesters和查找所有要读取的文件的组件
如果输入类型是 log,input组件会查找磁盘上与路径描述的所有文件,并为每个文件启动一个Harvester,每个输入都独立地运行
5.1.2 Harvesters(收割机)
Harvesters负责读取单个文件的内容,它负责打开/关闭文件,并逐行读取每个文件的内容,将读取到的内容发送给输出
每个文件都会启动一个Harvester
Harvester运行时,文件将处于打开状态。如果文件在读取时,被移除或者重命名,FileBeat将继续读取该文件
5.2 FileBeats如何保持文件状态
FileBeat保存每个文件的状态,并定时将状态信息保存在磁盘的「注册表」文件中
该状态记录Harvester读取的最后一次偏移量,并确保发送所有的日志数据
如果输出(Elasticsearch或者Logstash)无法访问,FileBeat会记录成功发送的最后一行,并在输出(Elasticsearch或者Logstash)可用时,继续读取文件发送数据
在运行FileBeat时,每个input的状态信息也会保存在内存中,重新启动FileBeat时,会从「注册表」文件中读取数据来重新构建状态。
’
在/usr/local/es/filebeat-7.6.1-linux-x86_64/data目录中有一个Registry文件夹,里面有一个data.json,该文件中记录了Harvester读取日志的offset。
六 Logstash
6.1 简介
Logstash是一个开源的数据采集引擎。它可以动态地将不同来源的数据统一采集,并按照指定的数据格式进行处理后,将数据加载到其他的目的地。最开始,Logstash主要是针对日志采集,但后来Logstash开发了大量丰富的插件,所以,它可以做更多的海量数据的采集。
它可以处理各种类型的日志数据,例如:Apache的web log、Java的log4j日志数据,或者是系统、网络、防火墙的日志等等。它也可以很容易的和Elastic Stack的Beats组件整合,也可以很方便的和关系型数据库、NoSQL数据库、MQ等整合。
1.1.1 经典架构
1.1.2 对比FileBeat
logstash是jvm跑的,资源消耗比较大
而FileBeat是基于golang编写的,功能较少但资源消耗也比较小,更轻量级
logstash 和filebeat都具有日志收集功能,Filebeat更轻量,占用资源更少
logstash 具有filter功能,能过滤分析日志
一般结构都是filebeat采集日志,然后发送到消息队列,redis,MQ中然后logstash去获取,利用filter功能过滤分析,然后存储到elasticsearch中
FileBeat和Logstash配合,实现背压机制
6.2 安装Logstash和Kibana
Kibana第一节就讲过如何安装了,这里不再赘述
安装Logstash
下载Logstash:https://www.elastic.co/cn/downloads/past-releases/logstash-7-6-1
解压Logstash到指定目录
unzip logstash-7.6.1 -d /usr/local/es/
运行测试
cd /usr/local/es/logstash-7.6.1/ bin/logstash -e 'input { stdin { } } output { stdout {} }'
等待一会,让Logstash启动完毕
Sending Logstash logs to /usr/local/es/logstash-7.6.1/logs which is now configured via log4j2.properties [2022-02-28T16:31:44,159][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified [2022-02-28T16:31:44,264][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.6.1"} [2022-02-28T16:31:45,631][INFO ][org.reflections.Reflections] Reflections took 37 ms to scan 1 urls, producing 20 keys and 40 values [2022-02-28T16:31:46,532][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.RubyArray) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team. [2022-02-28T16:31:46,560][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x3ccbc15b run>"} [2022-02-28T16:31:47,268][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} The stdin plugin is now waiting for input: [2022-02-28T16:31:47,348][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2022-02-28T16:31:47,550][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
然后,随便在控制台中输入内容,等待Logstash的输出。
{ "host" => "127.0.0.1", "message" => "hello logstash", "@version" => "1", "@timestamp" => 2021-02-28:01:01.007Z }
ps:
-e选项表示,直接把配置放在命令中,这样可以有效快速进行测试
cd /logstash-7.6.2/config
复制一份 logstash-sample.conf 文件 并重命名logstash.conf
cp logstash-sample.conf logstash.conf
vi logstash.conf
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { tcp { host =>"192.168.25.128" port => 4560 codec => "json" } } output { elasticsearch { action => "index" hosts => ["192.168.25.128:9200"] index => "winy-portal" }
启动:进入bin目录
./logstash -f /home/app/logstash-7.6.2/config/logstash.conf
后台启动:
nohup ./logstash -f /home/app/logstash-7.6.2/config/logstash.conf &