【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析

前提介绍


在RocketMQ中一般有两种获取消息的方式,一个是拉(pull,消费者主动去broker拉取),一个是推(push,主动推送给消费者),在上一章节中已经介绍到了相关的Push操作,接下来的章节会介绍Pull操作方式的消费机制体系。

image.png




DefaultMQPullConsumer


DefaultMQPullConsumer与DefaultMQPushConsumer相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。下面来介绍一下DefaultMQPullConsumer的内部原理。

image.png

总体流程执行

image.png



DefaultMQPullConsumer使用例子


public class MQPullConsumer {
  private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
  public static void main(String[] args) throws MQClientException {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
    consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");
    consumer.start();
    // 从指定topic中拉取所有消息队列
    Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
    for(MessageQueue mq:mqs){
      try {
          // 获取消息的offset,指定从store中获取
         long offset = consumer.fetchConsumeOffset(mq,true);
          while(true){
         PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                                 putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
          switch(pullResult.getPullStatus()){
          case FOUND:
            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                        for (MessageExt m : messageExtList) {
                            System.out.println(new String(m.getBody()));
                        }
            break;
          case NO_MATCHED_MSG:
            break;
          case NO_NEW_MSG:
            break;
          case OFFSET_ILLEGAL:
            break;
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    consumer.shutdown();
  }
  // 保存上次消费的消息下标
  private static void putMessageQueueOffset(MessageQueue mq,
      long nextBeginOffset) {
      OFFSE_TABLE.put(mq, nextBeginOffset);
  }
  // 获取上次消费的消息的下标
  private static Long getMessageQueueOffset(MessageQueue mq) {
    Long offset = OFFSE_TABLE.get(mq);
    if(offset != null){
      return offset;
    }
    return 0l;
  }
}
复制代码
  • 消费者启动:consumer.start();
  • 获取主题下所有的消息队列:这里是根据topic从nameserver获取的这里我们可以修改为从其他位置获取队列信息
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
  //遍历队列
  for(MessageQueue mq:mqs){
    try {
        //获取当前队列的消费位移,第二个参数表示位移是从本地内存获取,还是从broker获取,true表示从broker获取
      long offset = consumer.fetchConsumeOffset(mq,true);
      while(true){
        //第二个参数表示可以消费哪些tag的消息
        //第三个参数表示从哪个位移开始消费消息
        //第四个参数表示一次最大拉多少个消息
        PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
}
复制代码



DefaultMQPullConsumer的总体流程

启动DefaultMQPullConsumer是通过调用start()方法完成的



DefaultMQPullConsumer拉取源码分析


分析下DefaultMQPullConsumer拉取消息的流程

consumer.fetchSubscribeMessageQueues("order-topic")
复制代码



从指定topic中拉取所有消息队列

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
复制代码



核心源码分析


fetchSubscribeMessageQueues()


  • 通过调用fetchSubscribeMessageQueues()方法可以获取指定topic(GET_ROUTEINTO_BY_TOPIC)的读队列信息。它通过向nameserver发送GetRouteInfoRequest请求,请求内容为GET_ROUTEINTO_BY_TOPIC,nameserver将主题下的读队列个数发送给消费者,然后消费者使用如下代码创建出与读队列个数相同的MessageQueue对象。
  • 每个MessageQueue对象里面记录了topic、broker名和读队列号。最后fetchSubscribeMessageQueues()将MessageQueue对象集合返回给调用者。
  • 向NameServer发送请求获取topic参数对应的Broker信息和topic配置信息,即TopicRouteData对象。
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        try {
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
            if (topicRouteData != null) {
                // 2、遍历topicRouteData
                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                if (!mqList.isEmpty()) {
                    return mqList;
                } else {
                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
                }
            }
        } catch (Exception e) {
            throw new MQClientException(
                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                e);
        }
        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
    }
复制代码



遍历过程TopicRouteData


遍历TopicRouteData对象的QueueData列表中每个QueueData对象,首先判断该QueueData对象是否具有读权限, 若有则根据该QueueData对象的readQueueNums值,创建readQueueNums个MessageQueue对象,并构成MessageQueue集合; 最后返回给MessageQueue集合

public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
        Set<MessageQueue> mqList = new HashSet<MessageQueue>();
        List<QueueData> qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            if (PermName.isReadable(qd.getPerm())) {
                for (int i = 0; i < qd.getReadQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    mqList.add(mq);
                }
            }
        }
        return mqList;
    }
复制代码



consumer.fetchConsumeOffset


通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置。


DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)
复制代码


fetchConsumeOffset()有两个入参,第一个参数表示队列,第二个参数表示是否从broker获取该队列的消费位移,true表示从broker获取,false表示从本地记录获取,如果本地获取不到再从broker获取。 这里说的从本地获取是指从RemoteBrokerOffsetStore.offsetTable属性中获取,该属性记录了每个队列的消费位移。当从broker获取位移后会更新offsetTable。



pullBlockIfNotFound拉取信息


rocketmq提供了多个拉取方法,可以使用pullBlockIfNotFound()方法也可以使用pull()方法。两者的区别是如果队列中没有消息,两个方法的超时时间是不同的,pullBlockIfNotFound会等待30s返回一个空结果,pull是等待10s返回空结果。


不过pull方法的入参可以调整超时时间,而pullBlockIfNotFound则需要修改DefaultMQPullConsumer.consumerPullTimeoutMillis参数。不过两个方法调用的底层逻辑都是一样的,都是调用DefaultMQPullConsumerImpl.pullSyncImpl()方法获取消息。下面分析一下pullSyncImpl()方法。

public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
    }
复制代码

获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用pullSyncImpl,可以获取相关的结果数据。


  • 参数1:消息队列(通过调用消费者的fetchSubscibeMessageQueue(topic)可以得到相应topic的所需要消息队列) ;
  • 参数2:需要过滤用的表达式 ;
  • 参数3:偏移量即消费队列的进度 ;
  • 参数4:一次取消息的最大值 ;


DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
复制代码



DefaultMQPullConsumerImpl.pullSyncImpl的实现过程


private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
        long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.isRunning();
        //检查入参是否合法
        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }
        if (offset < 0) {
            throw new MQClientException("offset < 0", null);
        }
        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }
        //更新再平衡服务的数据,因为再平衡服务不起作用,所以更新数据没有效果
        this.subscriptionAutomatically(mq.getTopic());
        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
        //计算超时时间,如果调用的是pullBlockIfNotFound方法,block参数就是true,否则就是false
        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
        //调用PullAPIWrapper从broker拉取消息,
        //pullKernelImpl方法里面构建PullMessageRequest请求对象
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,//队列
            subscriptionData.getSubString(),//消息的过滤规则
            subscriptionData.getExpressionType(),
            isTagType ? 0L : subscriptionData.getSubVersion(),
            offset,//拉取消息的位移
            maxNums,//建议broker一次性返回最大消息个数,默认是32个
            sysFlag,
            0,//设置的提交位移,可以看到永远都是0,所以broker无法记录有效位移,需要程序自己记录控制提交位移
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,//超时时间
            CommunicationMode.SYNC,
            null//回调逻辑为null
        );
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
        //If namespace is not null , reset Topic without namespace.
        this.resetTopic(pullResult.getMsgFoundList());
        if (!this.consumeMessageHookList.isEmpty()) {
            ConsumeMessageContext consumeMessageContext = null;
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
            consumeMessageContext.setConsumerGroup(this.groupName());
            consumeMessageContext.setMq(mq);
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            consumeMessageContext.setSuccess(false);
            this.executeHookBefore(consumeMessageContext);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            consumeMessageContext.setSuccess(true);
            this.executeHookAfter(consumeMessageContext);
        }
        return pullResult;
    }
复制代码


检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>变量中,若不在则以consumerGroup、topic、subExpression为参数调用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法构造SubscriptionData对象保存到RebalanceImpl.subscriptionInner变量中,其中 subExpression="*" this.subscriptionAutomatically(mq.getTopic()); // 构建标志位,逻辑或运算|= int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

SubscriptionData subscriptionData;
    try {
        //以请求参数subExpression以及consumerGroup、topic为参数调用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造SubscriptionData对象并返回
        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
            mq.getTopic(), subExpression);
    } catch (Exception e) {
        throw new MQClientException("parse subscription error", e);
    }
    long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
    // 从broker中拉取消息
    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
        mq,
        subscriptionData.getSubString(),
        0L,
        offset,
        maxNums,
        sysFlag,
        0,
        this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
        timeoutMillis,
        CommunicationMode.SYNC,
        null
    );
    // 对拉取到的消息进行解码,过滤并执行回调,并把解析的message列表放到MsgFoundList中
    this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
    if (!this.consumeMessageHookList.isEmpty()) {
        ConsumeMessageContext consumeMessageContext = null;
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setConsumerGroup(this.groupName());
        consumeMessageContext.setMq(mq);
        consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
        consumeMessageContext.setSuccess(false);
        this.executeHookBefore(consumeMessageContext);
        consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
        consumeMessageContext.setSuccess(true);
        this.executeHookAfter(consumeMessageContext);
    }
    return pullResult;
}
复制代码




Push和Pull的操作对比


  • push-优点:及时性、服务端统一处理实现方便
  • push-缺点:容易造成堆积、负载性能不可控
  • pull-优点:获得消息状态方便、负载均衡性能可控
  • pull-缺点:及时性差


使用DefaultMQPullConsumer拉取消息,发送到broker的提交位移永远都是0,所以broker无法记录有效位移,需要程序自己记录和控制提交位移。



资料参考





相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 缓存
分布式中间件核心原理与RocketMQ最佳实践
随着互联网业务的不断扩展和复杂化,分布式系统的需求也越来越迫切。为了满足这一需求,分布式中间件应运而生。在分布式系统中,中间件的角色是协调和管理各个节点之间的通信和数据交换,它起到了桥梁的作用。本文将介绍分布式中间件的核心原理和RocketMQ最佳实践,帮助读者更好地理解和应用分布式中间件。
335 1
|
6月前
|
消息中间件 中间件 Kafka
RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析
**RocketMQ**的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的 ,具体来说Broker端是**Netty服务器**用来负责与客户端的连接请求处理,而Producer/Consumer端是**Netty客户端**用来负责与Netty服务器的通信及请求响应处理。
158 1
|
7月前
|
消息中间件 存储 负载均衡
RocketMQ 源码分析——NameServer
- 编写优雅、高效的代码。RocketMQ作为阿里双十一交易核心链路产品,支撑千万级并发、万亿级数据洪峰。读源码可以积累编写高效、优雅代码的经验。 - 提升微观的架构设计能力,重点在思维和理念。Apache RocketMQ作为Apache顶级项目,它的架构设计是值得大家借鉴的。 - 解决工作中、学习中的各种疑难杂症。在使用RocketMQ过程中遇到消费卡死、卡顿等问题可以通过阅读源码的方式找到问题并给予解决。 - 在BATJ一线互联网公司面试中展现优秀的自己。大厂面试中,尤其是阿里系的公司,你有RocketMQ源码体系化知识,必定是一个很大的加分项。
128 0
|
7月前
|
消息中间件 存储 Kafka
RocketMQ 源码分析——Broker
1. Broker启动流程分析 2. 消息存储设计 3. 消息写入流程 4. 亮点分析:NRS与NRC的功能号设计 5. 亮点分析:同步双写数倍性能提升的CompletableFuture 6. 亮点分析:Commitlog写入时使用可重入锁还是自旋锁? 7. 亮点分析:零拷贝技术之MMAP提升文件读写性能 8. 亮点分析:堆外内存机制
116 0
|
11月前
|
消息中间件 Cloud Native 中间件
带你读《企业级云原生白皮书项目实战》——4.1.1 消息队列RocketMQ版概述
带你读《企业级云原生白皮书项目实战》——4.1.1 消息队列RocketMQ版概述
183 0
|
11月前
|
消息中间件 存储 运维
带你读《企业级云原生白皮书项目实战》——4.1.2 消息队列RocketMQ版的优势
带你读《企业级云原生白皮书项目实战》——4.1.2 消息队列RocketMQ版的优势
512 0
|
11月前
|
消息中间件 Cloud Native 中间件
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(上)
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(上)
360 0
|
11月前
|
消息中间件 存储 数据采集
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(下)
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(下)
337 0
|
11月前
|
消息中间件 监控 Cloud Native
带你读《企业级云原生白皮书项目实战》——4.1.4 消息队列Kafka版概述
带你读《企业级云原生白皮书项目实战》——4.1.4 消息队列Kafka版概述
299 1
|
11月前
|
消息中间件 存储 运维
带你读《企业级云原生白皮书项目实战》——4.1.5 消息队列Kafka版的优势
带你读《企业级云原生白皮书项目实战》——4.1.5 消息队列Kafka版的优势
566 0

相关产品

  • 云消息队列 MQ