一文带你理解 RocketMQ 广播模式实现机制

简介: 一文带你理解 RocketMQ 广播模式实现机制

大家好,我是君哥。今天聊聊 RocketMQ 的广播消息实现机制。

RocketMQ 有两种消费模式,集群模式和广播模式。

集群模式是指 RocketMQ 中的一条消息只能被同一个消费者组中的一个消费者消费。如下图,Producer 向 TopicTest 这个 Topic 并发写入 3 条新消息,分别被分配到了 MessageQueue1~MessageQueue3 这 3 个队列,然后 Group 中的三个 Consumer 分别消费了一条消息:

微信图片_20221213115831.png

广播模式是  RocketMQ 中的消息会被消费组中的每个消费者都消费一次,如下图:

微信图片_20221213115852.png

使用 RocketMQ 的广播模式时,需要在消费端进行定义,下面是一段官方示例:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 consumer.setMessageModel(MessageModel.BROADCASTING);
 consumer.subscribe("TopicTest", "TagA || TagC || TagD");
 consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
   ConsumeConcurrentlyContext context) {
   System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
 });
 consumer.start();
 System.out.printf("Broadcast Consumer Started.%n");
}

从代码中可以看到,在定义 Consumer 时,通过 messageModel 这个属性指定消费模式,这里指定为 BROADCASTING,也就启动了广播模式的消费者。

1 消费者启动

以 RocketMQ 推模式为例,看一下消费者调用关系类图:

微信图片_20221213115916.png

DefaultMQPushConsumer 作为启动入口类,它的 start 方法调用了 DefaultMQPushConsumerImpl 类的 start 方法,下面重点看一下这个方法。

1.1 拷贝订阅关系

start 方法中调用了 copySubscription 方法,代码如下:

private void copySubscription() throws MQClientException {
 try {
  //拷贝订阅关系
  switch (this.defaultMQPushConsumer.getMessageModel()) {
   case BROADCASTING:
    break;
   case CLUSTERING:
    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
    break;
   default:
    break;
  }
 } catch (Exception e) {
  throw new MQClientException("subscription exception", e);
 }
}

这里的代码有一点需要注意:集群模式会创建一个重试 Topic 的订阅关系,而广播模式是不会创建这个订阅关系的。也就是说广播模式不考虑重试。

1.2 初始化偏移量

下面是初始化 offset 的代码:

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
 this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
 switch (this.defaultMQPushConsumer.getMessageModel()) {
  case BROADCASTING:
   this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
   break;
  case CLUSTERING:
   this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
   break;
  default:
   break;
 }
 this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}

从上面的代码可以看到,广播模式使用了 LocalFileOffsetStore,也就是说偏移量保存在客户端本地,除了在内存中会保存,在本地文件中也会保存。

2 消息拉取

ConsumeMessageService 是真正拉取消息的地方,消费者初始化时会初始化 ConsumeMessageService,并且这里会区分并发消息还是顺序消息。

2.1 顺序消息

在集群模式下,需要获取到 processQueue 的锁才会拉取消息,而在广播模式下,不用获取锁,直接就可以拉取消息。判断逻辑如下:

//ConsumeMessageOrderlyService.ConsumeRequest
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
      || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
  }
}

这里有个疑问,对于顺序消息,获取锁是必须的,这样才能保证一个 processQueue 只能由一个线程进行处理,从而保证消费的顺序性。那对于广播模式,为什么不用获取 processQueue 的锁呢?难道广播模式不支持顺序消息?

2.2 并发消息

对于并发消息,广播模式不同的是,对消费结果的处理。集群模式消费失败后需要把消息发送回 Broker 等待再次被拉取,而广播模式则不需要重试。代码如下:

//ConsumeMessageConcurrentlyService.rocessConsumeResult
switch (this.defaultMQPushConsumer.getMessageModel()) {
 case BROADCASTING:
  for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
   MessageExt msg = consumeRequest.getMsgs().get(i);
   log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
  }
  break;
 case CLUSTERING:
  List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
  for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
   MessageExt msg = consumeRequest.getMsgs().get(i);
   boolean result = this.sendMessageBack(msg, context);
   if (!result) {
    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
    msgBackFailed.add(msg);
   }
  }
  if (!msgBackFailed.isEmpty()) {
   consumeRequest.getMsgs().removeAll(msgBackFailed);
   this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
  }
  break;
 default:
  break;
}

这再次说明,广播模式是不支持消息重试的。

3 重平衡

在消费者启动过程中,会调用 RebalanceService 的 start 方法,进行重平衡。从重平衡的代码中可以看到,广播模式消费者会消费所有 MessageQueue,而集群模式下会根据负载均衡策略选择其中几个 MessageQueue。代码如下:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
 switch (messageModel) {
  case BROADCASTING: {
   Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
   if (mqSet != null) {
    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
    //省略部分逻辑
   } else {
   }
   break;
  }
  case CLUSTERING: {
   Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
   List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
   //省略部分逻辑
   if (mqSet != null && cidAll != null) {
    //省略部分逻辑
    try {
     allocateResult = strategy.allocate(
      this.consumerGroup,
      this.mQClientFactory.getClientId(),
      mqAll,
      cidAll);
    } catch (Throwable e) {
     return;
    }
    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
    if (allocateResult != null) {
     allocateResultSet.addAll(allocateResult);
    }
    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
    //省略部分逻辑
   }
   break;
  }
  default:
   break;
 }
}

上面 updateProcessQueueTableInRebalance 这个方法调用前,要获取到需要消费的 MessageQueue 集合。广播模式下,直接取了订阅的 Topic 下的所有集合元素,而集群模式下,则需要通过负责均衡获取当前消费者自己要消费的 MessageQueue 集合。

4 总结

本文主要讲解了 RocketMQ 广播消息的实现机制,理解广播消息,要把握下面几点:

1.偏移量保存在消费者本地内存和文件中;

2.广播消息不支持重试;

3.从源码上看,广播模式并不能支持顺序消息;

4.广播模式消费者订阅了 Topic 下的所有 MessageQueue,不会重平衡。

·············· END ··············
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
69 0
|
3天前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
13 1
|
21天前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
21天前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
26天前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
4天前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
12 0
|
4天前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
10 0
|
4天前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
9 0
|
3月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
66 0
|
21天前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。