【Event Hub】消费消息时遇到Azure.Messaging.EventHubs.EventHubsException(QuotaExceeded) 错误的解决之法

简介: 【Event Hub】消费消息时遇到Azure.Messaging.EventHubs.EventHubsException(QuotaExceeded) 错误的解决之法

问题描述

使用EventHubConsumerClient消费Event Hub的消息时,遇见Azure.Messaging.EventHubs.EventHubsException(QuotaExceeded): Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5.”异常。

代码片段为:

... 
private static String connectionString = "Event Hub Connection ...";
private static String consumerGroup = "xxxxxx";
private static EventHubConsumerClient consumer =  new EventHubClientBuilder()
            .connectionString(connectionString)
            .consumerGroup(consumerGroup)
            .buildConsumerClient();
...
    while (true) {
        IterableStream<PartitionEvent> events = consumer.receiveFromPartition(pId,100, sPosition, Duration.ofMillis(1000));
        int count = 0;
        for (PartitionEvent partitionEvent : events) {
            EventData event = partitionEvent.getData();
...

错误消息为:

com.azure.core.amqp.exception.AmqpException: Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5.

List of connected receivers - ff7e6a04-43a4-4d50-ae86-e0586297f60c, ff7e6a04-43a4-4d50-ae86-e0586297f60c, ff7e6a04-43a4-4d50-ae86-e0586297f60c, ff7e6a04-43a4-4d50-ae86-e0586297f60c, ff7e6a04-43a4-4d50-ae86-e0586297f60c.

问题解答

这是因为Event Hub对于同一个使用者组的限制:建议在使用者组中一个分区上只有一个活动接收器。 但是,在某些情况下,每个分区最多可以使用 5 个使用者或接收器,其中所有接收器都会获取分区的所有事件。

根据文档中提到的非 Epoch Receiver的限制为5,而如果为每一个Receiver设置上Epoch值,当新的Receiver连接到Event Hub服务器后,服务端会保留最大Epoch值的Receiver,其它旧的则会被逐渐删除。

 

例如下面的代码就可以解决Azure.Messaging.EventHubs.EventHubsException(QuotaExceeded): Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5.”异常。

import com.azure.messaging.eventhubs.models.ReceiveOptions;
... 
private static String connectionString = "Event Hub Connection ...";
private static String consumerGroup = "xxxxxx";
private static EventHubConsumerClient consumer =  new EventHubClientBuilder()
            .connectionString(connectionString)
            .consumerGroup(consumerGroup)
            .buildConsumerClient();
...
    ReceiveOptions receiveOptions ;
    receiveOptions = new ReceiveOptions();
    receiveOptions.setOwnerLevel((new Date()).getTime()); // set epoch value
    while (true) {
        IterableStream<PartitionEvent> events = consumer.receiveFromPartition(pId,100, sPosition, Duration.ofMillis(1000),receiveOptions );
        int count = 0;
        for (PartitionEvent partitionEvent : events) {
            EventData event = partitionEvent.getData();
...

 

 

参考资料

  1. https://docs.azure.cn/zh-cn/event-hubs/event-hubs-features#consumer-groups
  2. https://docs.azure.cn/zh-cn/event-hubs/event-hubs-quotas

 

相关文章
|
4月前
|
Java
【Azure 事件中心】向Event Hub发送数据异常 : partitionId[null]: Sending messages timed out
【Azure 事件中心】向Event Hub发送数据异常 : partitionId[null]: Sending messages timed out
|
4月前
|
Java 网络安全 开发工具
【Azure 事件中心】Event Hub 无法连接,出现 Did not observe any item or terminal signal within 60000ms in 'flatMapMany' 的错误消息
【Azure 事件中心】Event Hub 无法连接,出现 Did not observe any item or terminal signal within 60000ms in 'flatMapMany' 的错误消息
102 1
|
4月前
【Azure 事件中心】Azure Event Hub客户端遇见 Expired Heartbeat 错误
【Azure 事件中心】Azure Event Hub客户端遇见 Expired Heartbeat 错误
|
4月前
|
监控 Java 开发工具
【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录
【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录
|
4月前
|
存储 开发工具 C#
【Azure 事件中心】从Azure Event Hub中消费数据,如何查看当前消费客户端消费数据的Offset和SequenceNumber呢(消息偏移量和序列号)?
【Azure 事件中心】从Azure Event Hub中消费数据,如何查看当前消费客户端消费数据的Offset和SequenceNumber呢(消息偏移量和序列号)?
|
4月前
|
分布式计算 Java Spark
【事件中心 Azure Event Hub】使用Logstash消费EventHub中的event时遇见的几种异常(TimeoutException, ReceiverDisconnectedException)
【事件中心 Azure Event Hub】使用Logstash消费EventHub中的event时遇见的几种异常(TimeoutException, ReceiverDisconnectedException)
|
4月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
4月前
|
消息中间件 存储 Kafka
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
|
4月前
|
消息中间件 Java Kafka
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
|
4月前
|
网络安全
【Azure 事件中心】如何查看Event Hub的生产者或者是消费者端的IP地址呢?
【Azure 事件中心】如何查看Event Hub的生产者或者是消费者端的IP地址呢?