数据管道 Logstash 入门(下)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 数据管道 Logstash 入门

exec : 定期执行一个 shell 命令,然后捕获其输出。

示例:

input {
  exec {
    command => "ls"
    interval => 30
  }
}

file : 从文件中流式读取内容。

示例:

input {
  file {
    path => ["/var/log/*.log", "/var/log/message"]
    start_position => "beginning"
  }
}

generator : 生成随机数据。

示例:

input {
  generator {
    count => 3
    lines => [
      "line 1",
      "line 2",
      "line 3"
    ]
  }
}

github : 从 github webhooks 中读取数据。

graphite : 接受 graphite 的 metrics 指标数据。

heartbeat : 生成心跳信息。这样做的一般目的是测试 Logstash 的性能和可用性。

http : Logstash 接受 http 请求作为数据。

http_poller : Logstash 发起 http 请求,读取响应数据。

示例:

input {
  http_poller {
    urls => {
      test1 => "http://localhost:9200"
      test2 => {
        method => get
        user => "AzureDiamond"
        password => "hunter2"
        url => "http://localhost:9200/_cluster/health"
        headers => {
          Accept => "application/json"
        }
     }
    }
    request_timeout => 60
    schedule => { cron => "* * * * * UTC"}
    codec => "json"
    metadata_target => "http_poller_metadata"
  }
}

imap : 从 IMAP 服务器读取邮件。

jdbc : 通过 JDBC 接口导入数据库中的数据。

示例:

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    parameters => { "favorite_artist" => "Beethoven" }
    schedule => "* * * * *"
    statement => "SELECT * from songs where artist = :favorite_artist"
  }
}

kafka : 消费 kafka 中的消息。

示例:

input {
  kafka {
    bootstrap_servers => "127.0.0.1:9092"
    group_id => "consumer_group"
    topics => ["kafka_topic"]
    enable_auto_commit => true
    auto_commit_interval_ms => 5000
    auto_offset_reset => "latest"
    decorate_events => true
    isolation_level => "read_uncommitted"
    max_poll_records => 1000
  }
}

rabbitmq : 从 RabbitMQ 队列中拉取数据。

redis : 从 redis 中读取数据。

stdin : 从标准输入读取数据。

syslog : 读取 syslog 数据。

tcp : 通过 TCP socket 读取数据。

udp : 通过 udp 读取数据。

unix : 通过 UNIX socket 读取数据。

websocket : 通过 websocket 协议 读取数据。

Output plugin

Output 插件定义了数据的输出地,即 logstash 将数据写入何处。

csv : 将数据写入 csv 文件。

elasticsearch : 写入 Elasticsearch 。

email : 发送 email 邮件。

exec : 执行命令。

file : 写入磁盘文件。

graphite : 写入 Graphite 。

http : 发送 http 请求。

influxdb : 写入 InfluxDB 。

kafka : 写入 Kafka 。

mongodb : 写入 MongoDB 。

opentsdb : 写入 OpenTSDB 。

rabbitmq : 写入 RabbitMQ 。

redis : 使用 RPUSH 的方式写入到 Redis 队列。

sink : 将数据丢弃,不写入任何地方。

syslog : 将数据发送到 syslog 服务端。

tcp : 发送 TCP socket。

udp : 发送 UDP 。

webhdfs : 通过 webhdfs REST API 写入 HDFS 。

websocket : 推送 websocket 消息 。

Filter plugin

Filter 插件定义对数据进行如何处理。

aggregate : 聚合数据。

alter : 修改数据

bytes : 将存储大小如 "123 MB" 或 "5.6gb" 的字符串表示形式解析为以字节为单位的数值。

cidr : 检查 IP 地址是否在指定范围内。

示例:

filter {
  cidr {
    add_tag => [ "testnet" ]
    address => [ "%{src_ip}", "%{dst_ip}" ]
    network => [ "192.0.2.0/24" ]
  }
}

cipher : 对数据进行加密或解密

clone : 复制 event 事件

csv : 解析 CSV 格式的数据。

date : 解析字段中的日期数据。

示例,匹配输入的 timestamp 字段,然后替换 @timestamp :

filter {
  date {
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss ZZ"]
    target => "@timestamp"
  }
}

dissect : 使用 %{} 的形式拆分字符串并提取出特定内容,比较常用,具体语法见 dissect 文档。drop : 丢弃这个 event 。

示例:

filter {
  if [loglevel] == "debug" {
    drop { }
  }
}

elapsed : 通过记录开始和结束时间跟踪 event 的耗时。

elasticsearch : 在 elasticsearch 中进行搜索,并将数据复制到当前 event 中。

environment : 将环境变量中的数据存储到 @metadata 字段中。

extractnumbers : 提取字符串中找到的所有数字。

fingerprint : 根据一个或多个字段的内容创建哈希值,并存储到新的字段中。

geoip : 使用绑定的 GeoLite2 数据库添加有关 IP 地址的地理位置的信息,这个插件非常有用,你可以根据 IP 地址得到对应的国家、省份、城市、经纬度等地理位置数据。

示例,通过 clent_ip 字段获取对应的地理位置信息:

filter {
  geoip {
    cache_size => 1000
    default_database_type => "City"
    source => "clent_ip"
    target => "geo"
    tag_on_failure => ["_geoip_city_fail"]
    add_field => {
      "geo_country_name" => "%{[geo][country_name]}"
      "geo_region_name" => "%{[geo][region_name]}"
      "geo_city_name" => "%{[geo][city_name]}"
      "geo_location" => "%{[geo][latitude]},%{[geo][longitude]}"
    }
    remove_field => ["geo"]
  }
}

grok : 通过正则表达式去处理字符串,比较常用,具体语法见 grok 文档。

http : 与外部 web services/REST APIs 集成。

i18n : 从字段中删除特殊字符。

java_uuid : 生成 UUID 。

jdbc_static : 从远程数据库中读取数据,然后丰富 event 。

jdbc_streaming : 执行 SQL 查询然后将结果存储到指定字段。

json : 解析 json 字符串,生成 field 和 value。

示例:

filter {
  json {
    skip_on_invalid_json => true
    source => "message"
  }
}

如果输入的 message 字段是 json 字符串如 "{"a": 1, "b": 2}", 那么解析后就会增加两个字段,字段名分别是 a 和 b 。

kv : 解析 key=value 形式的数据。

memcached : 与外部 memcached 集成。

metrics : logstash 在内存中去聚合指标数据。

mutate : 对字段进行一些常规更改。

示例:

filter {
  mutate {
    split => ["hostname", "."]
    add_field => { "shortHostname" => "%{hostname[0]}" }
  }
  mutate {
    rename => ["shortHostname", "hostname"]
  }
}

prune : 通过黑白名单的方式删除多余的字段。

示例:

filter {
  prune {
    blacklist_names => [ "method", "(referrer|status)", "${some}_field" ]
  }
}

ruby : 执行 ruby 代码。

示例,解析 http://example.com/abc?q=haha 形式字符串中的 query 参数 q 的值 :

filter {
  ruby {
    code => "
      require 'cgi'
      req = event.get('request_uri').split('?')
      query = ''
      if req.length > 1
        query = req[1]
        qh = CGI::parse(query)
        event.set('search_q', qh['q'][0])
      end
    "
  }
}

在 ruby 代码中,字段的获取和设置通过 event.get()event.set() 方法进行操作。

sleep : 休眠指定时间。

split : 拆分字段。

throttle : 限流,限制 event 数量。

translate : 根据指定的字典文件将数据进行对应转换。

示例:

filter {
  translate {
    field => "[http_status]"
    destination => "[http_status_description]"
    dictionary => {
      "100" => "Continue"
      "101" => "Switching Protocols"
      "200" => "OK"
      "500" => "Server Error"
    }
    fallback => "I'm a teapot"
  }
}

truncate : 将字段内容超出长度的部分裁剪掉。

urldecode : 对 urlencoded 的内容进行解码。

useragent : 解析 user-agent 的内容得到诸如设备、操作系统、版本等信息。

示例:

filter {
  # ua_device : 设备
  # ua_name : 浏览器
  # ua_os : 操作系统
  useragent {
    lru_cache_size => 1000
    source => "user_agent"
    target => "ua"
    add_field => {
      "ua_device" => "%{[ua][device]}"
      "ua_name" => "%{[ua][name]}"
      "ua_os" => "%{[ua][os_name]}"
    }
    remove_field => ["ua"]
  }
}

uuid : 生成 UUID 。

xml : 解析 XML 格式的数据。

结语

Logstash 的插件除了本文提到的这些之外还有很多,想要详细的了解每个插件如何使用还是要去查阅官方文档。

得益于 Logstash 的插件体系,你只需要编写一个配置文件,声明使用哪些插件,就可以很轻松的构建数据管道。

目录
相关文章
|
消息中间件 Kafka API
数据管道 Logstash 入门(上)
数据管道 Logstash 入门
117 0
|
NoSQL Redis 索引
Filebeat收集日志数据传输到Redis,通过Logstash来根据日志字段创建不同的ES索引
Filebeat收集日志数据传输到Redis,通过Logstash来根据日志字段创建不同的ES索引
185 0
|
1月前
|
NoSQL 网络协议 Java
【赵渝强老师】Redis的管道Pipeline
Redis采用客户端-服务器模型和请求/响应协议,通常一个请求包括客户端发送查询请求并等待服务端响应。为了提高性能,Redis引入了管道PipeLine技术,可以一次性发送多条命令并一次性返回结果,减少客户端与服务器间的通信次数,从而降低往返延迟。示例代码展示了普通命令和管道命令在插入1万条数据时的性能差异,后者执行时间显著缩短。视频讲解提供了更详细的解释。
|
7月前
|
监控 安全
管道的三种使用方案中,唯一正确而安全的使用方法
管道的三种使用方案中,唯一正确而安全的使用方法
25 0
|
存储 消息中间件 编解码
Logstash收集多数据源数据神器
Logstash收集多数据源数据神器
370 0
Logstash收集多数据源数据神器
|
SQL NoSQL JavaScript
mongo 进阶之——聚合管道
上面这句话的意思是,先用pumber来进行分组,会有两个字段,一个是"_id"和"count",在后一个管道中用1表示显示,0表示不显示
mongo 进阶之——聚合管道
|
数据采集 Dubbo 应用服务中间件
使用 Logstash 导入流式数据|学习笔记
快速学习使用 Logstash 导入流式数据
141 0
使用 Logstash 导入流式数据|学习笔记
|
开发框架 JSON 监控
如何利用NLog输出结构化日志,并在Kibana优雅分析日志?
今天我们来看下如何向ES输出结构化日志、在Kibana中分析日志。
如何利用NLog输出结构化日志,并在Kibana优雅分析日志?
|
存储 安全 Apache
为什么我们需要Logstash,Fluentd等日志摄取器?
Fluent-Bit旨在成为日志收集和加工的通用瑞士军刀, 同时Fluent Bit在设计时考虑了性能和低资源消耗。
为什么我们需要Logstash,Fluentd等日志摄取器?
|
存储 数据处理 iOS开发
干货 | Logstash自定义正则表达式ETL实战
本文建立在干货 | Logstash Grok数据结构化ETL实战上,并专注于在Grok中使用自定义正则表达式。 有时Logstash没有我们需要的模式。 幸运的是,我们有正则表达式库:Oniguruma。 Oniguruma是一个灵活的正则表达式库。 它包含多种语言的不同正则表达式实现的特性。 Github地址:https://github.com/kkos/oniguruma
720 0
干货 | Logstash自定义正则表达式ETL实战