一文吃透企业级elk技术栈:5. logstatsh 安装配置

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 一文吃透企业级elk技术栈:5. logstatsh 安装配置

五、logstatsh 安装配置

# 下载安装包
wget  https://artifacts.elastic.co/downloads/logstash/logstash-7.9.2.rpm
# 安装
yum install  -y logstash-7.9.2.rpm
# 启动
systemctl enable  logstash.service
systemctl start   logstash.service
# 内置正则
cat /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

1、mysql慢日志处理

vim  /etc/logstash/conf.d/mysql.conf 
input {
  kafka {
    bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092"
    topics => ["elktest"]
    group_id => "elkgroup"
    codec => "json"
  }
}
filter {
  if [type] == "mysql-slow-logs" {
    grok {
        # 有ID有use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id:int}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
 
        # 有ID无use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id:int}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
 
        # 无ID有use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
 
        # 无ID无use
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
        # mariadb慢日志获取
        match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Thread_id: %{NUMBER:thread_id:int}\s+Schema: %{DATA:schema}\s+QC_hit: %{DATA:qc_hit}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ]
    }
    date {
            match => ["timestamp_mysql","UNIX"]
            target => "@timestamp"
    }
    mutate {
            remove_field => ["@version","message","timestamp_mysql"]
    }
  }
  else if [type] == "mysql-err-logs" {
    grok {
      # mysql 5.7 err
      match => [ "message", "(?m)^%{NUMBER:date} *%{NOTSPACE:time} %{NUMBER:bytes} %{GREEDYDATA:message}" ]
      
      # mysql 5.6 err
      match => [ "message", "(?<timestamp>\d+ \d+:\d+:\d+) \[%{LOGLEVEL:log_level}\] %{DATA:info}\s*$" ]
      # mariadb  err1
      match => [ "message", "(?<timestamp>\d+ \d+:\d+:\d+) \[%{DATA:log_level}\] %{DATA:info}\s*$" ]
      # mariadb err2
      match => [ "message", "(?<timestamp>\d+ \d+:\d+:\d+) %{DATA:info}\s*$" ]
    }
    mutate {
            remove_field => ["@version","message"]
    }
  }
}
output {
  if [type] == "mysql-slow-logs" {
  # 推送到es
    elasticsearch {
      hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"]
      user => "elastic"
      password => "1qaz@WSX"
      index => "mysql-slow-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "mysql-err-logs" {
    elasticsearch {
      hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"]
      user => "elastic"
      password => "1qaz@WSX"
      index => "mysql-err-%{+YYYY.MM.dd}"
    }
  }
}
# 重启logstash
systemctl restart logstash.service

2、系统日志处理

vim  /etc/logstash/conf.d/system.conf 
input {
  kafka {
    bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092"
    topics => ["elktest"]
    group_id => "elkgroup"
    codec => "json"
  }
}
filter {
  if [type] == "system-messages-logs" {
    grok {
      match => [ "message", "%{SYSLOGBASE} %{GREEDYDATA:syslog_message}" ]
    }
    mutate {
            remove_field => ["@version","message"]
    }
  }
  else if [type] == "system-secure-logs" {
    grok {
      match => [ "message", "%{SYSLOGBASE} %{GREEDYDATA:syslog_message}" ]
    }
    mutate {
            remove_field => ["@version","message"]
    }
  }
}
output {
  if [type] == "system-messages-logs" {
    elasticsearch {
      hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"]
      user => "elastic"
      password => "1qaz@WSX"
      index => "system-messages-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "system-secure-logs" {
    elasticsearch {
      hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"]
      user => "elastic"
      password => "1qaz@WSX"
      index => "system-secure-%{+YYYY.MM.dd}"
    }
  }
}

3、nginx日志处理

vim  /etc/logstash/conf.d/nginx.conf
input {
  kafka {
    bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092"
    topics => ["elktest"]
    group_id => "elkgroup"
    codec => "json"
  }
}
filter {
  # nginx access 日志处理
  if [type] == "nginx-access-logs" {
    grok {
      # 带返回时间
      match => [ "message", '%{IPORHOST:source_ip} - %{USERNAME:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{INT:status} %{INT:body_bytes_sent} %{QS:http_referer} %{QS:http_user_agent} "%{DATA:unknown}" "%{NUMBER:return_time}"' ]
      # 默认格式
      match => [ "message", "%{IPORHOST:source_ip} - %{USERNAME:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{INT:status} %{INT:body_bytes_sent} %{QS:http_referer} %{QS:http_user_agent}" ]
    }
    mutate {
            remove_field => ["@version","message"]
    }
  }
  
  # nginx error 日志处理
  else if [type] == "nginx-error-logs" {
    grok {
      match => [ "message", "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:loglevel}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:errormessage}(?:, client: (?<clientip>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server}?)(?:, request: %{QS:request})?(?:, upstream: (?<upstream>\"%{URI}\"|%{QS}))?(?:, host: %{QS:request_host})?(?:, referrer: \"%{URI:referrer}\")?" ]
    }
    mutate {
            remove_field => ["@version","message"]
    }
  }
}
output {
  if [type] == "nginx-access-logs" {
    elasticsearch {
      hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"]
      user => "elastic"
      password => "1qaz@WSX"
      index => "nginx-access-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "nginx-error-logs" {
    elasticsearch {
      hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"]
      user => "elastic"
      password => "1qaz@WSX"
      index => "nginx-error-%{+YYYY.MM.dd}"
    }
  }
}

4、httpd日志处理

vim /etc/logstash/conf.d/httpd.conf 
input {
  kafka {
    bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092"
    topics => ["elktest"]
    group_id => "elkgroup"
    codec => "json"
  }
}
filter {
  if [type] == "httpd-access-logs" {
    grok {
      match => [ "message", "%{COMMONAPACHELOG}" ]
    }
    mutate {
            remove_field => ["@version","message"]
    }
  }
  else if [type] == "httpd-error-logs" {
    grok {
    }
    mutate {
            remove_field => ["@version"]
    }
  }
}
output {
  if [type] == "httpd-access-logs" {
    elasticsearch {
      hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"]
      user => "elastic"
      password => "1qaz@WSX"
      index => "httpd-access-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "httpd-error-logs" {
    elasticsearch {
      hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"]
      user => "elastic"
      password => "1qaz@WSX"
      index => "httpd-error-%{+YYYY.MM.dd}"
    }
  }
}

5、php日志处理

vim /etc/logstash/conf.d/php.conf 
input {
  kafka {
    bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092"
    topics => ["elktest"]
    group_id => "elkgroup"
    codec => "json"
  }
}
filter {
  if [type] == "php-error-logs" {
    grok {
      match => [ "message", "\[(?<timestamp>(?:%{SYSLOGFACILITY} )?%{SYSLOGHOST} %{TIME})\] %{DATA:loglevel}: %{DATA:info}$" ]
    }
    mutate {
            remove_field => ["@version","message"]
    }
  }
}
output {
  if [type] == "php-error-logs" {
    elasticsearch {
      hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"]
      user => "elastic"
      password => "1qaz@WSX"
      index => "php-error-%{+YYYY.MM.dd}"
    }
  }
}

6、redis日志处理

vim /etc/logstash/conf.d/redis.conf 
input {
  kafka {
    bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092"
    topics => ["elktest"]
    group_id => "elkgroup"
    codec => "json"
  }
}
filter {
  if [type] == "redis-error-logs" {
    grok {
      match => [ "message", "(?<pid>.\d+?):(?<role>\w?)\s+(?<log_time>%{MONTHDAY}\s+%{MONTH}\s+%{HOUR}:%{MINUTE}:%{SECOND}?)\s+(?<log_level>.?)\s%{GREEDYDATA:info}" ]
    }
    if [log_level] == "*" {
      mutate{ update => {"log_level" => "NOTICE"}}
    }
    if [log_level] == "#" {
      mutate{ update => {"log_level" => "WARNING"}}
    }
    if [log_level] == "-" {
      mutate{ update => {"log_level" => "VERBOSE"}}
    }
    if [log_level] == "." {
      mutate{ update => {"log_level" => "DEBUG"}}
    }
    mutate {
            remove_field => ["@version","message"]
    }
  }
}
output {
  if [type] == "redis-error-logs" {
    elasticsearch {
      hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"]
      user => "elastic"
      password => "1qaz@WSX"
      index => "redis-error-%{+YYYY.MM.dd}"
    }
  }
}

7、nginx + zabbix 报警实现

vim  /etc/logstash/conf.d/nginx_zbx.conf 
input {
  kafka {
    bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092"
    topics => ["elktest"]
    group_id => "elkgroup"
    codec => "json"
  }
}
filter {
  # nginx access 日志处理
  if [type] == "nginx-access-logs" {
    grok {
      match => [ "message", "%{IPORHOST:source_ip} - %{USERNAME:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{INT:status} %{INT:body_bytes_sent} %{QS:http_referer} %{QS:http_user_agent}" ]
    }
    
    mutate {
      add_field => [ "[zabbix_key]", "nginx_status" ]
      add_field => [ "[zabbix_host]", "10.10.8.165" ]
      add_field => [ "nginx_status","状态:%{status} 客户端IP:%{source_ip} 访问路径:%{http_referer}" ]
    }
    mutate {
            remove_field => ["@version","message"]
    }
  }
  
}
output {
  if [type] == "nginx-access-logs" {
    if [status]  =~ /(404|500|502|503|504|505|304)/  {
      zabbix {
      zabbix_host => "[zabbix_host]"
      zabbix_key => "[zabbix_key]"
      zabbix_server_host => "10.10.8.166" 
      zabbix_server_port => "10051"
      zabbix_value => "nginx_status"
      }
    }    
  }
}


相关文章
|
3月前
|
监控
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
一文吃透企业级elk技术栈:9. zabbix结合logstash告警
|
3月前
|
NoSQL 关系型数据库 MySQL
一文吃透企业级elk技术栈:7. 验证结果
一文吃透企业级elk技术栈:7. 验证结果
|
3月前
|
消息中间件 Kafka 网络安全
一文吃透企业级elk技术栈:elk 各组件调试
调试需先理解逻辑与程序调用顺序。本文介绍filebeat、kafka、logstash和es的数据推送流程及调试方法:filebeat传输数据检查包括服务状态、配置与日志;kafka调试涵盖服务状态、端口与日志;logstash调试需检查配置文件、日志与流量;es直接通过kibana查看。还介绍了使用rsyslog接收防火墙/waf/交换机日志的方法。
|
3月前
|
监控 关系型数据库 MySQL
一文吃透企业级elk技术栈:11. zabbix报警实现
一文吃透企业级elk技术栈:11. zabbix报警实现
一文吃透企业级elk技术栈:10. es数据生命周期管理
一文吃透企业级elk技术栈:10. es数据生命周期管理
|
3月前
|
NoSQL 关系型数据库 MySQL
一文吃透企业级elk技术栈:6. filebeat安装配置
一文吃透企业级elk技术栈:6. filebeat安装配置
|
2月前
|
存储 消息中间件 网络协议
日志平台-ELK实操系列(一)
日志平台-ELK实操系列(一)
|
12天前
|
存储 监控 安全
|
3月前
|
消息中间件 Kafka 开发工具
rsyslog+ELK收集Cisco日志
rsyslog+ELK收集Cisco日志
|
3月前
|
运维 监控 Ubuntu
一键启动日志魔法:揭秘ELK自动安装脚本的神秘面纱!
【8月更文挑战第9天】在数据驱动时代,高效处理日志至关重要。ELK Stack(Elasticsearch、Logstash、Kibana)是强大的日志分析工具,但其复杂的安装配置常让初学者望而却步。本文介绍如何编写ELK自动安装脚本,简化部署流程。脚本适用于Ubuntu系统,自动完成ELK下载、安装及基本配置,包括依赖项安装、服务启动及自启设置,极大降低了使用门槛,助力运维人员和开发者轻松构建日志分析平台。
152 6