4、使用FileBeat采集MQ日志到Elasticsearch
4.1、需求分析
在资料中有一个mq_server.log.tar.gz压缩包,里面包含了很多的MQ服务器日志,现在我们为了通过在Elasticsearch中快速查询这些日志,定位问题。我们需要用FileBeats将日志数据上传到Elasticsearch中。
问题:
首先,我们要指定FileBeat采集哪些MQ日志,因为FileBeats中必须知道采集存放
在哪儿的日志,才能进行采集。
其次,采集到这些数据后,还需要指定FileBeats将采集到的日志输出到Elasticsearch,那么Elasticsearch的地址也必须指定。
4.2、配置FileBeats
FileBeats配置文件主要分为两个部分。
1. inputs
2. output
从名字就能看出来,一个是用来输入数据的,一个是用来输出数据的。
4.2.1、input配置
filebeat.inputs: ‐ type: log enabled: true paths: ‐ /var/log/*.log #‐ c:\programdata\elasticsearch\logs\*
在FileBeats中,可以读取一个或多个数据源。
FileBeats配置文件 - input4.2.2、output配置
FileBeat配置文件 - output
默认FileBeat会将日志数据放入到名称为:filebeat-%filebeat版本号%- yyyy.MM.dd 的索引中。
PS:
FileBeats中的filebeat.reference.yml包含了FileBeats所有支持的配置选项。
4.3、配置文件
- 创建配置文件
cd /usr/local/es/filebeat‐7.6.1‐linux‐x86_64 touch filebeat_mq_log.yml vim filebeat_mq_log.yml
- 复制以下到配置文件中
filebeat.inputs: ‐ type: log enabled: true4 paths: ‐ /var/mq/log/server.log.* output.elasticsearch: hosts: ["192.168.21.130:9200", "192.168.21.131:9200", "192.168.21.132: 9200"]
4.4、运行FileBeat
- 启动Elasticsearch
在每个节点上执行以下命令,启动Elasticsearch集群:
nohup /usr/local/es/elasticsearch‐7.6.1/bin/elasticsearch 2>&1 &
- 运行FileBeat
./filebeat ‐c filebeat_mq_log.yml ‐e
- 将日志数据上传到/var/mq/log,并解压
mkdir ‐p /var/mq/log cd /var/mq/log tar ‐zxvf mq_server.log.tar.gz
4.5、查询数据
通过head插件,我们可以看到filebeat采集了日志消息,并写入到Elasticsearch 集群中。
五、FileBeat是如何工作的
FileBeat主要由input和harvesters(收割机)组成。这两个组件协同工作,并将
数据发送到指定的输出。
1、input和harvester1.1、inputs(输入)
input是负责管理Harvesters和查找所有要读取的文件的组件,如果输入类型是 log,input组件会查找磁盘上与路径描述的所有文件,并为每个文件启动一个Harvester,每个输入都独立地运行1.2、Harvesters(收割机)。
Harvesters负责读取单个文件的内容,它负责打开/关闭文件,并逐行读取每个
文件的内容,将读取到的内容发送给输出
每个文件都会启动一个Harvester ,Harvester运行时,文件将处于打开状态。如果文件在读取时,被移除或者重命名,FileBeat将继续读取该文件
2、FileBeats如何保持文件状态
FileBeat保存每个文件的状态,并定时将状态信息保存在磁盘的「注册表」文件
中
该状态记录Harvester读取的最后一次偏移量,并确保发送所有的日志数据,如果输出(Elasticsearch或者Logstash)无法访问,FileBeat会记录成功发送的最后一行,并在输出(Elasticsearch或者Logstash)可用时,继续读取文件发送数据。在运行FileBeat时,每个input的状态信息也会保存在内存中,重新启动FileBeat时,会从「注册表」文件中读取数据来重新构建状态。
在/usr/local/es/filebeat-7.6.1-linux-x86_64/data目录中有一个Registry文件夹,里面有一个data.json,该文件中记录了Harvester读取日志的offset。
六. Logstash
1、简介
Logstash是一个开源的数据采集引擎。它可以动态地将不同来源的数据统一采集,并按照指定的数据格式进行处理后,将数据加载到其他的目的地。最开始,Logstash主要是针对日志采集,但后来Logstash开发了大量丰富的插件,所以,它可以做更多的海量数据的采集。
它可以处理各种类型的日志数据,例如:Apache的web log、Java的log4j日志数据,或者是系统、网络、防火墙的日志等等。它也可以很容易的和Elastic Stack的Beats组件整合,也可以很方便的和关系型数据库、NoSQL数据库、MQ等整合。
1.1 经典架构1.2 对比FileBeat
logstash是jvm跑的,资源消耗比较大,而FileBeat是基于golang编写的,功能较少但资源消耗也比较小,更轻量级。
logstash 和filebeat都具有日志收集功能,Filebeat更轻量,占用资源更少,logstash 具有filter功能,能过滤分析日志。
一般结构都是filebeat采集日志,然后发送到消息队列,redis,MQ中然后logstash去获取,利用filter功能过滤分析,然后存储到elasticsearch中,FileBeat和Logstash配合,实现背压机制2 安装Logstash和Kibana
2.1 安装Logstash
- 下载Logstash
此处:我们可以选择资料中的logstash-7.6.1.zip安装包。
2. 解压Logstash到指定目录
unzip logstash‐7.6.1 ‐d /usr/local/es/
- 运行测试
cd /usr/local/es/logstash‐7.6.1/ bin/logstash ‐e 'input { stdin { } } output { stdout {} }'
等待一会,让Logstash启动完毕。
Sending Logstash logs to /usr/local/es/logstash‐7.6.1/logs which is now co nfigured via log4j2.properties [2021‐02‐28T16:31:44,159][WARN ][logstash.config.source.multilocal] Ignori ng the 'pipelines.yml' file because modules or command line options are speci fied [2021‐02‐28T16:31:44,264][INFO ][logstash.runner ] Starting Logst ash {"logstash.version"=>"7.6.1"} [2021‐02‐28T16:31:45,631][INFO ][org.reflections.Reflections] Reflections took 37 ms to scan 1 urls, producing 20 keys and 40 values [2021‐02‐28T16:31:46,532][WARN ][org.logstash.instrument.metrics.gauge.Laz yDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.RubyArra y) has been create for key: cluster_uuids. This may result in invalid seriali zation. It is recommended to log an issue to the responsible developer/devel opment team. [2021‐02‐28T16:31:46,560][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"= >125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sou rces"=>["config string"], :thread=>"#<Thread:0x3ccbc15b run>"} [2021‐02‐28T16:31:47,268][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} The stdin plugin is now waiting for input: [2021‐02‐28T16:31:47,348][INFO ][logstash.agent ] Pipelines runn ing {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2021‐02‐28T16:31:47,550][INFO ][logstash.agent ] Successfully s tarted Logstash API endpoint {:port=>9600}
然后,随便在控制台中输入内容,等待Logstash的输出。
{ "host" => "127.0.0.1", "message" => "hello logstash", "@version" => "1", "@timestamp" => 2021‐02‐28:01:01.007Z }
ps:-e选项表示,直接把配置放在命令中,这样可以有效快速进行测试文档