一、ELFK集群部署(Filebeat+ELK)
ELFK= ES + logstash+filebeat+kibana
实验环境
服务器类型 | 系统和IP地址 | 需要安装的组件 | 硬件方面 |
node1节点 | 192.168.121.10 | JDK、elasticsearch-6.7.2、kibana-6.7.2 | 2核4G |
node2节点 | 192.168.121.20 | JDK、elasticsearch-6.7.2 | 2核4G |
apache节点 | 192.168.121.30 | JDK、apache、logstash-6.7.2、filebeat-6.7.2 | 2核4G |
注意:ELFK各安装包的版本要一致,或相近。
实验步骤:
1.1 部署ELK集群
ELK集群的部署在前一篇文章中有详细介绍,可参考前一篇文章。
注意:ELFK集群环境下,Logstash 组件所在节点的/etc/logstash/conf.d目录下,不需要创建system.conf配置文件,即Logstash不需要收集系统日志,因为系统日志将由filebeat收集后发送给Logstash。(安装filebeat后,Logstash会创建filebeat.conf配置文件获取filebeat传来的数据)
1.2 安装 Filebeat(在apache节点操作)
#上传软件包 filebeat-6.2.4-linux-x86_64.tar.gz 到/opt目录 tar zxvf filebeat-6.2.4-linux-x86_64.tar.gz mv filebeat-6.2.4-linux-x86_64/ /usr/local/filebeat 复制代码
1.3 设置 filebeat 的主配置文件
cd /usr/local/filebeat vim filebeat.yml filebeat.prospectors: - type: log #指定 log 类型,从日志文件中读取消息 enabled: true paths: - /var/log/messages #指定监控的日志文件 - /var/log/*.log tags: ["sys"] #设置索引标签 fields: #可以使用 fields 配置选项设置一些参数字段添加到 output 中 service_name: filebeat log_type: syslog from: 192.168.121.30 --------------Elasticsearch output------------------- (全部注释掉) ----------------Logstash output--------------------- output.logstash: hosts: ["192.168.121.30:5044"] #指定 logstash 的 IP 和端口 #启动 filebeat nohup ./filebeat -e -c filebeat.yml > filebeat.out & #-e:输出到标准输出,禁用syslog/文件输出 #-c:指定配置文件 #nohup:在系统后台不挂断地运行命令,退出终端不会影响程序的运行 复制代码
1.4 在 Logstash 组件所在节点上新建一个 Logstash 配置文件
cd /etc/logstash/conf.d vim filebeat.conf input { beats { port => "5044" } } output { elasticsearch { hosts => ["192.168.121.10:9200","192.168.121.20:9200"] index => "%{[fields][service_name]}-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } } #启动 logstash logstash -f filebeat.conf 复制代码
5、浏览器访问
浏览器访问 http://192.168.121.10:5601 登录 Kibana, 单击“Create Index Pattern”按钮添加索引“filebeat-*”,单击 “create” 按钮创建,单击 “Discover” 按钮可查看图表信息及日志信息。 复制代码
二、Logstash的过滤模块
2.1 Logstash配置文件中的模块
1、input {}
- 指定输入流,通过file、beats、kafka、redis中获取数据
2、filter {}
常用插件:
- grok:对若干个大文本字段进行再分割,分割成一些小字段 (?<字段名>正则表达式) 字段名:正则表示匹配到的内容
- date:对数据中的时间进行统一格式化
- mutate:对一些无用的字段进行剔除,或增加字段
- mutiline:对多行数据进行统一编排,多行合并和拆分
3、ourput {}
- elasticsearch stdout
2.2 Filter(过滤模块)中的插件
而对于 Logstash 的 Filter,这个才是 Logstash 最强大的地方。Filter 插件也非常多,我们常用到的 grok、date、mutate、mutiline 四个插件。
对于 filter 的各个插件执行流程,可以看下面这张图:
网络异常,图片无法展示
|
grok插件(通过grok插件实现对字段的分割,使用通配符)
这里就要用到 logstash 的 filter 中的 grok 插件。filebeat 发送给 logstash 的日志内容会放到message 字段里面,logstash 匹配这个 message 字段就可以了。
格式:
匹配格式:(?<字段名>正则表达式) # 字段名:正则表达式匹配到的内容 复制代码
实例1:
(?<remote_addr>%{IPV6}|%{IPV4} )(?<other_info>.+) #对数据进行分割ip字段名为remote_addr, 其他字段名为other_info 复制代码
实例2:
(?<remote_addr>%{IPV6}|%{IPV4} )[\s-]+[(?<log_time>.+)](?<other_info>.+) #添加匹配时间字段 复制代码
实例3:
#分割多个字段 (?<remote_addr>%{IPV6}|%{IPV4})[\s-]+[(?<log_time>.+)]\s+"(?<http_method>\S+)\s+(?<url-path>.+)"\s+(?<rev_code>\d+)(?<other_info>.+) 复制代码
实例4:
cd /etc/logstash/conf.d/ cp filebeat.conf filter.conf vim filter.conf input { beats { port => "5044" } } filter { grok { match =>["message","(?<remote_addr>%{IPV6}|%{IPV4} )[\s-]+[(?<log_time>.+)]\s+"(?<http_method>\S+)\s+(?<url-path>.+)"\s+(?<rev_code>\d+)(?<other_info>.+)"] } } output { elasticsearch { hosts => ["192.168.121.10:9200","192.168.121.20:9200"] index => "{[filter][service_name]}-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } } logstash -f filter.conf #启动