一文吃透企业级elk技术栈:11. zabbix报警实现

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 一文吃透企业级elk技术栈:11. zabbix报警实现

十一、zabbix报警实现

mysql多次登录失败告警 // 存在问题 日志信息无法打印到zabbix

[root@elk-02 ~]# cat /etc/logstash/conf.d/mysql.conf
input {
  kafka {
    bootstrap_servers => "10.10.8.10:9092,10.10.8.11:9092,10.10.8.12:9092"
    topics => ["elktest"]
    group_id => "elkgroup"
    codec => "json"
  }
}
filter {
  if [type] == "mysql-slow-logs" {
    grok {
        # 有ID有use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id:int}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
        # 有ID无use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id:int}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
        # 无ID有use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
        # 无ID无use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
        # mariadb慢日志获取
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Thread_id:%{NUMBER:thread_id:int}\s+Schema: %{DATA:schema}\s+QC_hit: %{DATA:qc_hit}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
    }
    date {
            match => ["timestamp_mysql","UNIX"]
            target => "@timestamp"
    }
    mutate {
            remove_field => ["@version","message","timestamp_mysql"]
    }
  }
  else if [type] == "mysql-err-logs" {
    grok {
     # mysql 5.7 err
      match => [ "message", "(?m)^%{NOTSPACE:date} %{NUMBER:bytes} \[%{GREEDYDATA:log_level}\] %{GREEDYDATA:messages}" ]
      # mysql 5.6 err
      match => [ "message", "(?<timestamp>\d+ \d+:\d+:\d+) \[%{LOGLEVEL:log_level}\] %{GREEDYDATA:messages}\s*$" ]
      # mariadb  err1
      match => [ "message", "(?<timestamp>\d+ \d+:\d+:\d+) \[%{DATA:log_level}\] %{GREEDYDATA:messages}\s*$" ]
      # mariadb err2
      match => [ "message", "(?<timestamp>\d+ \d+:\d+:\d+) %{GREEDYDATA:messages}\s*$" ]
    }
    mutate {
      add_field => ["[zabbix_host]","10.10.8.152"]
      add_field => ["[zabbix_key]","mysql_passwd_verification_failed"]
    }
    mutate {
      add_field => ["count","%{[agent][name]}_%{messages}"]
    }
    mutate {
     remove_field => [ "@version" ]
    }
  }
}
output {
  if [type] == "mysql-slow-logs" {
  # 推送到es
    elasticsearch {
      hosts => ["http://10.10.8.10:9200","http://10.10.8.11:9200","http://10.10.8.12:9200"]
      user => "elastic"
      password => "123456"
      index => "mysql-slow-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "mysql-err-logs" {
    elasticsearch {
      hosts => ["http://10.10.8.10:9200","http://10.10.8.11:9200","http://10.10.8.12:9200"]
      user => "elastic"
      password => "123456"
      index => "mysql-err-%{+YYYY.MM.dd}"
    }
    if [count]  =~ /(Access denied)/ {
      zabbix {
        zabbix_host => "[zabbix_host]"
        zabbix_key => "[zabbix_key]"
        zabbix_server_host => "10.10.8.166"
        zabbix_server_port => "10051"
        zabbix_value => "count"
      }
    }
  }
}

客户端配置

centos 7安装

systemctl status filebeat
cd /root && \
wget  https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.9.2-x86_64.rpm && \
rpm -ivh filebeat-7.9.2-x86_64.rpm && \
systemctl enable filebeat.service && \
systemctl start filebeat.service && \
mkdir /etc/filebeat/conf.d

centos 6安装

systemctl status filebeat
cd /root && \
wget  https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.9.2-x86_64.rpm && \
rpm -ivh filebeat-7.9.2-x86_64.rpm && \
service filebeat start && \
chkconfig  --add  filebeat && \
mkdir /etc/filebeat/conf.d

配置文件配置

cat >/etc/filebeat/filebeat.yml<<'EOF'
filebeat.config.inputs:
  enabled: true
  # 配置多配置文件路径
  path: /etc/filebeat/conf.d/*.yml
processors:
  - drop_fields:
      fields: ["source","input","beat","prospector","offset"]
# 区分日志来自哪台主机 // 自行修改当前主机IP
name: 103.29.16.83
output:
  # 输出到kafka
  kafka:
    hosts: ["10.10.8.164:9092", "10.10.8.165:9092", "10.10.8.166:9092"]
    topic: elktest
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0644
#output.file:
#  path: "/tmp/filebeat"
#  filename: filebeat
EOF

mysql日志配置

netstat -lntp|grep mysqld
tcp6       0      0 :::3306                 :::*                    LISTEN      166237/mysqld
tcp        0      0 0.0.0.0:3307            0.0.0.0:*               LISTEN      1853/mysqld
tcp        0      0 0.0.0.0:3309            0.0.0.0:*               LISTEN      181145/mysqld
tcp        0      0 0.0.0.0:3310            0.0.0.0:*               LISTEN      261336/mysqld
tcp6       0      0 :::3311                 :::*                    LISTEN      85491/mysqld
tcp        0      0 0.0.0.0:3316            0.0.0.0:*               LISTEN      127655/mysqld
tcp        0      0 0.0.0.0:3326            0.0.0.0:*               LISTEN      27277/mysqld
cat >/etc/filebeat/conf.d/mysql.yml<<'EOF'
# mysql 慢日志推送
- type: log
  tail_files: true
  backoff: "1s"
  paths:
  # 多实例推送 // 自行修改慢日志路径
  - /data/mysql3309/log/mysql-slow.log
  - /data/mysql3310/log/mysql-slow.log  
  - /data/mysql3316/log/mysql-slow.log
  - /data/mysql3326/log/slow.log
  tags: ["mysql-slow-logs"]
  # 排除列
  exclude_lines: ['^\# Time']
  fields:
    # 配置logstash区分
    type: "mysql-slow-logs"
  fields_under_root: true
  multiline:
    # 多行匹配
    pattern: '^\# Time|^\# User'
    negate: true
    match: after
# mysql 错误日志推送
- type: log
  tail_files: true
  backoff: "1s"
  paths:
  # 多实例推送 // 自行修改错误日志路径
  - /var/log/mysqld.log
  - /data/mysql3307/log/mysql-error.log
  - /data/mysql3309/log/mysql-error.log
  - /data/mysql3310/log/mysql-error.log
  - /data/mysql3311/error.log
  - /data/mysql3316/log/mysql-error.log
  - /data/mysql3326/log/mysql-error.log
  tags: ["mysql-err-logs"]
  fields:
    # 配置logstash区分
    type: "mysql-err-logs"
  fields_under_root: true
EOF

系统日志

cat >/etc/filebeat/conf.d/system.yml<<'EOF'
# 系统messages日志推送
- type: log
  tail_files: true
  backoff: "1s"
  paths:
   - /var/log/messages
  tags: ["system-messages-logs"]
  fields:
    # 配置logstash区分
    type: "system-messages-logs"
  fields_under_root: true
# 系统secure日志推送
- type: log
  tail_files: true
  backoff: "1s"
  paths:
   - /var/log/secure
  tags: ["system-secure-logs"]
  fields:
    # 配置logstash区分
    type: "system-secure-logs"
  fields_under_root: true
EOF

重启

systemctl restart filebeat.service

防火墙规则

telnet  10.10.8.164 9092
ip a

问题解决

1. “error”=>{“type”=>“validation_exception”, “reason”=>“Validation Failed: 1: this action would add [2] shards, but this cluster currently has [999]/[1000] maximum normal shards open;”}

PUT _cluster/settings
{
  "persistent": {
    "cluster": {
      "max_shards_per_node":20000
    }
  }
}

es的每台机器的默认最大分片数位1000,如果集群有3台机器,那就有3000当满了之后就无法创建索引,所以需要设置更大的参数

vim /etc/elasticsearch/elasticsearch.yml
cluster.max_shards_per_node: 20000


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
23天前
|
监控
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
|
12天前
|
消息中间件 Kafka 网络安全
一文吃透企业级elk技术栈:elk 各组件调试
调试需先理解逻辑与程序调用顺序。本文介绍filebeat、kafka、logstash和es的数据推送流程及调试方法:filebeat传输数据检查包括服务状态、配置与日志;kafka调试涵盖服务状态、端口与日志;logstash调试需检查配置文件、日志与流量;es直接通过kibana查看。还介绍了使用rsyslog接收防火墙/waf/交换机日志的方法。
一文吃透企业级elk技术栈:10. es数据生命周期管理
一文吃透企业级elk技术栈:10. es数据生命周期管理
|
4月前
|
存储 SQL 监控
修改Zabbix源码实现监控数据双写,满足业务需求!
虽然对接Elasticsearch后有诸多好处,但是它不往数据库写历史数据了,同时还不再计算趋势数据了。有这么一个场景...
修改Zabbix源码实现监控数据双写,满足业务需求!
|
4月前
|
数据采集 监控 数据库
OceanBase社区版可以通过Zabbix监控
OceanBase社区版可以通过Zabbix监控
215 4
|
9月前
|
监控 关系型数据库 机器人
小白带你学习linux的监控平台zabbix
小白带你学习linux的监控平台zabbix
237 0
|
19天前
|
监控 安全 Linux
在Linux中,zabbix如何监控脑裂?
在Linux中,zabbix如何监控脑裂?
|
9天前
|
监控 关系型数据库 MySQL
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
23 2
zabbix agent集成percona监控MySQL的插件实战案例
|
1月前
|
SQL 监控 分布式数据库
【解锁数据库监控的神秘力量!】OceanBase社区版与Zabbix的完美邂逅 —— 揭秘分布式数据库监控的终极奥秘!
【8月更文挑战第7天】随着OceanBase社区版的普及,企业广泛采用这一高性能、高可用的分布式数据库。为保障系统稳定,使用成熟的Zabbix监控工具对其进行全方位监控至关重要。本文通过实例介绍如何在Zabbix中配置监控OceanBase的方法,包括创建监控模板、添加监控项(如TPS)、设置触发器及图形展示,并提供示例脚本帮助快速上手。通过这些步骤,可以有效监控OceanBase状态,确保业务连续性。
53 0
|
3月前
|
监控 数据库 Docker
Zabbix监控神通数据库教程
**摘要:** 本文介绍了如何使用Docker安装和配置神舟通用数据库,并利用Zabbix进行监控。首先,通过Docker安装数据库镜像,启动容器并映射端口。接着,使用默认凭证连接数据库并验证安装。然后,将数据库的Python模块和库文件复制到主机,并安装Python3.5及相应模块,创建外部检查脚本以实现Zabbix的监控功能。示例展示了查询数据库版本的监控指标配置。最后,提到了监控结果的界面展示,并邀请读者探索更多Zabbix监控技巧。
48 0
Zabbix监控神通数据库教程

推荐镜像

更多