Logstash收集Rsyslog/syslog日志
syslog默认是通过514端口去发送日志,可以使用logstash安装对应的插件,监听514端口来收集日志。如果只是监听系统日志可以不用安装logstash的agent,只需要监听对应的514端口,收集系统数据即可。
logstash在INPUT插件中提供了syslog的模块,可以用于不安装本地日志收集agent的设备(如硬件防火墙等),当然支持此类协议的普通服务器也适用。
注意:此INPUT插件会同时监听TCP和UDP端口。
可以在安装logstash服务器node2(172.16.10.21)上编写logstash配置文件,指定端口(默认为514)同时指定写入的ES服务器:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
[root@node2 ~]
input {
syslog{
type => "system-syslog"
port => 514
}
}
output {
elasticsearch {
hosts => [ "172.16.10.21:9200" ]
index => "system-syslog-%{+YYYY.MM}"
}
}
|
在node1(172.16.10.20)使syslog自动发送日志信息到logstash,需要修改/etc/rsyslog.conf (CentOS7)的配置文件:
在最后添加一行,指定远程logstash服务器地址和端口:
重启rsyslog服务:
1
|
systemctl restart rsyslog
|
在logstash服务器上指定配置文件启动:
1
|
/opt/logstash/bin/logstash -f /etc/logstash/conf .d /syslog .conf
|
在node1使用logger 命令可以生成系统日志,使Elasticsearch上产生索引:
登录Elasticsearch上就可查询到syslog的日志了,可以在kibana上添加此项。
TCP、UDP日志收集
在有时要对一些日志进行补充或者发送一些文件内容到es上时,可以通过使用TCP模块的方式。
编辑tcp.conf的logstash文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
[root@node2 ~]
input {
tcp {
type => "tcp"
port => "6666"
mode => "server"
}
}
output{
stdout {
codec => rubydebug
}
}
|
启动服务,在其他任何主机上可以通过nc 命令或者伪设备输出的方式,将需要发送的内容发送给node2:
1
2
3
|
[root@node1 ~]
[root@node1 ~]
[root@node1 ~]
|
在node2上,显示接受的信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
[root@node2 ~]
Settings: Default pipeline workers: 1
Pipeline main started
{
"message" => "trying" ,
"@version" => "1" ,
"@timestamp" => "2017-01-03T08:12:10.630Z" ,
"host" => "172.16.10.20" ,
"port" => 57054,
"type" => "tcp"
}
{
"message" => "# Generated by NetworkManager" ,
"@version" => "1" ,
"@timestamp" => "2017-01-03T08:13:11.081Z" ,
"host" => "172.16.10.20" ,
"port" => 57248,
"type" => "tcp"
}
{
"message" => "nameserver 114.114.114.114" ,
"@version" => "1" ,
"@timestamp" => "2017-01-03T08:13:11.081Z" ,
"host" => "172.16.10.20" ,
"port" => 57248,
"type" => "tcp"
}
{
"message" => "stuff" ,
"@version" => "1" ,
"@timestamp" => "2017-01-03T08:15:14.224Z" ,
"host" => "172.16.10.20" ,
"port" => 57634,
"type" => "tcp"
}
|
使用Filter模块对Apache日志进行格式化
在logstash中可以对一些输入的格式进行格式化,如记录Apache日志时,由于自身不具有像nginx提供的JSON格式化的输出,所以在进行记录时,需要使用filter插件的grok来进行正则匹配。
在logstash中,已经默认提供了方便使用的正则表示方式,在/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns保存了相关的配置。
编写logstash的配置文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
[root@node2 patterns]
input{
stdin {}
}
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
output {
stdout{
codec => rubydebug
}
}
|
官方文档提供了示例,可以直接使用https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
启动服务,输入测试示例,返回格式化的输出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
[root@node2 ~]
Settings: Default pipeline workers: 1
Pipeline main started
55.3.244.1 GET /index .html 15824 0.043
{
"message" => "55.3.244.1 GET /index.html 15824 0.043" ,
"@version" => "1" ,
"@timestamp" => "2017-01-03T08:59:44.264Z" ,
"host" => "node2" ,
"client" => "55.3.244.1" ,
"method" => "GET" ,
"request" => "/index.html" ,
"bytes" => "15824" ,
"duration" => "0.043"
}
|
使用grok收集Apache日志到Elasticsearch.
在默认的Apache日志格式下,logstash提供了一个已经编写好的日志格式正则filter:
1
2
3
4
|
[root@node2 ~]
| grep "COMBINEDAPACHELOG"
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
|
编写logstash配置文件,引用此参数COMBINEDAPACHELOG:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
[root@node2 ~]
input{
file {
path => "/var/log/httpd/access_log"
start_position => "beginning"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
output {
elasticsearch {
hosts => [ "172.16.10.20:9200" ]
index => "apache-access-log-%{+YYYY.MM.dd}"
}
}
|
这样,默认的日志文件就可以被格式化输出到Elasticsearch上了。
使用grok会有一些问题,比如会非常影响性能,而且很不灵活,除非对ruby掌控很好。
在实际的生产环境中,为了实现松耦合和性能问题,一般将日志写入redis,再使用python对其进行格式化处理。
使用消息队列扩展ELK
数据 >> lostash >> MQ/Redis >> logstash >> ES
在实现低耦合和高可靠性的ELK部署架构中,使用消息队列的方式来作为一个中间转存的数据交换中心,将数据文件需要写入到消息队列,而对消息队列中数据的处理可有其它任意的应用去实现,整个过程写入和输出系统没有任何依赖关系。
这里使用最简单的redis来实现这一功能。
在node2上安装redis,将收集的数据发送到redis中,不做任何处理,这里需要先修改redis的配置文件:
1
2
3
|
daemonize yes
bind 172.16.10.21
|
通过使用logstash将数据写入redis,创建logstash的配置文件,用于将数据写入redis:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
[root@node2 ~]
input{
stdin {
}
}
output {
redis {
host => "172.16.10.21"
port => "6379"
db => "6"
data_type => "list"
key => "demo"
}
}
|
启动redis,指定配置文件启动logstash:
1
2
3
4
|
[root@node2 ~]
Settings: Default pipeline workers: 1
Pipeline main started
try
|
在redis上可以查看到输入的数据:
1
2
3
4
5
6
7
8
9
10
11
|
[root@node2 ~]
172.16.10.21:6379> info
db6:keys=2,expires=0,avg_ttl=0
172.16.10.21:6379> select 6
OK
172.16.10.21:6379[6]> keys *
1) "apache-accesslog"
2) "demo"
172.16.10.21:6379[6]> lindex demo -1
"{\"message\":\"try\",\"@version\":\"1\",\"@timestamp\":\"2017-01-04T03:29:28.337Z\",\"host\":\"node2\"}"
|
使用另一个logstash将数据从redis中取出:
在node1上配置logstash的配置文件,指定文件启动,此时会自动将redis上的数据读出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
[root@node1 ~]
input {
redis {
host => "172.16.10.21"
port => "6379"
db => "6"
data_type => "list"
key => "demo"
}
}
filter {}
output {
stdout{
codec => rubydebug
}
}
|
1
2
3
4
5
6
7
8
9
|
[root@node1 conf.d]
Settings: Default pipeline workers: 1
Pipeline main started
{
"message" => "try" ,
"@version" => "1" ,
"@timestamp" => "2017-01-03T12:10:20.635Z" ,
"host" => "node2"
}
|
上面使用的是手动输入的方式,在实际收集日志时,可以在输入时指定文件,输出时指定到Elasticsearch。
使用Redis收集Apache访问日志
在上面的示例中,我们使用了直接通过grok来收集Apache的日志,但是在实际的生产环境,由于这种方案会使性能下降,而且在整个架构上过度依赖于logstash,无法做到解耦,并不是一个好的解决方案。这里引入消息队列,作为一个解耦,上日志的存取分开,方便后期扩展,提高了稳定性。
在node2上配置logstash 配置文件,指定Apache的access文件,并启动:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
[root@node2 ~]
input{
file {
path => "/var/log/httpd/access_log"
start_position => "beginning"
}
}
output {
redis {
host => "172.16.10.21"
port => "6379"
db => "6"
data_type => "list"
key => "apache-log"
}
}
|
此处直接完整输入内容到redis,不做任何处理。
在node1上从redis中取出日志,并通过grok进行日志格式化处理,之后输出到Elasticsearch中。
node1上logstash的配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[root@node1 ~]
input {
redis {
host => "172.16.10.21"
port => "6379"
db => "6"
data_type => "list"
key => "apache-log"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
output {
elasticsearch{
hosts => [ "172.16.10.20:9200" ]
index => "apache-log-%{+YYYY.MM.dd}"
}
}
|
这样,启动node1和node2上的logstash,在指定的Elasticsearch上就可以看到被格式化后的日志输出了。
和redis类似,可以使用Kafka作为存储数据的中间节点:https://www.unixhot.com/article/61
ELK项目生产规划
在实际的生产环境中要对日志进行收集,首先需要对日志分类:
访问日志: apache,nginx, tomcat 等web访问日志 通过file插件收集,apache通过filter插件处理格式
错误日志:error log,java 日志 等可以直接收取,对于一条多行的日志(java异常)使用多行处理插件
系统日志: /var/log syslog 等 直接使用 syslog插件收取
网络日志: 防火墙,交换机,路由器等 使用 syslog日志
运行日志:应用程序定义 使用file 插件, 最好为json格式
标准化: 统一存放路径,命名规则,格式(json),日志切割(crontab). 需要保存的日志先在本地读写,提高性能,通过rsync推送到指定存储,防止挂载故障而使服务终止。删除本地日志(确认保留时间)
工具化:可以使用logstash进行日志收集方案。
如果使用redis作为消息队列,那么要对所有的list key的长度进行监控,防止溢出和数据丢失
llen key_name
根据实际情况,例如超过 10万 就报警。
如将之前的所有配置合并到一处,在使用redis作为存储时,可以这样编辑logstash配置文件:
从redis服务器接受日志,处理后存入Elasticsearch:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
input {
syslog {
type => "system-syslog"
port => 514
}
redis {
type => "apache-accesslog"
host => "192.168.56.12"
port => "6379"
db => "6"
data_type => "list"
key => "apache-accesslog"
}
redis {
type => "es-log"
host => "192.168.56.12"
port => "6379"
db => "6"
data_type => "list"
key => "es-log"
}
}
filter {
if [ type ] == "apache-accesslog" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
}
output {
if [ type ] == "apache-accesslog" {
elasticsearch {
hosts => [ "192.168.56.11:9200" ]
index => "apache-accesslog-%{+YYYY.MM.dd}"
}
}
if [ type ] == "es-log" {
elasticsearch {
hosts => [ "192.168.56.11:9200" ]
index => "es-log-%{+YYYY.MM}"
}
}
if [ type ] == "system-syslog" {
elasticsearch {
hosts => [ "192.168.56.11:9200" ]
index => "system-syslog-%{+YYYY.MM}"
}
}
}
|
从服务器接收日志,将日志存入redis/MQ:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
input {
file {
path => "/var/log/httpd/access_log"
start_position => "beginning"
type => "apache-accesslog"
}
file {
path => "/var/log/elasticsearch/elasticsearch.log"
type => "es-log"
start_position => "beginning"
codec => multiline{
pattern => "^\["
negate => true
what => "previous"
}
}
}
output {
if [ type ] == "apache-accesslog" {
redis {
host => "192.168.56.12"
port => "6379"
db => "6"
data_type => "list"
key => "apache-accesslog"
}
}
if [ type ] == "es-log" {
redis {
host => "192.168.56.12"
port => "6379"
db => "6"
data_type => "list"
key => "es-log"
}
}
}
|
这里使用了if做为了条件的判断,来对不同的日志进行选择处理,在一些系统日志如syslog等,可以单独进行处理。
提示: 在使用系统自带的脚本进行启动时,会出现无法收集日志的情况,脚本中默认的是使用logstash用户,在收集一些日志时可能存在没有权限的情况。可以修改脚本中的默认用户,或者给logstash添加其他用户组的权限。
本文转自 酥心糖 51CTO博客,原文链接:http://blog.51cto.com/tryingstuff/1888978