Zabbix与ELK整合实现对安全日志数据的实时监控告警

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Zabbix与ELK整合实现对安全日志数据的实时监控告警

1 ELK与ZABBIX有什么关系?


ELK大家应该比较熟悉了,zabbix应该也不陌生,那么将ELK和zabbix放到一起的话,可能大家就有疑问了?这两个放到一起是什么目的呢,听我细细道来


ELK是一套日志收集套件,它其实由Elasticsearch、Logstash和Kibana三个软件组成,通过ELK可以收集系统日志、网站日志、应用系统日志等各种日志数据,并且还可以对日志进行过滤、清洗,然后进行集中存放并可用于实时检索、分析。这是ELK的基础功能。


但是有些时候,我们希望在收集日志的时候,能够将日志中的异常信息(警告、错误、失败等信息)及时的提取出来,因为日志中的异常信息意味着操作系统、应用程序可能存在故障,如果能将日志中的故障信息及时的告知运维人员,那么运维就可以第一时间去进行故障排查和处理,进而也就可以避免很多故障的发生。


那么如何才能做到将ELK收集的日志数据中出现的异常信息及时的告知运维人员呢,这就需要用到zabbix了,ELK(更确切的说应该是logstash)可以实时的读取日志的内容,并且还可以过滤日志信息,通过ELK的读取和过滤功能,就可以将日志中的一些异常关键字(error、failed、OutOff、Warning)过滤出来,然后通过logstash的zabbix插件将这个错误日志信息发送给zabbix,那么zabbix在接收到这个数据后,结合自身的机制,然后发起告警动作,这样就实现了日志异常zabbix实时告警的功能了。


2 Logstash与zabbix插件的使用


Logstash支持多种输出介质,比如syslog、HTTP、TCP、elasticsearch、kafka等,而有时候我们想将收集到的日志中一些错误信息输出,并告警时,就用到了logstash-output-zabbix这个插件,此插件可以将Logstash与zabbix进行整合,也就是将Logstash收集到的数据进行过滤,将有错误标识的日志输出到zabbix中,最后通过zabbix的告警机制进行触发、告警。


logstash-output-zabbix是一个社区维护的插件,它默认没有在Logstash中安装,但是安装起来也很容易,直接在logstash中运行如下命令即可:


[root@elk-master bin]# /usr/share/logstash/bin/logstash-plugin install logstash-output-zabbix


其中,/usr/share/logstash/bin/是Logstash的yum默认安装目录,如果是源码安装的需要自己修改


此外,logstash-plugin命令还有多种用法,我们来看一下:


2.1 列出目前已经安装的插件


列出所有已安装的插件


[root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin list


列出已安装的插件及版本信息


[root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin list --verbose


列出包含namefragment的所有已安装插件


[root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin list "*namefragment*"


列出特定组的所有已安装插件( input,filter,codec,output)


[root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin --group output


2.2 安装插件


要安装某个插件,例如安装kafka插件,可执行如下命令:


[root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin install logstash-output-kafka


要使用此命令安装插件,需要你的电脑可以访问互联网。此插件安装方法,会检索托管在公共存储库(RubyGems.org)上的插件,然后下载到本地机器并在Logstash安装之上进行自动安装


2.3 更新插件


每个插件有自己的发布周期和版本更新,这些更新通常是独立于Logstash的发布周期的。因此,有时候需要单独更新插件,可以使用update子命令获得最新版本的插件

更新所有已安装的插件


[root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin update


仅更新指定的插件


[root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin update logstash-output-kafka


2.4 删除插件


如果需要从Logstash插件中删除插件,可执行如下命令:


[root@elk-master ~]# /usr/share/logstash/bin/logstash-plugin remove logstash-output-kafka


这样就删除了logstash-output-kafka插件


3 logstash-output-zabbix插件的使用


logstash-output-zabbix安装好之后,就可以在logstash配置文件中使用了,下面是一个logstash-output-zabbix使用的例子:


zabbix {
        zabbix_host =>"[@metadata][zabbix_host]"
        zabbix_key =>"[@metadata][zabbix_key]"
        zabbix_server_host =>"x.x.x.x"
        zabbix_server_port =>"xxxx"
        zabbix_value ="xxxx"
        }


其中:


  • zabbix_host:表示Zabbix主机名字段名称, 可以是单独的一个字段, 也可以是 @metadata 字段的子字段, 是必需的设置,没有默认值。
  • zabbix_key:表示Zabbix项目键的值,也就是zabbix中的item,此字段可以是单独的一个字段, 也可以是 @metadata 字段的子字段,没有默认值。
  • zabbix_server_host:表示Zabbix服务器的IP或可解析主机名,默认值是 “localhost”,需要设置为zabbix server服务器所在的地址。
  • zabbix_server_port:表示Zabbix服务器开启的监听端口,默认值是10051。
  • zabbix_value:表示要发送给zabbix item监控项的值对应的字段名称,默认值是 “message”,也就是将”message”字段的内容发送给上面zabbix_key定义的zabbix item监控项,当然也可以指定一个具体的字段内容发送给zabbix item监控项。


4 将logstash与zabbix进行整合


这里我们以logstash收集日志,然后对日志进行读取,最后选择关键字进行过滤并调用zabbix告警的流程,来看看如何配置logstash实现zabbix告警。


先说明一下我们的应用需求:通过对系统日志文件的监控,然后去过滤日志信息中的一些关键字,例如ERROR、Failed、WARNING等,将日志中这些信息过滤出来,然后发送到zabbix上,最后借助zabbix的报警功能实现对系统日志中有上述关键字的告警。


对于过滤关键字,进行告警,不同的业务系统,可能关键字不尽相同,例如对http系统,可能需要过滤500、403、503等这些错误码,对于java相关的系统,可能需要过滤OutOfMemoryError、PermGen、Java heap等关键字。在某些业务系统的日志输出中,可能还有一些自定义的错误信息,那么这些也需要作为过滤关键字来使用。


4.1 配置logstash事件配置文件


接下来就是创建一个logstash事件配置文件,这里将配置文件分成三个部分来介绍,首先是input部分,内容如下:


input {
        file {
        path ="/var/log/secure"
        type ="system"
        start_position ="beginning"
}
}


input部分是从/var/log/secure文件中读取数据,start_position 表示从secure文件开头读取内容。接着是filter部分,内容如下:


filter {
    grok {
          match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:message_content}" }#这里通过grok对message字段的数据进行字段划分,这里将message字段划分了5个子字段。其中,message_content字段会在output中用到
         }
    mutate {
           add_field => ["[zabbix_key]","oslogs"] #新增的字段,字段名是zabbix_key,值为oslogs
           add_field => ["[zabbix_host]","Zabbix server"] #新增的字段,字段名是zabbix_host,值可以在这里直接定义,也可以引用字段变量来获取。这里的%{host}获取的就是日志数据的主机名,这个主机名与zabbix web中“主机名称”需要保持一致
           remove_field => ["@version","message"] #这里是删除不需要的字段
        }
    date { #这里是对日志输出中的日期字段进行转换,其中message_timestamp字段是默认输出的时间日期字段,将这个字段的值传给 @timestamp字段
           match => [ "message_timestamp","MMM  d HH:mm:ss","MMM dd HH:mm:ss","ISO8601"]
        }
}


filter部分是个重点,在这个部分中,重点关注的是message_timestamp字段、message_content字段


最后是output部分,内容如下:


output {
        elasticsearch{  #这部分是为把日志次发送到es中便于kibana中进行展示,如果没有部署的可以去掉
                index => "oslogs-%{+YYYY.MM.dd}"
                hosts => ["192.168.73.133:9200"]
                user => "elastic"
                password => "Goldwind@2019"
                sniffing => false
         }
        if [message_content]  =~ /(ERR|error|ERROR|Failed)/ { #定义在message_content字段中,需要过滤的关键字信息,也就是在message_content字段中出现给出的这些关键字,那么就将这些信息发送给zabbix
                   zabbix {
                            zabbix_host =>"[zabbix_host]" #这个zabbix_host将获取上面filter部分定义的字段变量%{host}的值
                            zabbix_key =>"[zabbix_key]" #这个zabbix_key将获取上面filter部分中给出的值
                            zabbix_server_host =>"192.168.73.133" #这是指定zabbix server的IP地址
                            zabbix_server_port =>"10051" #这是指定zabbix server的监听端口
                            zabbix_value =>"message_content" #这个很重要,指定要传给zabbix监控项item(oslogs)的值, zabbix_value默认的值是"message"字段,因为上面我们已经删除了"message"字段,因此,这里需要重新指定,根据上面filter部分对"message"字段的内容划分,这里指定为"message_content"字段,其实,"message_content"字段输出的就是服务器上具体的日志内容
                        }
       }
        #stdout{ codec => rubydebug }
} #这里是开启调试模式,当第一次配置的时候,建议开启,这样过滤后的日志信息直接输出的屏幕,方便进行调试,调试成功后,即可关闭


将上面三部分内容合并到一个文件file_to_zabbix.conf中,然后启动logstash服务:


[root@logstashserver ~]#cd /usr/local/logstash
[root@logstashserver logstash]#nohup bin/logstash -f config/file_to_zabbix.conf --path.data /tmp/ &


这里的—path.data是指定此logstash进程的数据存储目录,用于在一个服务器上启动多个logstash进程的环境中


4.2 zabbix平台配置日志告警


登录zabbix web平台,选择配置—->模板—->创建模板,名称定为logstash-output-zabbix,如下图所示:


640.png


接着,在此模块下创建一个应用集,点击应用集——->创建应用集,如下图所示:


640.png


然后,在此模块下创建一个监控项,点击监控项——->创建监控项,如下图所示:


640.png


到此为止,zabbix监控logstash的日志数据配置完成


这里我们以客户端192.168.73.135主机为例,也就是监控192.168.73.135主机上的系统日志数据,当发现日志异常就进行告警


在上面创建了一个模板和监控项,接着还需要将此模板链接到192.168.73.135主机上,选择“配置”—-“主机”,然后选中192.168.73.135主机,选择“模板”标签,将上面创建的模板加入到192.168.73.135主机上,如下图所示:


640.png


这样,上面创建的监控项在192.168.73.135主机上就自动生效了


下面我们模拟一个故障,在任意主机通过ssh登录192.168.73.135主机,然后输入一个错误密码,让系统的/var/log/secure文件产生错误日志,然后看看logstash是否能够过滤到,是否能够发送到zabbix中 首先让系统文件/var/log/secure产生类似如下内容:


[root@k8s-node03 ~]# tail /var/log/secure
Dec 25 22:57:44 k8s-node03 sshd[49803]: Failed password for root from 192.168.73.1 port 60256 ssh2
Dec 25 22:57:48 k8s-node03 sshd[49803]: error: Received disconnect from 192.168.73.1 port 60256:13: The user canceled authentication.  [preauth]
Dec 25 22:57:48 k8s-node03 sshd[49803]: Disconnected from 192.168.73.1 port 60256 [preauth]
Dec 25 23:02:40 k8s-node03 sshd[49908]: reverse mapping checking getaddrinfo for bogon [192.168.73.1] failed - POSSIBLE BREAK-IN ATTEMPT!
Dec 25 23:02:43 k8s-node03 unix_chkpwd[49911]: password check failed for user (root)
Dec 25 23:02:43 k8s-node03 sshd[49908]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=192.168.73.1  user=root
Dec 25 23:02:43 k8s-node03 sshd[49908]: pam_succeed_if(sshd:auth): requirement "uid >= 1000" not met by user "root"
Dec 25 23:02:44 k8s-node03 sshd[49908]: Failed password for root from 192.168.73.1 port 60757 ssh2
Dec 25 23:02:46 k8s-node03 sshd[49908]: error: Received disconnect from 192.168.73.1 port 60757:13: The user canceled authentication.  [preauth]
Dec 25 23:02:46 k8s-node03 sshd[49908]: Disconnected from 192.168.73.1 port 60757 [preauth]


这里面有我们要过滤的关键字Failed,因此logstash会将此内容过滤出来,发送到zabbix上 接着,登录zabbix web平台,点击监测中——->最新数据,如果zabbix能够接收到日志,就可以看到下图的最新数据:


640.png


点击历史记录,可以查看详细内容,如下图所示:


640.png


可以看到,红框中的内容就是在logstash中定义的message_content字段的内容,到这里为止,zabbix已经可以收到logstash的发送过来的数据了,但是要实现报警,还需要在zabbix中创建一个触发器,进入配置——->模板,选择logstash-output-zabbix这个模板,然后点击上面的触发器,继续点击右上角的创建触发器,如下图所示:


640.png


这里注意触发器创建中,表达式的写法,这里触发器的触发条件是:


如果接收到logstash发送过来的数据,就进行告警,也就是说接收到的数据,如果长度大于0就告警 触发器配置完成后,如果配置正常,就会进行告警了,使用的是钉钉进行图文告警,告警内容如下图所示:


640.png


Kibana上查看安全日志


640.png


总结


首先我们来捋一下思路:


我们的架构基本不变,仍然是filebat收集日志推送到kibana消息队列,然后由Logstash前去拉取日志数据,经过处理最后中转出去;只不过是中转输出到zabbix上面而已;能够实现这个功能的,最核心的功臣就是Logsatsh的插件(logstash-output-zabbix);

在这里需要注意的是:filebeat收集端的IP一定要与zabbix监控主机的IP相对应,否则日志是过不来的~


分享一个小技巧:通过该命令可以测试定义在zabbix上的键值;出现以下输出变为正常~,如果failed非零值表示失败


#在server端使用zabbix_sender向zabbix发送测试
[root@localhost zabbix_sender]# /usr/local/zabbix/bin/zabbix_sender -s 192.168.73.135 -z 192.168.73.133 -k "oslogs" -o 1
info from server: "processed: 1; failed: 0; total: 1; seconds spent: 0.000081"
sent: 1; skipped: 0; total: 1


详解:-s:指定本地agent端


-z:指定zabbix服务端


-k:指定键值

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
2月前
|
消息中间件 Java Kafka
搭建ELK日志收集,保姆级教程
本文介绍了分布式日志采集的背景及ELK与Kafka的整合应用。传统多服务器环境下,日志查询效率低下,因此需要集中化日志管理。ELK(Elasticsearch、Logstash、Kibana)应运而生,但单独使用ELK在性能上存在瓶颈,故结合Kafka实现高效的日志采集与处理。文章还详细讲解了基于Docker Compose构建ELK+Kafka环境的方法、验证步骤,以及如何在Spring Boot项目中整合ELK+Kafka,并通过Logback配置实现日志的采集与展示。
717 64
搭建ELK日志收集,保姆级教程
|
4月前
|
运维 安全 数据可视化
日志审查安排工具实战攻略:中小团队如何通过日志审查安排工具建立可控、安全的审查机制?
在审计敏感时代,日志审查安排工具成为安全运维与合规管理的关键利器。它实现审查任务的流程化、周期化与可视化,支持多系统协作、责任到人,确保“可控、可查、可追”的日志治理。工具如板栗看板、Asana、Monday 等提供任务调度、问题闭环与合规对接能力,助力企业构建高效、透明的日志审查体系,提升安全与合规水平。
|
8月前
|
数据可视化 关系型数据库 MySQL
ELK实现nginx、mysql、http的日志可视化实验
通过本文的步骤,你可以成功配置ELK(Elasticsearch, Logstash, Kibana)来实现nginx、mysql和http日志的可视化。通过Kibana,你可以直观地查看和分析日志数据,从而更好地监控和管理系统。希望这些步骤能帮助你在实际项目中有效地利用ELK来处理日志数据。
646 90
|
8月前
|
数据采集 运维 监控
数据采集监控与告警:错误重试、日志分析与自动化运维
本文探讨了数据采集技术从“简单采集”到自动化运维的演进。传统方式因反爬策略和网络波动常导致数据丢失,而引入错误重试、日志分析与自动化告警机制可显著提升系统稳定性与时效性。正方强调健全监控体系的重要性,反方则担忧复杂化带来的成本与安全风险。未来,结合AI与大数据技术,数据采集将向智能化、全自动方向发展,实现动态调整与智能识别反爬策略,降低人工干预需求。附带的Python示例展示了如何通过代理IP、重试策略及日志记录实现高效的数据采集程序。
408 7
数据采集监控与告警:错误重试、日志分析与自动化运维
|
存储 消息中间件 网络协议
日志平台-ELK实操系列(一)
日志平台-ELK实操系列(一)
|
9月前
|
存储 监控 安全
云日志管理|从云端进行安全日志管理
Log360 Cloud 是一款基于云的SIEM解决方案,旨在为企业提供灵活、安全的日志管理。它从本地和云环境中收集日志并存储于云端,通过实时图形仪表板提供网络安全的全面视图。用户可随时随地访问日志,减少存储成本,轻松扩展,并确保符合IT合规性要求。该平台支持代理和无代理日志收集,具备强大的搜索、分析、审计和实时警报功能,帮助企业及时检测和解决潜在威胁,保障信息安全。
157 1
|
消息中间件 测试技术
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
轻量消息队列(原MNS)以其简单队列模型、轻量化协议及按量后付费模式,成为阿里云产品间消息传输首选。本文通过创建主题、订阅、配置告警集成等步骤,展示了该产品在实际应用中的部分功能,确保消息的可靠传输。
249 2
|
存储 监控 安全
|
Oracle 关系型数据库 数据库
【赵渝强老师】Oracle的参数文件与告警日志文件
本文介绍了Oracle数据库的参数文件和告警日志文件。参数文件分为初始化参数文件(PFile)和服务器端参数文件(SPFile),在数据库启动时读取并分配资源。告警日志文件记录了数据库的重要活动、错误和警告信息,帮助诊断问题。文中还提供了相关视频讲解和示例代码。
267 1
|
数据采集 弹性计算 监控
为什么云监控、云产品流量监控中的流量数据和DDoS防护的流量监控数据有差异?
为什么云监控、云产品流量监控中的流量数据和DDoS防护的流量监控数据有差异?
为什么云监控、云产品流量监控中的流量数据和DDoS防护的流量监控数据有差异?

热门文章

最新文章

推荐镜像

更多