【Azure 事件中心】适用Mirror Maker生产数据发送到Azure Event Hub出现发送一段时间后Timeout Exception: Expiring 18 record(s) for xxxxxxx: 79823 ms has passed since last append

简介: 【Azure 事件中心】适用Mirror Maker生产数据发送到Azure Event Hub出现发送一段时间后Timeout Exception: Expiring 18 record(s) for xxxxxxx: 79823 ms has passed since last append

问题描述

根据“将 Apache Kafka MirrorMaker 与事件中心配合使用”一文,成功配置了Mirror Maker来发送数据到Event Hub中。为什么只能成功运行一会(10分钟 ~ 2小时左右)就会出现Timeout Exception,然后Kafka MirrorMaker就会中断退出呢?

异常消息为:

[2022-05-25 14:29:21,683] INFO [Producer clientId=mirror_maker_producer] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms. 
(org.apache.kafka.clients.producer.KafkaProducer)
[2022-05-25 14:29:21,683] DEBUG [Producer clientId=mirror_maker_producer] Kafka producer has been closed (org.apache.kafka.clients.producer.KafkaProducer)
[2022-05-25 14:29:21,683] ERROR Error when sending message to topic xxxxxxxxxxxx with key: 16 bytes, value: 875 bytes with error: 
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 18 record(s) for xxxxxxxxxxxx-4: 79823 ms has passed since last append
[2022-05-25 14:29:21,683] INFO Closing producer due to send failure. (kafka.tools.MirrorMaker$)
[2022-05-25 14:29:21,683] INFO [Producer clientId=mirror_maker_producer] Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2022-05-25 14:29:21,683] INFO [Producer clientId=mirror_maker_producer] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms. 
(org.apache.kafka.clients.producer.KafkaProducer)
[2022-05-25 14:29:21,683] DEBUG [Producer clientId=mirror_maker_producer] Kafka producer has been closed (org.apache.kafka.clients.producer.KafkaProducer)
[2022-05-25 14:29:21,683] ERROR Error when sending message to topic xxxxxxxxxxxx with key: 16 bytes, value: 875 bytes with error: 
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 18 record(s) for xxxxxxxxxxxx-4: 79823 ms has passed since last append
[2022-05-25 14:29:21,683] INFO Closing producer due to send failure. (kafka.tools.MirrorMaker$)
[2022-05-25 14:29:21,683] INFO [Producer clientId=mirror_maker_producer] Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer)

 

问题解析

根据错误消息 " Expiring 18 record(s) for xxxxxxxxxxxx-4: 79823 ms has passed since last append " 在网上进行搜索,对发生问题的解释有:

1) Stack Overflow上的解释:https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has

The error indicates that some records are put into the queue at a faster rate than they can be sent from the client.

错误表示Mirror Maker中消息进入Queue中的速度快于从当前客户端发送到服务端的速度(服务端是 Event Hub)

When your Producer sends messages, they are stored in buffer (before sending them to the target broker) and the records are grouped together into batches in order to increase throughput. When a new record is added to the batch, it must be sent within a -configurable- time window which is controlled by request.timeout.ms (the default is set to 30 seconds). If the batch is in the queue for longer time, a TimeoutException is thrown and the batch records will then be removed from the queue and won't be delivered to the broker.

当Mirror Maker(生产者)发送消息时,它们被存储在缓冲区中(在将它们发送到目标代理之前),并且记录被分组到一起以增加吞吐量。 将新记录添加到批次时,它必须在由 request.timeout.ms 控制的时间窗口内发送(默认设置为 30 秒)。 如果批处理在队列中的时间较长,则会引发 TimeoutException,然后批处理记录将从队列中删除并且不会传递给代理。

Increasing the value of request.timeout.ms should do the trick for you.

增加 request.timeout.ms 的值应该可以解决问题。

 

2) 博客园解释:https://blog.csdn.net/weixin_43432984/article/details/109180842

当每一批消息满了(batch.size)且 requestTimeoutMs < (now - this.lastAppendTime)) 这一批消息就会被标记为过期且不会放到 RecordAccumulator 中(不会再次重试发送)

调大batch.size 参数和request.timeout.ms 参数可解决问题

 

3) 为什么一出现异常就马上停止运行呢?

因为Mirror Maker的配置参数,abort.on.send.failure 默认为true,决定生产者写入失败时的处理机制就是Abort,终止发送。

 

maybeExpire 函数的源码中发现异常消息产生的根源:

 

所以根据以上的信息,只需要修改 batch.size 和 request.timeout.ms 参数即可。

修改前:
    request.timeout.ms = 60000
    batch.size = 16384
修改后:
    request.timeout.ms=180000
    batch.size=50000

使用修改后的参数运行Mirror Maker,发送数据到Azure Event Hub正常,连续运行2天没见Timeout 异常。问题解决!

 

参考资料

记一次Kafka Producer TimeoutException排查:https://blog.csdn.net/weixin_43432984/article/details/109180842

How to fix kafka.common.errors.TimeoutException: Expiring 1 record(s) xxx ms has passed since batch creation plus linger timehttps://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has

将 Apache Kafka MirrorMaker 与事件中心配合使用:https://docs.azure.cn/zh-cn/event-hubs/event-hubs-kafka-mirror-maker-tutorial

kafka-mirror-maker.sh脚本: https://blog.csdn.net/qq_41154944/article/details/108282641

 

相关文章
|
25天前
|
Java
【Azure 事件中心】向Event Hub发送数据异常 : partitionId[null]: Sending messages timed out
【Azure 事件中心】向Event Hub发送数据异常 : partitionId[null]: Sending messages timed out
|
25天前
|
Java 网络安全 开发工具
【Azure 事件中心】Event Hub 无法连接,出现 Did not observe any item or terminal signal within 60000ms in 'flatMapMany' 的错误消息
【Azure 事件中心】Event Hub 无法连接,出现 Did not observe any item or terminal signal within 60000ms in 'flatMapMany' 的错误消息
|
25天前
【Azure 事件中心】Azure Event Hub客户端遇见 Expired Heartbeat 错误
【Azure 事件中心】Azure Event Hub客户端遇见 Expired Heartbeat 错误
|
10天前
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub的解决之法
An exception occurred while retrieving properties for Event Hub: logicapp. Error Message: 'ClientSecretCredential authentication failed: AADSTS90002: Tenant 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' not found. Check to make sure you have the correct tenant ID and are signing into the correct cloud. Che
|
25天前
|
C++
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
|
28天前
|
Java 开发工具
【事件中心 Azure Event Hub】关于EventHub中出现Error时候的一些问题(偶发错误,EventHub后台升级,用户端错误,Retry机制的重要性)
【事件中心 Azure Event Hub】关于EventHub中出现Error时候的一些问题(偶发错误,EventHub后台升级,用户端错误,Retry机制的重要性)
|
27天前
|
分布式计算 Java Spark
【事件中心 Azure Event Hub】使用Logstash消费EventHub中的event时遇见的几种异常(TimeoutException, ReceiverDisconnectedException)
【事件中心 Azure Event Hub】使用Logstash消费EventHub中的event时遇见的几种异常(TimeoutException, ReceiverDisconnectedException)
|
1月前
|
C++
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
在尝试使用Azure Logic App创建由Event Hub触发的工作流时,配置了Active Directory OAuth认证但仍遇到认证失败的问题。错误信息提示找不到指定的租户ID。尽管已设置了正确的Azure中国环境Authority,认证请求似乎仍指向全球Azure环境。这可能是Logic App服务本身的局限导致。作为替代方案,可采用Connection String或Managed Identity方式进行认证,两者均可正常工作。此外,通过Azure Function App复现此问题,进一步验证这是服务层面而非配置问题。相关文档和教程可在Azure官方文档中找到。
|
25天前
|
消息中间件 开发工具
【Azure 事件中心】Event Hub 消费端出现 Timeout Exception,errorContext中 LINK_CREDIT为0的解释
【Azure 事件中心】Event Hub 消费端出现 Timeout Exception,errorContext中 LINK_CREDIT为0的解释
|
25天前
|
Java 开发工具
【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position
【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position