ELK技术栈 - logstash学习笔记(九)

简介: 插件已经正式合并进官方仓库,以下使用介绍基于logstash 1.4相关版本,1.5及以后版本的使用后续依照官方文档持续更新。

Kafka


github.com/joekiller/l…


插件已经正式合并进官方仓库,以下使用介绍基于logstash 1.4相关版本,1.5及以后版本的使用后续依照官方文档持续更新。


插件本身内容非常简单,其主要依赖同一作者写的 jruby-kafka 模块。需要注意的是:该模块仅支持 Kafka-0.8 版本。如果是使用 0.7 版本 kafka 的,将无法直接使 jruby-kafka 该模块和 logstash-kafka 插件。


安装


  • 安装按照官方文档完全自动化的安装.或是可以通过以下方式手动自己安装插件,不过重点注意的是 kafka 的版本,上面已经指出了。


01.下载 logstash 并解压重命名为 ./logstash-1.4.0 文件目录。

02.下载 kafka 相关组件,以下示例选的为 kafka_2.8.0-0.8.1.1-src,并解压重命名为 ./kafka_2.8.0-0.8.1.1

03.下载 logstash-kafka v0.4.2 从 releases,并解压重命名为 ./logstash-kafka-0.4.2

04.从 ./kafka_2.8.0-0.8.1.1/libs 目录下复制所有的 jar 文件拷贝到 ./logstash-1.4.0/vendor/jar/kafka_2.8.0-0.8.1.1/libs 下,其中你需要创建 kafka_2.8.0-0.8.1.1/libs 相关文件夹及目录。

05.分别复制 ./logstash-kafka-0.4.2/logstash 里的 inputsoutputs 下的 kafka.rb,拷贝到对应的 ./logstash-1.4.0/lib/logstash 里的 inputsoutputs 对应目录下。

06.切换到 ./logstash-1.4.0 目录下,现在需要运行 logstash-kafka 的 gembag.rb 脚本去安装 jruby-kafka 库,执行以下命令: GEM_HOME=vendor/bundle/jruby/1.9 GEM_PATH= java -jar vendor/jar/jruby-complete-1.7.11.jar --1.9 ../logstash-kafka-0.4.2/gembag.rb ../logstash-kafka-0.4.2/logstash-kafka.gemspec

07.现在可以使用 logstash-kafka 插件运行 logstash 了。例如:bin/logstash agent -f logstash.conf


Input 配置示例


以下配置可以实现对 kafka 读取端(consumer)的基本使用。


消费端更多详细的配置请查看 kafka.apache.org/documentati… kafka 官方文档的消费者部分配置文档。


input {
    kafka {
        zk_connect => "localhost:2181"
        group_id => "logstash"
        topic_id => "test"
        reset_beginning => false # boolean (optional), default: false
        consumer_threads => 5  # number (optional), default: 1
        decorate_events => true # boolean (optional), default: false
        }
    }

Input 解释


消费端的一些比较有用的配置项:


  • group_id


消费者分组,可以通过组 ID 去指定,不同的组之间消费是相互不受影响的,相互隔离。


  • topic_id


指定消费话题,也是必填项目,指定消费某个 topic ,这个其实就是订阅某个主题,然后去消费。


  • reset_beginning


logstash 启动后从什么位置开始读取数据,默认是结束位置,也就是说 logstash 进程会以从上次读取结束时的偏移量开始继续读取,如果之前没有消费过,那么就开始从头读取.如果你是要导入原有数据,把这个设定改成 "true", logstash 进程就从头开始读取.有点类似 cat ,但是读到最后一行不会终止,而是变成 tail -F ,继续监听相应数据。


  • decorate_events


在输出消息的时候会输出自身的信息包括:消费消息的大小, topic 来源以及 consumer 的 group 信息。


  • rebalance_max_retries


当有新的 consumer(logstash) 加入到同一 group 时,将会 reblance ,此后将会有 partitions 的消费端迁移到新的 consumer 上,如果一个 consumer 获得了某个 partition 的消费权限,那么它将会向 zookeeper注册, Partition Owner registry 节点信息,但是有可能此时旧的 consumer 尚没有释放此节点,此值用于控制,注册节点的重试次数。


  • consumer_timeout_ms


指定时间内没有消息到达就抛出异常,一般不需要改。


以上是相对重要参数的使用示例,更多参数可以选项可以跟据 github.com/joekiller/l… 查看 input 默认参数。


注意


1.想要使用多个 logstash 端协同消费同一个 topic 的话,那么需要把两个或是多个 logstash 消费端配置成相同的 group_idtopic_id, 但是前提是要把相应的 topic 分多个 partitions (区),多个消费者消费是无法保证消息的消费顺序性的。


这里解释下,为什么要分多个 partitions(区), kafka 的消息模型是对 topic 分区以达到分布式效果。每个 topic 下的不同的 partitions (区)只能有一个 Owner 去消费。所以只有多个分区后才能启动多个消费者,对应不同的区去消费。其中协调消费部分是由 server 端协调而成。不必使用者考虑太多。只是消息的消费则是无序的


总结:保证消息的顺序,那就用一个 partitionkafka 的每个 partition 只能同时被同一个 group 中的一个 consumer 消费


Output 配置


以下配置可以实现对 kafka 写入端 (producer) 的基本使用。


生产端更多详细的配置请查看 kafka.apache.org/documentati… kafka 官方文档的生产者部分配置文档。


 output {
    kafka {
        broker_list => "localhost:9092"
        topic_id => "test"
        compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none"
    }
}

Output 解释


生产的可设置性还是很多的,设置其实更多,以下是更多的设置:


  • compression_codec


消息的压缩模式,默认是 none,可以有 gzip 和 snappy (暂时还未测试开启压缩与不开启的性能,数据传输大小等对比)。


  • compressed_topics


可以针对特定的 topic 进行压缩,设置这个参数为 topic ,表示此 topic 进行压缩。


  • request_required_acks


消息的确认模式:


可以设置为 0: 生产者不等待 broker 的回应,只管发送.会有最低能的延迟和最差的保证性(在服务器失败后会导致信息丢失)


可以设置为 1: 生产者会收到 leader 的回应在 leader 写入之后.(在当前 leader 服务器为复制前失败可能会导致信息丢失)


可以设置为 -1: 生产者会收到 leader 的回应在全部拷贝完成之后。


  • partitioner_class


分区的策略,默认是 hash 取模


  • send_buffer_bytes


socket 的缓存大小设置,其实就是缓冲区的大小


消息模式相关


  • serializer_class


消息体的系列化处理类,转化为字节流进行传输,请注意 encoder 必须和下面的 key_serializer_class 使用相同的类型


  • key_serializer_class


默认的是与 serializer_class 相同


  • producer_type


生产者的类型 async 异步执行消息的发送 sync 同步执行消息的发送


  • queue_buffering_max_ms


异步模式下,那么就会在设置的时间缓存消息,并一次性发送


  • queue_buffering_max_messages


异步的模式下,最长等待的消息数


  • queue_enqueue_timeout_ms


异步模式下,进入队列的等待时间,若是设置为0,那么要么进入队列,要么直接抛弃


  • batch_num_messages


异步模式下,每次发送的最大消息数,前提是触发了 queue_buffering_max_messages 或是 queue_enqueue_timeout_ms 的限制


以上是相对重要参数的使用示例,更多参数可以选项可以跟据 github.com/joekiller/l… 查看 output 默认参数。


小贴士


默认情况下,插件是使用 json 编码来输入和输出相应的消息,消息传递过程中 logstash 默认会为消息编码内加入相应的时间戳和 hostname 等信息。如果不想要以上信息(一般做消息转发的情况下),可以使用以下配置,例如:


 output {
    kafka {
        codec => plain {
            format => "%{message}"
        }
    }
}

HDFS



This plugin based on WebHDFS api of Hadoop, it just POST data to WebHDFS port. So, it's a native Ruby code.

ini


output {
    hadoop_webhdfs {
        workers => 2
        server => "your.nameno.de:14000"
        user => "flume"
        path => "/user/flume/logstash/dt=%{+Y}-%{+M}-%{+d}/logstash-%{+H}.log"
        flush_size => 500
        compress => "snappy"
        idle_flush_time => 10
        retry_interval => 0.5
    }
}


This plugin based on HDFS api of Hadoop, it import java classes like org.apache.hadoop.fs.FileSystemetc.


Configuration


input {
    hdfs {
        path => "/path/to/output_file.log"
        enable_append => true
    }
}

Howto run


CLASSPATH=$(find /path/to/hadoop -name '*.jar' | tr '\n' ':'):/etc/hadoop/conf:/path/to/logstash-1.1.7-mon

scribe


github.com/EverythingM…


input {
        scribe {
                host => "localhost"
                port => 8000
        }
}
java -Xmx400M -server \
   -cp scribe_server.jar:logstash-1.2.1-flatjar.jar \
   logstash.runner agent \
   -p /where/did/i/put/this/downloaded/plugin \
   -f logstash.conf

自己写一个插件


前面已经提过在运行 logstash 的时候,可以通过 --pluginpath 参数来加载自己写的插件。那么,插件又该怎么写呢?


插件格式


一个标准的 logstash 输入插件格式如下:


require 'logstash/namespace'
require 'logstash/inputs/base'
class LogStash::Inputs::MyPlugin < LogStash::Inputs::Base
  config_name 'myplugin'
  milestone 1
  config :myoption_key, :validate => :string, :default => 'myoption_value'
  public def register
  end
  public def run(queue)
  end
end

其中大多数语句在过滤器和输出阶段是共有的。


  • config_name 用来定义该插件写在 logstash 配置文件里的名字;
  • milestone 标记该插件的开发里程碑,一般为1,2,3,如果不再维护的,标记为 0;
  • config 可以定义很多个,即该插件在 logstash 配置文件中的可配置参数。logstash 很温馨的提供了验证方法,确保接收的数据是你期望的数据类型;
  • register logstash 在启动的时候运行的函数,一些需要常驻内存的数据,可以在这一步先完成。比如对象初始化,filters/ruby 插件中的 init 语句等。


小贴士


milestone 级别在 3 以下的,logstash 默认为不足够稳定,会在启动阶段,读取到该插件的时候,输出类似下面这样的一行提示信息,日志级别是 warn。这不代表运行出错!只是提示如果用户碰到 bug,欢迎提供线索。


{:timestamp=>"2015-02-06T10:37:26.312000+0800", :message=>"Using milestone 2 input plugin 'file'. This plugin should be stable, but if you see strange behavior, please let us know! For more information on plugin milestones, see logstash.net/docs/1.4.2-…", :level=>:warn}


插件的关键方法


输入插件独有的是 run 方法。在 run 方法中,必须实现一个长期运行的程序(最简单的就是 loop 指令)。然后在每次收到数据并处理成 event 之后,一定要调用 queue << event 语句。一个输入流程就算是完成了。


而如果是过滤器插件,对应修改成:


require 'logstash/filters/base'
class LogStash::Filters::MyPlugin < LogStash::Filters::Base
  public def filter(event)
  end
end

输出插件则是:


require 'logstash/outputs/base'
class LogStash::Outputs::MyPlugin < LogStash::Outputs::Base
  public def receive(event)
  end
end

另外,为了在终止进程的时候不遗失数据,建议都实现如下这个方法,只要实现了,logstash 在 shutdown 的时候就会自动调用:


public def teardown
end

推荐阅读



为什么用 JRuby?能用 MRI 运行么?


对日志处理框架有一些了解的都知道,大多数框架都是用 Java 写的,毕竟做大规模系统 Java 有天生优势。而另一个新生代 fluentd 则是标准的 Ruby 产品(即 Matz's Ruby Interpreter)。logstash 选用 JRuby 来实现,似乎有点两头不讨好啊?


乔丹西塞曾经多次著文聊过这个问题。为了避凑字数的嫌,这里罗列他的 gist 地址:


  • Time sucks 一文是关于 Time 对象的性能测试,最快的生成方法是 sprintf 方法,MRI 性能为 82600 call/sec,JRuby1.6.7 为 131000 call/sec,而 JRuby1.7.0 为 215000 call/sec。
  • Comparing egexp patterns speeds 一文是关于正则表达式的性能测试,使用的正则统一为 (?-mix:('(?:[^\\']+|(?:\\.)+)*')),结果 MRI1.9.2 为 530000 matches/sec,而 JRuby1.6.5 为 690000 matches/sec。
  • Logstash performance under ruby一文是关于 logstash 本身数据流转性能的测试,使用 inputs/generator 插件生成数据,outputs/stdout 到 pv 工具记点统计。结果 MRI1.9.3 为 4000 events/sec,而 JRuby1.7.0 为 25000 events/sec。


可能你已经运行着 logstash 并发现自己的线上数据远超过这个测试——这是因为乔丹西塞在2013年之前,一直是业余时间开发 logstash,而且从未用在自己线上过。所以当时的很多测试是在他自己电脑上完成的。


在 logstash 得到大家强烈关注后,作者发表了《logstash needs full time love》,表明了这点并求一份可以让自己全职开发 logstash 的工作,同时列出了1.1.0 版本以后的 roadmap。(不过事实证明当时作者列出来的这些需求其实不紧急,因为大多数,或者说除了 kibana 以外,至今依然没有==!)


时间轴继续向前推,到 2011 年,你会发现 logstash 原先其实也是用 MRI1.8.7 写的!在 grok 模块从 C 扩展改写成 FFI 扩展后,才正式改用 JRuby。


切换语言的当时,乔丹西塞发表了《logstash, why jruby?》大家可以一读。


事实上,时至今日,多种 Ruby 实现的痕迹(到处都有 RUBY_ENGINE 变量判断)依然遍布 logstash 代码各处,作者也力图保证尽可能多的代码能在 MRI 上运行。


作为简单的指示,在和插件无关的核心代码中,只有 LogStash::Event 里生成 @timestamp字段时用了 Java 的 joda 库为 JRuby 仅有的。稍微修改成 Ruby 自带的 Time 库,即可在 MRI 上运行起来。而主要插件中,也只有 filters/date 和 outputs/elasticsearch 是 Java 相关的。

相关文章
|
5月前
|
消息中间件 存储 运维
一文吃透企业级elk技术栈:1. 基础优化
一文吃透企业级elk技术栈:1. 基础优化
|
5月前
|
监控
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
|
5月前
|
NoSQL 关系型数据库 MySQL
一文吃透企业级elk技术栈:7. 验证结果
一文吃透企业级elk技术栈:7. 验证结果
|
2月前
|
存储 JSON Java
ELK 圣经:Elasticsearch、Logstash、Kibana 从入门到精通
ELK是一套强大的日志管理和分析工具,广泛应用于日志监控、故障排查、业务分析等场景。本文档将详细介绍ELK的各个组件及其配置方法,帮助读者从零开始掌握ELK的使用。
|
2月前
|
存储 监控 安全
|
5月前
|
消息中间件 Kafka 网络安全
一文吃透企业级elk技术栈:elk 各组件调试
调试需先理解逻辑与程序调用顺序。本文介绍filebeat、kafka、logstash和es的数据推送流程及调试方法:filebeat传输数据检查包括服务状态、配置与日志;kafka调试涵盖服务状态、端口与日志;logstash调试需检查配置文件、日志与流量;es直接通过kibana查看。还介绍了使用rsyslog接收防火墙/waf/交换机日志的方法。
一文吃透企业级elk技术栈:2. ES集群搭建
一文吃透企业级elk技术栈:2. ES集群搭建
|
5月前
|
监控 关系型数据库 MySQL
一文吃透企业级elk技术栈:11. zabbix报警实现
一文吃透企业级elk技术栈:11. zabbix报警实现
一文吃透企业级elk技术栈:10. es数据生命周期管理
一文吃透企业级elk技术栈:10. es数据生命周期管理
|
5月前
|
NoSQL 关系型数据库 MySQL
一文吃透企业级elk技术栈:6. filebeat安装配置
一文吃透企业级elk技术栈:6. filebeat安装配置