logstash的关键配置.这个配置文件的目的是这样的:
1.使用udp端口9998 接受来自数据源的信息;
2.通过filter插件,先把message转成json格式,这一步很重要,很多文章里都不提这个;
3.判断接受到的数据里,是否包含app_name这个key,如果包含,就把app_name里的值提出来,和当前日期组合到一起,作为传入elasticsearch的index名字。如果不包含,就生成一个字符串non_index_log作为elasticsearch的index名字;
4.把日志转发到elasticsearch里,同时打印在控制台里
input { udp { port => 9998 } } filter { json { source => "message" } if [app_name] { mutate { add_field => { "index_name" => "%{app_name}-%{+YYYY.MM.dd}" } } } else { mutate { add_field => { "index_name" => "non_index_log" } } } } output { elasticsearch { hosts => ["http://elasticsearch-master:9200"] index => "logstash-roc-%{index_name}" } stdout { codec => rubydebug } }
为了测试这样配置之后的logstash效果如何,我们可以使用一个python代码
# All rights reserved. # # filename : example2.py # description : # # created by zhenwei.Li at 2020/9/2 15:52 import logging import logstash import sys host = '192.168.0.12' test_logger = logging.getLogger('python-logstash-logger') test_logger.setLevel(logging.INFO) test_logger.addHandler(logstash.LogstashHandler(host, 9998, version=1)) test_logger.error('python-logstash: test logstash error message.') # add extra field to logstash message extra = { 'app_name': 'temp-lizhenwei-python2', 'test_string': 'python version: ' + repr(sys.version_info), 'test_boolean': True, 'test_dict': {'a': 1, 'b': 'c'}, 'test_float': 1.23, 'test_integer': 123, 'type': 'temp-lishuai-python3' } test_logger.info('python-logstash: test extra fields', extra=extra)
通过运行上述代码,我们可以去检查logstash的控制台是否产生了index_name,理论上来说,会产生一个
"index_name" => "temp-lizhenwei-python2-2020.09.03"和"index_name" => "non_index_log"
如下图所示
最后,我们去kibana里面检查一下elasticsearch的index,确实产生了我们需要的两个index