视频地址
B站视频地址:Logstash如何成为镇得住场面的数据管道
公众号视频地址:Logstash如何成为镇得住场面的数据管道
知乎视频地址:Logstash如何成为镇得住场面的数据管道
内容
首先我们延续上一期视频中日志采集架构的案例,Filebeat采集日志并推送Kafka消息队列进行分发,再由Logstash消费日志消息,并将日志数据最终落地在Elasticsearch集群索引当中,Kafka作为消息队列分发服务需要将收集到的日志消息继续分发下去,最终数据落地在Elasticsearch集群索引当中。
那么连接整个过程的主角Logstash是如何工作的,就是我们今天讲解的重点。
Logstash工作过程分为三个部分:Input输入、Filter过滤、Output输出,它们共同协作形成了完整的Logstash数据管道传输机制
我们先从一个最简单的例子演示开始,看看Logstash是怎么输入和输出的,这一次先跳过filter过滤环节。
下面查看已经预置好的一个配置文件01-kafka-elastic-nginx.conf
首先是input输入配置点,从Kafka订阅消息,Kafka集群地址与filebeat中都指向了一个地址,其他配置我们先略过,后续Kafka专题再说
下来看到要订阅的Topic主题TestT3,我们先不用json格式解码消息,默认就是纯文本的方式
一样的,这一步先略过过滤环节,直接看看output输出配置点,目标是给Elasticsearch输出数据,并指定了elasticsearch集群的三个节点
输出环节创建需要写入的elasticsearch日志索引,我们先按照默认的filebeat采集时间,进行日期格式化,按照每个小时建立一个索引,这块会有时间问题,一会儿再说。
让数据输出到终端,方便我们调试结果。
通过演示中最简单的配置方式,这时候的Logstash已经成为连接Kafka和Elastisearch之间的数据管道了!
好,接下来我们将所有系统运行起来,并生成一条nginx请求日志,看看管道各个阶段的数据变化。
首先nginx日志数据被filebeat采集,是一条典型的无结构的文本日志数据,大家注意红色标注的时间是2021年2月21日13时
接着这条日志数据通过Kafka进入到了Logstash管道的输入阶段,
Logstash为这条日志生成了更为非常庞大的Json数据,里面包括了所有被采集主机的信息,以及nginx日志,实际上这些原始信息并没有被良好的进行数据清洗与结构化
最后数据被写入到Elastisearch一个按小时划分的索引当中,对应时间为2021年2月21日5时
我们发现Logstash对原始数据在没有任何处理的情况下,会很不方便将来数据的使用;
这次我们利用Logstash json解码器让管道重新再来一次,
接下来我们进入Logstash中对应的配置文件,并找到input输入点的codec配置,删掉注释,打开Logstash对输入数据的json解码方式·。
我们看看再次进入管道中的日志数据,Logstash首先对原始日志数据进行Json解析
这时候我们再看Json解析后的数据,是不是就清晰多了,filebeat采集到的本地机器数据、以及红色框中Nginx HTTP日志数据、以及其他标签数据都进行了字段分离
做到这一步其实还是不够好,为什么呢?一方面因为我们依然希望将Nginx HTTP的日志数据也进行结构化处理,
另一个方面,Filebeat传递给Logstash的系统时间是慢了8个小时的UTC时间标准,反而Nginx日志中的时间是我们本地的北京时间标准,因此我们希望用Nginx日志时间作为创建Elasticsearch日志索引的唯一依据
这时候我们就要使用Logstash的过滤机制了,我们继续进入Logstash对应的配置中,删掉过滤配置中的注释,让Logstash过滤最常用插件grok、date、ruby、mutate起作用
grok插件是专业处理非结构化数据的能手,通过自定义的Nginx日志正则表达式,就能实现Nginx日志的结构化解析
date插件用于处理时间问题,我们通过date插件将nginx日志中的时间转换成Logstash时间对象,并赋给一个新的临时时间字段indextime
ruby就是在过滤过程中可以插入ruby脚本语言来进行程序级处理,我们通过ruby语言对indextime时间格式化,生成一个精确到小时的字符串字段index.date,用于elasticsearch索引名称
mutate是最常用的可以对管道中数据字段进行操作的插件了,我们的目的是删除临时时间字段indextime
最后我们还需要将output输出中的索引生成方式修改一下,注释掉原来用filebeat生默认时间生成的索引,改成nginx日志时间生成的索引。
我们重新运行Logstash,数据经过了Input解码、日志grok结构化处理、本地时间对象创建,并进行日期格式化,为了生成新的Elasticsearch索引字段,并对临时字段进行删除,最终经过Output输出阶段,创建Elasticsearch索引或写入日志数据
让我们看看Elasticsearch最终保存的数据效果,index索引对应的时间来自过滤器创建的index.date字段,index.date字段又来自nginx日志中分离出的本地时间。这样我们就不用再去修改Logstash的系统时间了
我们看到菱形标注的字段数据就是由过滤器对nginx http日志进行结构化抽取的结果,
同样elasticsearch依然保存着nginx日志的原始数据以备不时之需