【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录

问题情形

使用Java SDK编写的Event Hub消费端应用,随机性遇见了某个分区没有消费消息的情况,在检查日志时候,有发现IdelTimeExpired的错误记录。在重启应用后,连接EventHub正常,并又能正常消费数据。比较怀疑的方面,在又开启Retry机制的情况下,为什么分区(Partition)连接断掉后没有重连呢?

错误消息:

{"time":"2020-09-21 05:11:19.578", "level":"ERROR", "thread":"bounded-71", "appName":"events-service", "traceId":"", "spanId":"", "url":"", "clientIp":"", "method":"", "elapse":"", "code":"", "message":"", "class":"c.h.socialhub.eventhub.EventHub", "line":"EventHub.java:150", "msg":"Error occurred while processing events The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'cd8a74181e68151dde4_G28'., errorContext[NAMESPACE: shprod-member.servicebus.chinacloudapi.cn, PATH: xxxx/ConsumerGroups/$default/Partitions/1, REFERENCE_ID: 2_xxxxxxxx LINK_CREDIT: 253]"}

消费端代码:

eventProcessorClient = new EventProcessorClientBuilder()
                .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
                .connectionString(currentEventHubProperty.getConnectionString(), this.topic)
                .retry(retryOptions)
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
                .processEvent(eventContext -> {
                    String currentData = "";
                    try {
                            EventData event = eventContext.getEventData();
                            PartitionContext partitionContext = eventContext.getPartitionContext();

                            EventMessage eventMessage = new EventMessage();
                            currentData = new String(event.getBody(), Charset.defaultCharset());
                            eventMessage.setContent(currentData);
                            eventMessage.setPartitionId(partitionContext.getPartitionId());
                            eventMessage.setSequenceNumber(event.getSequenceNumber());
                            log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic,  partitionContext.getPartitionId(), event.getSequenceNumber(),event.getEnqueuedTime());

                            eventContext.updateCheckpoint();
                        } catch (Exception e) {
                            String msg = e.getMessage();
                            if (StringUtils.isBlank(msg)) {
                                msg = e.getStackTrace().toString();
                            }
                            log.error("Error occurred while do works with events[{}] : {}, data: {} ", this.topic, msg, currentData);
                        }
                })
                .processError(errorContext -> log.error("Error occurred while processing events " + errorContext.getThrowable().getMessage()))
                .buildEventProcessorClient();

分析原因

第一步,需要根据日志来判断当前分区是否在问题时间点闲置了240秒,在此期间没有数据进入该分区中,如日志中有关于每一天消息进入Queue的时间(enqueued time),则可以通过日志分析,如果没有,这可以在代码日志中添加:(这是为了下一次发生问题时候,可以直接在日志中分析)

log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic,  partitionContext.getPartitionId(), event.getSe

而对于已经发生的问题,根据EventHub数据保留的设置,如果Event等信息还在保留时间期内,则可以通过SDK的receiveFromPartition方法来指定需要获取的数据范围,来查看其进入Queue的时间。(注:需要建一个不同的consumer group,不要用$Default,免得连不上),示例代码:https://azuresdkdocs.blob.core.windows.net/$web/java/azure-messaging-eventhubs/5.2.0/index.html

Consume events from an Event Hub partition

To consume events, create an EventHubConsumerAsyncClient or EventHubConsumerClient for a specific consumer group. In addition, a consumer needs to specify where in the event stream to begin receiving events.

Consume events with EventHubConsumerAsyncClient

In the snippet below, we create an asynchronous consumer that receives events from partitionId and only listens to newest events that get pushed to the partition. Developers can begin receiving events from multiple partitions using the same EventHubConsumerAsyncClient by calling receiveFromPartition(String, EventPosition) with another partition id.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();
// Receive newly added events from partition with id "0". EventPosition specifies the position
// within the Event Hub partition to begin consuming events.
consumer.receiveFromPartition("0", EventPosition.latest()).subscribe(event -> {
    // Process each event as it arrives.
});
// add sleep or System.in.read() to receive events before exiting the process.

Consume events with EventHubConsumerClient

Developers can create a synchronous consumer that returns events in batches using an EventHubConsumerClient. In the snippet below, a consumer is created that starts reading events from the beginning of the partition's event stream.

EventHubConsumerClient consumer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();
String partitionId = "<< EVENT HUB PARTITION ID >>";
// Get the first 15 events in the stream, or as many events as can be received within 40 seconds.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 15,
    EventPosition.earliest(), Duration.ofSeconds(40));
for (PartitionEvent event : events) {
    System.out.println("Event: " + event.getData().getBodyAsString());
}

以上。 并没有发现问题是否是应用端逻辑问题还是是SDK端问题,在借鉴了GitHub上的很多相类似的情况后,大部分倾向于Java SDK问题。需要等待Github中的进一步更新:

AmqpEventHubConsumer.IdleTimerExpired in Java EventHubConsumer SDK:https://github.com/Azure/azure-sdk-for-java/issues/11233

 

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
5月前
|
JavaScript 前端开发 API
【Azure Developer】use @azure/arm-monitor sdk 遇见 ManagedIdentityCredential authentication failed.(status code 500)
【Azure Developer】use @azure/arm-monitor sdk 遇见 ManagedIdentityCredential authentication failed.(status code 500)
|
5月前
|
存储 Java API
【Azure 存储服务】Java Storage SDK 调用 uploadWithResponse 代码示例(询问ChatGTP得代码原型后人力验证)
【Azure 存储服务】Java Storage SDK 调用 uploadWithResponse 代码示例(询问ChatGTP得代码原型后人力验证)
|
2月前
|
Java 开发工具 Windows
【Azure App Service】在App Service中调用Stroage SDK上传文件时遇见 System.OutOfMemoryException
System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.
|
3月前
|
JavaScript 前端开发 开发工具
【Azure Developer】使用JavaScript通过SDK进行monitor-query的client认证报错问题
AADSTS90002: Tenant 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' not found. Check to make sure you have the correct tenant ID and are signing into the correct cloud. Check with your subscription administrator, this may happen if there are no active subscriptions for the tenant.
|
4月前
|
Kubernetes API 开发工具
【Azure Developer】通过SDK(for python)获取Azure服务生命周期信息
需要通过Python SDK获取Azure服务的一些通知信息,如:K8S版本需要更新到指定的版本,Azure服务的维护通知,服务处于不健康状态时的通知,及相关的操作建议等内容。
58 18
|
5月前
|
Java 开发工具
通过Java SDK调用阿里云模型服务
在阿里云平台上,可以通过创建应用并使用模型服务完成特定任务,如生成文章内容。本示例展示了一段简化的Java代码,演示了如何调用阿里云模型服务生成关于“春秋战国经济与文化”的简短文章。示例代码通过设置系统角色为历史学家,并提出文章生成需求,最终处理并输出生成的文章内容。在实际部署前,请确保正确配置环境变量中的密钥和ID,并根据需要调整SDK导入语句及类名。更多详情和示例,请参考相关链接。
|
5月前
|
存储 API 开发工具
【Azure Storage Blob】如何通过.NET Azure Storage Blobs SDK获取到Blob的MD5值呢?
【Azure Storage Blob】如何通过.NET Azure Storage Blobs SDK获取到Blob的MD5值呢?
|
5月前
|
Java 开发工具
【Azure Developer】示例: 在中国区调用MSGraph SDK通过User principal name获取到User信息,如Object ID
【Azure Developer】示例: 在中国区调用MSGraph SDK通过User principal name获取到User信息,如Object ID
|
5月前
|
网络安全 开发工具 Python
【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误
【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误
|
5月前
|
API 开发工具 网络架构
【Azure Developer】使用Python SDK去Azure Container Instance服务的Execute命令的疑问解释
【Azure Developer】使用Python SDK去Azure Container Instance服务的Execute命令的疑问解释
【Azure Developer】使用Python SDK去Azure Container Instance服务的Execute命令的疑问解释