exec
: 定期执行一个 shell 命令,然后捕获其输出。
示例:
input { exec { command => "ls" interval => 30 } }
file
: 从文件中流式读取内容。
示例:
input { file { path => ["/var/log/*.log", "/var/log/message"] start_position => "beginning" } }
generator
: 生成随机数据。
示例:
input { generator { count => 3 lines => [ "line 1", "line 2", "line 3" ] } }
•github
: 从 github webhooks 中读取数据。
•graphite
: 接受 graphite 的 metrics 指标数据。
•heartbeat
: 生成心跳信息。这样做的一般目的是测试 Logstash 的性能和可用性。
•http
: Logstash 接受 http 请求作为数据。
•http_poller
: Logstash 发起 http 请求,读取响应数据。
示例:
input { http_poller { urls => { test1 => "http://localhost:9200" test2 => { method => get user => "AzureDiamond" password => "hunter2" url => "http://localhost:9200/_cluster/health" headers => { Accept => "application/json" } } } request_timeout => 60 schedule => { cron => "* * * * * UTC"} codec => "json" metadata_target => "http_poller_metadata" } }
•imap
: 从 IMAP 服务器读取邮件。
•jdbc
: 通过 JDBC 接口导入数据库中的数据。
示例:
input { jdbc { jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb" jdbc_user => "mysql" parameters => { "favorite_artist" => "Beethoven" } schedule => "* * * * *" statement => "SELECT * from songs where artist = :favorite_artist" } }
•kafka
: 消费 kafka 中的消息。
示例:
input { kafka { bootstrap_servers => "127.0.0.1:9092" group_id => "consumer_group" topics => ["kafka_topic"] enable_auto_commit => true auto_commit_interval_ms => 5000 auto_offset_reset => "latest" decorate_events => true isolation_level => "read_uncommitted" max_poll_records => 1000 } }
•rabbitmq
: 从 RabbitMQ 队列中拉取数据。
•redis
: 从 redis 中读取数据。
•stdin
: 从标准输入读取数据。
•syslog
: 读取 syslog 数据。
•tcp
: 通过 TCP socket 读取数据。
•udp
: 通过 udp 读取数据。
•unix
: 通过 UNIX socket 读取数据。
•websocket
: 通过 websocket 协议 读取数据。
Output plugin
Output
插件定义了数据的输出地,即 logstash 将数据写入何处。
•csv
: 将数据写入 csv 文件。
•elasticsearch
: 写入 Elasticsearch 。
•email
: 发送 email 邮件。
•exec
: 执行命令。
•file
: 写入磁盘文件。
•graphite
: 写入 Graphite 。
•http
: 发送 http 请求。
•influxdb
: 写入 InfluxDB 。
•kafka
: 写入 Kafka 。
•mongodb
: 写入 MongoDB 。
•opentsdb
: 写入 OpenTSDB 。
•rabbitmq
: 写入 RabbitMQ 。
•redis
: 使用 RPUSH 的方式写入到 Redis 队列。
•sink
: 将数据丢弃,不写入任何地方。
•syslog
: 将数据发送到 syslog 服务端。
•tcp
: 发送 TCP socket。
•udp
: 发送 UDP 。
•webhdfs
: 通过 webhdfs REST API 写入 HDFS 。
•websocket
: 推送 websocket 消息 。
Filter plugin
Filter
插件定义对数据进行如何处理。
•aggregate
: 聚合数据。
•alter
: 修改数据
•bytes
: 将存储大小如 "123 MB" 或 "5.6gb" 的字符串表示形式解析为以字节为单位的数值。
•cidr
: 检查 IP 地址是否在指定范围内。
示例:
filter { cidr { add_tag => [ "testnet" ] address => [ "%{src_ip}", "%{dst_ip}" ] network => [ "192.0.2.0/24" ] } }
•cipher
: 对数据进行加密或解密
•clone
: 复制 event 事件
•csv
: 解析 CSV 格式的数据。
•date
: 解析字段中的日期数据。
示例,匹配输入的 timestamp 字段,然后替换 @timestamp :
filter { date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss ZZ"] target => "@timestamp" } }
•dissect
: 使用 %{}
的形式拆分字符串并提取出特定内容,比较常用,具体语法见 dissect 文档。•drop
: 丢弃这个 event 。
示例:
filter { if [loglevel] == "debug" { drop { } } }
•elapsed
: 通过记录开始和结束时间跟踪 event 的耗时。
•elasticsearch
: 在 elasticsearch 中进行搜索,并将数据复制到当前 event 中。
•environment
: 将环境变量中的数据存储到 @metadata 字段中。
•extractnumbers
: 提取字符串中找到的所有数字。
•fingerprint
: 根据一个或多个字段的内容创建哈希值,并存储到新的字段中。
•geoip
: 使用绑定的 GeoLite2 数据库添加有关 IP 地址的地理位置的信息,这个插件非常有用,你可以根据 IP 地址得到对应的国家、省份、城市、经纬度等地理位置数据。
示例,通过 clent_ip 字段获取对应的地理位置信息:
filter { geoip { cache_size => 1000 default_database_type => "City" source => "clent_ip" target => "geo" tag_on_failure => ["_geoip_city_fail"] add_field => { "geo_country_name" => "%{[geo][country_name]}" "geo_region_name" => "%{[geo][region_name]}" "geo_city_name" => "%{[geo][city_name]}" "geo_location" => "%{[geo][latitude]},%{[geo][longitude]}" } remove_field => ["geo"] } }
•grok
: 通过正则表达式去处理字符串,比较常用,具体语法见 grok 文档。
•http
: 与外部 web services/REST APIs 集成。
•i18n
: 从字段中删除特殊字符。
•java_uuid
: 生成 UUID 。
•jdbc_static
: 从远程数据库中读取数据,然后丰富 event 。
•jdbc_streaming
: 执行 SQL 查询然后将结果存储到指定字段。
•json
: 解析 json 字符串,生成 field 和 value。
示例:
filter { json { skip_on_invalid_json => true source => "message" } }
如果输入的 message 字段是 json 字符串如 "{"a": 1, "b": 2}"
, 那么解析后就会增加两个字段,字段名分别是 a 和 b 。
•kv
: 解析 key=value 形式的数据。
•memcached
: 与外部 memcached 集成。
•metrics
: logstash 在内存中去聚合指标数据。
•mutate
: 对字段进行一些常规更改。
示例:
filter { mutate { split => ["hostname", "."] add_field => { "shortHostname" => "%{hostname[0]}" } } mutate { rename => ["shortHostname", "hostname"] } }
•prune
: 通过黑白名单的方式删除多余的字段。
示例:
filter { prune { blacklist_names => [ "method", "(referrer|status)", "${some}_field" ] } }
•ruby
: 执行 ruby 代码。
示例,解析 http://example.com/abc?q=haha
形式字符串中的 query 参数 q 的值 :
filter { ruby { code => " require 'cgi' req = event.get('request_uri').split('?') query = '' if req.length > 1 query = req[1] qh = CGI::parse(query) event.set('search_q', qh['q'][0]) end " } }
在 ruby 代码中,字段的获取和设置通过 event.get()
和 event.set()
方法进行操作。
•sleep
: 休眠指定时间。
•split
: 拆分字段。
•throttle
: 限流,限制 event 数量。
•translate
: 根据指定的字典文件将数据进行对应转换。
示例:
filter { translate { field => "[http_status]" destination => "[http_status_description]" dictionary => { "100" => "Continue" "101" => "Switching Protocols" "200" => "OK" "500" => "Server Error" } fallback => "I'm a teapot" } }
•truncate
: 将字段内容超出长度的部分裁剪掉。
•urldecode
: 对 urlencoded 的内容进行解码。
•useragent
: 解析 user-agent 的内容得到诸如设备、操作系统、版本等信息。
示例:
filter { # ua_device : 设备 # ua_name : 浏览器 # ua_os : 操作系统 useragent { lru_cache_size => 1000 source => "user_agent" target => "ua" add_field => { "ua_device" => "%{[ua][device]}" "ua_name" => "%{[ua][name]}" "ua_os" => "%{[ua][os_name]}" } remove_field => ["ua"] } }
•uuid
: 生成 UUID 。
•xml
: 解析 XML 格式的数据。
结语
Logstash 的插件除了本文提到的这些之外还有很多,想要详细的了解每个插件如何使用还是要去查阅官方文档。
得益于 Logstash 的插件体系,你只需要编写一个配置文件,声明使用哪些插件,就可以很轻松的构建数据管道。