【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录

问题情形

使用Java SDK编写的Event Hub消费端应用,随机性遇见了某个分区没有消费消息的情况,在检查日志时候,有发现IdelTimeExpired的错误记录。在重启应用后,连接EventHub正常,并又能正常消费数据。比较怀疑的方面,在又开启Retry机制的情况下,为什么分区(Partition)连接断掉后没有重连呢?

错误消息:

{"time":"2020-09-21 05:11:19.578", "level":"ERROR", "thread":"bounded-71", "appName":"events-service", "traceId":"", "spanId":"", "url":"", "clientIp":"", "method":"", "elapse":"", "code":"", "message":"", "class":"c.h.socialhub.eventhub.EventHub", "line":"EventHub.java:150", "msg":"Error occurred while processing events The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'cd8a74181e68151dde4_G28'., errorContext[NAMESPACE: shprod-member.servicebus.chinacloudapi.cn, PATH: xxxx/ConsumerGroups/$default/Partitions/1, REFERENCE_ID: 2_xxxxxxxx LINK_CREDIT: 253]"}

消费端代码:

eventProcessorClient = new EventProcessorClientBuilder()
                .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
                .connectionString(currentEventHubProperty.getConnectionString(), this.topic)
                .retry(retryOptions)
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
                .processEvent(eventContext -> {
                    String currentData = "";
                    try {
                            EventData event = eventContext.getEventData();
                            PartitionContext partitionContext = eventContext.getPartitionContext();

                            EventMessage eventMessage = new EventMessage();
                            currentData = new String(event.getBody(), Charset.defaultCharset());
                            eventMessage.setContent(currentData);
                            eventMessage.setPartitionId(partitionContext.getPartitionId());
                            eventMessage.setSequenceNumber(event.getSequenceNumber());
                            log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic,  partitionContext.getPartitionId(), event.getSequenceNumber(),event.getEnqueuedTime());

                            eventContext.updateCheckpoint();
                        } catch (Exception e) {
                            String msg = e.getMessage();
                            if (StringUtils.isBlank(msg)) {
                                msg = e.getStackTrace().toString();
                            }
                            log.error("Error occurred while do works with events[{}] : {}, data: {} ", this.topic, msg, currentData);
                        }
                })
                .processError(errorContext -> log.error("Error occurred while processing events " + errorContext.getThrowable().getMessage()))
                .buildEventProcessorClient();

分析原因

第一步,需要根据日志来判断当前分区是否在问题时间点闲置了240秒,在此期间没有数据进入该分区中,如日志中有关于每一天消息进入Queue的时间(enqueued time),则可以通过日志分析,如果没有,这可以在代码日志中添加:(这是为了下一次发生问题时候,可以直接在日志中分析)

log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic,  partitionContext.getPartitionId(), event.getSe

而对于已经发生的问题,根据EventHub数据保留的设置,如果Event等信息还在保留时间期内,则可以通过SDK的receiveFromPartition方法来指定需要获取的数据范围,来查看其进入Queue的时间。(注:需要建一个不同的consumer group,不要用$Default,免得连不上),示例代码:https://azuresdkdocs.blob.core.windows.net/$web/java/azure-messaging-eventhubs/5.2.0/index.html

Consume events from an Event Hub partition

To consume events, create an EventHubConsumerAsyncClient or EventHubConsumerClient for a specific consumer group. In addition, a consumer needs to specify where in the event stream to begin receiving events.

Consume events with EventHubConsumerAsyncClient

In the snippet below, we create an asynchronous consumer that receives events from partitionId and only listens to newest events that get pushed to the partition. Developers can begin receiving events from multiple partitions using the same EventHubConsumerAsyncClient by calling receiveFromPartition(String, EventPosition) with another partition id.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();
// Receive newly added events from partition with id "0". EventPosition specifies the position
// within the Event Hub partition to begin consuming events.
consumer.receiveFromPartition("0", EventPosition.latest()).subscribe(event -> {
    // Process each event as it arrives.
});
// add sleep or System.in.read() to receive events before exiting the process.

Consume events with EventHubConsumerClient

Developers can create a synchronous consumer that returns events in batches using an EventHubConsumerClient. In the snippet below, a consumer is created that starts reading events from the beginning of the partition's event stream.

EventHubConsumerClient consumer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();
String partitionId = "<< EVENT HUB PARTITION ID >>";
// Get the first 15 events in the stream, or as many events as can be received within 40 seconds.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 15,
    EventPosition.earliest(), Duration.ofSeconds(40));
for (PartitionEvent event : events) {
    System.out.println("Event: " + event.getData().getBodyAsString());
}

以上。 并没有发现问题是否是应用端逻辑问题还是是SDK端问题,在借鉴了GitHub上的很多相类似的情况后,大部分倾向于Java SDK问题。需要等待Github中的进一步更新:

AmqpEventHubConsumer.IdleTimerExpired in Java EventHubConsumer SDK:https://github.com/Azure/azure-sdk-for-java/issues/11233

 

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
18天前
|
Java 程序员 容器
Java中的变量和常量:数据的‘小盒子’和‘铁盒子’有啥不一样?
在Java中,变量是一个可以随时改变的数据容器,类似于一个可以反复打开的小盒子。定义变量时需指定数据类型和名称。例如:`int age = 25;` 表示定义一个整数类型的变量 `age`,初始值为25。 常量则是不可改变的数据容器,类似于一个锁死的铁盒子,定义时使用 `final` 关键字。例如:`final int MAX_SPEED = 120;` 表示定义一个名为 `MAX_SPEED` 的常量,值为120,且不能修改。 变量和常量的主要区别在于变量的数据可以随时修改,而常量的数据一旦确定就不能改变。常量主要用于防止意外修改、提高代码可读性和便于维护。
|
2月前
|
算法 Java 数据处理
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。
从HashSet到TreeSet,Java集合框架中的Set接口及其实现类以其“不重复性”要求,彻底改变了处理唯一性数据的方式。HashSet基于哈希表实现,提供高效的元素操作;TreeSet则通过红黑树实现元素的自然排序,适合需要有序访问的场景。本文通过示例代码详细介绍了两者的特性和应用场景。
42 6
|
2月前
|
存储 Java API
深入剖析Java Map:不只是存储数据,更是设计艺术的体现!
【10月更文挑战第17天】在Java编程中,Map是一种重要的数据结构,用于存储键值对,并展现了设计艺术的精髓。本文深入剖析了Map的设计原理和使用技巧,包括基本概念、设计艺术(如哈希表与红黑树的空间时间权衡)、以及使用技巧(如选择合适的实现类、避免空指针异常等),帮助读者更好地理解和应用Map。
98 3
|
18天前
|
存储 缓存 安全
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见。本文介绍了使用 `File.createTempFile` 方法和自定义创建临时文件的两种方式,详细探讨了它们的使用场景和注意事项,包括数据缓存、文件上传下载和日志记录等。强调了清理临时文件、确保文件名唯一性和合理设置文件权限的重要性。
42 2
|
18天前
|
Java
Java 8 引入的 Streams 功能强大,提供了一种简洁高效的处理数据集合的方式
Java 8 引入的 Streams 功能强大,提供了一种简洁高效的处理数据集合的方式。本文介绍了 Streams 的基本概念和使用方法,包括创建 Streams、中间操作和终端操作,并通过多个案例详细解析了过滤、映射、归并、排序、分组和并行处理等操作,帮助读者更好地理解和掌握这一重要特性。
25 2
|
23天前
|
存储 分布式计算 Java
存算分离与计算向数据移动:深度解析与Java实现
【11月更文挑战第10天】随着大数据时代的到来,数据量的激增给传统的数据处理架构带来了巨大的挑战。传统的“存算一体”架构,即计算资源与存储资源紧密耦合,在处理海量数据时逐渐显露出其局限性。为了应对这些挑战,存算分离(Disaggregated Storage and Compute Architecture)和计算向数据移动(Compute Moves to Data)两种架构应运而生,成为大数据处理领域的热门技术。
40 2
|
29天前
|
SQL Java OLAP
java实现“数据平滑升级”
java实现“数据平滑升级”
42 2
|
1月前
|
Java Maven Android开发
【Azure Developer】VS Code打包Java maven Project 遇见 BUILD FAILURE
Unknown lifecycle phase "lean". You must specify a valid lifecycle phase or a goal in the format <plugin-prefix>:<goal> or <plugin-group-id>:<plugin-artifact-id>[:<plugin-version>]:<goal>
|
2月前
|
SQL Java 关系型数据库
java连接mysql查询数据(基础版,无框架)
【10月更文挑战第12天】该示例展示了如何使用Java通过JDBC连接MySQL数据库并查询数据。首先在项目中引入`mysql-connector-java`依赖,然后通过`JdbcUtil`类中的`main`方法实现数据库连接、执行SQL查询及结果处理,最后关闭相关资源。
|
2月前
|
Java
Java Set以其“不重复”的特性,为我们提供了一个高效、简洁的处理唯一性约束数据的方式。
【10月更文挑战第16天】在Java编程中,Set接口确保集合中没有重复元素,每个元素都是独一无二的。HashSet基于哈希表实现,提供高效的添加、删除和查找操作;TreeSet则基于红黑树实现,不仅去重还能自动排序。通过这两个实现类,我们可以轻松处理需要唯一性约束的数据,提升代码质量和效率。
39 2