Elastic Stack初篇-Logstash

简介: Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

一、Logstash简介


    Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

   

1005447-20180825100722017-1014186658.png

二、Logstash处理流程


  Logstash处理流程大致可分为3个阶段,Input---->Filter---->Output,用中文描述一下分别是,数据采集----->数据分析/解析---->数据输出;具体的处理流程可以查看下图,下面的一些函数和一些概念等后面我们在具体讲讲:

  1005447-20180825105835097-1815398791.png

 这里我们解释一下codec的概念,Codec 是 logstash 从 1.3.0 版开始新引入的概念(Codec 来自 Coder/decoder 两个单词的首字母缩写)。在此之前,logstash 只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入 期处理不同类型的数据,这全是因为有了 codec 设置。所以,这里需要纠正之前的一个概念。Logstash 不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流!codec 就是用来 decode、encode 事件的。

  Logstash Event也需要介绍,类似Java的Object的对象,在这个过程我们可以对其进行一些赋值等操作;


三、Logstash架构介绍


 上面介绍一下Logstash的数据流向,接下来我们介绍下Logstash的架构,当然我介绍的是6.X的架构,看下图

1005447-20180825114504517-1850580592.png

 

 由上图得知,我们可以有多个输入的文本,另外由Queue分发到不同的Pipline中,这里的Pipline中文的意思就是管道,在程序中我们可以理解为线程,可以有多个Pipline,并且每个Pipline之间是互不打扰的,另外每个Batcher,Filter和Output组成,Batcher是批量从Queue获取数据的,这个值可以通过配置进行设置;

 Pipline配置如下:

 pipeline.workers :  8  Pipeline线程数,及filter_output的处理线程数,默认是CPU的核数,命令行参数为-w

 pipeline.batch.size : 125  Batcher一次批量获取的待处理文档数,默认125(建议向es输出的时候10-20Mb之间,可以计算一下文档数),可以根据输出进行调整,越大会占用越多的heap空间,可以通过jvm.options调整。命令行参数-b

 pipeline.batch.delay :5  :Batcher的等待时长,单位为ms。命令行参数-u

 接下来我们在了解下Queue设计思路,使用Queue以后我们首先要关注的是怎么确保数据是正常被消费掉了,这里到达QutPut以后会有发送一个ACK给Queue来告诉Queue这些Logstash Event已经处理完了,这样就能确保一个数据正常被消费掉,这也是在消息队列中我们常用的一种手段,接下来我们再聊聊Queue分类:

 1.In Memory(在内存)

 在内存中,固定大小,无法处理进程crash、机器宕机等情况,会导致数据丢失。

 2.Persistent Queue In Disk (持久化到磁盘)

 可处理进程crash情况,保证数据不丢失。保证数据至少消费一次;充当缓冲区,可代替kafka等消息队列作用。

 接下来我们来了解下持久队列是怎么保证的?这里我们从数据到队列以后的处理开始说起,首先队列将数据备份到disk(磁盘),队列返回响应给Input,最后数据经过OutPut以后返回ACK给队列,当队列接收到消息以后开始删除disk中备份的数据,这样就能保证数据持久性;

 性能方面对比如下图,基本性能是无差别的:

 1005447-20180825142902048-703565607.png

 队列主要配置如下:

 queue.type: persisted 默认是memory

 queue.max_bytes: 4gb 默认是1gb

四、Logstash配置介绍

  我们会使用的配置文件在config目录录下面,logstash.yml和jvm.options,另外6.0以上的版本会有pipelines.yml,这个文件是为了在同一个进程中运行多个管道,具体可以参考下官方,接下来我们主要介绍下参数的配置logstash.yml配置:

  node.name:节点名,默认是主机名;

  path.data:持续化存储数据的文件夹,默认是在Logstash home目录下的data

  path.config:设定Pipeline配置文件的目录

  path.log:设置Pipeline日志目录

  第三小节介绍的Queue和Pipeline都可以在该文件下设置,这里不做过多的介绍,另外更细节的一些参数配置大家可以参考官方

  jvm.options这个里面的配置大家可以自己根据自己机器的情况进行下JVM参数调优;

  另外还有Pipeline配置文件,定义数据处理流程的文件,以.conf结尾


五、Pipeline配置


 布尔类型 Boolean:isFailed => true

 数值类型 Number:age => 33

 字符串类型 String: name => “Hello”

 数组类型:users => [{age=>11, name=>wtz}, {age => 22, name => myt}] 或者 path => [“/var/log/error,log”,”/var/log/warn.log”]

 哈希类型:match => {“field” => “value1”  “field”=>”value2”}

 备注:#

 在配置中可以引用Logstash Event的属性(字段),主要由下面两种方式:

 1.直接引用字段值 – 直接使用[],如果是多层直接嵌套

 1005447-20180825154719809-1573726365.png

 2.在字符串中以 sprintf 方式引用 – 使用%{}来实现

 1005447-20180825154732797-1555763504.png

配置文件支持条件判断语法:

 1005447-20180825154804227-567555230.png

表达式操作如下:

1.比较:== != < > <= >=

2.正则是否匹配:=~ =

3.包含(字符串或者数组):in, not in

4.布尔操作符:and or nand xor !

5.分组操作符(条件非常复杂的时候,其实就相当于括号):()


六、插件详解


 第二部分的时候我们介绍了Logstash的数据处理流程,涉及到一些插件流程,接下来我们介绍下插件的配置,也是分成3部分介绍:

input插件介绍:

1.stdin

最简单的输入,从标准输入读取数据

2.file

从文件读取数据,参数介绍如下:

3.kafka

这个是我们使用的配置,供大家参考下,另外大家可以参考官方文档

input {
    kafka {
        bootstrap_servers => "服务地址"
        group_id => "消费组id"
        topics => ["订阅的主题列表","订阅的主题列表"]
        codec => "json"
        auto_offset_reset => "Kafka偏移量,超出设置为最早偏移量"
        consumer_threads => "线程数"
    }
}

另外大家还可以看下官方文档选择自己合适的使用;

Filter插件介绍

1.grok

解析和构造任意文本,Grok是目前Logstash中解析非结构化日志数据到结构化和可查询数据的最佳方式,使用内置的120种模式;

另外可以阅读下这篇文章你真的理解grok吗

2.mutate

对事件字段执行一般的转换,你可以重命名、删除、替换和修改事件中的字段。

3.drop

完全删除事件,例如调试事件。

4.clone

复制事件,可能添加或删除字段。

5.geoip

添加关于IP地址地理位置的信息

更多的大家可以阅读官方文档

OutPut插件介绍

 最常用的就是输出到es,配置如下

elasticsearch {
      hosts => "地址"
      index => "索引"
      document_type => "文档类型"
      template_overwrite => 是否重写
}

Codec插件介绍

 1.plain 读取原始内容

2.rubydebug将Logstash Event按照ruby格式输出,方便调试

3.line处理带有换行符的内容

4.json处理json格式的内容

5.multiline处理多行数据的内容


七、最后说点什么


参考与学习的来源为慕课网和官方文档,

相关文章
|
存储 安全 数据可视化
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(中)
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(中)
187 0
|
数据采集 数据可视化 搜索推荐
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(上)
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(上)
253 0
|
测试技术 Apache
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(下)
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(下)
182 0
|
存储 监控 数据可视化
|
存储 JSON 自然语言处理
Elastic Stack-Elasticsearch使用介绍(二)
Elastic Stack-Elasticsearch使用介绍
Elastic Stack-Elasticsearch使用介绍(二)
|
存储 缓存 API
Elastic Stack-Elasticsearch使用介绍(三)
Elastic Stack-Elasticsearch使用介绍
Elastic Stack-Elasticsearch使用介绍(三)
|
存储 缓存 自然语言处理
Elastic Stack-Elasticsearch使用介绍(四)
Elastic Stack-Elasticsearch使用介绍
Elastic Stack-Elasticsearch使用介绍(四)
|
存储 缓存 JSON
Elastic Stack-Elasticsearch使用介绍(五)
Elastic Stack-Elasticsearch使用介绍
Elastic Stack-Elasticsearch使用介绍(五)
|
JSON 监控 Java
Elastic Stack-Elasticsearch使用介绍(六)
Elastic Stack-Elasticsearch使用介绍
Elastic Stack-Elasticsearch使用介绍(六)