使用Logstash通过Rabbitmq接收Serilog日志到ES
首先,要部署logstash
为了与前面的ElasticSearch版本保持一致,此处Logstash下载的版本也是7.13.1,下载地址:
https://artifacts.elastic.co/downloads/logstash/logstash-7.13.1-windows-x86_64.zip
解压以后,修改一些配置:
在config目录下,修改jvm.options文件,设置内存占用最小值和最大值:如果配置比较低,建议配置成512MB即可,如果电脑或服务器配置比较好,那就请随意。如果只是普通用途,比如记录普通日志啥的,配个4G内基本足够了。
在config里面,有一个logstash-sample.conf文件,可以当做参考配置,随后咱们新建一个用于接收RabbitMQ的配置文件。先来写代码~~
在package包项目下,新增引用 Serilog.Sinks.RabbitMQ组件:
然后,在Program文件下面,添加serilog日志写入到RabbitMQ的一些配置:
以上代码如下:
logger.WriteTo.RabbitMQ((clientConfiguration, sinkConfig) => { clientConfiguration.Username = "wesky"; clientConfiguration.Password = "wesky123"; clientConfiguration.Exchange = "WeskyExchange"; clientConfiguration.ExchangeType = "direct"; clientConfiguration.DeliveryMode = RabbitMQDeliveryMode.Durable; clientConfiguration.RouteKey = "WeskyLog"; clientConfiguration.Port = 5672; clientConfiguration.Hostnames.Add("127.0.0.1"); sinkConfig.TextFormatter = new JsonFormatter(); });
以上为了方便,所以写死了,大佬们可以写到配置文件里面去进行读取,这样好一点。
然后,程序启动时候,进行主动创建一个Exchange为WeskyExchange的,RouteKey是WeskyLogs的消息队列,包括生产者和消费者。之前有做过简单的RabbitMQ创建的案例,所以直接在原来的基础上做一些改动:
设置了两个RouteKey:WeskyLog和WeskyLog2,以及两个队列 Log1和Log2。咱们主要使用WeskyLog和 Log1。
在消费者监听上面,做个过滤,对于队列是Log1的消息,直接返回不做处理,这样做到目的是消息不被消费,让logstash来消费消息:
现在开始配置logstash,上面有一个logstash-sample.conf文件,拷贝一分,重命名为 rabbitmq.conf 然后往里面更改一些配置信息,如下:
logstash部分配置代码:
input { rabbitmq { host => "127.0.0.1" port => 5672 user => "wesky" password => "wesky123" queue => "Log1" key => "WeskyLog" exchange => "WeskyExchange" durable => true } } filter { grok { match => {"Timestamp" => "%{TIMESTAMP_ISO8601:ctime}"} add_field => ["create_time","%{@timestamp}"] } date { match => ["ctime","yyyy-MM-dd HH:mm:ss.SSS","ISO8601"] target => "@timestamp" } mutate { remove_field => ["@version","Properties","Timestamp","ctime"] rename => {"MessageTemplate" => "message"} rename => {"Level" => "level"} } ruby { code => "event.set('create_time',event.get('@timestamp').time.localtime)" } } output { elasticsearch { hosts => ["http://localhost:9200"] index => "log-%{+YYYYMMdd}" } }
注意,配置不能使用Tab,必须只能用空格,每个缩进俩空格。
现在写一个测试的webapi,来看看效果。创建一个webapi,记录两条日志,一条是Infomaton,一条是Error:
现在启动Wsk.Core程序,试着跑一下看看效果:
哦吼,才发现有其他的日志,所以直接打开RabbitMQ,可以看到日志被写入到了MQ里面,而且因为没有消费,所以队列一直在增加。咱们现在启动一下logstash。
启动方式如下图,具体地址那些,需要根据自己具体的目录而定:
可以看见,左边的消息,一下子被消费完毕,说明logstash应该是获取到MQ消息了。
现在我们看一下ElasticSearch上面,是否有消息:
查询log-20210629,可以看到对应的日志信息,说明写入ES成功。
在kibana上面,选择Discover,然后创建一个log的筛选,用于查询所有以log开头到索引:
刚添加会有点乱,咱们选择只查看create_time、level和message字段信息:
显示内容有点乱,debug信息也都记录了,咱们把这部分过滤掉再启动。配置文件里面,修改最小日志级别为Information:
再启动程序,查看效果,瞬间清爽~~~
现在通过上面的webapi,写两个日志看看效果:
控制台有信息了,现在去ES上面看下日志信息:
可以看见日志也有了。现在试一下自带搜索引擎的查询的效果:
说明查询也是OK的。
另外需要注意一点:
我这边索引还是log-20210629,但是实际上已经是2021年6月30日的0点24分,这个是因为ES默认是0区,咱们中国是东八区,所以会自动少8个小时进行存储。Kibana上面查询的日志时间却正常的,这个是因为Kibana默认会读取浏览器的时区,自动帮我们转换进行显示了。
如果搜索日志时候,发现搜索的是单个字,没有词组那些,那可能是因为没有添加中文分词的原因。添加中文分词以及中文分词插件,可以加群索取哦~~