前言
大家好,我是小郭,前面几篇文章整体上完成了对RocketMQ发送和消费端的一个流程,今天我主要想根据 RocketMQ 官方提供的基本最佳实践第一条,分享自己的一点心得,有问题欢迎大家指出~。
基本最佳实践
发送消息注意事项 - Tags的使用
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("TagA")。
疑问
官网在这里说到了一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。
但是没有提到他具体的场景,是整个应用下只用一个Topic还是某个业务功能下只用一个Topic呢?
如果没有正确的使用,可能会造成意想不到的意外后果,针对可能出现的情况,我做了一个小测试。
结论先行
先说一下我测试的结果吧
准备
RocketMQ版本:4.X
配置信息
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: my-product-group
send-message-timeout: 5000
retry-times-when-send-async-failed: 5
consumer:
group: my-consume-group
sync:
string-topic: string-topic
user-topic: user-topic
string-ext-topic: string-ext-topic
string-tag: tagA
string-ext-tag: tagB
测试一:相同消费者组 - 不同topic相同tags情况
发送两条消息,对应不同的Topic和相同tags
userSendReqDTO.setNickName("MsgTopic");
messageProduct.syncSend(rocketMqConfig.getSyncStringTopic() + StrUtil.COLON + rocketMqConfig.getSyncStringTag(), JSON.toJSONString(userSendReqDTO));
userSendReqDTO.setNickName("MsgExtTopic");
messageProduct.syncSend(rocketMqConfig.getMsgExtTopic() + StrUtil.COLON + rocketMqConfig.getSyncStringTag(), JSON.toJSONString(userSendReqDTO));
启动消费者 consumer1 和 consumer2 按顺序去启动
逻辑:按顺序 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟Broker建立连接通道,不管生产者还是消费者都会以默认30秒的间隔频率去上报心跳,然后开始消费消息。
// 发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
consumer1
@Service
@RocketMQMessageListener(topic = "${rocketmq.sync.string-topic}", consumerGroup = "string_consumer", selectorExpression = "${rocketmq.sync.string-tag}")
@Slf4j
public class syncStringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("不同topic相同tag情况 message : {}", s);
}
}
consumer2
@Service
@RocketMQMessageListener(topic = "${rocketmq.sync.string-ext-topic}", consumerGroup = "string_consumer", selectorExpression = "${rocketmq.sync.string-tag}")
@Slf4j
public class syncMsgExtConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("不同topic相同tag情况 message ext: {}", s);
}
}
执行结果
2022-12-08 16:47:51.184 INFO 34090 --- [nio-8080-exec-3] cn.patrick4j.user.mq.MessageProduct : sendResult : {"messageQueue":{"brokerName":"broker-a","queueId":1,"topic":"string-topic"},"msgId":"7F000001852A18B4AAC227A73B670004","offsetMsgId":"2765CCB400002A9F0000000001BDB92C","queueOffset":33,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}, topic : string-topic:tagA
2022-12-08 16:47:51.184 INFO 34090 --- [nio-8080-exec-3] cn.patrick4j.user.mq.MessageProduct : sendResult : {"messageQueue":{"brokerName":"broker-a","queueId":1,"topic":"string-topic"},"msgId":"7F000001852A18B4AAC227A73B670004","offsetMsgId":"2765CCB400002A9F0000000001BDB92C","queueOffset":33,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}, topic : string-topic:tagA
2022-12-08 16:48:14.425 INFO 34090 --- [ring_consumer_1] c.p.user.mq.consumer.syncStringConsumer : 不同topic相同tag情况 message ext: {"nickName":"MsgTopic"}
是不是发现,刚刚发送了两条消息,但是只显示消费了一条,那另外一条是什么情况呢?
通过管理平台可以看到是没有消费的,我们继续打开Broker的日志来看一下
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
原因很明显了,因为 Topic 信息不存在,但是前面几行的信息显示了更新了 subscription 信息
难道他们发生了相互覆盖?
根据日志的信息,我们发现了他们其实是在一直不断的切换
- subscription changed, add new topic -> subscription changed, remove topic 的过程
- 然后一直不断的进行重平衡
消费者端启动后,进行心跳发送,调用 Broker 的心跳函数
org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBroker -->
org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat -->
// 通过消费者组找到对应配置
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( data.getGroupName());
// 注册消费者信息到broker
org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer -->
// 更新发布信息
org.apache.rocketmq.broker.client.ConsumerGroupInfo#updateSubscription -->
// 通过 Topic 找到对应的订阅的关系
private final ConcurrentMap subscriptionTable = new ConcurrentHashMap();
所以因为不断的切换原因,那条消息需要等到下次轮到他的时候,他才会被拉取成功,
这里得出的结论就和我们上面的一致,如果订阅的 Topic 不同 tags 相同,会出现消费者之间来回切换的消费消息,会一直不断的进行重平衡。
测试二:相同消费者组 - 相同Topic不同tags情况
发送两条消息,对应相同的 Topic 和不同 tags
userSendReqDTO.setNickName("MsgTopic");
messageProduct.syncSend(rocketMqConfig.getSyncStringTopic() + StrUtil.COLON + rocketMqConfig.getSyncStringTag(), JSON.toJSONString(userSendReqDTO));
userSendReqDTO.setNickName("MsgExtTopic");
messageProduct.syncSend(rocketMqConfig.getSyncStringTopic() + StrUtil.COLON + rocketMqConfig.getMsgExtTopic(), JSON.toJSONString(userSendReqDTO));
启动消费者 consumer1 和 consumer2 按顺序去启动
consumer1
@Service
@RocketMQMessageListener(topic = "${rocketmq.sync.string-topic}", consumerGroup = "string_consumer", selectorExpression = "${rocketmq.sync.string-tag}")
@Slf4j
public class syncStringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("不同topic相同tag情况 message : {}", s);
}
}
consumer2
@Service
@RocketMQMessageListener(topic = "${rocketmq.sync.string-topic}", consumerGroup = "string_consumer", selectorExpression = "${rocketmq.sync.string-ext-tag}")
@Slf4j
public class syncMsgExtConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("不同topic相同tag情况 message ext: {}", s);
}
}
测试结果
2022-12-08 17:47:20.979 INFO 34896 --- [nio-8080-exec-2] cn.patrick4j.user.mq.MessageProduct : sendResult : {"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"string-topic"},"msgId":"7F000001885018B4AAC227DDB3EB0002","offsetMsgId":"2765CCB400002A9F0000000001BE45A1","queueOffset":53,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}, topic : string-topic:tagA
2022-12-08 17:47:21.019 INFO 34896 --- [nio-8080-exec-2] cn.patrick4j.user.mq.MessageProduct : sendResult : {"messageQueue":{"brokerName":"broker-a","queueId":1,"topic":"string-topic"},"msgId":"7F000001885018B4AAC227DDB4140003","offsetMsgId":"2765CCB400002A9F0000000001BE46C1","queueOffset":60,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}, topic : string-topic:tagB
2022-12-08 17:47:20.980 INFO 34896 --- [ring_consumer_2] c.p.user.mq.consumer.syncStringConsumer : 相同topic不相同tag情况 message : {"nickName":"MsgTopic"}
如我们所料,他确实只处理了一条数据,话不多说,我们继续打开admin管理工具来看一看是什么情况,
没有想到消息竟然被过滤了,消费者已经处理了消息,但是不是他的Tags,所以被过滤了
老套路,我们在此打开 Broker 的日志来看看,他是怎么处理的
按顺序启动消费者,第二个启动的消费者覆盖第一个启动的消费者配置,所以之后都是以第二个消费者的配置为准
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true;
log.info("subscription changed, add new topic, group: {} {}",
this.groupName,
sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) { //比较时间戳
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
old.toString(),
sub.toString()
);
}
// 覆盖之前的信息
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
...
return updated;
}
逻辑:在同一个消费者下,从 subscriptionTable 中通过Topic查询对应的内容,通过比较时间戳来更新内容
总结
- 在消费者组相同的情况下,使用不同Topic和相同的Tag,会造成消息消费不够及时,因为不断的进行重平衡,只有轮到对应的Topic,才会拉取消息成功。
- 在消费者组相同的情况下,使用同一个Topic和不同的Tag,会造成先启动消费者无法拉取消息,只有最后一个启动的消费者能消费部分消息。
如果如果在整个应用下,只使用同一个Topic,必须区分消费者组。
如果是某个业务功能,如订单,那他们订阅关系一定是相同的,那么Tag一定是相同,则使用同一个Topic和消费者组是没有问题。
- 在不同消费组下,则这两种情况都不是问题
这是我的理解,欢迎大家纠正 🛫🛫🛫