ELK技术栈 - logstash学习笔记(三)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: ELK技术栈 - logstash学习笔记(三)


读取 Syslog 数据

syslog 可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog 应该会是你的第一选择。尤其是网络设备,比如思科 —— syslog 几乎是唯一可行的办法。

我们这里不解释如何配置你的 syslog.conf, rsyslog.conf 或者 syslog-ng.conf 来发送数据,而只讲如何把 logstash 配置成一个 syslog 服务器来接收数据。

有关 rsyslog 的用法,稍后的类型项目一节中,会有更详细的介绍。

配置示例

input {
  syslog {
    port => "514"
  }
}

运行结果

作为最简单的测试,我们先暂停一下本机的 syslogd (或 rsyslogd )进程,然后启动 logstash 进程(这样就不会有端口冲突问题)。现在,本机的 syslog 就会默认发送到 logstash 里了。我们可以用自带的 logger 命令行工具发送一条 "Hello World"信息到 syslog 里(即 logstash 里)。看到的 logstash 输出像下面这样:

{
           "message" => "Hello World",
          "@version" => "1",
        "@timestamp" => "2014-08-08T09:01:15.911Z",
              "host" => "127.0.0.1",
          "priority" => 31,
         "timestamp" => "Aug  8 17:01:15",
         "logsource" => "raochenlindeMacBook-Air.local",
           "program" => "com.apple.metadata.mdflagwriter",
               "pid" => "381",
          "severity" => 7,
          "facility" => 3,
    "facility_label" => "system",
    "severity_label" => "Debug"
}

解释

Logstash 是用 UDPSocket, TCPServerLogStash::Filters::Grok 来实现 LogStash::Inputs::Syslog 的。所以你其实可以直接用 logstash 配置实现一样的效果:

input {
  tcp {
    port => "8514"
  }
}
filter {
  grok {
    match => ["message", %{SYSLOGLINE} ]
  }
  syslog_pri { }
}

最佳实践

建议在使用 LogStash::Inputs::Syslog 的时候走 TCP 协议来传输数据。

因为具体实现中,UDP 监听器只用了一个线程,而 TCP 监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。

如果你已经在使用 UDP 监听器收集日志,用下行命令检查你的 UDP 接收队列大小:

# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'
Recv-Q
228096

228096 是 UDP 接收队列的默认最大大小,这时候 linux 内核开始丢弃数据包了!

强烈建议使用LogStash::Inputs::TCP和 LogStash::Filters::Grok 配合实现同样的 syslog 功能!

虽然 LogStash::Inputs::Syslog 在使用 TCPServer 的时候可以采用多线程处理数据的接收,但是在同一个客户端数据的处理中,其 grok 和 date 是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降 —— 经过测试,TCPServer 每秒可以接收 50000 条数据,而在同一线程中启用 grok 后每秒只能处理 5000 条,再加上 date 只能达到 500 条!

才将这两步拆分到 filters 阶段后,logstash 支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下, logstash -f tcp.conf -w 20 的测试中,总体处理性能可以达到每秒 30000 条数据!

注:测试采用 logstash 作者提供的 yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000命令。出处见:github.com/jordansisse…

小贴士

如果你实在没法切换到 TCP 协议,你可以自己写程序,或者使用其他基于异步 IO 框架(比如 libev )的项目。下面是一个简单的异步 IO 实现 UDP 监听数据输入 Elasticsearch 的示例:

gist.github.com/chenryn/7c9…

读取 Redis 数据

Redis 服务器是 logstash 官方推荐的 broker 选择。Broker 角色也就意味着会同时存在输入和输出俩个插件。这里我们先学习输入插件。

LogStash::Inputs::Redis 支持三种 data_type(实际上是redis_type),不同的数据类型会导致实际采用不同的 Redis 命令操作:

  • list => BLPOP
  • channel => SUBSCRIBE
  • pattern_channel => PSUBSCRIBE

注意到了么?这里面没有 GET 命令!

Redis 服务器通常都是用作 NoSQL 数据库,不过 logstash 只是用来做消息队列。所以不要担心 logstash 里的 Redis 会撑爆你的内存和磁盘。

配置示例

input {
    redis {
        data_type => "pattern_channel"
        key => "logstash-*"
        host => "192.168.0.2"
        port => 6379
        threads => 5
    }
}

使用方式

基本方法

首先确认你设置的 host 服务器上已经运行了 redis-server 服务,然后打开终端运行 logstash 进程等待输入数据,然后打开另一个终端,输入 redis-cli 命令(先安装好 redis 软件包),在交互式提示符后面输入PUBLISH logstash-demochan "hello world"

# redis-cli
127.0.0.1:6379> PUBLISH logstash-demochan "hello world"

你会在第一个终端里看到 logstash 进程输出类似下面这样的内容:

{
       "message" => "hello world",
      "@version" => "1",
    "@timestamp" => "2014-08-08T16:26:29.399Z"
}

注意:这个事件里没有 host 字段!(或许这算是 bug……)

输入 JSON 数据

如果你想通过 redis 的频道给 logstash 事件添加更多字段,直接向频道发布 JSON 字符串就可以了。 LogStash::Inputs::Redis 会直接把 JSON 转换成事件。

继续在第二个终端的交互式提示符下输入如下内容:

127.0.0.1:6379> PUBLISH logstash-chan '{"message":"hello world","@version":"1","@timestamp":"2014-08-08T16:34:21.865Z","host":"raochenlindeMacBook-Air.local","key1":"value1"}'

你会看到第一个终端里的 logstash 进程随即也返回新的内容,如下所示:

{
       "message" => "hello world",
      "@version" => "1",
    "@timestamp" => "2014-08-09T00:34:21.865+08:00",
          "host" => "raochenlindeMacBook-Air.local",
          "key1" => "value1"
}

看,新的字段出现了!现在,你可以要求开发工程师直接向你的 redis 频道发送信息好了,一切自动搞定。

小贴士

这里我们建议的是使用 pattern_channel 作为输入插件的 data_type 设置值。因为实际使用中,你的 redis 频道可能有很多不同的 keys,一般命名成 logstash-chan-%{type} 这样的形式。这时候 pattern_channel 类型就可以帮助你一次订阅全部 logstash 相关频道!

扩展方式

如上段"小贴士"提到的,之前两个使用场景采用了同样的配置,即数据类型为频道发布订阅方式。这种方式在需要扩展 logstash 成多节点集群的时候,会出现一个问题:通过频道发布的一条信息,会被所有订阅了该频道的 logstash 进程同时接收到,然后输出重复内容!

你可以尝试再做一次上面的实验,这次在两个终端同时启动 logstash -f redis-input.conf 进程,结果会是两个终端都输出消息。

这种时候,就需要用 list 类型。在这种类型下,数据输入到 redis 服务器上暂存,logstash 则连上 redis 服务器取走 (BLPOP 命令,所以只要 logstash 不堵塞,redis 服务器上也不会有数据堆积占用空间)数据。

配置示例

input {
    redis {
        batch_count => 1
        data_type => "list"
        key => "logstash-list"
        host => "192.168.0.2"
        port => 6379
        threads => 5
    }
}

使用方式

这次我们同时在两个终端运行 logstash -f redis-input-list.conf 进程。然后在第三个终端里启动 redis-cli 命令交互:

$ redis-cli 
127.0.0.1:6379> RPUSH logstash-list "hello world"
(integer) 1

这时候你可以看到,只有一个终端输出了结果。

连续 RPUSH 几次,可以看到两个终端近乎各自输出一半条目。

小贴士

RPUSH 支持 batch 方式,修改 logstash 配置中的 batch_count 值,作为示例这里只改到 2,实际运用中可以更大(事实上 LogStash::Outputs::Redis 对应这点的 batch_event 配置默认值就是 50)。

重启 logstash 进程后,redis-cli 命令中改成如下发送:

127.0.0.1:6379> RPUSH logstash-list "hello world" "hello world" "hello world" "hello world" "hello world" "hello world"
(integer) 3

可以看到,两个终端也各自输出一部分结果。而你只用了一次 RPUSH 命令。

推荐阅读

collectd简述

collectd 是一个守护(daemon)进程,用来收集系统性能和提供各种存储方式来存储不同值的机制。它会在系统运行和存储信息时周期性的统计系统的相关统计信息。利用这些信息有助于查找当前系统性能瓶颈(如作为性能分析 performance analysis)和预测系统未来的 load(如能力部署capacity planning)等

下面简单介绍一下: collectd的部署以及与logstash对接的相关配置实例

collectd的安装

解决依赖

rpm -ivh "http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm"
yum -y install libcurl libcurl-devel rrdtool rrdtool-devel perl-rrdtool rrdtool-prel libgcrypt-devel gcc make gcc-c++ liboping liboping-devel perl-CPAN net-snmp net-snmp-devel

源码安装collectd

wget http://collectd.org/files/collectd-5.4.1.tar.gz
tar zxvf collectd-5.4.1.tar.gz
cd collectd-5.4.1
./configure --prefix=/usr/local/software/collectd --sysconfdir=/etc --localstatedir=/var --libdir=/usr/lib --mandir=/usr/share/man --enable-all-plugins
make && make install

安装启动脚本

cp contrib/redhat/init.d-collectd /etc/init.d/collectd
chmod +x /etc/init.d/collectd

启动collectd

service collectd start

collectd的配置

以下配置可以实现对服务器基本的CPU、内存、网卡流量、磁盘 IO 以及磁盘空间占用情况的监控:

Hostname "host.example.com"
LoadPlugin interface
LoadPlugin cpu
LoadPlugin memory
LoadPlugin network
LoadPlugin df
LoadPlugin disk
<Plugin interface>
    Interface "eth0"
    IgnoreSelected false
</Plugin>
<Plugin network>
    <Server "10.0.0.1" "25826"> ## logstash 的 IP 地址和 collectd 的数据接收端口号
    </Server>
</Plugin>

logstash的配置

以下配置实现通过 logstash 监听 25826 端口,接收从 collectd 发送过来的各项检测数据:

示例一:

input {
 collectd {
    port => 25826 ## 端口号与发送端对应
    type => collectd
}

示例二:(推荐)

udp {
    port => 25826
    buffer_size => 1452
    workers => 3          # Default is 2
    queue_size => 30000   # Default is 2000
    codec => collectd { }
    type => "collectd"
}

运行结果

下面是简单的一个输出结果:

{
  "_index": "logstash-2014.12.11",
  "_type": "collectd",
  "_id": "dS6vVz4aRtK5xS86kwjZnw",
  "_score": null,
  "_source": {
    "host": "host.example.com",
    "@timestamp": "2014-12-11T06:28:52.118Z",
    "plugin": "interface",
    "plugin_instance": "eth0",
    "collectd_type": "if_packets",
    "rx": 19147144,
    "tx": 3608629,
    "@version": "1",
    "type": "collectd",
    "tags": [
      "_grokparsefailure"
    ]
  },
  "sort": [
    1418279332118
  ]
}

参考资料

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
消息中间件 存储 运维
一文吃透企业级elk技术栈:1. 基础优化
一文吃透企业级elk技术栈:1. 基础优化
|
3月前
|
监控
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
|
3月前
|
NoSQL 关系型数据库 MySQL
一文吃透企业级elk技术栈:7. 验证结果
一文吃透企业级elk技术栈:7. 验证结果
|
18天前
|
存储 监控 安全
|
3月前
|
消息中间件 Kafka 网络安全
一文吃透企业级elk技术栈:elk 各组件调试
调试需先理解逻辑与程序调用顺序。本文介绍filebeat、kafka、logstash和es的数据推送流程及调试方法:filebeat传输数据检查包括服务状态、配置与日志;kafka调试涵盖服务状态、端口与日志;logstash调试需检查配置文件、日志与流量;es直接通过kibana查看。还介绍了使用rsyslog接收防火墙/waf/交换机日志的方法。
|
3月前
|
监控 关系型数据库 MySQL
一文吃透企业级elk技术栈:11. zabbix报警实现
一文吃透企业级elk技术栈:11. zabbix报警实现
一文吃透企业级elk技术栈:10. es数据生命周期管理
一文吃透企业级elk技术栈:10. es数据生命周期管理
|
3月前
|
NoSQL 关系型数据库 MySQL
一文吃透企业级elk技术栈:6. filebeat安装配置
一文吃透企业级elk技术栈:6. filebeat安装配置
|
3月前
|
监控 NoSQL 关系型数据库
一文吃透企业级elk技术栈:5. logstatsh 安装配置
一文吃透企业级elk技术栈:5. logstatsh 安装配置
|
3月前
|
消息中间件 Kafka
一文吃透企业级elk技术栈:4. kafka 集群部署
一文吃透企业级elk技术栈:4. kafka 集群部署