consumer.subscribe("TopicTest", "TagA");
那么在消息过滤是在broker还是Conumser呢?按照常理是在broker,因为在broker可以减少流量,实际情况是在broker过滤大部分,Consumer过滤一小部分
ConsumeQueue的结构
ConsumeQueue存的是主题的逻辑信息,如下图所示,代表一条记录。其中记录的信息存储在commitLog中,位置是CommitLog Offset。
Offset用于标记消息在CommitLog中的位置
Size用于标记消息的大小
HashCode用于过滤消息
源码跟踪
SubscriptionData的构建(Consumer启动)
Consumer一般会有订阅的主题和tag
consumer.subscribe("TopicTest", "TagA");
跟进去会跟到FilterAPI的buildSubscriptionData方法
public static SubscriptionData buildSubscriptionData(String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\|\\|"); if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); if (trimString.length() > 0) { //添加tag的set //添加tag的set //添加tag的set subscriptionData.getTagsSet().add(trimString); //添加tag的hashcode的set //添加tag的hashcode的set //添加tag的hashcode的set subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { throw new Exception("subString split error"); } } return subscriptionData; }
总结:SubscriptionData包含了tag列表和tag的hashcode列表
broker过滤消息
首先Consumer给broker发送消息,请求code是 RequestCode.PULL_MESSAGE ,因此我们可以跟borker里对这个请求码的处理的processor,最后定位到
PullMessageProcessor###processRequest方法,方法里有如下的代码
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
跟DefaultMessageStore###getMessage方法
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { //省略 for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { //获取消息的偏移量 long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); //获取消息的大小 int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); //获取消息的tag的hashcode long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy; if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } //省略 //省略 //省略 //查看消息tag是否匹配,此时在broker实现过滤 //查看消息tag是否匹配,此时在broker实现过滤 //查看消息tag是否匹配,此时在broker实现过滤 if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } //省略 //省略 //省略 return getResult; }
跟进匹配方法,此时能发现过滤方法是看subscriptionData里是否有包含tagsCode
//ExpressionMessageFilter### ```java @Override public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { //省略 //省略 //省略 //订阅主题里是否包含这个hashcode return subscriptionData.getCodeSet().contains(tagsCode.intValue()); } else { //省略 }
总结:broker是根据subscriptionData里的tag的hashcode列表去过滤消息,判断从ConsumeQueue中读取的tag的hashcode是否在subscriptionData里的tag的hashcode列表中。
Consumer过滤消息
Consumer开始跟的地方在DefaultMQPushConsumerImpl###pullMessage方法里有一个PullCallback,此方法是一个给broker发送拉取消息后的一个回调方法
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); //省略 }
跟一下PullAPIWrapper###processPullResult方法
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { //省略 for (MessageExt msg : msgList) { if (msg.getTags() != null) { //Consumer端过滤消息 //Consumer端过滤消息 //Consumer端过滤消息 if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } //省略 return pullResult; }
总结:broker端的消息过滤是通过看subscriptionData里的tag列表是否含有当前消息的tag
总结:broker和Consuemr都会过滤
(1)在Consumer启动的时候会传入topic和tag,然后把tag的string和hashcode封装到SubscriptionData中。
(2)当Conumser去broker拉消息的时候,查看SubscriptionData中的hashcode列表和Consumequeue中读取到的tag.hashcode是否一致,这个地方可以过滤大部分的消息。这是第一次过滤。
(3)当(2)通过过滤的消息会发送到Consumer,Consumer则会SubscriptionData中的tag列表中查看是否和当前tag匹配,这是第二次过滤。