CousumeQueue中tag的作用

简介: 问题的提出存在就是有意义的,那么ConsumeQueue中存消息tag的hashcode是什么目的呢?
consumer.subscribe("TopicTest", "TagA");


那么在消息过滤是在broker还是Conumser呢?按照常理是在broker,因为在broker可以减少流量,实际情况是在broker过滤大部分,Consumer过滤一小部分


ConsumeQueue的结构


ConsumeQueue存的是主题的逻辑信息,如下图所示,代表一条记录。其中记录的信息存储在commitLog中,位置是CommitLog Offset。


5.png


Offset用于标记消息在CommitLog中的位置

Size用于标记消息的大小

HashCode用于过滤消息


源码跟踪


SubscriptionData的构建(Consumer启动)


Consumer一般会有订阅的主题和tag


consumer.subscribe("TopicTest", "TagA");


跟进去会跟到FilterAPI的buildSubscriptionData方法


public static SubscriptionData buildSubscriptionData(String topic, String subString) throws Exception {
        SubscriptionData subscriptionData = new SubscriptionData();
        subscriptionData.setTopic(topic);
        subscriptionData.setSubString(subString);
        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
        } else {
            String[] tags = subString.split("\\|\\|");
            if (tags.length > 0) {
                for (String tag : tags) {
                    if (tag.length() > 0) {
                        String trimString = tag.trim();
                        if (trimString.length() > 0) {
                            //添加tag的set
                            //添加tag的set
                            //添加tag的set
                            subscriptionData.getTagsSet().add(trimString);
                            //添加tag的hashcode的set
                            //添加tag的hashcode的set
                            //添加tag的hashcode的set
                            subscriptionData.getCodeSet().add(trimString.hashCode());
                        }
                    }
                }
            } else {
                throw new Exception("subString split error");
            }
        }
        return subscriptionData;
    }


总结:SubscriptionData包含了tag列表和tag的hashcode列表


broker过滤消息


首先Consumer给broker发送消息,请求code是 RequestCode.PULL_MESSAGE ,因此我们可以跟borker里对这个请求码的处理的processor,最后定位到

PullMessageProcessor###processRequest方法,方法里有如下的代码


final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);


跟DefaultMessageStore###getMessage方法


 public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        //省略        
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //获取消息的偏移量
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                             //获取消息的大小
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            //获取消息的tag的hashcode
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
                            maxPhyOffsetPulling = offsetPy;
                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }
                            //省略
                            //省略
                            //省略
                             //查看消息tag是否匹配,此时在broker实现过滤
                             //查看消息tag是否匹配,此时在broker实现过滤
                             //查看消息tag是否匹配,此时在broker实现过滤
                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }
                                continue;
                            }
                          //省略
                          //省略
                          //省略
        return getResult;
    }


跟进匹配方法,此时能发现过滤方法是看subscriptionData里是否有包含tagsCode


//ExpressionMessageFilter###
```java
 @Override
    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
            //省略
            //省略
            //省略
            //订阅主题里是否包含这个hashcode
            return subscriptionData.getCodeSet().contains(tagsCode.intValue());
        } else {
           //省略
    }


总结:broker是根据subscriptionData里的tag的hashcode列表去过滤消息,判断从ConsumeQueue中读取的tag的hashcode是否在subscriptionData里的tag的hashcode列表中。


Consumer过滤消息


Consumer开始跟的地方在DefaultMQPushConsumerImpl###pullMessage方法里有一个PullCallback,此方法是一个给broker发送拉取消息后的一个回调方法


PullCallback pullCallback = new PullCallback() {
@Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
         //省略
}


跟一下PullAPIWrapper###processPullResult方法


public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
              //省略
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                    //Consumer端过滤消息
                    //Consumer端过滤消息
                    //Consumer端过滤消息
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }
           //省略
        return pullResult;
    }


总结:broker端的消息过滤是通过看subscriptionData里的tag列表是否含有当前消息的tag


总结:broker和Consuemr都会过滤


(1)在Consumer启动的时候会传入topic和tag,然后把tag的string和hashcode封装到SubscriptionData中。

(2)当Conumser去broker拉消息的时候,查看SubscriptionData中的hashcode列表和Consumequeue中读取到的tag.hashcode是否一致,这个地方可以过滤大部分的消息。这是第一次过滤。

(3)当(2)通过过滤的消息会发送到Consumer,Consumer则会SubscriptionData中的tag列表中查看是否和当前tag匹配,这是第二次过滤。


目录
相关文章
|
2月前
|
Oracle Java 关系型数据库
@Id、@GeneratedValue的作用,以及@GeneratedValue的使用
@Id、@GeneratedValue的作用,以及@GeneratedValue的使用
|
5月前
4. 解决uni-app开发过程中view、image等标签出现诸如“出现错误:类型“{ class: string; }”的参数不能赋给类型“.......”
4. 解决uni-app开发过程中view、image等标签出现诸如“出现错误:类型“{ class: string; }”的参数不能赋给类型“.......”
506 0
|
6月前
|
Java 应用服务中间件 Android开发
完成你的自定义JSP Tag标签-Basic Coustom Tag
完成你的自定义JSP Tag标签-Basic Coustom Tag
34 0
|
存储 数据挖掘 数据库
data的含义与作用及使用方法
data的含义与作用及使用方法
6435 0
|
C++
c++中ref的作用
c++中ref的作用
157 0
|
移动开发 前端开发 数据安全/隐私保护
标签 tag
学习tag标签
98 0
|
JavaScript 开发者
动画-transition-group 中 appear 和 tag 属性的作用|学习笔记
快速学习动画- transition-group 中 appear 和 tag 属性的作用
280 0
动画-transition-group 中 appear 和 tag 属性的作用|学习笔记
|
开发工具 git
tag
tag
180 0
|
消息中间件 Java RocketMQ
Tag过滤|学习笔记
快速学习Tag过滤
vue94-auto close tag自动合并标签
vue94-auto close tag自动合并标签
127 0
vue94-auto close tag自动合并标签