【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
性能测试 PTS,5000VUM额度
简介: 【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析

前提介绍


在RocketMQ中一般有两种获取消息的方式,一个是拉(pull,消费者主动去broker拉取),一个是推(push,主动推送给消费者),在上一章节中已经介绍到了相关的Push操作,接下来的章节会介绍Pull操作方式的消费机制体系。

image.png




DefaultMQPullConsumer


DefaultMQPullConsumer与DefaultMQPushConsumer相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。下面来介绍一下DefaultMQPullConsumer的内部原理。

image.png

总体流程执行

image.png



DefaultMQPullConsumer使用例子


public class MQPullConsumer {
  private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
  public static void main(String[] args) throws MQClientException {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
    consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");
    consumer.start();
    // 从指定topic中拉取所有消息队列
    Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
    for(MessageQueue mq:mqs){
      try {
          // 获取消息的offset,指定从store中获取
         long offset = consumer.fetchConsumeOffset(mq,true);
          while(true){
         PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                                 putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
          switch(pullResult.getPullStatus()){
          case FOUND:
            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                        for (MessageExt m : messageExtList) {
                            System.out.println(new String(m.getBody()));
                        }
            break;
          case NO_MATCHED_MSG:
            break;
          case NO_NEW_MSG:
            break;
          case OFFSET_ILLEGAL:
            break;
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    consumer.shutdown();
  }
  // 保存上次消费的消息下标
  private static void putMessageQueueOffset(MessageQueue mq,
      long nextBeginOffset) {
      OFFSE_TABLE.put(mq, nextBeginOffset);
  }
  // 获取上次消费的消息的下标
  private static Long getMessageQueueOffset(MessageQueue mq) {
    Long offset = OFFSE_TABLE.get(mq);
    if(offset != null){
      return offset;
    }
    return 0l;
  }
}
复制代码
  • 消费者启动:consumer.start();
  • 获取主题下所有的消息队列:这里是根据topic从nameserver获取的这里我们可以修改为从其他位置获取队列信息
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
  //遍历队列
  for(MessageQueue mq:mqs){
    try {
        //获取当前队列的消费位移,第二个参数表示位移是从本地内存获取,还是从broker获取,true表示从broker获取
      long offset = consumer.fetchConsumeOffset(mq,true);
      while(true){
        //第二个参数表示可以消费哪些tag的消息
        //第三个参数表示从哪个位移开始消费消息
        //第四个参数表示一次最大拉多少个消息
        PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
}
复制代码



DefaultMQPullConsumer的总体流程

启动DefaultMQPullConsumer是通过调用start()方法完成的



DefaultMQPullConsumer拉取源码分析


分析下DefaultMQPullConsumer拉取消息的流程

consumer.fetchSubscribeMessageQueues("order-topic")
复制代码



从指定topic中拉取所有消息队列

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
复制代码



核心源码分析


fetchSubscribeMessageQueues()


  • 通过调用fetchSubscribeMessageQueues()方法可以获取指定topic(GET_ROUTEINTO_BY_TOPIC)的读队列信息。它通过向nameserver发送GetRouteInfoRequest请求,请求内容为GET_ROUTEINTO_BY_TOPIC,nameserver将主题下的读队列个数发送给消费者,然后消费者使用如下代码创建出与读队列个数相同的MessageQueue对象。
  • 每个MessageQueue对象里面记录了topic、broker名和读队列号。最后fetchSubscribeMessageQueues()将MessageQueue对象集合返回给调用者。
  • 向NameServer发送请求获取topic参数对应的Broker信息和topic配置信息,即TopicRouteData对象。
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        try {
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
            if (topicRouteData != null) {
                // 2、遍历topicRouteData
                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                if (!mqList.isEmpty()) {
                    return mqList;
                } else {
                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
                }
            }
        } catch (Exception e) {
            throw new MQClientException(
                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                e);
        }
        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
    }
复制代码



遍历过程TopicRouteData


遍历TopicRouteData对象的QueueData列表中每个QueueData对象,首先判断该QueueData对象是否具有读权限, 若有则根据该QueueData对象的readQueueNums值,创建readQueueNums个MessageQueue对象,并构成MessageQueue集合; 最后返回给MessageQueue集合

public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
        Set<MessageQueue> mqList = new HashSet<MessageQueue>();
        List<QueueData> qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            if (PermName.isReadable(qd.getPerm())) {
                for (int i = 0; i < qd.getReadQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    mqList.add(mq);
                }
            }
        }
        return mqList;
    }
复制代码



consumer.fetchConsumeOffset


通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置。


DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)
复制代码


fetchConsumeOffset()有两个入参,第一个参数表示队列,第二个参数表示是否从broker获取该队列的消费位移,true表示从broker获取,false表示从本地记录获取,如果本地获取不到再从broker获取。 这里说的从本地获取是指从RemoteBrokerOffsetStore.offsetTable属性中获取,该属性记录了每个队列的消费位移。当从broker获取位移后会更新offsetTable。



pullBlockIfNotFound拉取信息


rocketmq提供了多个拉取方法,可以使用pullBlockIfNotFound()方法也可以使用pull()方法。两者的区别是如果队列中没有消息,两个方法的超时时间是不同的,pullBlockIfNotFound会等待30s返回一个空结果,pull是等待10s返回空结果。


不过pull方法的入参可以调整超时时间,而pullBlockIfNotFound则需要修改DefaultMQPullConsumer.consumerPullTimeoutMillis参数。不过两个方法调用的底层逻辑都是一样的,都是调用DefaultMQPullConsumerImpl.pullSyncImpl()方法获取消息。下面分析一下pullSyncImpl()方法。

public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
    }
复制代码

获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用pullSyncImpl,可以获取相关的结果数据。


  • 参数1:消息队列(通过调用消费者的fetchSubscibeMessageQueue(topic)可以得到相应topic的所需要消息队列) ;
  • 参数2:需要过滤用的表达式 ;
  • 参数3:偏移量即消费队列的进度 ;
  • 参数4:一次取消息的最大值 ;


DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
复制代码



DefaultMQPullConsumerImpl.pullSyncImpl的实现过程


private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
        long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.isRunning();
        //检查入参是否合法
        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }
        if (offset < 0) {
            throw new MQClientException("offset < 0", null);
        }
        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }
        //更新再平衡服务的数据,因为再平衡服务不起作用,所以更新数据没有效果
        this.subscriptionAutomatically(mq.getTopic());
        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
        //计算超时时间,如果调用的是pullBlockIfNotFound方法,block参数就是true,否则就是false
        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
        //调用PullAPIWrapper从broker拉取消息,
        //pullKernelImpl方法里面构建PullMessageRequest请求对象
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,//队列
            subscriptionData.getSubString(),//消息的过滤规则
            subscriptionData.getExpressionType(),
            isTagType ? 0L : subscriptionData.getSubVersion(),
            offset,//拉取消息的位移
            maxNums,//建议broker一次性返回最大消息个数,默认是32个
            sysFlag,
            0,//设置的提交位移,可以看到永远都是0,所以broker无法记录有效位移,需要程序自己记录控制提交位移
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,//超时时间
            CommunicationMode.SYNC,
            null//回调逻辑为null
        );
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
        //If namespace is not null , reset Topic without namespace.
        this.resetTopic(pullResult.getMsgFoundList());
        if (!this.consumeMessageHookList.isEmpty()) {
            ConsumeMessageContext consumeMessageContext = null;
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
            consumeMessageContext.setConsumerGroup(this.groupName());
            consumeMessageContext.setMq(mq);
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            consumeMessageContext.setSuccess(false);
            this.executeHookBefore(consumeMessageContext);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            consumeMessageContext.setSuccess(true);
            this.executeHookAfter(consumeMessageContext);
        }
        return pullResult;
    }
复制代码


检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>变量中,若不在则以consumerGroup、topic、subExpression为参数调用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法构造SubscriptionData对象保存到RebalanceImpl.subscriptionInner变量中,其中 subExpression="*" this.subscriptionAutomatically(mq.getTopic()); // 构建标志位,逻辑或运算|= int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

SubscriptionData subscriptionData;
    try {
        //以请求参数subExpression以及consumerGroup、topic为参数调用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造SubscriptionData对象并返回
        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
            mq.getTopic(), subExpression);
    } catch (Exception e) {
        throw new MQClientException("parse subscription error", e);
    }
    long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
    // 从broker中拉取消息
    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
        mq,
        subscriptionData.getSubString(),
        0L,
        offset,
        maxNums,
        sysFlag,
        0,
        this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
        timeoutMillis,
        CommunicationMode.SYNC,
        null
    );
    // 对拉取到的消息进行解码,过滤并执行回调,并把解析的message列表放到MsgFoundList中
    this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
    if (!this.consumeMessageHookList.isEmpty()) {
        ConsumeMessageContext consumeMessageContext = null;
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setConsumerGroup(this.groupName());
        consumeMessageContext.setMq(mq);
        consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
        consumeMessageContext.setSuccess(false);
        this.executeHookBefore(consumeMessageContext);
        consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
        consumeMessageContext.setSuccess(true);
        this.executeHookAfter(consumeMessageContext);
    }
    return pullResult;
}
复制代码




Push和Pull的操作对比


  • push-优点:及时性、服务端统一处理实现方便
  • push-缺点:容易造成堆积、负载性能不可控
  • pull-优点:获得消息状态方便、负载均衡性能可控
  • pull-缺点:及时性差


使用DefaultMQPullConsumer拉取消息,发送到broker的提交位移永远都是0,所以broker无法记录有效位移,需要程序自己记录和控制提交位移。



资料参考





相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
24天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
2月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
3月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
64 3
|
3月前
|
消息中间件 Cloud Native Serverless
RabbitMQ 与云原生技术的融合
【8月更文第28天】随着微服务架构和容器化的普及,云原生技术已成为构建现代应用的标准方式。云原生应用程序利用了诸如容器化、微服务、声明式API等技术,以提高可伸缩性、可靠性和可维护性。消息队列作为服务间通信的关键组件,在云原生环境中扮演着重要角色。本文将探讨如何将RabbitMQ与云原生技术(如Service Mesh和Serverless平台)相结合,并通过具体的代码示例来展示其集成方法。
35 2
|
18天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
4月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
6月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
110 0
|
5月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1367 0
|
4月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
275 3

相关产品

  • 云消息队列 MQ