【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)

简介: 【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)

消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。我们接下来主要介绍Pull模式

Pull模式的处理机制

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程

在RocketMQ中有两种Pull方式,一种是比较原始Pull Consumer,它不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点。另一种是Lite Pull Consumer,它提供了Subscribe和Assign两种方式,使用起来更加方便。

Pull模式的使用特点

  • 开发者自己维护OffsetStore。
  • 自己保存消费组的offset,比如存入Redis,或调用MQ接口将其保存在Broker端。自主选择Message Queue和offset进行消息拉取。
  • 用户拉去消息时,需要用户自己来决定拉去哪个队列从哪个offset开始,拉去多少消息。

相比Push的运行特点

与PUSH模式相比,PULL模式需要应用层不间断地进行拉取消息然后再执行消费处理,提高了应用层的编码复杂度,为了Pull方式的编程复杂度,RocketMQ提供了调度消费服务(MQPullConsumerScheduleService),在topic的订阅发送变化(初次订阅或距上次拉取消息超时)就触发PULL方式拉取消息。

DefaultMQPullConsumer

针对于DefaultMQPullConsumer源码流程进行相关的分析,对于Push模式而言,Pull 模式比较适应于客户端拉去的速度由自己进行控制处理。而且实现的原理和复杂程度也简单了很多,我们从实现出发,进行分析对应的实现流程。

DefaultMQPullConsumer的Pull拉取模式的开发案例

指定队列模式消费对应队列的消息

java

复制代码

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.start();
    try {
      MessageQueue mq = new MessageQueue();
      mq.setQueueId(0);
      mq.setTopic("lob");
      mq.setBrokerName("brokerName");
      long offset = 26;
      PullResult pullResult = consumer.pull(mq, "*", offset, 32);
      if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
        System.out.printf("%s%n", pullResult.getMsgFoundList());
        consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    consumer.shutdown();
  }

消费所有队列数据

从所有队列进行选择队列模式,并且存储offset在被本地。

java

复制代码

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
for(MessageQueue mq:mqs){
    try {
   // 获取消息的offset,指定从store中获取
   long offset = consumer.fetchConsumeOffset(mq,true);
    while(true){
       PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
       putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
      switch(pullResult.getPullStatus()){
         case FOUND:
                  List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                  for (MessageExt m : messageExtList) {
                      System.out.println(new String(m.getBody()));
                   }
                break;
        case NO_MATCHED_MSG:
                break;
        case NO_NEW_MSG:
                break;
        case OFFSET_ILLEGAL:
                break;
      }
  }
} catch (Exception e) {
     e.printStackTrace();
 }
}
consumer.shutdown();
// 保存上次消费的消息下标
private static void putMessageQueueOffset(MessageQueue mq,
      long nextBeginOffset) {
      OFFSE_TABLE.put(mq, nextBeginOffset);
}
// 获取上次消费的消息的下标
private static Long getMessageQueueOffset(MessageQueue mq) {
      Long offset = OFFSE_TABLE.get(mq);
      if(offset != null){
         return offset;
       }
   return 0l;
}
fetchSubscribeMessageQueues(从指定topic中拉取所有消息队列)

根据Topic获取该Topic的所有消息队列,用于遍历消息队列,从每个消息队列中获取消息,

调用DefaultMQPullConsumer.fetchSubscribeMessageQueues(String topic)方法,根据topic获取对应的MessageQueue(即可被订阅的队列),在该方法中最终通过调用MQAdminImpl.fetchSubscribeMessageQueues(String topic)方法从NameServer获取该topic的MessageQueue。

java

复制代码

/**
 * @param topic Topic名称
 * @return 该Topic所有的消息队列
 */
@Override
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
    return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic));
}
fetchSubscribeMessageQueues底层调用

调用MQClientAPIImpl.getTopicRouteInfoFromNameServer(String topic, long timeoutMillis)方法,其中timeoutMillis=3000,该方法向NameServer发送GET_ROUTEINTO_BY_TOPIC请求码获取topic参数对应的Broker信息和topic配置信息,即TopicRouteData对象;.

JAVA

复制代码

public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        try {
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
            if (topicRouteData != null) {
                // 2、遍历topicRouteData
                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                if (!mqList.isEmpty()) {
                    return mqList;
                } else {
                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
                }
            }
        } catch (Exception e) {
            throw new MQClientException(
                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                e);
        }
 
        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
    }
fetchSubscribeMessageQueues底层调用

遍历TopicRouteData对象的QueueData列表中每个QueueData对象,首先判断该QueueData对象是否具有读权限,若有则根据该QueueData对象的readQueueNums值,创建readQueueNums个MessageQueue对象,并构成MessageQueue集合;最后返回给MessageQueue集合

JAVA

复制代码

public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
        Set<MessageQueue> mqList = new HashSet<MessageQueue>();
        List<QueueData> qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            if (PermName.isReadable(qd.getPerm())) {
                for (int i = 0; i < qd.getReadQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    mqList.add(mq);
                }
            }
        }
        return mqList;
    }
消息的三种拉取模式
同步拉取消息

java

复制代码

/**
 * @param mq            消息队列
 * @param subExpression 消息tag过滤表达式
 * @param offset        消费组offset(从哪里开始拉去)
 * @param maxNums       一次最大拉去消息数量
 * @param timeout       超时时间
 * @return 存储了拉取状态以及消息
 */
@Override
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, timeout);
}
异步拉取消息

java

复制代码

/**
 * @param mq            消息队列
 * @param subExpression 消息tag过滤表达式
 * @param offset        消费组offset(从哪里开始拉去)
 * @param maxNums       一次最大拉去消息数量
 * @param timeout       超时时间
 * @param pullCallback  异步回调函数
 * @param timeout       
 * @throws MQClientException
 * @throws RemotingException
 * @throws InterruptedException
 */
@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
                 long timeout)
        throws MQClientException, RemotingException, InterruptedException {
    this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback, timeout);
}
同步阻塞拉取消息

拉取消息,若没有找到消息,则阻塞一段时间。通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容。

  • maxNums=32即表示获取的最大消息个数。
  • offset为该MessageQueue对象的开始消费位置,可以调用DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)方法

java

复制代码

/**
 * @param mq            消息队列
 * @param subExpression tag过滤
 * @param offset        消费组offset
 * @param maxNums       一次最大拉取数量
 * @return
 */
@Override
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums);
}

pullBlockIfNotFound 和 pull区别是: 前者在没有找到消息的时候会阻塞一段时间以便等待后续消息进入,后者则会直接返回 NOT_FOUND 。

维护消息队列的Offset
获取队列的消费Offset

java

复制代码

/**
 * @param mq 队列
 * @param fromStore 是否从存储获取,true: 从当前服务器存储中获取,false:从远程broker获取
 * @return 消费offset
 */
@Override
public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {
    return this.defaultMQPullConsumerImpl.fetchConsumeOffset(queueWithNamespace(mq), fromStore);
}

调用DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)方法获取MessageQueue队列的消费进度,其中fromStore为false表示从存储端(即Broker端)获取消费进度;若fromStore为true表示从本地内存获取消费进度;

  1. 对于从存储端获取消费进度(即fromStore=true)的情况:
  • 对于LocalFileOffsetStore对象,从本地加载offsets.json文件,然后获取该MessageQueue对象的offset值;

(即fromStore=false)对于RemoteBrokerOffsetStore对象,获取逻辑如下:

  1. 以MessageQueue对象的brokername从MQClientInstance. brokerAddrTable中获取Broker的地址;若没有获取到则立即调用updateTopicRouteInfoFromNameServer方法然后再次获取;
  2. 构造QueryConsumerOffsetRequestHeader对象,其中包括topic、consumerGroup、queueId;然后调用MQClientAPIImpl.queryConsumerOffset (String addr, QueryConsumerOffsetRequestHeader requestHeader, long timeoutMillis)方法向Broker发送QUERY_CONSUMER_OFFSET请求码,获取消费进度Offset;
  3. 用上一步从Broker获取的offset更新本地内存的消费进度列表数据RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap<MessageQueue, AtomicLong>变量值;
更新消费组Offset

更新消费组的Offset,注意:只会在本地内存中更新,并不会同步到远程Broker.

java

复制代码

/**
 * @param mq 消息队列
 * @param offset 消费进度
 */
@Override
public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
    this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);
}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
13天前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
26 0
|
1月前
|
消息中间件 Java RocketMQ
MQ产品使用合集之在同一个 Java 进程内建立三个消费对象并设置三个消费者组订阅同一主题和标签的情况下,是否会发生其中一个消费者组无法接收到消息的现象
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 小程序 网络性能优化
蓝易云 - 直播小程序源码有用的协议知识:MQTT协
在直播小程序源码中,MQTT协议可以用于实现实时消息推送,如弹幕、聊天消息、礼物信息等。通过使用MQTT协议,可以确保消息的实时性和可靠性,从而提高用户体验。
59 0
|
1月前
|
监控 安全 物联网
阿里云mqtt简介和使用流程
本文介绍了阿里云MQTT的准备工作、简介和使用流程。首先,用户需要注册阿里云账号并完成实名认证。接着,通过阿里云物联网平台创建产品和设备,获取连接所需的Broker Address、Port、Username和Password。然后,使用MQTT客户端(如MQTTX)配置这些信息进行连接,并激活设备。最后,创建并订阅/发布自定义Topic,实现设备间的通信。阿里云MQTT是一个适用于物联网设备的轻量级通信协议,提供高并发、高可靠性的服务,广泛应用于各种物联网场景。
阿里云mqtt简介和使用流程
|
1月前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
78 0
RabbitMQ入门指南(九):消费者可靠性
|
1月前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
27 1
|
1月前
|
消息中间件 RocketMQ
RocketMq消费者/生产者配置
RocketMq消费者/生产者配置
|
1月前
|
消息中间件 传感器 网络协议
阿里云MQTT简介和使用流程
以下是内容的摘要: 该文主要介绍了在阿里云上搭建 MQTT 服务器的步骤。首先,需要注册阿里云账号并进行实名认证。然后,购买阿里云 MQTT 实例,选择合适的类型、地域、连接和消息限制。接着,创建产品和设备,命名并上线,获取 MQTT 连接的相关信息,包括 ProductKey、DeviceName 和 DeviceSecret。通过提供的 MQTT.fx 工具,设置 MQTT 客户端连接参数,包括 Broker 地址、端口、用户名和密码。最后,使用 MQTT.fx 测试连接,实现数据的上报和接收,验证 MQTT 服务器的配置是否成功。
|
10天前
|
消息中间件 Java 双11
RocketMQ:揭秘电商巨头背后的消息队列秘密
**RocketMQ概览:**高性能分布式消息队列,适用于有序消息、事务处理、流计算、消息推送、日志处理及Binlog分发。在双11等高流量场景下证明了其性能、稳定性和低延迟。Java开发,利于扩展,性能超RabbitMQ,支持死信队列,但可能有集成兼容性问题。适合Java开发者,为电商等场景优化,每秒处理大量消息。
32 3
RocketMQ:揭秘电商巨头背后的消息队列秘密
|
17天前
|
消息中间件 监控 应用服务中间件
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的

热门文章

最新文章