十八、.net core(.NET 6)搭建ElasticSearch(ES)系列之使用Logstash通过Rabbitmq接收Serilog日志到ES

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 使用Logstash通过Rabbitmq接收Serilog日志到ES首先,要部署logstash 为了与前面的ElasticSearch版本保持一致,此处Logstash下载的版本也是7.13.1, 下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.13.1-windows-x86_64.zip


使用Logstash通过Rabbitmq接收Serilog日志到ES


首先,要部署logstash


为了与前面的ElasticSearch版本保持一致,此处Logstash下载的版本也是7.13.1,下载地址:

https://artifacts.elastic.co/downloads/logstash/logstash-7.13.1-windows-x86_64.zip

 

解压以后,修改一些配置:


config目录下,修改jvm.options文件,设置内存占用最小值和最大值:如果配置比较低,建议配置成512MB即可,如果电脑或服务器配置比较好,那就请随意。如果只是普通用途,比如记录普通日志啥的,配个4G内基本足够了。

1995789-20210630003408358-1101832066.png


config里面,有一个logstash-sample.conf文件,可以当做参考配置,随后咱们新建一个用于接收RabbitMQ的配置文件。先来写代码~~


package包项目下,新增引用 Serilog.Sinks.RabbitMQ组件:

1995789-20210630003452798-656624434.png

 

然后,在Program文件下面,添加serilog日志写入到RabbitMQ的一些配置:


1995789-20210630003517614-1298019453.png


以上代码如下:


logger.WriteTo.RabbitMQ((clientConfiguration, sinkConfig) =>
                         {
                             clientConfiguration.Username = "wesky";
                             clientConfiguration.Password = "wesky123";
                             clientConfiguration.Exchange = "WeskyExchange";
                             clientConfiguration.ExchangeType = "direct";
                             clientConfiguration.DeliveryMode = RabbitMQDeliveryMode.Durable;
                             clientConfiguration.RouteKey = "WeskyLog";
                             clientConfiguration.Port = 5672;
                             clientConfiguration.Hostnames.Add("127.0.0.1");
                             sinkConfig.TextFormatter = new JsonFormatter();
                         });



以上为了方便,所以写死了,大佬们可以写到配置文件里面去进行读取,这样好一点。

然后,程序启动时候,进行主动创建一个ExchangeWeskyExchange的,RouteKeyWeskyLogs的消息队列,包括生产者和消费者。之前有做过简单的RabbitMQ创建的案例,所以直接在原来的基础上做一些改动:


1995789-20210630003620058-1528936121.png


设置了两个RouteKey:WeskyLogWeskyLog2,以及两个队列 Log1Log2。咱们主要使用WeskyLogLog1

在消费者监听上面,做个过滤,对于队列是Log1的消息,直接返回不做处理,这样做到目的是消息不被消费,让logstash来消费消息:


1995789-20210630003644748-1542967816.png


现在开始配置logstash,上面有一个logstash-sample.conf文件,拷贝一分,重命名为 rabbitmq.conf  然后往里面更改一些配置信息,如下:

1995789-20210630003706332-578748508.png

 

logstash部分配置代码:


input {
  rabbitmq {
    host => "127.0.0.1"
    port => 5672
    user => "wesky"
    password => "wesky123"
    queue => "Log1"
    key => "WeskyLog"
    exchange => "WeskyExchange"
    durable => true
  }
}
filter {
  grok {
    match => {"Timestamp" => "%{TIMESTAMP_ISO8601:ctime}"}
    add_field => ["create_time","%{@timestamp}"]
  }
  date {
    match => ["ctime","yyyy-MM-dd HH:mm:ss.SSS","ISO8601"]
    target => "@timestamp"
  }
  mutate {
    remove_field => ["@version","Properties","Timestamp","ctime"]
    rename => {"MessageTemplate" => "message"}
    rename => {"Level" => "level"}
  }
  ruby {
    code => "event.set('create_time',event.get('@timestamp').time.localtime)"
  }
}
output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "log-%{+YYYYMMdd}"
  }
}

 

注意,配置不能使用Tab,必须只能用空格,每个缩进俩空格。


现在写一个测试的webapi,来看看效果。创建一个webapi,记录两条日志,一条是Infomaton,一条是Error

1995789-20210630003905578-428225220.png


现在启动Wsk.Core程序,试着跑一下看看效果:

 1995789-20210630003921511-32132430.png


哦吼,才发现有其他的日志,所以直接打开RabbitMQ,可以看到日志被写入到了MQ里面,而且因为没有消费,所以队列一直在增加。咱们现在启动一下logstash

启动方式如下图,具体地址那些,需要根据自己具体的目录而定:


1995789-20210630003957028-2097553021.png

可以看见,左边的消息,一下子被消费完毕,说明logstash应该是获取到MQ消息了。

现在我们看一下ElasticSearch上面,是否有消息:


1995789-20210630004027168-1096580294.png

 

查询log-20210629,可以看到对应的日志信息,说明写入ES成功。


kibana上面,选择Discover,然后创建一个log的筛选,用于查询所有以log开头到索引:

 1995789-20210630004132744-1532138510.png


刚添加会有点乱,咱们选择只查看create_timelevelmessage字段信息:


1995789-20210630004157905-1057678579.png

显示内容有点乱,debug信息也都记录了,咱们把这部分过滤掉再启动。配置文件里面,修改最小日志级别为Information:

1995789-20210630004220203-2064396968.png

 

再启动程序,查看效果,瞬间清爽~~~

1995789-20210630004234480-1419018422.png


现在通过上面的webapi,写两个日志看看效果:


1995789-20210630004250344-1127013437.png


控制台有信息了,现在去ES上面看下日志信息:


1995789-20210630004309522-684136523.png

 

可以看见日志也有了。现在试一下自带搜索引擎的查询的效果:


1995789-20210630004334780-332635573.png


1995789-20210630004349114-1611215746.png

 

说明查询也是OK的。

 

另外需要注意一点:


我这边索引还是log-20210629,但是实际上已经是2021630日的024分,这个是因为ES默认是0区,咱们中国是东八区,所以会自动少8个小时进行存储。Kibana上面查询的日志时间却正常的,这个是因为Kibana默认会读取浏览器的时区,自动帮我们转换进行显示了。


如果搜索日志时候,发现搜索的是单个字,没有词组那些,那可能是因为没有添加中文分词的原因。添加中文分词以及中文分词插件,可以加群索取哦~~


相关实践学习
利用Elasticsearch实现地理位置查询
本实验将分别介绍如何使用Elasticsearch7.10版本进行全文检索、多语言检索和地理位置查询三个Elasticsearch基础检索子场景的实现。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
2月前
|
Dubbo Java 应用服务中间件
微服务框架(三十)Logstash Kong 日志上报
此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现,然后通过gitlab-CI以持续集成为Docker镜像。 本文为Logstash Kong 日志上报 本系列文章中所使用的框架版本为Spring Boot 2.0.3-...
|
4天前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ产品使用合集之如何关闭客户端的日志记录
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
Oracle 关系型数据库 API
实时计算 Flink版产品使用合集之当sink到elasticsearch时,可以指定es的指定字段吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用合集之当sink到elasticsearch时,可以指定es的指定字段吗
|
2月前
Elasticsearch【问题记录 02】【不能以root运行es + max virtual memory areas vm.max_map_count [65530] is too low处理】
【4月更文挑战第12天】Elasticsearch【问题记录 02】【不能以root运行es + max virtual memory areas vm.max_map_count [65530] is too low处理】
29 3
|
3天前
|
SQL JSON 数据处理
5% 消耗,6 倍性能:揭秘新一代 iLogtail SPL 日志处理引擎与 Logstash 的 PK
在本文中,我们将深入探讨为何选择 iLogtail,以及它在 SPL 数据处理方面相较于 Logstash 有何独特优势。通过对比这两款工具的架构、性能以及功能,我们希望能够揭示 iLogtail 如何在日益复杂的日志处理需求中脱颖而出,帮助您做出明智的技术选择。
|
25天前
|
JSON 搜索推荐 大数据
Elasticsearch:从 ES|QL 到 PHP 对象
【6月更文挑战第9天】Elasticsearch 是一款强大的开源搜索引擎,适用于大数据处理和分析。在 PHP 开发中,使用 ES|QL 构建复杂查询后,通常需将查询结果转换为 PHP 对象。通过 `json_decode()` 函数解析 JSON 数据,可以实现这一目标。示例代码展示了如何将 Elasticsearch 响应转换为 PHP 对象并遍历数据。这样,我们可以进一步处理和操作数据,适应不同项目需求。随着技术和方法的更新,不断学习和适应将提升我们在开发中的效率和创新力。
44 10
|
2天前
|
存储 关系型数据库 MySQL
【Elasticsearch】在es中实现mysql中的FIND_IN_SET查询条件
【Elasticsearch】在es中实现mysql中的FIND_IN_SET查询条件
5 0
|
2月前
|
监控 应用服务中间件 nginx
使用 Docker Compose V2 快速搭建日志分析平台 ELK (Elasticsearch、Logstash 和 Kibana)
ELK的架构有多种,本篇分享使用的架构如图所示: Beats(Filebeat) -> -> Elasticsearch -> Kibana,目前生产环境一天几千万的日志,内存占用大概 10G
84 4
|
2月前
|
Prometheus 监控 Cloud Native
实时计算 Flink版操作报错之在使用ES时遇到“java.lang.IllegalStateException: The elasticsearch emitter must be serializable”,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
SQL 监控 API
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗