【Event Hub】消费消息时遇到Azure.Messaging.EventHubs.EventHubsException(QuotaExceeded) 错误的解决之法

简介: 【Event Hub】消费消息时遇到Azure.Messaging.EventHubs.EventHubsException(QuotaExceeded) 错误的解决之法

问题描述

使用EventHubConsumerClient消费Event Hub的消息时,遇见Azure.Messaging.EventHubs.EventHubsException(QuotaExceeded): Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5.”异常。

代码片段为:

... 
private static String connectionString = "Event Hub Connection ...";
private static String consumerGroup = "xxxxxx";
private static EventHubConsumerClient consumer =  new EventHubClientBuilder()
            .connectionString(connectionString)
            .consumerGroup(consumerGroup)
            .buildConsumerClient();
...
    while (true) {
        IterableStream<PartitionEvent> events = consumer.receiveFromPartition(pId,100, sPosition, Duration.ofMillis(1000));
        int count = 0;
        for (PartitionEvent partitionEvent : events) {
            EventData event = partitionEvent.getData();
...

错误消息为:

com.azure.core.amqp.exception.AmqpException: Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5.

List of connected receivers - ff7e6a04-43a4-4d50-ae86-e0586297f60c, ff7e6a04-43a4-4d50-ae86-e0586297f60c, ff7e6a04-43a4-4d50-ae86-e0586297f60c, ff7e6a04-43a4-4d50-ae86-e0586297f60c, ff7e6a04-43a4-4d50-ae86-e0586297f60c.

问题解答

这是因为Event Hub对于同一个使用者组的限制:建议在使用者组中一个分区上只有一个活动接收器。 但是,在某些情况下,每个分区最多可以使用 5 个使用者或接收器,其中所有接收器都会获取分区的所有事件。

根据文档中提到的非 Epoch Receiver的限制为5,而如果为每一个Receiver设置上Epoch值,当新的Receiver连接到Event Hub服务器后,服务端会保留最大Epoch值的Receiver,其它旧的则会被逐渐删除。

 

例如下面的代码就可以解决Azure.Messaging.EventHubs.EventHubsException(QuotaExceeded): Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5.”异常。

import com.azure.messaging.eventhubs.models.ReceiveOptions;
... 
private static String connectionString = "Event Hub Connection ...";
private static String consumerGroup = "xxxxxx";
private static EventHubConsumerClient consumer =  new EventHubClientBuilder()
            .connectionString(connectionString)
            .consumerGroup(consumerGroup)
            .buildConsumerClient();
...
    ReceiveOptions receiveOptions ;
    receiveOptions = new ReceiveOptions();
    receiveOptions.setOwnerLevel((new Date()).getTime()); // set epoch value
    while (true) {
        IterableStream<PartitionEvent> events = consumer.receiveFromPartition(pId,100, sPosition, Duration.ofMillis(1000),receiveOptions );
        int count = 0;
        for (PartitionEvent partitionEvent : events) {
            EventData event = partitionEvent.getData();
...

 

 

参考资料

  1. https://docs.azure.cn/zh-cn/event-hubs/event-hubs-features#consumer-groups
  2. https://docs.azure.cn/zh-cn/event-hubs/event-hubs-quotas

 

相关文章
|
3月前
|
Java
【Azure 事件中心】向Event Hub发送数据异常 : partitionId[null]: Sending messages timed out
【Azure 事件中心】向Event Hub发送数据异常 : partitionId[null]: Sending messages timed out
|
3月前
【Azure 事件中心】Azure Event Hub客户端遇见 Expired Heartbeat 错误
【Azure 事件中心】Azure Event Hub客户端遇见 Expired Heartbeat 错误
|
3月前
|
监控 Java 开发工具
【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录
【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录
|
3月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
3月前
|
分布式计算 Java Spark
【事件中心 Azure Event Hub】使用Logstash消费EventHub中的event时遇见的几种异常(TimeoutException, ReceiverDisconnectedException)
【事件中心 Azure Event Hub】使用Logstash消费EventHub中的event时遇见的几种异常(TimeoutException, ReceiverDisconnectedException)
|
3月前
|
存储 开发工具 C#
【Azure 事件中心】从Azure Event Hub中消费数据,如何查看当前消费客户端消费数据的Offset和SequenceNumber呢(消息偏移量和序列号)?
【Azure 事件中心】从Azure Event Hub中消费数据,如何查看当前消费客户端消费数据的Offset和SequenceNumber呢(消息偏移量和序列号)?
|
3月前
|
Java 开发工具
【事件中心 Azure Event Hub】关于EventHub中出现Error时候的一些问题(偶发错误,EventHub后台升级,用户端错误,Retry机制的重要性)
【事件中心 Azure Event Hub】关于EventHub中出现Error时候的一些问题(偶发错误,EventHub后台升级,用户端错误,Retry机制的重要性)
|
3月前
|
开发工具
【Azure 服务总线】Azure.Messaging.ServiceBus 多次发送消息报超时错误,是否可以配置重新发送?是否有内置重试机制?
【Azure 服务总线】Azure.Messaging.ServiceBus 多次发送消息报超时错误,是否可以配置重新发送?是否有内置重试机制?
|
3月前
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
|
3月前
|
运维
【Azure Event Hub】自定义告警(Alert Rule)用来提示Event Hub的消息incoming(生产)与outgoing(消费)的异常情况
【Azure Event Hub】自定义告警(Alert Rule)用来提示Event Hub的消息incoming(生产)与outgoing(消费)的异常情况