面试官:RocketMQ 的推模式和拉模式有什么区别?

简介: 面试官:RocketMQ 的推模式和拉模式有什么区别?

大家好,我是君哥。

RocketMQ 消息消费有两种模式,PULL 和 PUSH,今天我们来看一下这两种模式有什么区别。

PUSH 模式

首先看一段 RocketMQ 推模式的一个官方示例:

public static void main(String[] args) throws InterruptedException, MQClientException {
    Tracer tracer = initTracer();
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
    consumer.subscribe("TopicTest", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.setConsumeTimestamp("20181109221800");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

消费者会定义一个消息监听器,并且把这个监听器注册到 DefaultMQPushConsumer,同时也会注册到 DefaultMQPushConsumerIm-pl,当拉取到消息时,就会使用这个监听器来处理消息。那这个监听器是什么时候调用呢?看下面的 UML 类图:

微信图片_20221213113453.png

消费者真正拉取请求的类是 DefaultMQPush-ConsumerImpl,这个类的 pullMessage 方法调用了 PullAPIWrapper 的 pullKernelImpl 方法,这个方法有一个参数是回调函数 Pull-Callback,当 PULL 状态是 PullStatus.FOU-ND 时,代表拉取消息成功,处理逻辑如下:

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                subscriptionData);
            switch (pullResult.getPullStatus()) {
                case FOUND:
                    //省略部分逻辑
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                            pullResult.getMsgFoundList(),
                            processQueue,
                            pullRequest.getMessageQueue(),
                            dispatchToConsume);
                        //省略部分逻辑
                    break;
                //省略其他case
                default:
                    break;
            }
        }
    }
    @Override
    public void onException(Throwable e) {
        //省略
    }
};

这个处理逻辑调用了 ConsumeMessage-Service 类的 submitConsumeRequest 方法,我们看一下并发消费消息的处理逻辑,代码如下:

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        //分批处理,跟上面逻辑一致
}

ConsumeRequest 类是一个线程类,run 方法里面调用了消费者定义的消息处理方法,代码如下:

public void run() {
    //省略逻辑
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    //省略逻辑
    try {
        //调用消费方法
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        //省略逻辑
    }
    //省略逻辑
}

下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:

  1. 在 MessageListenerConcurrently 中定义消费者处理逻辑,消费者启动时注册到 DefaultMQPushConsumer 和 DefaultMQ-PushConsumerImpl;
  2. 消费者启动时,启动消费拉取线程 PullMessageService,里面死循环不停地从 Broker 拉取消息。这里调用了 DefaultMQPushConsumerImpl 类的 pullMessage 方法;
  3. DefaultMQPushConsumerImpl 类的 pullMessage 方法调用 PullAPIWrapper 的 pullKernelImpl 方法真正去发送 PULL 请求,并传入 PullCallback 的 回调函数;
  4. 拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageConcurrentlyService 的 submitConsumeRequest 方法,里面用 ConsumeRequest 线程来处理拉取到的消息;
  5. ConsumeRequest 处理消息时调用了消费端定义的消费逻辑,也就是 Message-ListenerConcurrently 的 consumeMessage 方法。

PULL 模式

下面是来自官方的一段 PULL 模式拉取消息的代码:

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
    while (running) {
        List<MessageExt> messageExts = litePullConsumer.poll();
        System.out.printf("%s%n", messageExts);
    }
} finally {
    litePullConsumer.shutdown();
}

这里我们看到,PULL 模式需要在处理逻辑里不停的去拉取消息,比如上面代码中写了一个死循环。那 PULL 模式中 poll 函数是怎么实现的呢?我们看下面的 UML 类图:

微信图片_20221213113521.png

跟踪源码可以看到,消息拉取最终是从 DefaultLitePullConsumerImpl 类中的一个 LinkedBlockingQueue 上面拉取。那消息是什么时候 put 到 LinkedBlockingQueue 呢?

官方拉取消息的代码中有一个 subscribe 方法订阅了 Topic,这里相关的 UML 类图如下:

微信图片_20221213113546.png

这个 subscribe 方法最终调用了 DefaultLite-PullConsumerImpl 类的 subscribe,代码如下:

public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
    try {
        //省略逻辑
        this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
        assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
        //省略逻辑
    } catch (Exception e) {
        throw new MQClientException("subscribe exception", e);
    }
}

这里给 DefaultLitePullConsumer 类的 messageQueueListener 这个监听器进行了赋值。当监听器监听到 MessageQueue 发送变化时,就会启动消息拉取消息的线程 Pull-TaskImpl,代码如下:

public void run() {
    //省略部分逻辑
 if (!this.isCancelled()) {
  long pullDelayTimeMills = 0;
  try {
   PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
   switch (pullResult.getPullStatus()) {
          case FOUND:
            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
            synchronized (objLock) {
              if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                processQueue.putMessage(pullResult.getMsgFoundList());
                submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
              }
            }
            break;
    //省略其他 case
   }
  } 
  //省略 catch
  if (!this.isCancelled()) {
         //启动下一次拉取
      scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
  } else {
     log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
  }
 }
}

拉取消息成功后,调用 submitConsume-Request 方法把拉取到的消息放到 consumeRequestCache,然后启动下一次拉取。

这样就清除了示例代码中 poll 消息的逻辑,那还有一个问题,监听器是什么时候触发监听事件呢?

在消费者启动时,会启动 RebalanceService 这个线程,这个线程的 run 方法如下:

public void run() {
    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }
}

下面的 UML 类图显示了 doRebalance 方法的调用关系:

微信图片_20221213113624.png

可以看到最终调用了 最终调用了 Rebalance-LitePullImpl 的 messageQueueChanged 方法,代码如下:

public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
    MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
    if (messageQueueListener != null) {
        try {
            messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
        } catch (Throwable e) {
            log.error("messageQueueChanged exception", e);
        }
    }
}

这里最终触发了监听器。

下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:

  1. 消费者启动,向 DefaultLitePullConsumer 订阅了 Topic,这个订阅过程会向 DefaultLitePullConsumer 注册一个监听器;
  2. 消费者启动过程中,会启动 Message-Queue 重平衡线程 Rebalance-Service,当重平衡过程发现 ProcessQueueTable 发生变化时,启动消息拉取线程;
  3. 消息拉取线程拉取到消息后,把消息放到 consumeRequestCache,然后进行下一次拉取;
  4. 消费者启动后,不停地从 consumeReq-uestCache 拉取消息进行处理。

总结

通过本文的讲解,可以看到 PUSH 模式和 PULL 模式本质上都是客户端主动拉取,RocketMQ并没有真正实现 Broker 推送消息的 PUSH 模式。RocketMQ 中 PULL 模式和 PUSH 模式的区别如下:

  1. PULL 模式是从 Broker 拉取消息后放入缓存,然后消费端不停地从缓存取出消息来执行客户端定义的处理逻辑,而 PUSH 模式是在死循环中不停的从 Broker 拉取消息,拉取到后调用回调函数进行处理,回调函数中调用客户端定义的处理逻辑
  2. PUSH 模式拉取消息依赖死循环来不停唤起业务,而 PULL 模式拉取消息是通过 MessageQueue 监听器来触发消息拉取线程,消息拉取线程会在拉取完一次后接着下一次拉取。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
24 3
|
27天前
|
Java 程序员
Java社招面试题:& 和 && 的区别,HR的套路险些让我翻车!
小米,29岁程序员,分享了一次面试经历,详细解析了Java中&和&&的区别及应用场景,展示了扎实的基础知识和良好的应变能力,最终成功获得Offer。
67 14
|
1月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
25天前
|
Java 关系型数据库 数据库
京东面试:聊聊Spring事务?Spring事务的10种失效场景?加入型传播和嵌套型传播有什么区别?
45岁老架构师尼恩分享了Spring事务的核心知识点,包括事务的两种管理方式(编程式和声明式)、@Transactional注解的五大属性(transactionManager、propagation、isolation、timeout、readOnly、rollbackFor)、事务的七种传播行为、事务隔离级别及其与数据库隔离级别的关系,以及Spring事务的10种失效场景。尼恩还强调了面试中如何给出高质量答案,推荐阅读《尼恩Java面试宝典PDF》以提升面试表现。更多技术资料可在公众号【技术自由圈】获取。
|
1天前
|
缓存 安全 Java
【JavaEE】——单例模式引起的多线程安全问题:“饿汉/懒汉”模式,及解决思路和方法(面试高频)
单例模式下,“饿汉模式”,“懒汉模式”,单例模式下引起的线程安全问题,解锁思路和解决方法
|
2月前
|
消息中间件 存储 canal
阿里面试:canal+MQ,会有乱序的问题吗?
本文详细探讨了在阿里面试中常见的问题——“canal+MQ,会有乱序的问题吗?”以及如何保证RocketMQ消息有序。文章首先介绍了消息有序的基本概念,包括全局有序和局部有序,并分析了RocketMQ中实现消息有序的方法。接着,针对canal+MQ的场景,讨论了如何通过配置`canal.mq.partitionsNum`和`canal.mq.partitionHash`来保证数据同步的有序性。最后,提供了多个与MQ相关的面试题及解决方案,帮助读者更好地准备面试,提升技术水平。
阿里面试:canal+MQ,会有乱序的问题吗?
|
1月前
|
存储 缓存 网络协议
计算机网络常见面试题(二):浏览器中输入URL返回页面过程、HTTP协议特点,GET、POST的区别,Cookie与Session
计算机网络常见面试题(二):浏览器中输入URL返回页面过程、HTTP协议特点、状态码、报文格式,GET、POST的区别,DNS的解析过程、数字证书、Cookie与Session,对称加密和非对称加密
|
1月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
111 1
|
2月前
|
消息中间件 存储 监控
说说如何解决RocketMq消息积压?为什么Kafka性能比RocketMq高?它们区别是什么?
【10月更文挑战第8天】在分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统组件,还能提供异步处理、流量削峰和消息持久化等功能。在众多的消息队列产品中,RocketMQ和Kafka无疑是其中的佼佼者。本文将围绕如何解决RocketMQ消息积压、为什么Kafka性能比RocketMQ高以及它们之间的区别进行深入探讨。
113 1
|
2月前
|
编译器
经典面试题:变量的声明和定义有什么区别
在编程领域,变量的“声明”与“定义”是经典面试题之一。声明告诉编译器一个变量的存在,但不分配内存,通常包含变量类型和名称;而定义则为变量分配内存空间,一个变量必须至少被定义一次。简而言之,声明是告知变量形式,定义则是实际创建变量并准备使用。