RocketMQ中msg&tag的生命周期

简介: RocketMQ中msg&tag的生命周期

1 写作目的


最近发现项目内部和外部沟通频繁使用MQ,并通过tag进行消息过滤和隔离,因此想搞清楚tag在源码中使用的地方,毕竟消息中间件这块还是有很多该学习的地方。


2 版本及说明


RocketMQ-4.9.1


3 初识ConsumeQueue及tag


首先要RocketMQ的文件存储设计,本文主要关注CommitLog文件和ConsumeQueue文件,如下图所示(图片引自该处)。当消息生产者生产消息时,所有topic的消息都会顺序的保存在CommitLog文件里,如果只从CommitLog一个文件看,是没有办法快速定位到某个topic的消息,那么此时就需要ConsumeQueue登场了。


5.png


ConsumeQueue在不同的文件夹下,根据不同的文件夹可以区分不同的队列,而ConsumeQueue文件存储的是消息的索引信息。


6.png


如上图所示消息生产者每生产一条消息就对应这下图的一条索引记录。其中消息的真实内容存储在commitLog中。


●CommitLog Offset:指向commitLog中文件的偏移量。

●Size:该条消息的大小。

●Message Tag Hashcode:生产消息时指定的 tag 的hash 值。


4 tag跟踪及定位


整个流程为:


  1. 1.producer生产消息
  2. 2.broker存储消息
  3. 3.conusmer启动流程
  4. 4.broker给consumer消息(过滤tag)
  5. 5.consumer消费消息(过滤&消费)


其中

topic = TopicTest

tag = TagA


4.1 producer生产消息


一般producer生产消息时候会使用如下代码,其中消息要包含topic、tag和msg消息体。


 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
Message msg =
            new Message(
                "TopicTest", // topic
                "TagA", // tag
                "OrderID188", // key
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body
SendResult sendResult = producer.send(msg);


其中上面的tag是存在哪呢?跟Message的构造方法可以看到tag其实是放在msg的properties里,MessageConst.PROPERTY_TAGS = TAGS


    public void setTags(String tags) {
        this.putProperty(MessageConst.PROPERTY_TAGS, tags);
    }


跟上面的send方法中间会跟到

MQClientAPIImpl#sendMessage方法,方法中的一行代码如下图所示,创建Request,因为本次发送为单条消息,所以代码中的三元表达式中选择RequestCode.SEND_MESSAGE_V2(310)。


request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);


在往下跟其实就是通过Netty给borker发送消息了(非本次内容关注重点,忽略)。


总结:


tag放在msg的properties里

发送请求的code = RequestCode.SEND_MESSAGE_V2(310)


4.2 broker存储消息


本文关注的有两个文件,一个是存储消息的CommitLog文件和存储topic索引的ConsumeQueue文件。


CommitLog是对外暴露的是一个逻辑日志(而真正对应的物理日志是多个MappedFile文件组成的)。该逻辑日志有一个最大偏移量maxOffset(DefaultMessageStore.this.commitLog.getMaxOffset())。当有新消息发到broker时消息会写到CommitLog里并且maxOffset就会增加。

而ConsumeQueue的构成是由另一个类ReputMessageService异步线程进行处理,异步构建Consumequeue。

ReputMessageService是Runnable实现类,run方法会每隔1秒执行doReput方法,如下面代码所示。


        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    Thread.sleep(1);
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }


ReputMessageService里有一个属性是reputFromOffset,该属性表示同步CommLog到Consumequeue的进度。

如果


this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset()


则说明有新的消息要从CommLog构建索引到Consumequeue。


而Consumequeue中的三个属性(commitlog offset、size、tag hashcode)是怎么来的?


本身我们是有一个CommitLog的偏移量(reputFromOffset),从这个偏移量开始往后解析我们是可以解析出整条消息的,消息格式如下图所示。


7.png

解析出整条消息后可以获取到


●commitlog offset :从消息中解析到

●size:解析消息后计算的

●tag hashcode :从消息中解析到msg的properties并获取到tags(字符串)然后获取hashcode。


那么就可以构建一条Consumequeue索引了。


总结:


broker收到消息后同步放在CommitLog中(本文没讲)

ReputMessageService通过异步不断扫描reputFromOffset和commitLog.getMaxOffset关系从而获取需要构建的通知。

解析消息获取Consumequeue参数并构建。


4.3 consumer启动流程


1、获取订阅的topic和Queue信息


2、通过Reblace获取被分配的Queue,开始拉取消息


4.3.1 consumer获取topic和Queue信息


消费者启动会调用

MQClientInstance#start()方法,start()方法里有会调用

MQClientInstance#startScheduledTask()方法,里面的一段代码如下,会每隔一段时间更新一下topic路由信息


//MQClientInstance###startScheduledTask()
 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);


会把路由信息保存到本地的一个HashMap里,这样消费者就拿到了topic的信息并且会把broker的信息保存下来


//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//根据主题从nameserver获取topic信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);


//MQClientInstance###updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer)
//把主题和主题队列相关的broker保存下来
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }


总结:


消费者拿到主题的队列列表和broker信息


4.3.2 consumer拉取消息


consumer怎么开始拉取消息?这里其实是一个reblance的过程

MQClientInstance的start的方法里会开启一个rebalance的线程,如下面代码所示


//MQClientInstance###start()
public void start() throws MQClientException {
 //省略
 // Start rebalance service
 this.rebalanceService.start();
 //省略
}


跟RebalanceService的run()方法一直跟下去最后跟到RebalanceImpl的rebalanceByTopic方法。如下面代码所示。根据主题队列列表和消费者组集合去做一个Rebalance,最后的返回结果是当前消费者需要消费的主题队列。


//RebalanceImpl##rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
                //获取订阅的主题的队列
                //获取订阅的主题的队列
                //获取订阅的主题的队列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                //获取同消费者组的ClientID集合
                //获取同消费者组的ClientID集合
                //获取同消费者组的ClientID集合
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    //排序
                    //排序
                    //排序
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                    }
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        //rebalance算法核心实现,最后的结果是返回应该消费的队列
                        allocateResultSet.addAll(allocateResult);
                    }
                    //此处看下面的消费者怎么去拉消息
                    //此处看下面的消费者怎么去拉消息
                    //此处看下面的消费者怎么去拉消息
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
        }
    }


上面代码中allocateResultSet就是该consumerGroup被分配的Queue。后面会把每一个Queue包装成一个Task去对应的Broker中拉取消息。


8.png


总结:


如下图所示,RebalanceService线程会根据情况把请求放在PullMessageService的pullRequestQueue阻塞队列队列里,队列的每一个节点就是拉请求;PullMessageService线程就是不断去pullRequestQueue里拿任务然后去看一下broker中有没有数据,如果有数据就消费。


4.4 broker响应consumer请求(过滤tag)


首先Consumer给broker发送消息,请求code是 RequestCode.PULL_MESSAGE ,因此我们可以跟borker里对这个请求码的处理的processor,最后定位到

PullMessageProcessor#processRequest方法,方法里有如下的代码


final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);


跟DefaultMessageStore#getMessage方法


 public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        //省略        
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //获取消息的偏移量
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                             //获取消息的大小
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            //获取消息的tag的hashcode
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
                            maxPhyOffsetPulling = offsetPy;
                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }
                            //省略
                            //省略
                            //省略
                             //查看消息tag是否匹配,此时在broker实现过滤
                             //查看消息tag是否匹配,此时在broker实现过滤
                             //查看消息tag是否匹配,此时在broker实现过滤
                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }
                                continue;
                            }
                          //省略
                          //省略
                          //省略
        return getResult;
    }


跟进匹配方法,此时能发现过滤方法是看subscriptionData里是否有包含tagsCode


//ExpressionMessageFilter#isMatchedByConsumeQueue
 @Override
    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
            //省略
            //省略
            //省略
            //订阅主题里是否包含这个hashcode
            return subscriptionData.getCodeSet().contains(tagsCode.intValue());
        } else {
           //省略
    }


总结:broker是根据subscriptionData里的tag的hashcode列表去过滤消息,判断从ConsumeQueue中读取的tag的hashcode是否在subscriptionData里的tag的hashcode列表中。


4.5consumer消费消息(过滤tag&消费)


Consumer端在DefaultMQPushConsumerImpl#pullMessage方法里有一个PullCallback,此方法是一个给broker发送拉取消息后的一个回调方法


PullCallback pullCallback = new PullCallback() {
@Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
         //省略
}


跟一下PullAPIWrapper#processPullResult方法


public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
              //省略
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                    //Consumer端过滤消息
                    //Consumer端过滤消息
                    //Consumer端过滤消息
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }
           //省略
        return pullResult;
    }


总结:broker端的消息过滤是通过看subscriptionData里的tag列表是否含有当前消息的tag

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
SQL 存储 消息中间件
RocketMQ的TAG过滤和SQL过滤机制
写作目的 项目中各个中台都使用同一个DB。而DB下会使用中间件监听binlog转换成MQ消息,而下游的各个中台去MQ去拿自己感兴趣的消息。
296 0
RocketMQ的TAG过滤和SQL过滤机制
|
消息中间件 存储 RocketMQ
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
661 0
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
|
消息中间件 存储 缓存
我擦,RocketMQ的tag还有这个“坑”!
我擦,RocketMQ的tag还有这个“坑”!
我擦,RocketMQ的tag还有这个“坑”!
|
消息中间件 负载均衡 Java
RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象
RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象
|
消息中间件 负载均衡 Java
RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?
RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?
1262 0
|
消息中间件 RocketMQ 测试技术
rocketMq - tag不一致造成的假象
概述     这篇文章是以同事在实际工作中遇到的问题作为分析的切入点,加深自己对mq的掌握,践行“干中学”的团队理念。     当自己差不多把基本概念都掌握的差不多的时候,必须需要实际的案例或者实践来提深自己的深度,这个时候just do it 变得很重要,所以我喜欢不停的被人挑战,截止目前帮人解答的问题包括:client端消息堆积问题、批量消息拉取问题中遇到的神奇的数字32、以及本篇的tag不一致造成的假象,也就说会有3篇文章输出。
1443 0
|
6天前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
6天前
|
消息中间件 物联网 Java
MQTT常见问题之微消息队列配置失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
6天前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
42 2
|
6天前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
82 0