【事件中心 Azure Event Hub】使用Logstash消费EventHub中的event时遇见的几种异常(TimeoutException, ReceiverDisconnectedException)

简介: 【事件中心 Azure Event Hub】使用Logstash消费EventHub中的event时遇见的几种异常(TimeoutException, ReceiverDisconnectedException)

问题描述

使用EFK(Elasticsearch, Fluentd and Kibana)在收集日志的解决方案中, 可以先把日志发送到EventHub中,然后通过Logstash消费EventHub中的事件并分发出去。但是在使用Logstash的过程中,遇见了连接不上EventHub的错误,和Receiver一直不停关闭的问题。 错误的信息分别为:

Exception while initializing stores, not starting partition manager com.microsoft.azure.eventhubs.IllegalEntityException: Failure getting partition ids for event hub

        ... ...

Caused by: com.microsoft.azure.eventhubs.TimeoutException: Opening MessagingFactory timed out.

[WARN ][com.microsoft.azure.eventprocessorhost.PartitionPump][main][cbc2dac224225cd02511820a8ee314e73f1c0800809c9c534154188acb14fbac] host logstash-fe4f6e2e-e260-4522-a3f8-f292a8902dad: 3: Receiver disconnected on create, bad epoch?

com.microsoft.azure.eventhubs.ReceiverDisconnectedException: Receiver 'nil' with a higher epoch '637360547769896558' already exists. Receiver 'nil' with epoch 0 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected. 

问题原因

  • 对于TimeoutException,需要判断是否是当前环境连接不上EventHub服务器,所以可以通过排查网络连接的方式来解决。(Link)

  • 对于ReceiverDisconnectedException,错误是Receiver在不停通过同一个消费组,同一个分区建立连接,当新连接建立时,会导致旧的连接关闭。所以需要检查客户端是不是又多个进程在建立连接或者时多个客户端在消费同一个分区数据

具体的解释可以参考:https://github.com/Azure/azure-event-hubs-spark/blob/master/FAQ.md

Why am I getting a ReceiverDisconnectedException?

In version 2.3.2 and above, the connector uses epoch receivers from the Event Hubs Java client. This only allows one receiver to be open per consumer group-partition combo. To be crystal clear, let's say we have receiverA with an epoch of 0 which is open within consumer group foo on partition 0. Now, if we open a new receiver, receiverB, for the same consumer group and partition with an epoch of 0 (or higher), then receiverA will be disconnected and get the ReceiverDisconnectedException.

In order to avoid this issue, please have one consumer group per Spark application being run. In general, you should have a unique consumer group for each consuming application being run.

Note that this error could happen if the same structured stream is accessed by multiple queries (writers).

Spark will read from the input source and process the dataframe separately for each defined sink. This results in having multiple readers on the same consumer group-partition combo. In order to prevent this, you can create a separate reader for each writer using a separate consumer group or use an intermediate delta table if you are using Databricks.

解决方案

对于TimeoutException问题,只要解决另外客户端环境问题后,问题会得到解决。但是对于ReceiverDisconnectedException则如何解决呢? 由于都是在Logstash中配置,并没有代码可以修改。所以解决这个问题就是要设置Logstash的工作进程,不能让进程数大于分区数。 并且为Logstash在EventHub中单独建立一个消费组。以下是为一个成功通过Logstash消费EventHub的配置

input {
   azure_event_hubs {
      event_hub_connections => ["Endpoint=sb://xxxx.servicebus.chinacloudapi.cn/;SharedAccessKeyName=test;SharedAccessKey=xxxxxxxx=;EntityPath=logstest"]
      threads => 8
      decorate_events => true
     consumer_group => "logs"
     storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxx;AccountKey=xxxxxxx=;EndpointSuffix=core.chinacloudapi.cn"
   }
  } output { stdout {
        }
 }

启动命令为:

./bin/logstash -f config/ehtest.conf -w 1

启动后成功结果如:(成功捕获到EventHub中进入的事件)

在获取连接字符串的过程中,可以参考Logstash中关于EventHub插件的说明文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-azure_event_hubs.html(如要了解全面的EventHub中的配置参数,也可以参考该文档说明)

Event Hub connection string

The plugin uses the connection string to access Azure Events Hubs. Find the connection string here: Azure Portal-> Event Hub -> Shared access polices. The event_hub_connections option passes the Event Hub connection strings for the basic configuration.

注:不要使用默认的消费组或者是与其他应用公用消费组,这样会导致Logstash连接不上。

相关文章
|
SQL 消息中间件 存储
Hue和Kibana有什么区别?Logstash和kafka区别?
本文分享几款hive可视化工具,然后区分一下Hue和Kibana,区分一下Logstash和kafka
372 1
王道408计组汇编语言部分学习总结
用于实现分支结构、循环结构的指令: cmp、 test、 jmp、 jxxx 用于实现函数调用的指令: push、pop、call、 ret 用于实现数据转移的指令: mov
652 0
|
存储 调度 块存储
阿里云连续两年斩获全球存储顶会FAST最佳论文
阿里云连续两年斩获全球存储顶会FAST最佳论文
1047 0
|
JSON 编解码 网络协议
记一次Logstash日志丢失问题
大量的json解析错误,根据日志情况分析,原因是日志数据传输到logstash之后被截断成了多条数据,于是有的数据就解析异常了,自然无法正常到归集到es的索引文档中。
|
缓存
数据结构之 - 链表数据结构详解: 从基础到实现
数据结构之 - 链表数据结构详解: 从基础到实现
528 6
|
消息中间件 存储 供应链
进程间通信方式-----消息队列通信
【10月更文挑战第29天】消息队列通信是一种强大而灵活的进程间通信机制,它通过异步通信、解耦和缓冲等特性,为分布式系统和多进程应用提供了高效的通信方式。在实际应用中,需要根据具体的需求和场景,合理地选择和使用消息队列,以充分发挥其优势,同时注意其可能带来的复杂性和性能开销等问题。
|
机器学习/深度学习 自然语言处理 算法
2024年4月计算机视觉论文推荐
四月的计算机视觉研究涵盖多个子领域,包括扩散模型和视觉语言模型。在扩散模型中,Tango 2通过直接偏好优化改进了文本到音频生成,而Ctrl-Adapter提出了一种有效且通用的框架,用于在图像和视频扩散模型中添加多样控制。视觉语言模型的论文分析了CLIP模型在有限资源下的优化,并探讨了语言引导对低级视觉任务的鲁棒性。图像生成与编辑领域关注3D感知和高质量图像编辑,而视频理解与生成则涉及实时视频转游戏环境和文本引导的剪贴画动画。
313 0
|
Docker 容器
解决docker启动logstash失败的问题(可能原因)
解决docker启动logstash失败的问题(可能原因)
511 0
|
数据可视化 应用服务中间件 Apache
优化集中式日志记录的方法:添加 Logstash 过滤器
优化集中式日志记录的方法:添加 Logstash 过滤器
206 1