「视频小课堂」Logstash如何成为镇得住场面的数据管道(文字版)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 「视频小课堂」Logstash如何成为镇得住场面的数据管道(文字版)

视频地址


B站视频地址:Logstash如何成为镇得住场面的数据管道


公众号视频地址:Logstash如何成为镇得住场面的数据管道


知乎视频地址:Logstash如何成为镇得住场面的数据管道


内容


首先我们延续上一期视频中日志采集架构的案例,Filebeat采集日志并推送Kafka消息队列进行分发,再由Logstash消费日志消息,并将日志数据最终落地在Elasticsearch集群索引当中,Kafka作为消息队列分发服务需要将收集到的日志消息继续分发下去,最终数据落地在Elasticsearch集群索引当中。


那么连接整个过程的主角Logstash是如何工作的,就是我们今天讲解的重点。


Logstash工作过程分为三个部分:Input输入、Filter过滤、Output输出,它们共同协作形成了完整的Logstash数据管道传输机制


我们先从一个最简单的例子演示开始,看看Logstash是怎么输入和输出的,这一次先跳过filter过滤环节。


下面查看已经预置好的一个配置文件01-kafka-elastic-nginx.conf


首先是input输入配置点,从Kafka订阅消息,Kafka集群地址与filebeat中都指向了一个地址,其他配置我们先略过,后续Kafka专题再说


下来看到要订阅的Topic主题TestT3,我们先不用json格式解码消息,默认就是纯文本的方式


一样的,这一步先略过过滤环节,直接看看output输出配置点,目标是给Elasticsearch输出数据,并指定了elasticsearch集群的三个节点


输出环节创建需要写入的elasticsearch日志索引,我们先按照默认的filebeat采集时间,进行日期格式化,按照每个小时建立一个索引,这块会有时间问题,一会儿再说。


让数据输出到终端,方便我们调试结果。


通过演示中最简单的配置方式,这时候的Logstash已经成为连接Kafka和Elastisearch之间的数据管道了!


好,接下来我们将所有系统运行起来,并生成一条nginx请求日志,看看管道各个阶段的数据变化。


首先nginx日志数据被filebeat采集,是一条典型的无结构的文本日志数据,大家注意红色标注的时间是2021年2月21日13时


接着这条日志数据通过Kafka进入到了Logstash管道的输入阶段,


Logstash为这条日志生成了更为非常庞大的Json数据,里面包括了所有被采集主机的信息,以及nginx日志,实际上这些原始信息并没有被良好的进行数据清洗与结构化


最后数据被写入到Elastisearch一个按小时划分的索引当中,对应时间为2021年2月21日5时


我们发现Logstash对原始数据在没有任何处理的情况下,会很不方便将来数据的使用;


这次我们利用Logstash json解码器让管道重新再来一次,


接下来我们进入Logstash中对应的配置文件,并找到input输入点的codec配置,删掉注释,打开Logstash对输入数据的json解码方式·。


我们看看再次进入管道中的日志数据,Logstash首先对原始日志数据进行Json解析


这时候我们再看Json解析后的数据,是不是就清晰多了,filebeat采集到的本地机器数据、以及红色框中Nginx HTTP日志数据、以及其他标签数据都进行了字段分离


做到这一步其实还是不够好,为什么呢?一方面因为我们依然希望将Nginx HTTP的日志数据也进行结构化处理,


另一个方面,Filebeat传递给Logstash的系统时间是慢了8个小时的UTC时间标准,反而Nginx日志中的时间是我们本地的北京时间标准,因此我们希望用Nginx日志时间作为创建Elasticsearch日志索引的唯一依据


这时候我们就要使用Logstash的过滤机制了,我们继续进入Logstash对应的配置中,删掉过滤配置中的注释,让Logstash过滤最常用插件grok、date、ruby、mutate起作用


grok插件是专业处理非结构化数据的能手,通过自定义的Nginx日志正则表达式,就能实现Nginx日志的结构化解析


date插件用于处理时间问题,我们通过date插件将nginx日志中的时间转换成Logstash时间对象,并赋给一个新的临时时间字段indextime


ruby就是在过滤过程中可以插入ruby脚本语言来进行程序级处理,我们通过ruby语言对indextime时间格式化,生成一个精确到小时的字符串字段index.date,用于elasticsearch索引名称


mutate是最常用的可以对管道中数据字段进行操作的插件了,我们的目的是删除临时时间字段indextime


最后我们还需要将output输出中的索引生成方式修改一下,注释掉原来用filebeat生默认时间生成的索引,改成nginx日志时间生成的索引。


我们重新运行Logstash,数据经过了Input解码、日志grok结构化处理、本地时间对象创建,并进行日期格式化,为了生成新的Elasticsearch索引字段,并对临时字段进行删除,最终经过Output输出阶段,创建Elasticsearch索引或写入日志数据


让我们看看Elasticsearch最终保存的数据效果,index索引对应的时间来自过滤器创建的index.date字段,index.date字段又来自nginx日志中分离出的本地时间。这样我们就不用再去修改Logstash的系统时间了


我们看到菱形标注的字段数据就是由过滤器对nginx http日志进行结构化抽取的结果,


同样elasticsearch依然保存着nginx日志的原始数据以备不时之需


相关文章
|
3月前
|
消息中间件 Java Linux
RocketMQ的下载与安装(全网最细保姆级别教学)
RocketMQ的下载与安装(全网最细保姆级别教学)
1279 1
|
13天前
|
分布式计算 资源调度 测试技术
“Spark Streaming异常处理秘籍:揭秘如何驯服实时数据流的猛兽,守护你的应用稳如泰山,不容错过!”
【8月更文挑战第7天】Spark Streaming 是 Apache Spark 中的关键组件,用于实时数据流处理。部署时可能遭遇数据问题、资源限制或逻辑错误等异常。合理处理这些异常对于保持应用稳定性至关重要。基础在于理解其异常处理机制,通过 DSC 将数据流切分为 RDD。对于数据异常,可采用 try-catch 结构捕获并处理;资源层面异常需优化 Spark 配置,如调整内存分配;逻辑异常则需加强单元测试及集成测试。结合监控工具,可全面提升应用的健壮性和可靠性。
22 3
|
前端开发
前端学习笔记202305学习笔记第二十四天-文件写入
前端学习笔记202305学习笔记第二十四天-文件写入
58 0
|
存储 Java 关系型数据库
【Elasticsearch 技术分享】—— 十张图带大家看懂 ES 原理 !明白为什么说:ES 是准实时的!
说到 Elasticsearch ,其中最明显的一个特点就是 near real-time 准实时 —— 当文档存储在Elasticsearch中时,将在1秒内以几乎实时的方式对其进行索引和完全搜索。那为什么说 ES 是准实时的呢?
1135 0
|
存储 数据采集 监控
OushuDB 小课堂丨数据管道测试自动化的最佳实践
OushuDB 小课堂丨数据管道测试自动化的最佳实践
107 0
|
监控
大屏监控系统实战(16)-项目拾遗
大屏监控系统实战(16)-项目拾遗
147 0
大屏监控系统实战(16)-项目拾遗
|
数据采集 监控 Kubernetes
您有一份来自iLogtail社区的礼物待查收
从2021年11月开源以来,高性能轻量级可观测性采集器iLogtail受到了大量开发者的关注和建议。累计收藏800+次,收到建议100+条,PR 180+次,文章阅读量破万。在开源之初我们就坚信开源才是iLogtail最优的发展策略,也是释放其最大价值的方法,时隔9个月开源社区的积极反响让我们亲身体会到了社区在推动开源软件发展的价值,也让我们决定推出更多激励计划来感谢一直支持iLogtail的贡献者们。
300 0
您有一份来自iLogtail社区的礼物待查收
|
消息中间件 存储 监控
|
存储 缓存 自然语言处理
白日梦的ES笔记三:万字长文 Elasticsearch基础概念统一扫盲(一)
白日梦的ES笔记三:万字长文 Elasticsearch基础概念统一扫盲(一)
252 0
|
存储 负载均衡 算法
白日梦的ES笔记三:万字长文 Elasticsearch基础概念统一扫盲(三)
白日梦的ES笔记三:万字长文 Elasticsearch基础概念统一扫盲(三)
181 0