ELK日志管理深度实战

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
日志服务 SLS,月写入数据量 50GB 1个月
简介:

Logstash收集Rsyslog/syslog日志

syslog默认是通过514端口去发送日志,可以使用logstash安装对应的插件,监听514端口来收集日志。如果只是监听系统日志可以不用安装logstash的agent,只需要监听对应的514端口,收集系统数据即可。

logstash在INPUT插件中提供了syslog的模块,可以用于不安装本地日志收集agent的设备(如硬件防火墙等),当然支持此类协议的普通服务器也适用。

注意:此INPUT插件会同时监听TCP和UDP端口。


可以在安装logstash服务器node2(172.16.10.21)上编写logstash配置文件,指定端口(默认为514)同时指定写入的ES服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@node2 ~] # cat /etc/logstash/conf.d/syslog.conf 
input {
      syslog{
      type  =>  "system-syslog"
      port => 514
}
}
output {
     elasticsearch {
         hosts => [ "172.16.10.21:9200" ]          # 指定写入的ES服务器
         index =>  "system-syslog-%{+YYYY.MM}"
     }
}

在node1(172.16.10.20)使syslog自动发送日志信息到logstash,需要修改/etc/rsyslog.conf (CentOS7)的配置文件:

在最后添加一行,指定远程logstash服务器地址和端口:

1
*.* @@172.16.10.21:514

重启rsyslog服务:

1
systemctl restart rsyslog

在logstash服务器上指定配置文件启动:

1
/opt/logstash/bin/logstash  -f  /etc/logstash/conf .d /syslog .conf

在node1使用logger 命令可以生成系统日志,使Elasticsearch上产生索引:

1
logger trying

登录Elasticsearch上就可查询到syslog的日志了,可以在kibana上添加此项。


TCP、UDP日志收集

在有时要对一些日志进行补充或者发送一些文件内容到es上时,可以通过使用TCP模块的方式。

编辑tcp.conf的logstash文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@node2 ~] # cat /etc/logstash/conf.d/tcp.conf 
input {
     tcp {
         type  =>  "tcp"
         port =>  "6666"
         mode =>  "server"
    }
}
output{
     stdout {
         codec => rubydebug
}
}

启动服务,在其他任何主机上可以通过nc 命令或者伪设备输出的方式,将需要发送的内容发送给node2:

1
2
3
[root@node1 ~] # echo "trying"|nc 172.16.10.21 6666
[root@node1 ~] # nc 172.16.10.21 6666 < /etc/resolv.conf 
[root@node1 ~] # echo "stuff" > /dev/tcp/172.16.10.21/6666

在node2上,显示接受的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
[root@node2 ~] # /opt/logstash/bin/logstash -f /etc/logstash/conf.d/tcp.conf 
Settings: Default pipeline workers: 1
Pipeline main started
{
        "message"  =>  "trying" ,
       "@version"  =>  "1" ,
     "@timestamp"  =>  "2017-01-03T08:12:10.630Z" ,
           "host"  =>  "172.16.10.20" ,
           "port"  => 57054,
           "type"  =>  "tcp"
}
{
        "message"  =>  "# Generated by NetworkManager" ,
       "@version"  =>  "1" ,
     "@timestamp"  =>  "2017-01-03T08:13:11.081Z" ,
           "host"  =>  "172.16.10.20" ,
           "port"  => 57248,
           "type"  =>  "tcp"
}
{
        "message"  =>  "nameserver 114.114.114.114" ,
       "@version"  =>  "1" ,
     "@timestamp"  =>  "2017-01-03T08:13:11.081Z" ,
           "host"  =>  "172.16.10.20" ,
           "port"  => 57248,
           "type"  =>  "tcp"
}
{
        "message"  =>  "stuff" ,
       "@version"  =>  "1" ,
     "@timestamp"  =>  "2017-01-03T08:15:14.224Z" ,
           "host"  =>  "172.16.10.20" ,
           "port"  => 57634,
           "type"  =>  "tcp"
}


使用Filter模块对Apache日志进行格式化

在logstash中可以对一些输入的格式进行格式化,如记录Apache日志时,由于自身不具有像nginx提供的JSON格式化的输出,所以在进行记录时,需要使用filter插件的grok来进行正则匹配。

在logstash中,已经默认提供了方便使用的正则表示方式,在/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns保存了相关的配置。

编写logstash的配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@node2 patterns] # cat /etc/logstash/conf.d/grok.conf 
input{
     stdin {}
}
filter { 
      grok {
     match => {  "message"  =>  "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"  }
   }
}
output {
     stdout{
         codec => rubydebug
}
}

官方文档提供了示例,可以直接使用https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html 

启动服务,输入测试示例,返回格式化的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@node2 ~] # /opt/logstash/bin/logstash -f /etc/logstash/conf.d/grok.conf 
Settings: Default pipeline workers: 1
Pipeline main started
55.3.244.1 GET  /index .html 15824 0.043   #输入的示例
{
        "message"  =>  "55.3.244.1 GET /index.html 15824 0.043" ,
       "@version"  =>  "1" ,
     "@timestamp"  =>  "2017-01-03T08:59:44.264Z" ,
           "host"  =>  "node2" ,
         "client"  =>  "55.3.244.1" ,
         "method"  =>  "GET" ,
        "request"  =>  "/index.html" ,
          "bytes"  =>  "15824" ,
       "duration"  =>  "0.043"
}

使用grok收集Apache日志到Elasticsearch.


在默认的Apache日志格式下,logstash提供了一个已经编写好的日志格式正则filter:

1
2
3
4
[root@node2 ~] # cat /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns/grok-patterns \
| grep  "COMBINEDAPACHELOG"
 
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}


编写logstash配置文件,引用此参数COMBINEDAPACHELOG:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@node2 ~] # cat /etc/logstash/conf.d/grok.conf                              
input{
     file  {
         path =>  "/var/log/httpd/access_log"
         start_position =>  "beginning"
}
}
filter { 
      grok {
     match => {  "message"  =>  "%{COMBINEDAPACHELOG}"  }   #调用此参数
   }
}
output {
     elasticsearch {
         hosts => [ "172.16.10.20:9200" ]
         index =>  "apache-access-log-%{+YYYY.MM.dd}"
}
#   stdout{ codec => rubydebug  }
}

这样,默认的日志文件就可以被格式化输出到Elasticsearch上了。


使用grok会有一些问题,比如会非常影响性能,而且很不灵活,除非对ruby掌控很好。

在实际的生产环境中,为了实现松耦合和性能问题,一般将日志写入redis,再使用python对其进行格式化处理。


使用消息队列扩展ELK


数据 >> lostash >> MQ/Redis >> logstash >> ES

在实现低耦合和高可靠性的ELK部署架构中,使用消息队列的方式来作为一个中间转存的数据交换中心,将数据文件需要写入到消息队列,而对消息队列中数据的处理可有其它任意的应用去实现,整个过程写入和输出系统没有任何依赖关系。

这里使用最简单的redis来实现这一功能。

在node2上安装redis,将收集的数据发送到redis中,不做任何处理,这里需要先修改redis的配置文件:

1
2
3
# vim /etc/redis.conf 
daemonize  yes           # 使用台运行模式
bind 172.16.10.21       # 绑定监听的IP

通过使用logstash将数据写入redis,创建logstash的配置文件,用于将数据写入redis:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@node2 ~] # cat /etc/logstash/conf.d/redis.conf 
input{
     stdin {
}
}
output {
         redis {
           host =>  "172.16.10.21"
           port =>  "6379"
           db =>  "6"
           data_type =>  "list"
           key =>  "demo"
}
}

启动redis,指定配置文件启动logstash:

1
systemctl start redis
1
2
3
4
[root@node2 ~] # /opt/logstash/bin/logstash -f /etc/logstash/conf.d/redis.conf 
Settings: Default pipeline workers: 1
Pipeline main started
try

在redis上可以查看到输入的数据:

1
2
3
4
5
6
7
8
9
10
11
[root@node2 ~] # redis-cli  -h 172.16.10.21 -p 6379
172.16.10.21:6379> info
# Keyspace
db6:keys=2,expires=0,avg_ttl=0
172.16.10.21:6379>  select  6
OK
172.16.10.21:6379[6]> keys *
1)  "apache-accesslog"
2)  "demo"
172.16.10.21:6379[6]> lindex demo -1
"{\"message\":\"try\",\"@version\":\"1\",\"@timestamp\":\"2017-01-04T03:29:28.337Z\",\"host\":\"node2\"}"

使用另一个logstash将数据从redis中取出:

在node1上配置logstash的配置文件,指定文件启动,此时会自动将redis上的数据读出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@node1 ~] # cat /etc/logstash/conf.d/redis-get.conf 
input {
        redis {
           host =>  "172.16.10.21"
           port =>  "6379"
           db =>  "6"
           data_type =>  "list"
           key =>  "demo"
}
}
filter {}
output {
     stdout{
         codec => rubydebug
}
}
1
2
3
4
5
6
7
8
9
[root@node1 conf.d] # /opt/logstash/bin/logstash -f /etc/logstash/conf.d/redis-get.conf 
Settings: Default pipeline workers: 1
Pipeline main started
{
        "message"  =>  "try" ,
       "@version"  =>  "1" ,
     "@timestamp"  =>  "2017-01-03T12:10:20.635Z" ,
           "host"  =>  "node2"
}

上面使用的是手动输入的方式,在实际收集日志时,可以在输入时指定文件,输出时指定到Elasticsearch。


使用Redis收集Apache访问日志

在上面的示例中,我们使用了直接通过grok来收集Apache的日志,但是在实际的生产环境,由于这种方案会使性能下降,而且在整个架构上过度依赖于logstash,无法做到解耦,并不是一个好的解决方案。这里引入消息队列,作为一个解耦,上日志的存取分开,方便后期扩展,提高了稳定性。

在node2上配置logstash 配置文件,指定Apache的access文件,并启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@node2 ~] # cat /etc/logstash/conf.d/apache-redis.conf 
input{
     file  {
         path =>  "/var/log/httpd/access_log"
         start_position =>  "beginning"
}
}
output {
         redis {
           host =>  "172.16.10.21"
           port =>  "6379"
           db =>  "6"
           data_type =>  "list"
           key =>  "apache-log"
}
}

此处直接完整输入内容到redis,不做任何处理。

在node1上从redis中取出日志,并通过grok进行日志格式化处理,之后输出到Elasticsearch中。

node1上logstash的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@node1 ~] # cat /etc/logstash/conf.d/redis-get.conf 
input {
        redis {
           host =>  "172.16.10.21"
           port =>  "6379"
           db =>  "6"
           data_type =>  "list"
           key =>  "apache-log"
}
}
filter {
     grok {
     match => {  "message"  =>  "%{COMBINEDAPACHELOG}"  }
   }
}
output {
     elasticsearch{
         hosts => [ "172.16.10.20:9200" ]
         index =>  "apache-log-%{+YYYY.MM.dd}"
}
}

这样,启动node1和node2上的logstash,在指定的Elasticsearch上就可以看到被格式化后的日志输出了。

和redis类似,可以使用Kafka作为存储数据的中间节点:https://www.unixhot.com/article/61 



ELK项目生产规划

在实际的生产环境中要对日志进行收集,首先需要对日志分类:

访问日志: apache,nginx, tomcat 等web访问日志   通过file插件收集,apache通过filter插件处理格式

错误日志:error log,java 日志 等可以直接收取,对于一条多行的日志(java异常)使用多行处理插件

系统日志: /var/log  syslog 等 直接使用 syslog插件收取

网络日志: 防火墙,交换机,路由器等   使用 syslog日志

运行日志:应用程序定义    使用file 插件, 最好为json格式


标准化: 统一存放路径,命名规则,格式(json),日志切割(crontab). 需要保存的日志先在本地读写,提高性能,通过rsync推送到指定存储,防止挂载故障而使服务终止。删除本地日志(确认保留时间)

工具化:可以使用logstash进行日志收集方案。

如果使用redis作为消息队列,那么要对所有的list key的长度进行监控,防止溢出和数据丢失

llen key_name    

根据实际情况,例如超过 10万 就报警。


如将之前的所有配置合并到一处,在使用redis作为存储时,可以这样编辑logstash配置文件:

从redis服务器接受日志,处理后存入Elasticsearch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
input {
    syslog {
      type  =>  "system-syslog"
      port => 514
    }
    redis {
         type  =>  "apache-accesslog"
         host =>  "192.168.56.12"
         port =>  "6379"
         db =>  "6"
         data_type =>  "list"
         key =>  "apache-accesslog"
     }
     redis {
         type  =>  "es-log"
         host =>  "192.168.56.12"
         port =>  "6379"
         db =>  "6"
         data_type =>  "list"
         key =>  "es-log"
     }
}
filter {
     if  [ type ] ==  "apache-accesslog"  {
     grok {
         match => {  "message"  =>  "%{COMBINEDAPACHELOG}"  }
     }
     }
}
output {
     if  [ type ] ==  "apache-accesslog"  {
     elasticsearch {
        hosts => [ "192.168.56.11:9200" ]
         index =>  "apache-accesslog-%{+YYYY.MM.dd}"
     }
     }
     if  [ type ] ==  "es-log"  {
         elasticsearch {
                 hosts => [ "192.168.56.11:9200" ]
                 index =>  "es-log-%{+YYYY.MM}"
         }
     }
     if  [ type ] ==  "system-syslog"  {
         elasticsearch {
                 hosts => [ "192.168.56.11:9200" ]
                 index =>  "system-syslog-%{+YYYY.MM}"
         }
     }
}


从服务器接收日志,将日志存入redis/MQ:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
input {
     file  {
         path =>  "/var/log/httpd/access_log"
         start_position =>  "beginning"
         type  =>  "apache-accesslog"
     }
     file  {
         path =>  "/var/log/elasticsearch/elasticsearch.log"
         type  =>  "es-log"
         start_position =>  "beginning"
         codec => multiline{
           pattern =>  "^\["
           negate =>  true
           what =>  "previous"
         }
     }
}
output {
     if  [ type ] ==  "apache-accesslog"  {
         redis {
                 host =>  "192.168.56.12"
                 port =>  "6379"
                 db =>  "6"
                 data_type =>  "list"
                 key =>  "apache-accesslog"
         }
     }
     if  [ type ] ==  "es-log"  {
     redis {
         host =>  "192.168.56.12"
         port =>  "6379"
         db =>  "6"
         data_type =>  "list"
         key =>  "es-log"
     }
     }
}

这里使用了if做为了条件的判断,来对不同的日志进行选择处理,在一些系统日志如syslog等,可以单独进行处理。


提示: 在使用系统自带的脚本进行启动时,会出现无法收集日志的情况,脚本中默认的是使用logstash用户,在收集一些日志时可能存在没有权限的情况。可以修改脚本中的默认用户,或者给logstash添加其他用户组的权限。



 本文转自 酥心糖 51CTO博客,原文链接:http://blog.51cto.com/tryingstuff/1888978


相关实践学习
通过日志服务实现云资源OSS的安全审计
本实验介绍如何通过日志服务实现云资源OSS的安全审计。
目录
打赏
0
0
0
0
347
分享
相关文章
日志审查安排工具实战攻略:中小团队如何通过日志审查安排工具建立可控、安全的审查机制?
在审计敏感时代,日志审查安排工具成为安全运维与合规管理的关键利器。它实现审查任务的流程化、周期化与可视化,支持多系统协作、责任到人,确保“可控、可查、可追”的日志治理。工具如板栗看板、Asana、Monday 等提供任务调度、问题闭环与合规对接能力,助力企业构建高效、透明的日志审查体系,提升安全与合规水平。
Aipy实战:分析apache2日志中的网站攻击痕迹
Apache2日志系统灵活且信息全面,但安全分析、实时分析和合规性审计存在较高技术门槛。为降低难度,可借助AI工具如aipy高效分析日志,快速发现攻击痕迹并提供反制措施。通过结合AI与学习技术知识,新手运维人员能更轻松掌握复杂日志分析任务,提升工作效率与技能水平。
|
10月前
|
超越传统模型:从零开始构建高效的日志分析平台——基于Elasticsearch的实战指南
【10月更文挑战第8天】随着互联网应用和微服务架构的普及,系统产生的日志数据量日益增长。有效地收集、存储、检索和分析这些日志对于监控系统健康状态、快速定位问题以及优化性能至关重要。Elasticsearch 作为一种分布式的搜索和分析引擎,以其强大的全文检索能力和实时数据分析能力成为日志处理的理想选择。
637 6
ELK实现nginx、mysql、http的日志可视化实验
通过本文的步骤,你可以成功配置ELK(Elasticsearch, Logstash, Kibana)来实现nginx、mysql和http日志的可视化。通过Kibana,你可以直观地查看和分析日志数据,从而更好地监控和管理系统。希望这些步骤能帮助你在实际项目中有效地利用ELK来处理日志数据。
423 90
超实用的SpringAOP实战之日志记录
【11月更文挑战第11天】本文介绍了如何使用 Spring AOP 实现日志记录功能。首先概述了日志记录的重要性及 Spring AOP 的优势,然后详细讲解了搭建 Spring AOP 环境、定义日志切面、优化日志内容和格式的方法,最后通过测试验证日志记录功能的准确性和完整性。通过这些步骤,可以有效提升系统的可维护性和可追踪性。
176 1
「测试线排查的一些经验-中篇」&& 调试日志实战
「测试线排查的一些经验-中篇」&& 调试日志实战
114 1
「测试线排查的一些经验-中篇」&& 调试日志实战
|
11月前
|
基于SQL Server事务日志的数据库恢复技术及实战代码详解
基于事务日志的数据库恢复技术是SQL Server中一个非常强大的功能,它能够帮助数据库管理员在数据丢失或损坏的情况下,有效地恢复数据。通过定期备份数据库和事务日志,并在需要时按照正确的步骤恢复,可以最大限度地减少数据丢失的风险。需要注意的是,恢复数据是一个需要谨慎操作的过程,建议在执行恢复操作之前,详细了解相关的操作步骤和注意事项,以确保数据的安全和完整。
511 0
Struts 2 日志管理化身神秘魔法师,洞察应用运行乾坤,演绎奇幻篇章!
【8月更文挑战第31天】在软件开发中,了解应用运行状况至关重要。日志管理作为 Struts 2 应用的关键组件,记录着每个动作和决策,如同监控摄像头,帮助我们迅速定位问题、分析性能和使用情况,为优化提供依据。Struts 2 支持多种日志框架(如 Log4j、Logback),便于配置日志级别、格式和输出位置。通过在 Action 类中添加日志记录,我们能在开发过程中获取详细信息,及时发现并解决问题。合理配置日志不仅有助于调试,还能分析用户行为,提升应用性能和稳定性。
139 0

热门文章

最新文章

AI助理
登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问

你好,我是AI助理

可以解答问题、推荐解决方案等