【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)

简介: 【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)

承接上文

承接上一章节的内容,下面我们看继续看拉取的调度模式,PULL与PUSH模式相比,PULL模式需要应用层不间断地进行拉取消息然后再执行消费处理,提高了应用层的编码复杂度,为了Pull方式的编程复杂度,RocketMQ提供了调度消费服务(MQPullConsumerScheduleService),在topic的订阅发送变化(初次订阅或距上次拉取消息超时)就触发PULL方式拉取消息。

MQPullConsumerScheduleService

MQPullConsumerScheduleService是PULL模式下面的调度服务,当RebalanceImpl.processQueueTable队列有变化时才进行消息的拉取,从而降低Pull方式的编程复杂度。在应用层按照如下方式使用:

使用MQPullConsumerScheduleService开发消费消息

实例化对象MQPullConsumerScheduleService

java

复制代码

final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");

设置NameServer

java

复制代码

scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876");

设置消费组为集群模式

java

复制代码

scheduleService.setMessageModel(MessageModel.CLUSTERING);

注册拉取回调函数

java

复制代码

scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {
            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                try {
                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;
                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    context.setPullNextDelayTimeMillis(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

从上下文中获取MQPullConsumer对象,此处其实就是DefaultMQPullConsumer。

java

复制代码

MQPullConsumer consumer = context.getPullConsumer();

获取该消费组的该队列的消费进度

java

复制代码

long offset = consumer.fetchConsumeOffset(mq, false);

拉取消息,pull()方法在DefaultMQPullConsumer有具体介绍

java

复制代码

PullResult pullResult = consumer.pull(mq, "*", offset, 32);

更新消费组该队列消费进度

java

复制代码

consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());

设置下次拉取消息时间间隔,单位毫秒

java

复制代码

context.setPullNextDelayTimeMillis(100);

启动调度组件,调用MQPullConsumerScheduleService.start()方法启动该调度服务。

ini

复制代码

scheduleService.start();
  1. 首先初始化队列监听器MessageQueueListenerImpl类,该类是MQPullConsumerScheduleService的内部类,实现了MessageQueueListener接口的messageQueueChanged方法;
  2. 将该监听器类赋值给DefaultMQPullConsumer.messageQueueListener变量值;
  3. 调用DefaultMQPullConsumer的start方法启动Consumer;

分析核心执行方法及流程

  1. 使用registerPullTaskCallback对Topic进行注册
  2. MQPullConsumerScheduleService 会将Topic的每个队列以及相应的 doPullTask() 实现放入名为 taskTable 的Hash表中。
  3. 线程池 scheduledThreadPoolExecutor 会不断的调用每个队列的 doPullTask() 函数。
  4. 在 doPullTask() 完成自己的拉取消息逻辑,和DefaultMQPullConsumer是一样的。
  5. 用户设置下次调用间隔时间
  6. scheduledThreadPoolExecutor 等待该间隔时间后,再次调用 doPullTask() 方法。

注册拉取任务回调函数

java

复制代码

/**
 * @param topic topic名称
 * @param callback 回调函数
 */
public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
    this.callbackTable.put(topic, callback);
    this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
}

PullTaskCallback回调函数接口

调用MQPullConsumerScheduleService.registerPullTaskCallback (String topic, PullTaskCallback callback)方法,在该方法中以topic为key值将自定义的PullTaskCallback 对象存入MQPullConsumerScheduleService. callbackTable:ConcurrentHashMap<String ,PullTaskCallback>变量中;

java

复制代码

public interface PullTaskCallback {
    /**
     * 
     * @param mq 消息队列
     * @param context 任务上下文
     */
    void doPullTask(final MessageQueue mq, final PullTaskContext context);
}

建立PullTaskCallback接口的实现类,实现该接口的doPullTask(final MessageQueue mq, final PullTaskContext context)方法。

在该方法中可以先调用DefaultMQPullConsumer.fetchConsumeOffset (MessageQueue mq, boolean fromStore)方法获取MessageQueue队列的消费进度

PullTaskContext拉取任务上下文

调用DefaultMQPullConsumer.pull(MessageQueue mq, String subExpression, long offset, int maxNums)方法,

  1. 指定的队列和指定的开始位置读取消息内容;
  2. 获取到的消息进行相关的业务逻辑处理;

java

复制代码

public class PullTaskContext {
    private int pullNextDelayTimeMillis = 200;
    // 使用该接口进行消息拉取,默认实现是DefaultMQPullConsumer
    private MQPullConsumer pullConsumer;
    public int getPullNextDelayTimeMillis() {
        return pullNextDelayTimeMillis;
    }
    /**
     * 设置下次调用doPullTask()的间隔时间,默认毫秒
     */
    public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) {
        this.pullNextDelayTimeMillis = pullNextDelayTimeMillis;
    }
    public MQPullConsumer getPullConsumer() {
        return pullConsumer;
    }
    public void setPullConsumer(MQPullConsumer pullConsumer) {
        this.pullConsumer = pullConsumer;
    }
}
  1. 调用DefaultMQPullConsumer.updateConsumeOffset(MessageQueue mq, long offset)方法进行消费进度的更新,其中offset值是在获取消息内容时返回的下一个消费进度值;

MQPullConsumerScheduleService的实现原理

触发拉取消息

RebalanceImpl.rebalanceByTopic()方法执行的过程中,若RebalanceImpl.processQueueTable有变化,则回调DefaultMQPullConsumer. messageQueueListener变量值的MessageQueueListenerImpl. MessageQueueChanged方法,在该方法中调用MQPullConsumerScheduleService. putTask(String topic, Set mqNewSet)方法。

  • 若为广播模式(BROADCASTING),则mqNewSet为该topic下面的所有MessageQueue队列;
  • 若为集群模式,则mqNewSet为给该topic分配的MessageQueue队列,putTask方法的大致逻辑如下:
  1. 遍历MQPullConsumerScheduleService.taskTable: ConcurrentHashMap<MessageQueue, PullTaskImpl> 列表(表示正在拉取消息的任务列表),检查该topic下面的所有MessageQueue对象,若该对象不在入参mqNewSet集合中的,将对应的PullTaskImpl对象的cancelled变量标记为true。
  2. mqNewSet集合中的MessageQueue对象,若不在MQPullConsumerScheduleService.taskTable列表中,则以MessageQueue对象为参数初始化PullTaskImpl对象,然后放入taskTable列表中,将该PullTaskImpl对象放入MQPullConsumerScheduleService.scheduledThreadPoolExecutor线程池中,然后立即执行该线程。

拉取消息的线程(PullTaskImpl)

该PullTaskImpl线程的run方法如下:

  1. 检查cancelled变量是为true,若为false则直接退出该线程;否则继续下面的处理;
  2. 以MessageQueue对象的topic值从MQPullConsumerScheduleService.callbackTable变量中获取PullTaskCallback的实现类(该类是由应用层实现);

3, 调用该PullTaskCallback实现类的doPullTask方法,即实现业务层定义的业务逻辑(通用逻辑是先获取消息内容,然后进行相应的业务处理,最后更新消费进度);

4, 再次检查cancelled变量是为true,若不为true,则将该PullTaskImpl对象再次放入MQPullConsumerScheduleService. scheduledThreadPoolExecutor线程池中,设定在200毫秒之后重新调度执行PullTaskImpl线程类;

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
25天前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
59 12
|
25天前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
32 2
|
2月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
47 0
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
80 2
|
4月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
76 1
|
4月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
4月前
|
消息中间件 数据安全/隐私保护 RocketMQ
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决
就软件研发问题之RocketMQ ACL 2.0的认证流程的问题如何解决