Logstash:从grok到5.X版本的dissect

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 打开微信扫一扫,关注微信公众号【数据与算法联盟】 转载请注明出处:http://blog.csdn.net/gamer_gyt 博主微博:http://weibo.com/234654758 Github:https://github.com/thinkgamer写在前边的话grok 作为 Logstash 最广为人知的插件,在性能和资源损耗方面同样也广为诟病。


这里写图片描述
打开微信扫一扫,关注微信公众号【数据与算法联盟】

转载请注明出处:http://blog.csdn.net/gamer_gyt
博主微博:http://weibo.com/234654758
Github:https://github.com/thinkgamer


写在前边的话

grok 作为 Logstash 最广为人知的插件,在性能和资源损耗方面同样也广为诟病。为了应对这个情况,同时也考虑到大多数时候,日志格式并没有那么复杂,Logstash 开发团队在 5.0 版新添加了另一个解析字段的插件:dissect。当日志格式有比较简明的分隔标志位,而且重复性较大的时候,我们可以使用 dissect 插件更快的完成解析工作。


安装和example

安装

使用之前首先检查自己的logstash plugin中是否包含,如果未包含,请先安装
检查所有插件

/your/logstash/path/bin/logstash-plugin list

安装logstash-filter-dissect

/your/logstash/path/bin/logstash-plugin install logstash-filter-dissect

example

filter {
    dissect {
        mapping => {
            "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"
        }
        convert_datatype => {
            pid => "int"
        }
    }
}

语法解释:

我们看到上面使用了和 Grok 很类似的 %{} 语法来表示字段,这显然是基于习惯延续的考虑。不过示例中 %{+ts} 的加号就不一般了。dissect 除了字段外面的字符串定位功能以外,还通过几个特殊符号来处理字段提取的规则:
● %{+key} 这个 + 表示,前面已经捕获到一个 key 字段了,而这次捕获的内容,自动添补到之前 key 字段内容的后面。
● %{+key/2} 这个 /2 表示,在有多次捕获内容都填到 key 字段里的时候,拼接字符串的顺序谁前谁后。/2 表示排第 2 位。
● %{?string} 这个 ? 表示,这块只是一个占位,并不会实际生成捕获字段存到 Event 里面。
● %{?string} %{&string} 当同样捕获名称都是 string,但是一个 ? 一个 & 的时候,表示这是一个键值对。

比如对 http://rizhiyi.com/index.do?id=123 写这么一段配置:

http://%{domain}/%{?url}?%{?arg1}=%{&arg1}

则最终生成的 Event 内容是这样的:

{
  domain => "rizhiyi.com",
  id => "123"
}

该插件的一些配置

该插件支持下边这几种配置,所有的配置都包括在 dissect{ }中。

Setting Input Type Required Default Value
add_field hash No {}
add_tag array No []
convert_datatype hash No {}
enable_metric boolean No true
id string No
mapping hash No {}
periodic_flush boolean No false
remove_field array No []
remove_tag array No []
tag_on_failure arrray No [“_dissectfailure”]

1:add_field

如果此过滤器解析成功,便可以为该事件添加任何字段,字段名称可以是动态的,并使用%{field}包括事件部分
eg:

input{
  stdin{
    type => "dissect"

  }
}
filter {
  dissect {
    add_field => {
      "from_%{host}" => "Hello world, now is %{@timestamp}"
      "new_field" => "new_static_value"
    }
  }
}
output{
  stdout{
    codec => rubydebug
  }
  elasticsearch{
    hosts => ["http://localhost:9200"]
    index => "logstash-%{type}"
  }
}

执行

sudo /usr/share/logstash/bin/logstash -f test.conf

输入hello test
显示:

17:08:26.925 [Api Webserver] INFO  logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
hello test
{
         "@timestamp" => 2017-03-28T09:08:29.814Z,
          "new_field" => "new_static_value",
           "@version" => "1",
               "host" => "thinkgamer",
            "message" => "hello test",
               "type" => "dissect",
    "from_thinkgamer" => "Hello world, now is 2017-03-28T09:08:29.814Z"
}

所以说这里的%{somefield} 就是已经存在的字段名称

2:add_tag

替换上边程序的filter为:

filter {
  dissect {
    add_tag => ["foo_%{host}","taggedy_tag" ]
  }
}

运行输入 hello test
显示:

17:12:42.307 [Api Webserver] INFO  logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
hello test
{
    "@timestamp" => 2017-03-28T09:13:05.728Z,
      "@version" => "1",
          "host" => "thinkgamer",
       "message" => "hello test",
          "type" => "dissect",
          "tags" => [
        [0] "foo_thinkgamer",
        [1] "taggedy_tag"
    ]
}

3:convert_datatype

使用此设置,可以指定int和float数据类型转换,如果在elasticsearch中没有使用mapping的话,这将是很有用的

filter {
  dissect {
    convert_datatype{
      cpu => "float"
      code => "int"
    }
  }
}

4:enable_metric

默认情况下我们可以记录所有的指标,但是你可以通过该项配置来禁止或者启动是否记录这些指标

5:id

向插件实例添加唯一ID,此ID用于跟踪插件特定配置的信息。

6:mapping

field=>value 可以对当前解析的值进行以后的解析
eg:

filter {
  dissect {
    mapping => {
      "message" => "%{field1} %{field2} %{description}"
      "description" => "%{field3} %{field4} %{field5}"
    }
  }
}

7:periodic_flush

定期调用过滤器刷新方法,可选

8:remove_field

eg:

filter {
  dissect {
    add_field => {
      "from_%{host}" => "Hello world, now is %{@timestamp}"
      "new_field" => "new_static_value"
    }
    remove_field => ["from_%{host}"]
  }
}

输入hello test
显示:

18:04:09.028 [Api Webserver] INFO  logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
hello test
{
    "@timestamp" => 2017-03-28T10:04:36.910Z,
     "new_field" => "new_static_value",
      "@version" => "1",
          "host" => "thinkgamer",
       "message" => "hello test",
          "type" => "dissect"
}

9:remove_tag

删除标签,可以是动态的
eg:

filter {
  dissect {
    add_tag => ["foo_%{host}","taggedy_tag" ]
    remove_tag => ["foo_%{host}"]
  }
}

输入hello test
显示:

18:06:10.097 [Api Webserver] INFO  logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
hello test
{
    "@timestamp" => 2017-03-28T10:06:35.259Z,
      "@version" => "1",
          "host" => "thinkgamer",
       "message" => "hello test",
          "type" => "dissect",
          "tags" => [
        [0] "taggedy_tag"
    ]
}

10:tag_on_failure

当解析失败时,将值附加到标签字段


Over !

相关文章
Logstash 配置 Grok 语法
Logstash 配置 Grok 语法
533 0
Logstash 配置 Grok 语法
|
6月前
|
数据采集 监控 数据可视化
日志解析神器——Logstash中的Grok过滤器使用详解
日志解析神器——Logstash中的Grok过滤器使用详解
515 4
|
监控 容器
Filebeat配置顶级字段Logstash在output输出到Elasticsearch中的使用
Filebeat配置顶级字段Logstash在output输出到Elasticsearch中的使用
234 0
|
Java Ruby
Error:在安装elasticsearch和logstash程序出现的报错
Error:在安装elasticsearch和logstash程序出现的报错
275 0
|
Java 关系型数据库
Logstash 安装
https://www.elastic.co/cn/downloads/logstash官网一、下载logstash[root@jiaxin-ceshi ~]# cd /usr/local/src/[root@jiaxin-ceshi src]# wget https://artifacts.
9008 0
|
Web App开发 监控 Java
ELK日志处理之使用Grok解析日志
一、简介 Grok是迄今为止使蹩脚的、无结构的日志结构化和可查询的最好方式。Grok在解析 syslog logs、apache and other webserver logs、mysql logs等任意格式的文件上表现完美。
3153 0
|
编解码
collectd 与 logstash配置
节点 node1: 配置logstash node2: 配置collectd, collectd收集本地的信息, 通过配置将信息发送到node1节点 node1安装配置logstash rpm -ivh logstash.
1229 0
|
关系型数据库 MySQL Apache