1 MySql数据到Elasticsearch
1.1 下载logstash
官网
https://www.elastic.co/cn/logstash/
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz
1.2 解压logstash
tar -zxvf logstash-6.6.0.tar.gz
1.3 在logstash 目录创建 mysql 文件夹
[root@bigdata01 logstash-6.6.0]# mkdir mysql
1.4 将 mysql 驱动文件和数据库查询文件 放进mysql中
1.5 在config 目录下创建 mysqltoes.conf 文件
1.6 mysqltoes.conf 配置
input { # 多张表的同步只需要设置多个jdbc的模块就行了 jdbc { # mysql 数据库链接,shop为数据库名 jdbc_connection_string => "jdbc:mysql://ip:3306/mall?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true" # 用户名和密码 jdbc_user => "root" jdbc_password => "root" # 驱动 jdbc_driver_library => "/usr/local/logstash-6.6.0/mysql/mysql-connector-java-8.0.16.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" #是否分页 jdbc_paging_enabled => "true" jdbc_page_size => "50000" #直接执行sql语句 # statement =>"select * from t_item" # 执行的sql 文件路径+名称 statement_filepath => "/usr/local/logstash-6.6.0/mysql/item.sql" # 默认列名转换为小写 lowercase_column_names => "false" #设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" # 索引类型 #type => "jdbc" } } output { elasticsearch { #es的ip和端口 hosts => ["http://ip:9200"] #ES索引名称(自己定义的) index => "mall" #文档类型 document_type => "mall_item" #设置数据的id为数据库中的字段 document_id => "%{iteId}" } stdout { codec => json_lines } }
1.7 启动 logstash
前台启动:
[root@bigdata01 bin]# ./logstash -f ../config/mysqltoes.conf
后台启动:
[root@bigdata01 bin]# nohup ./logstash -f ../config/mysqltoes.conf >logstash.log &
2 配置语法讲解
logstash使用{ }来定义配置区域,区域内又可以包含其插件的区域配置。
# 最基本的配置文件定义,必须包含input 和 output。 input{ stdin{ } } output{ stdout{ codec=>rubydebug } } # 如果需要对数据进操作,则需要加上filter段 input{ stdin{ } } filter{ # 里面可以包含各种数据处理的插件,如文本格式处理 grok、键值定义 kv、字段添加、 # geoip 获取地理位置信息等等... } output{ stdout{ codec=>rubydebug } } # 可以定义多个输入源与多个输出位置 input{ stdin{ } file{ path => ["/var/log/message"] type => "system" start_position => "beginning" } } output{ stdout{ codec=>rubydebug } file { path => "/var/datalog/mysystem.log.gz" gzip => true } }
3 启动方式
# 通过手动指定配置文件启动 /bin/logstash -f /etc/logstash/conf.d/nginx_logstash.conf # 以daemon方式运行,则在指令后面加一个 & 符号 /bin/logstash -f /etc/logstash/conf.d/nginx_logstash.conf & # 如果是通过rpm包安装的logstash则可以使用自带的脚本启动 /etc/init.d/logstash start # 通过这种方式启动,logstash会自动加载 /etc/logstash/conf.d/ 下的配置文件
4 filebeat基本讲解
filebeat是基于原先 logstash-forwarder 的源码开发而来,无需JAVA环境,运行起来更轻便,无疑是业务服务器端的日志收集工具。
配 置
# 配置文件路径 "/etc/filebeat/filebeat.yml" # 一个配置文件可以包含多个prospectors,一个prospectors可以包含多个path。 filebeat: # List of prospectors to fetch data. prospectors: # Each - is a prospector. Below are the prospector specific configurations - paths: - /var/log/messages input_type: log document_type: messages - paths: - /alidata/log/nginx/access/access.log.json input_type: log document_type: nginxacclog - paths: - /alidata/www/storage/logs/laravel.log input_type: log document_type: larlog - paths: - /alidata/www/500_error/500_error.log input_type: log document_type: error500 - paths: - /alidata/www/deposit/deposit.log input_type: log document_type: deposit - paths: - /alidata/www/call_error.log input_type: log document_type: call_error - paths: - /alidata/www/weixin_deposit.log input_type: log document_type: weixin_deposit - paths: - /alidata/log/php/php-fpm.log.slow input_type: log document_type: phpslowlog # 多行处理 multiline: pattern: '^[[:space:]]' negate: true match: after # Additional prospector registry_file: /var/lib/filebeat/registry ############################# Libbeat Config ################################## # Base config file used by all other beats for using libbeat features ############################# Output ########################################## # 输出数据到 redis output: redis: host: "10.122.52.129" port: 6379 password: "123456" # 输出数据到 logstash ,一般两者选用其一 logstash: hosts: ["10.160.8.221:5044"] ############################# Shipper ######################################### shipper: # 打上服务器tag name: "host_2" ############################# Logging ######################################### logging: files: rotateeverybytes: 10485760 # = 10MB
filebeat主要配置就是这个配置文件了,设定好之后启动服务就会自动从源拉取数据发送到指定位置,当数据为普通行数据时,filebeat会自动为其添加字段信息,其中一项字段 @timestamp 为filebeat读取到这条数据的时间,默认格式为UTC时间,比中国大陆时间早8小时。
如果数据为json格式,而数据中已包含@timestamp字段,filebeat处理时会把@timestamp字段值替换为filebeat读取到该行数据的当前UTC时间。
5 实战运用
5.1 业务到redis到es之间迁移
nginx 日志格式配置
log_format json '{"@timestamp":"$time_iso8601",' '"slbip":"$remote_addr",' '"clientip":"$http_x_forwarded_for",' '"serverip":"$server_addr",' '"size":$body_bytes_sent,' '"responsetime":$request_time,' '"domain":"$host",' '"method":"$request_method",' '"requesturi":"$request_uri",' '"url":"$uri",' '"appversion":"$HTTP_APP_VERSION",' '"referer":"$http_referer",' '"agent":"$http_user_agent",' '"status":"$status"}';
filebeat 配置
filebeat: # List of prospectors to fetch data. prospectors: # Each - is a prospector. Below are the prospector specific configurations - paths: - /alidata/log/nginx/access/access.log.json input_type: log document_type: nginxacclog ############################# Output ########################################## output: logstash: hosts: ["10.160.8.221:5044"] # 其他部分配置省略。
logstash 配置 (此处logstash用于接收filebeat的数据,然后转存redis)
input { beats { port => 5044 codec => "json" } } filter { if [type] == "nginxacclog" { geoip { source => "clientip" target => "geoip" database => "/u01/elk/logstash/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]","%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]","%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]","float" ] } } } output{ if [type] == "nginxacclog" { redis { data_type => "list" key => "nginxacclog" host => "127.0.0.1" port => "26379" password => "123456" db => "0" } } if [type] == "messages" { redis { data_type => "list" key => "messages" host => "127.0.0.1" port => "26379" password => "123456" db => "0" } } }
logstash 配置 (此处logstash用于读取redis list中的数据,然后转存elasticsearch)
input{ redis { host => "10.10.1.2" port => "26379" db => "0" key => "nginxacclog" threads => 300 password => "123456" data_type => "list" codec => "json" } redis { host => "10.10.1.2" port => "26379" db => "0" key => "messages" password => "123456" threads => 50 data_type => "list" codec => "json" } } output { if [type] == "nginxacclog" { elasticsearch { hosts => ["127.0.0.1:9200"] index => "logstash-nginxacclog-%{+YYYY.MM.dd}" manage_template => true flush_size => 50000 idle_flush_time => 10 workers => 2 } } if [type] == "messages" { elasticsearch { hosts => ["127.0.0.1:9200"] index => "logstash-messages-%{+YYYY.MM.dd}" manage_template => true flush_size => 50000 idle_flush_time => 30 workers => 1 } } }
关键指令解释:
threads 开启多少个线程读取redis数据,也就是从redis输入到logstash的速度,线程越多读取速度越快,但是根据接收节点的接收速度来设置,如果输入过快,接收速度不够,则会出现丢数据的情况,设置一个最佳的threads值需要和接收节点做反复测试才能得出。
flush_size 控制logstash向Elasticsearch批量发送数据,上面的配置表示,logstash会努力赞到50000条数据一次发送给Elasticsearch。
idle_flush_time 控制logstash多长时间向Elasticsearch发送一次数据,默认为1秒,根据以上配置,logstash积攒数据未到flush_size 10秒后也会向Elasticsearch发送一次数据。
workers 建议设置为1或2,如果机器性能不错可以设置为2. 不建议设置的更高。
5.2 业务到redis到mongo
filebeat 配置(从日志文件读取到的数据直接缓存至redis队列)
filebeat: # List of prospectors to fetch data. prospectors: # Each - is a prospector. Below are the prospector specific configurations - paths: - /alidata/log/nginx/access/access.log.json input_type: log document_type: nginxacclog ############################# Output ########################################## output: redis: host: "10.160.8.221" port: 26379 password: "123456"
document_type 自定义日志类型,在logstash中可通过type判断做不同的处理。
logstash 配置 (此处logstash用于读取redis list中的数据,然后转存mongodb)
input { redis { host => "10.160.8.221" port => "26379" key => "filebeat" data_type => "list" password => "123456" threads => 50 } redis { host => "10.160.8.221" port => "26379" key => "mycat" data_type => "list" password => "123456" threads => 50 type => "mycat" } } output { if [type] == "mycat" { mongodb{ collection => "mycat%{+yyyyMMdd}" isodate => true database => "logdb" uri => "mongodb://log_user:123456@10.10.1.102:27017/logdb" } } if [type_xi09wnk] == "nginxacclog" { mongodb{ collection => "nginx_accress%{years_dik3k}%{months_dik3k}%{days_dik3k}" isodate => true database => "logdb" uri => "mongodb://log_user:123456@10.10.1.102:27017/logdb" } } }