面试官:RocketMQ 的推模式和拉模式有什么区别?

简介: 面试官:RocketMQ 的推模式和拉模式有什么区别?

大家好,我是君哥。

RocketMQ 消息消费有两种模式,PULL 和 PUSH,今天我们来看一下这两种模式有什么区别。

PUSH 模式

首先看一段 RocketMQ 推模式的一个官方示例:

public static void main(String[] args) throws InterruptedException, MQClientException {
    Tracer tracer = initTracer();
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
    consumer.subscribe("TopicTest", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.setConsumeTimestamp("20181109221800");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

消费者会定义一个消息监听器,并且把这个监听器注册到 DefaultMQPushConsumer,同时也会注册到 DefaultMQPushConsumerIm-pl,当拉取到消息时,就会使用这个监听器来处理消息。那这个监听器是什么时候调用呢?看下面的 UML 类图:

微信图片_20221213113453.png

消费者真正拉取请求的类是 DefaultMQPush-ConsumerImpl,这个类的 pullMessage 方法调用了 PullAPIWrapper 的 pullKernelImpl 方法,这个方法有一个参数是回调函数 Pull-Callback,当 PULL 状态是 PullStatus.FOU-ND 时,代表拉取消息成功,处理逻辑如下:

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                subscriptionData);
            switch (pullResult.getPullStatus()) {
                case FOUND:
                    //省略部分逻辑
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                            pullResult.getMsgFoundList(),
                            processQueue,
                            pullRequest.getMessageQueue(),
                            dispatchToConsume);
                        //省略部分逻辑
                    break;
                //省略其他case
                default:
                    break;
            }
        }
    }
    @Override
    public void onException(Throwable e) {
        //省略
    }
};

这个处理逻辑调用了 ConsumeMessage-Service 类的 submitConsumeRequest 方法,我们看一下并发消费消息的处理逻辑,代码如下:

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        //分批处理,跟上面逻辑一致
}

ConsumeRequest 类是一个线程类,run 方法里面调用了消费者定义的消息处理方法,代码如下:

public void run() {
    //省略逻辑
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    //省略逻辑
    try {
        //调用消费方法
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        //省略逻辑
    }
    //省略逻辑
}

下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:

  1. 在 MessageListenerConcurrently 中定义消费者处理逻辑,消费者启动时注册到 DefaultMQPushConsumer 和 DefaultMQ-PushConsumerImpl;
  2. 消费者启动时,启动消费拉取线程 PullMessageService,里面死循环不停地从 Broker 拉取消息。这里调用了 DefaultMQPushConsumerImpl 类的 pullMessage 方法;
  3. DefaultMQPushConsumerImpl 类的 pullMessage 方法调用 PullAPIWrapper 的 pullKernelImpl 方法真正去发送 PULL 请求,并传入 PullCallback 的 回调函数;
  4. 拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageConcurrentlyService 的 submitConsumeRequest 方法,里面用 ConsumeRequest 线程来处理拉取到的消息;
  5. ConsumeRequest 处理消息时调用了消费端定义的消费逻辑,也就是 Message-ListenerConcurrently 的 consumeMessage 方法。

PULL 模式

下面是来自官方的一段 PULL 模式拉取消息的代码:

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
    while (running) {
        List<MessageExt> messageExts = litePullConsumer.poll();
        System.out.printf("%s%n", messageExts);
    }
} finally {
    litePullConsumer.shutdown();
}

这里我们看到,PULL 模式需要在处理逻辑里不停的去拉取消息,比如上面代码中写了一个死循环。那 PULL 模式中 poll 函数是怎么实现的呢?我们看下面的 UML 类图:

微信图片_20221213113521.png

跟踪源码可以看到,消息拉取最终是从 DefaultLitePullConsumerImpl 类中的一个 LinkedBlockingQueue 上面拉取。那消息是什么时候 put 到 LinkedBlockingQueue 呢?

官方拉取消息的代码中有一个 subscribe 方法订阅了 Topic,这里相关的 UML 类图如下:

微信图片_20221213113546.png

这个 subscribe 方法最终调用了 DefaultLite-PullConsumerImpl 类的 subscribe,代码如下:

public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
    try {
        //省略逻辑
        this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
        assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
        //省略逻辑
    } catch (Exception e) {
        throw new MQClientException("subscribe exception", e);
    }
}

这里给 DefaultLitePullConsumer 类的 messageQueueListener 这个监听器进行了赋值。当监听器监听到 MessageQueue 发送变化时,就会启动消息拉取消息的线程 Pull-TaskImpl,代码如下:

public void run() {
    //省略部分逻辑
 if (!this.isCancelled()) {
  long pullDelayTimeMills = 0;
  try {
   PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
   switch (pullResult.getPullStatus()) {
          case FOUND:
            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
            synchronized (objLock) {
              if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                processQueue.putMessage(pullResult.getMsgFoundList());
                submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
              }
            }
            break;
    //省略其他 case
   }
  } 
  //省略 catch
  if (!this.isCancelled()) {
         //启动下一次拉取
      scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
  } else {
     log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
  }
 }
}

拉取消息成功后,调用 submitConsume-Request 方法把拉取到的消息放到 consumeRequestCache,然后启动下一次拉取。

这样就清除了示例代码中 poll 消息的逻辑,那还有一个问题,监听器是什么时候触发监听事件呢?

在消费者启动时,会启动 RebalanceService 这个线程,这个线程的 run 方法如下:

public void run() {
    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }
}

下面的 UML 类图显示了 doRebalance 方法的调用关系:

微信图片_20221213113624.png

可以看到最终调用了 最终调用了 Rebalance-LitePullImpl 的 messageQueueChanged 方法,代码如下:

public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
    MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
    if (messageQueueListener != null) {
        try {
            messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
        } catch (Throwable e) {
            log.error("messageQueueChanged exception", e);
        }
    }
}

这里最终触发了监听器。

下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:

  1. 消费者启动,向 DefaultLitePullConsumer 订阅了 Topic,这个订阅过程会向 DefaultLitePullConsumer 注册一个监听器;
  2. 消费者启动过程中,会启动 Message-Queue 重平衡线程 Rebalance-Service,当重平衡过程发现 ProcessQueueTable 发生变化时,启动消息拉取线程;
  3. 消息拉取线程拉取到消息后,把消息放到 consumeRequestCache,然后进行下一次拉取;
  4. 消费者启动后,不停地从 consumeReq-uestCache 拉取消息进行处理。

总结

通过本文的讲解,可以看到 PUSH 模式和 PULL 模式本质上都是客户端主动拉取,RocketMQ并没有真正实现 Broker 推送消息的 PUSH 模式。RocketMQ 中 PULL 模式和 PUSH 模式的区别如下:

  1. PULL 模式是从 Broker 拉取消息后放入缓存,然后消费端不停地从缓存取出消息来执行客户端定义的处理逻辑,而 PUSH 模式是在死循环中不停的从 Broker 拉取消息,拉取到后调用回调函数进行处理,回调函数中调用客户端定义的处理逻辑
  2. PUSH 模式拉取消息依赖死循环来不停唤起业务,而 PULL 模式拉取消息是通过 MessageQueue 监听器来触发消息拉取线程,消息拉取线程会在拉取完一次后接着下一次拉取。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
1
0
114
分享
相关文章
Java 高级面试技巧:yield() 与 sleep() 方法的使用场景和区别
本文详细解析了 Java 中 `Thread` 类的 `yield()` 和 `sleep()` 方法,解释了它们的作用、区别及为什么是静态方法。`yield()` 让当前线程释放 CPU 时间片,给其他同等优先级线程运行机会,但不保证暂停;`sleep()` 则让线程进入休眠状态,指定时间后继续执行。两者都是静态方法,因为它们影响线程调度机制而非单一线程行为。这些知识点在面试中常被提及,掌握它们有助于更好地应对多线程编程问题。
59 9
Android经典面试题之Kotlin中Lambda表达式和匿名函数的区别
Lambda表达式和匿名函数都是Kotlin中强大的特性,帮助开发者编写简洁而高效的代码。理解它们的区别和适用场景,有助于选择最合适的方式来解决问题。希望本文的详细讲解和示例能够帮助你在Kotlin开发中更好地运用这些特性。
31 9
Java面试必问!run() 和 start() 方法到底有啥区别?
在多线程编程中,run和 start方法常常让开发者感到困惑。为什么调用 start 才能启动线程,而直接调用 run只是普通方法调用?这篇文章将通过一个简单的例子,详细解析这两者的区别,帮助你在面试中脱颖而出,理解多线程背后的机制和原理。
63 12
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
45岁资深架构师尼恩在其读者群中分享了关于如何提升RocketMQ顺序消费性能的高并发面试题解析。面对10W QPS的高并发场景,尼恩详细讲解了RocketMQ的调优策略,包括专用方案如增加ConsumeQueue数量、优化Topic设计等,以及通用方案如硬件配置(CPU、内存、磁盘、网络)、操作系统调优、Broker配置调整、客户端配置优化、JVM调优和监控与日志分析等方面。通过系统化的梳理,帮助读者在面试中充分展示技术实力,获得面试官的认可。相关真题及答案将收录于《尼恩Java面试宝典PDF》V175版本中,助力求职者提高架构、设计和开发水平。
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
招行面试:RocketMQ、Kafka、RabbitMQ,如何选型?
45岁资深架构师尼恩针对一线互联网企业面试题,特别是招商银行的高阶Java后端面试题,进行了系统化梳理。本文重点讲解如何根据应用场景选择合适的消息中间件(如RabbitMQ、RocketMQ和Kafka),并对比三者的性能、功能、可靠性和运维复杂度,帮助求职者在面试中充分展示技术实力,实现“offer直提”。此外,尼恩还提供了《尼恩Java面试宝典PDF》等资源,助力求职者提升架构、设计、开发水平,应对高并发、分布式系统的挑战。更多内容及技术圣经系列PDF,请关注【技术自由圈】获取。
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
71 3
【JavaEE】——单例模式引起的多线程安全问题:“饿汉/懒汉”模式,及解决思路和方法(面试高频)
单例模式下,“饿汉模式”,“懒汉模式”,单例模式下引起的线程安全问题,解锁思路和解决方法
Java社招面试题:& 和 && 的区别,HR的套路险些让我翻车!
今日分享的主题是如何区分&和&&的区别,提高自身面试的能力。主要分为以下四部分。 1、自我面试经历 2、&amp和&amp&amp的不同之处 3、&对&&的不同用回答逻辑解释 4、彩蛋
京东面试:聊聊Spring事务?Spring事务的10种失效场景?加入型传播和嵌套型传播有什么区别?
45岁老架构师尼恩分享了Spring事务的核心知识点,包括事务的两种管理方式(编程式和声明式)、@Transactional注解的五大属性(transactionManager、propagation、isolation、timeout、readOnly、rollbackFor)、事务的七种传播行为、事务隔离级别及其与数据库隔离级别的关系,以及Spring事务的10种失效场景。尼恩还强调了面试中如何给出高质量答案,推荐阅读《尼恩Java面试宝典PDF》以提升面试表现。更多技术资料可在公众号【技术自由圈】获取。
Java社招面试题:& 和 && 的区别,HR的套路险些让我翻车!
小米,29岁程序员,分享了一次面试经历,详细解析了Java中&和&&的区别及应用场景,展示了扎实的基础知识和良好的应变能力,最终成功获得Offer。
108 14
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等