一个应用尽可能用一个Topic是最佳实践吗?没理解就用保证出错

简介: RocketMQ 官方提供的基本最佳实践第一条,分享自己的一点心得,有问题欢迎大家指出~> 一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("TagA")。

前言

大家好,我是小郭,前面几篇文章整体上完成了对RocketMQ发送和消费端的一个流程,今天我主要想根据 RocketMQ 官方提供的基本最佳实践第一条,分享自己的一点心得,有问题欢迎大家指出~。

基本最佳实践

发送消息注意事项 - Tags的使用

一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("TagA")。

疑问

官网在这里说到了一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。

但是没有提到他具体的场景,是整个应用下只用一个Topic还是某个业务功能下只用一个Topic呢?

如果没有正确的使用,可能会造成意想不到的意外后果,针对可能出现的情况,我做了一个小测试。

结论先行

先说一下我测试的结果吧

消费者组.png

准备

RocketMQ版本:4.X

配置信息

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: my-product-group
    send-message-timeout: 5000
    retry-times-when-send-async-failed: 5
  consumer:
    group: my-consume-group
  sync:
    string-topic: string-topic
    user-topic: user-topic
    string-ext-topic: string-ext-topic
    string-tag: tagA
    string-ext-tag: tagB

测试一:相同消费者组 - 不同topic相同tags情况

发送两条消息,对应不同的Topic和相同tags

userSendReqDTO.setNickName("MsgTopic");
messageProduct.syncSend(rocketMqConfig.getSyncStringTopic() + StrUtil.COLON + rocketMqConfig.getSyncStringTag(), JSON.toJSONString(userSendReqDTO));

userSendReqDTO.setNickName("MsgExtTopic");

messageProduct.syncSend(rocketMqConfig.getMsgExtTopic() + StrUtil.COLON +  rocketMqConfig.getSyncStringTag(), JSON.toJSONString(userSendReqDTO));

启动消费者 consumer1consumer2 按顺序去启动

逻辑:按顺序 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟Broker建立连接通道,不管生产者还是消费者都会以默认30秒的间隔频率去上报心跳,然后开始消费消息。

// 发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

consumer1

@Service
@RocketMQMessageListener(topic = "${rocketmq.sync.string-topic}", consumerGroup = "string_consumer", selectorExpression = "${rocketmq.sync.string-tag}")
@Slf4j
public class syncStringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        log.info("不同topic相同tag情况 message : {}", s);
    }
}

consumer2

@Service
@RocketMQMessageListener(topic = "${rocketmq.sync.string-ext-topic}", consumerGroup = "string_consumer", selectorExpression = "${rocketmq.sync.string-tag}")
@Slf4j
public class syncMsgExtConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        log.info("不同topic相同tag情况 message ext: {}", s);
    }
}

执行结果

2022-12-08 16:47:51.184 INFO 34090 --- [nio-8080-exec-3] cn.patrick4j.user.mq.MessageProduct : sendResult : {"messageQueue":{"brokerName":"broker-a","queueId":1,"topic":"string-topic"},"msgId":"7F000001852A18B4AAC227A73B670004","offsetMsgId":"2765CCB400002A9F0000000001BDB92C","queueOffset":33,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}, topic : string-topic:tagA

2022-12-08 16:47:51.184 INFO 34090 --- [nio-8080-exec-3] cn.patrick4j.user.mq.MessageProduct : sendResult : {"messageQueue":{"brokerName":"broker-a","queueId":1,"topic":"string-topic"},"msgId":"7F000001852A18B4AAC227A73B670004","offsetMsgId":"2765CCB400002A9F0000000001BDB92C","queueOffset":33,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}, topic : string-topic:tagA

2022-12-08 16:48:14.425 INFO 34090 --- [ring_consumer_1] c.p.user.mq.consumer.syncStringConsumer : 不同topic相同tag情况 message ext: {"nickName":"MsgTopic"}

是不是发现,刚刚发送了两条消息,但是只显示消费了一条,那另外一条是什么情况呢?

image.png

通过管理平台可以看到是没有消费的,我们继续打开Broker的日志来看一下

image.png

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
    log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
    response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
    response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
    return response;
}

原因很明显了,因为 Topic 信息不存在,但是前面几行的信息显示了更新了 subscription 信息

难道他们发生了相互覆盖?

根据日志的信息,我们发现了他们其实是在一直不断的切换

  1. subscription changed, add new topic -> subscription changed, remove topic 的过程
  2. 然后一直不断的进行重平衡

消费者端启动后,进行心跳发送,调用 Broker 的心跳函数

org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBroker -->

org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat -->

// 通过消费者组找到对应配置 
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( data.getGroupName());

// 注册消费者信息到broker 
org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer -->

// 更新发布信息 
org.apache.rocketmq.broker.client.ConsumerGroupInfo#updateSubscription -->

// 通过 Topic 找到对应的订阅的关系
private final ConcurrentMap subscriptionTable = new ConcurrentHashMap(); 

所以因为不断的切换原因,那条消息需要等到下次轮到他的时候,他才会被拉取成功,

这里得出的结论就和我们上面的一致,如果订阅的 Topic 不同 tags 相同,会出现消费者之间来回切换的消费消息,会一直不断的进行重平衡。

测试二:相同消费者组 - 相同Topic不同tags情况

发送两条消息,对应相同的 Topic 和不同 tags

userSendReqDTO.setNickName("MsgTopic");

messageProduct.syncSend(rocketMqConfig.getSyncStringTopic() + StrUtil.COLON + rocketMqConfig.getSyncStringTag(), JSON.toJSONString(userSendReqDTO));

userSendReqDTO.setNickName("MsgExtTopic");

messageProduct.syncSend(rocketMqConfig.getSyncStringTopic() + StrUtil.COLON +  rocketMqConfig.getMsgExtTopic(), JSON.toJSONString(userSendReqDTO));

启动消费者 consumer1consumer2 按顺序去启动

consumer1

@Service
@RocketMQMessageListener(topic = "${rocketmq.sync.string-topic}", consumerGroup = "string_consumer", selectorExpression = "${rocketmq.sync.string-tag}")
@Slf4j
public class syncStringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        log.info("不同topic相同tag情况 message : {}", s);
    }
}

consumer2

@Service
@RocketMQMessageListener(topic = "${rocketmq.sync.string-topic}", consumerGroup = "string_consumer", selectorExpression = "${rocketmq.sync.string-ext-tag}")
@Slf4j
public class syncMsgExtConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        log.info("不同topic相同tag情况 message ext: {}", s);
    }
}

测试结果

2022-12-08 17:47:20.979 INFO 34896 --- [nio-8080-exec-2] cn.patrick4j.user.mq.MessageProduct : sendResult : {"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"string-topic"},"msgId":"7F000001885018B4AAC227DDB3EB0002","offsetMsgId":"2765CCB400002A9F0000000001BE45A1","queueOffset":53,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}, topic : string-topic:tagA

2022-12-08 17:47:21.019 INFO 34896 --- [nio-8080-exec-2] cn.patrick4j.user.mq.MessageProduct : sendResult : {"messageQueue":{"brokerName":"broker-a","queueId":1,"topic":"string-topic"},"msgId":"7F000001885018B4AAC227DDB4140003","offsetMsgId":"2765CCB400002A9F0000000001BE46C1","queueOffset":60,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}, topic : string-topic:tagB

2022-12-08 17:47:20.980 INFO 34896 --- [ring_consumer_2] c.p.user.mq.consumer.syncStringConsumer : 相同topic不相同tag情况 message : {"nickName":"MsgTopic"}

如我们所料,他确实只处理了一条数据,话不多说,我们继续打开admin管理工具来看一看是什么情况,

image.png

没有想到消息竟然被过滤了,消费者已经处理了消息,但是不是他的Tags,所以被过滤了

老套路,我们在此打开 Broker 的日志来看看,他是怎么处理的

image.png

按顺序启动消费者,第二个启动的消费者覆盖第一个启动的消费者配置,所以之后都是以第二个消费者的配置为准

public boolean updateSubscription(final Set<SubscriptionData> subList) {
    boolean updated = false;

    for (SubscriptionData sub : subList) {

        SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
        if (old == null) {
            SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
            if (null == prev) {
                updated = true;
                log.info("subscription changed, add new topic, group: {} {}",
                    this.groupName,
                    sub.toString());
            }
        } else if (sub.getSubVersion() > old.getSubVersion()) { //比较时间戳
            if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                log.info("subscription changed, group: {} OLD: {} NEW: {}",
                    this.groupName,
                    old.toString(),
                    sub.toString()
                );
            }
            // 覆盖之前的信息
            this.subscriptionTable.put(sub.getTopic(), sub);
        }
    }
    ... 
    return updated;
}

逻辑:在同一个消费者下,从 subscriptionTable 中通过Topic查询对应的内容,通过比较时间戳来更新内容

总结

  1. 在消费者组相同的情况下,使用不同Topic和相同的Tag,会造成消息消费不够及时,因为不断的进行重平衡,只有轮到对应的Topic,才会拉取消息成功。
  2. 在消费者组相同的情况下,使用同一个Topic和不同的Tag,会造成先启动消费者无法拉取消息,只有最后一个启动的消费者能消费部分消息。

    如果如果在整个应用下,只使用同一个Topic,必须区分消费者组。

    如果是某个业务功能,如订单,那他们订阅关系一定是相同的,那么Tag一定是相同,则使用同一个Topic和消费者组是没有问题。

  3. 在不同消费组下,则这两种情况都不是问题

这是我的理解,欢迎大家纠正 🛫🛫🛫

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 NoSQL 关系型数据库
6年高级开发就因这道题少了5K,Kafka如何避免消息重复消费?
一个6年工作经验的小伙伴,被问到这样一个问题,说Kafka是如何避免消息重复消费的?面试完之后,这位小伙伴来找到我,希望我能给一个思路。今天,我给大家分享一下我的思路。
157 1
|
6月前
|
消息中间件 安全 Kafka
Kafka保证消息不丢失不重复
Kafka保证消息不丢失不重复
100 6
|
1月前
|
消息中间件 存储 Java
Kafka 如何避免重复消费?
在Apache Kafka中,避免消息的重复消费是确保数据准确处理的关键。本文详细介绍了七种避免重复消费的方法:使用消费者组、幂等生产者、事务性生产者与消费者、手动提交偏移量、外部存储管理偏移量、去重逻辑及幂等消息处理逻辑。每种方法均有其优缺点,可根据实际需求选择合适方案。结合消费者组、手动提交偏移量和幂等处理逻辑通常是有效策略,而对于高一致性要求,则可考虑使用事务性消息。
78 0
|
3月前
|
SQL 存储 数据库连接
【Azure Stream Analystics】流分析服务执行遇见警告错误消息,导致上游数据堆积,下游无任何输出
【Azure Stream Analystics】流分析服务执行遇见警告错误消息,导致上游数据堆积,下游无任何输出
【Azure Stream Analystics】流分析服务执行遇见警告错误消息,导致上游数据堆积,下游无任何输出
|
4月前
|
消息中间件 Java 物联网
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
|
5月前
|
消息中间件 Java 测试技术
消息队列 MQ操作报错合集之设置了setKeepAliveInterval(1)但仍然出现客户端未连接,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
4月前
|
消息中间件 存储 监控
Kafka 消息保留策略及其影响详解
Kafka 消息保留策略及其影响详解
173 0
|
5月前
|
消息中间件 监控 安全
探究Kafka主题删除失败的根本原因
探究Kafka主题删除失败的根本原因
49 0
|
6月前
|
消息中间件 JavaScript 物联网
MQTT常见问题之用rocketmq mqttdemo的MqttConsumer始终无法接收到消息如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
6月前
|
消息中间件 物联网 关系型数据库
MQTT常见问题之消息对列mqtt的历史数据查看失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总: