五、logstatsh 安装配置
# 下载安装包 wget https://artifacts.elastic.co/downloads/logstash/logstash-7.9.2.rpm # 安装 yum install -y logstash-7.9.2.rpm # 启动 systemctl enable logstash.service systemctl start logstash.service # 内置正则 cat /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns
1、mysql慢日志处理
vim /etc/logstash/conf.d/mysql.conf input { kafka { bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092" topics => ["elktest"] group_id => "elkgroup" codec => "json" } } filter { if [type] == "mysql-slow-logs" { grok { # 有ID有use match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id:int}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ] # 有ID无use match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id:int}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ] # 无ID有use match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ] # 无ID无use match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ] # mariadb慢日志获取 match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Thread_id: %{NUMBER:thread_id:int}\s+Schema: %{DATA:schema}\s+QC_hit: %{DATA:qc_hit}\n# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query>[\s\S]*)" ] } date { match => ["timestamp_mysql","UNIX"] target => "@timestamp" } mutate { remove_field => ["@version","message","timestamp_mysql"] } } else if [type] == "mysql-err-logs" { grok { # mysql 5.7 err match => [ "message", "(?m)^%{NUMBER:date} *%{NOTSPACE:time} %{NUMBER:bytes} %{GREEDYDATA:message}" ] # mysql 5.6 err match => [ "message", "(?<timestamp>\d+ \d+:\d+:\d+) \[%{LOGLEVEL:log_level}\] %{DATA:info}\s*$" ] # mariadb err1 match => [ "message", "(?<timestamp>\d+ \d+:\d+:\d+) \[%{DATA:log_level}\] %{DATA:info}\s*$" ] # mariadb err2 match => [ "message", "(?<timestamp>\d+ \d+:\d+:\d+) %{DATA:info}\s*$" ] } mutate { remove_field => ["@version","message"] } } } output { if [type] == "mysql-slow-logs" { # 推送到es elasticsearch { hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"] user => "elastic" password => "1qaz@WSX" index => "mysql-slow-%{+YYYY.MM.dd}" } } else if [type] == "mysql-err-logs" { elasticsearch { hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"] user => "elastic" password => "1qaz@WSX" index => "mysql-err-%{+YYYY.MM.dd}" } } } # 重启logstash systemctl restart logstash.service
2、系统日志处理
vim /etc/logstash/conf.d/system.conf input { kafka { bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092" topics => ["elktest"] group_id => "elkgroup" codec => "json" } } filter { if [type] == "system-messages-logs" { grok { match => [ "message", "%{SYSLOGBASE} %{GREEDYDATA:syslog_message}" ] } mutate { remove_field => ["@version","message"] } } else if [type] == "system-secure-logs" { grok { match => [ "message", "%{SYSLOGBASE} %{GREEDYDATA:syslog_message}" ] } mutate { remove_field => ["@version","message"] } } } output { if [type] == "system-messages-logs" { elasticsearch { hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"] user => "elastic" password => "1qaz@WSX" index => "system-messages-%{+YYYY.MM.dd}" } } else if [type] == "system-secure-logs" { elasticsearch { hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"] user => "elastic" password => "1qaz@WSX" index => "system-secure-%{+YYYY.MM.dd}" } } }
3、nginx日志处理
vim /etc/logstash/conf.d/nginx.conf input { kafka { bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092" topics => ["elktest"] group_id => "elkgroup" codec => "json" } } filter { # nginx access 日志处理 if [type] == "nginx-access-logs" { grok { # 带返回时间 match => [ "message", '%{IPORHOST:source_ip} - %{USERNAME:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{INT:status} %{INT:body_bytes_sent} %{QS:http_referer} %{QS:http_user_agent} "%{DATA:unknown}" "%{NUMBER:return_time}"' ] # 默认格式 match => [ "message", "%{IPORHOST:source_ip} - %{USERNAME:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{INT:status} %{INT:body_bytes_sent} %{QS:http_referer} %{QS:http_user_agent}" ] } mutate { remove_field => ["@version","message"] } } # nginx error 日志处理 else if [type] == "nginx-error-logs" { grok { match => [ "message", "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:loglevel}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:errormessage}(?:, client: (?<clientip>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server}?)(?:, request: %{QS:request})?(?:, upstream: (?<upstream>\"%{URI}\"|%{QS}))?(?:, host: %{QS:request_host})?(?:, referrer: \"%{URI:referrer}\")?" ] } mutate { remove_field => ["@version","message"] } } } output { if [type] == "nginx-access-logs" { elasticsearch { hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"] user => "elastic" password => "1qaz@WSX" index => "nginx-access-%{+YYYY.MM.dd}" } } else if [type] == "nginx-error-logs" { elasticsearch { hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"] user => "elastic" password => "1qaz@WSX" index => "nginx-error-%{+YYYY.MM.dd}" } } }
4、httpd日志处理
vim /etc/logstash/conf.d/httpd.conf input { kafka { bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092" topics => ["elktest"] group_id => "elkgroup" codec => "json" } } filter { if [type] == "httpd-access-logs" { grok { match => [ "message", "%{COMMONAPACHELOG}" ] } mutate { remove_field => ["@version","message"] } } else if [type] == "httpd-error-logs" { grok { } mutate { remove_field => ["@version"] } } } output { if [type] == "httpd-access-logs" { elasticsearch { hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"] user => "elastic" password => "1qaz@WSX" index => "httpd-access-%{+YYYY.MM.dd}" } } else if [type] == "httpd-error-logs" { elasticsearch { hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"] user => "elastic" password => "1qaz@WSX" index => "httpd-error-%{+YYYY.MM.dd}" } } }
5、php日志处理
vim /etc/logstash/conf.d/php.conf input { kafka { bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092" topics => ["elktest"] group_id => "elkgroup" codec => "json" } } filter { if [type] == "php-error-logs" { grok { match => [ "message", "\[(?<timestamp>(?:%{SYSLOGFACILITY} )?%{SYSLOGHOST} %{TIME})\] %{DATA:loglevel}: %{DATA:info}$" ] } mutate { remove_field => ["@version","message"] } } } output { if [type] == "php-error-logs" { elasticsearch { hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"] user => "elastic" password => "1qaz@WSX" index => "php-error-%{+YYYY.MM.dd}" } } }
6、redis日志处理
vim /etc/logstash/conf.d/redis.conf input { kafka { bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092" topics => ["elktest"] group_id => "elkgroup" codec => "json" } } filter { if [type] == "redis-error-logs" { grok { match => [ "message", "(?<pid>.\d+?):(?<role>\w?)\s+(?<log_time>%{MONTHDAY}\s+%{MONTH}\s+%{HOUR}:%{MINUTE}:%{SECOND}?)\s+(?<log_level>.?)\s%{GREEDYDATA:info}" ] } if [log_level] == "*" { mutate{ update => {"log_level" => "NOTICE"}} } if [log_level] == "#" { mutate{ update => {"log_level" => "WARNING"}} } if [log_level] == "-" { mutate{ update => {"log_level" => "VERBOSE"}} } if [log_level] == "." { mutate{ update => {"log_level" => "DEBUG"}} } mutate { remove_field => ["@version","message"] } } } output { if [type] == "redis-error-logs" { elasticsearch { hosts => ["http://10.10.8.164:9200","http://10.10.8.165:9200","http://10.10.8.166:9200"] user => "elastic" password => "1qaz@WSX" index => "redis-error-%{+YYYY.MM.dd}" } } }
7、nginx + zabbix 报警实现
vim /etc/logstash/conf.d/nginx_zbx.conf input { kafka { bootstrap_servers => "10.10.8.164:9092,10.10.8.165:9092,10.10.8.166:9092" topics => ["elktest"] group_id => "elkgroup" codec => "json" } } filter { # nginx access 日志处理 if [type] == "nginx-access-logs" { grok { match => [ "message", "%{IPORHOST:source_ip} - %{USERNAME:remote_user} \[%{HTTPDATE:timestamp}\] %{QS:request} %{INT:status} %{INT:body_bytes_sent} %{QS:http_referer} %{QS:http_user_agent}" ] } mutate { add_field => [ "[zabbix_key]", "nginx_status" ] add_field => [ "[zabbix_host]", "10.10.8.165" ] add_field => [ "nginx_status","状态:%{status} 客户端IP:%{source_ip} 访问路径:%{http_referer}" ] } mutate { remove_field => ["@version","message"] } } } output { if [type] == "nginx-access-logs" { if [status] =~ /(404|500|502|503|504|505|304)/ { zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "10.10.8.166" zabbix_server_port => "10051" zabbix_value => "nginx_status" } } } }