【事件中心 Azure Event Hub】使用Logstash消费EventHub中的event时遇见的几种异常(TimeoutException, ReceiverDisconnectedException)

简介: 【事件中心 Azure Event Hub】使用Logstash消费EventHub中的event时遇见的几种异常(TimeoutException, ReceiverDisconnectedException)

问题描述

使用EFK(Elasticsearch, Fluentd and Kibana)在收集日志的解决方案中, 可以先把日志发送到EventHub中,然后通过Logstash消费EventHub中的事件并分发出去。但是在使用Logstash的过程中,遇见了连接不上EventHub的错误,和Receiver一直不停关闭的问题。 错误的信息分别为:

Exception while initializing stores, not starting partition manager com.microsoft.azure.eventhubs.IllegalEntityException: Failure getting partition ids for event hub

        ... ...

Caused by: com.microsoft.azure.eventhubs.TimeoutException: Opening MessagingFactory timed out.

[WARN ][com.microsoft.azure.eventprocessorhost.PartitionPump][main][cbc2dac224225cd02511820a8ee314e73f1c0800809c9c534154188acb14fbac] host logstash-fe4f6e2e-e260-4522-a3f8-f292a8902dad: 3: Receiver disconnected on create, bad epoch?

com.microsoft.azure.eventhubs.ReceiverDisconnectedException: Receiver 'nil' with a higher epoch '637360547769896558' already exists. Receiver 'nil' with epoch 0 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected. 

问题原因

  • 对于TimeoutException,需要判断是否是当前环境连接不上EventHub服务器,所以可以通过排查网络连接的方式来解决。(Link)

  • 对于ReceiverDisconnectedException,错误是Receiver在不停通过同一个消费组,同一个分区建立连接,当新连接建立时,会导致旧的连接关闭。所以需要检查客户端是不是又多个进程在建立连接或者时多个客户端在消费同一个分区数据

具体的解释可以参考:https://github.com/Azure/azure-event-hubs-spark/blob/master/FAQ.md

Why am I getting a ReceiverDisconnectedException?

In version 2.3.2 and above, the connector uses epoch receivers from the Event Hubs Java client. This only allows one receiver to be open per consumer group-partition combo. To be crystal clear, let's say we have receiverA with an epoch of 0 which is open within consumer group foo on partition 0. Now, if we open a new receiver, receiverB, for the same consumer group and partition with an epoch of 0 (or higher), then receiverA will be disconnected and get the ReceiverDisconnectedException.

In order to avoid this issue, please have one consumer group per Spark application being run. In general, you should have a unique consumer group for each consuming application being run.

Note that this error could happen if the same structured stream is accessed by multiple queries (writers).

Spark will read from the input source and process the dataframe separately for each defined sink. This results in having multiple readers on the same consumer group-partition combo. In order to prevent this, you can create a separate reader for each writer using a separate consumer group or use an intermediate delta table if you are using Databricks.

解决方案

对于TimeoutException问题,只要解决另外客户端环境问题后,问题会得到解决。但是对于ReceiverDisconnectedException则如何解决呢? 由于都是在Logstash中配置,并没有代码可以修改。所以解决这个问题就是要设置Logstash的工作进程,不能让进程数大于分区数。 并且为Logstash在EventHub中单独建立一个消费组。以下是为一个成功通过Logstash消费EventHub的配置

input {
   azure_event_hubs {
      event_hub_connections => ["Endpoint=sb://xxxx.servicebus.chinacloudapi.cn/;SharedAccessKeyName=test;SharedAccessKey=xxxxxxxx=;EntityPath=logstest"]
      threads => 8
      decorate_events => true
     consumer_group => "logs"
     storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxx;AccountKey=xxxxxxx=;EndpointSuffix=core.chinacloudapi.cn"
   }
  } output { stdout {
        }
 }

启动命令为:

./bin/logstash -f config/ehtest.conf -w 1

启动后成功结果如:(成功捕获到EventHub中进入的事件)

在获取连接字符串的过程中,可以参考Logstash中关于EventHub插件的说明文档:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-azure_event_hubs.html(如要了解全面的EventHub中的配置参数,也可以参考该文档说明)

Event Hub connection string

The plugin uses the connection string to access Azure Events Hubs. Find the connection string here: Azure Portal-> Event Hub -> Shared access polices. The event_hub_connections option passes the Event Hub connection strings for the basic configuration.

注:不要使用默认的消费组或者是与其他应用公用消费组,这样会导致Logstash连接不上。

相关文章
|
4月前
|
Java
【Azure 事件中心】向Event Hub发送数据异常 : partitionId[null]: Sending messages timed out
【Azure 事件中心】向Event Hub发送数据异常 : partitionId[null]: Sending messages timed out
|
4月前
|
Java 网络安全 开发工具
【Azure 事件中心】Event Hub 无法连接,出现 Did not observe any item or terminal signal within 60000ms in 'flatMapMany' 的错误消息
【Azure 事件中心】Event Hub 无法连接,出现 Did not observe any item or terminal signal within 60000ms in 'flatMapMany' 的错误消息
144 1
|
4月前
【Azure 事件中心】Azure Event Hub客户端遇见 Expired Heartbeat 错误
【Azure 事件中心】Azure Event Hub客户端遇见 Expired Heartbeat 错误
|
4月前
|
Java 开发工具
【事件中心 Azure Event Hub】关于EventHub中出现Error时候的一些问题(偶发错误,EventHub后台升级,用户端错误,Retry机制的重要性)
【事件中心 Azure Event Hub】关于EventHub中出现Error时候的一些问题(偶发错误,EventHub后台升级,用户端错误,Retry机制的重要性)
|
4月前
|
监控 Java 开发工具
【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录
【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录
|
4月前
|
消息中间件 存储 Kafka
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
|
4月前
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
|
4月前
|
运维
【Azure Event Hub】自定义告警(Alert Rule)用来提示Event Hub的消息incoming(生产)与outgoing(消费)的异常情况
【Azure Event Hub】自定义告警(Alert Rule)用来提示Event Hub的消息incoming(生产)与outgoing(消费)的异常情况
|
4月前
|
消息中间件 开发工具
【Azure 事件中心】Event Hub 消费端出现 Timeout Exception,errorContext中 LINK_CREDIT为0的解释
【Azure 事件中心】Event Hub 消费端出现 Timeout Exception,errorContext中 LINK_CREDIT为0的解释
|
4月前
|
JSON 数据格式 Python
【Azure 应用服务】Azure Function Python函数中,如何获取Event Hub Trigger的消息Event所属于的PartitionID呢?
【Azure 应用服务】Azure Function Python函数中,如何获取Event Hub Trigger的消息Event所属于的PartitionID呢?