RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象

简介: RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象

一、问题复现


1、描述


两个一样的Consumer Group的Consumer订阅同一个Topic,但是是不同的tag,Consumer1订阅Topic的tag1,Consumer2订阅Topic的tag2,然后分别启动。这时候往Topic的tag1里发送10条数据,Topic的tag2里发送10条。目测应该是Consumer1和Consumer2分别收到对应的10条消息。结果却是只有Consumer2收到了消息,而且只收到了4-6条消息,不固定。


2、代码


2.1、Consumer


public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer");
        consumer.setNamesrvAddr("124.57.180.156:9876");
        consumer.subscribe("TopicTest2","tag1");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                System.out.println(msg.getTags());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("ConsumerStarted.");
    }
}


启动这个订阅了TopicTest2的tag1标签的Consumer,然后将tag1改为tag2再次启动Consumer。这就相当于启动了两个Consumer进程,一个订阅了TopicTest2的tag1标签,另一个订阅了TopicTest2的tag2标签。


2.2、Producer


public class Producer {
    public static void main(String[] args) throws MQClientException {
        final DefaultMQProducer producer = new DefaultMQProducer("test-producer");
        producer.setNamesrvAddr("124.57.180.156:9876");
        producer.start();
        for (int i = 0; i < 10; i++){
            try {
                Message msg = new Message("TopicTest2", "tag1", ("Hello tag1 - "+i).getBytes());
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }catch(Exception e) {
                e.printStackTrace();
            }
        }
    }
}


启动Producer,往TopicTest2的tag1里发10条消息。再次将tag1改为tag2,然后再次启动Producer进行发送,这样就是TopicTest2的tag1下有10条消息,TopicTest2的tag2下也有10条消息。


3、结果


Consumer和Producer都启动后,发现如下:


  • Producer发送了20条消息正常。
  • Consumer1没有消费到tag1下的数据
  • Consumer2消费了一半(不一定是几条,有时候5条,有时候6条的)消息。


二、问题答案


  • 首先这是Broker决定的,而不是Consumer端决定的


之前看过一篇文章写的有理有据,写的是Consumer端,还贴出了debug的源码,说后者覆盖了前者,但是我想说:你启动了两个独立的Consumer,那是两个独立的进程,根本不存在覆盖不覆盖的问题,那就是独立的。JVM就一个。又不是共享的JVM,何来覆盖?


  • Consumer端发心跳给Broker,Broker收到后存到consumerTable里(就是个Map),key是GroupName,value是ConsumerGroupInfo。


  • ConsumerGroupInfo里面是包含topic等信息的,但是问题就出在上一步骤,key是groupName,你同GroupName的话Broker心跳最后收到的Consumer会覆盖前者的。相当于如下代码:


map.put(groupName, ConsumerGroupInfo);


这样同key,肯定产生了覆盖。所以Consumer1不会收到任何消息,但是Consumer2为什么只收到了一半(不固定)消息呢?


那是因为:你是集群模式消费,它会负载均衡分配到各个节点去消费,所以一半消息(不固定个数)跑到了Consumer1上,结果Consumer1订阅的是tag1,所以不会任何输出。

如果换成BROADCASTING,那绝逼后者会收到全部消息,而不是一半,因为广播是广播全部Consumer。


三、源码验证


1、调用链


# 核心在于如下这个方法
org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer()
# 关键调用链如下
# 入口是Broker启动的时候
org.apache.rocketmq.broker.BrokerStartup#start()
org.apache.rocketmq.broker.BrokerController#start()
org.apache.rocketmq.remoting.netty.NettyRemotingServer#start() 
org.apache.rocketmq.remoting.netty.NettyRemotingServer#prepareSharableHandlers()
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0()
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived()
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand()
org.apache.rocketmq.broker.processor.ClientManageProcessor#processRequest()
org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat()
org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer()


2、源码


2.1、registerConsumer


/**
 * Consumer信息
 */
public class ConsumerGroupInfo {
    // 组名
    private final String groupName;
    // topic信息,比如topic、tag等
    private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
        new ConcurrentHashMap<String, SubscriptionData>();
    // 客户端信息,比如clientId等
    private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
        new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
    // PULL/PUSH
    private volatile ConsumeType consumeType;
    // 消费模式:BROADCASTING/CLUSTERING
    private volatile MessageModel messageModel;
    // 消费到哪了
    private volatile ConsumeFromWhere consumeFromWhere;
}
/**
 * 通过心跳将Consumer信息注册到Broker端。
 */
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
    // consumerTable:维护所有的Consumer
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    // 如果没有Consumer,则put到map里
    if (null == consumerGroupInfo) {
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        // put到map里
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }
    // 更新Consumer信息,客户端信息
    boolean r1 =
        consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                                        consumeFromWhere);
    // 更新订阅Topic信息
    boolean r2 = consumerGroupInfo.updateSubscription(subList);
    if (r1 || r2) {
        if (isNotifyConsumerIdsChangedEnable) {
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }
    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
    return r1 || r2;
}



从这一步可以看出消费者信息是以groupName为key,ConsumerGroupInfo为value存到map(consumerTable)里的,那很明显了,后者肯定会覆盖前者的,因为key是一样的。而后者的tag是tag2,那肯定覆盖了前者的tag1,这部分是存到ConsumerGroupInfo的subscriptionTable里面的


private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
    new ConcurrentHashMap<String, SubscriptionData>();


SubscriptionData包含了topic等信息


public class SubscriptionData implements Comparable<SubscriptionData> {
    // topic
    private String topic;
    private String subString;
    // tags
    private Set<String> tagsSet = new HashSet<String>();
    private Set<Integer> codeSet = new HashSet<Integer>();
}


2.2、两个问题


1.topic、tag等信息是怎么覆盖的?


boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);


/**
 * 其实很简单,就是以topic为key,SubscriptionData为value。而SubscriptionData里包含了tags信息,所以直接覆盖掉
 */
public boolean updateSubscription(final Set<SubscriptionData> subList) {
    for (SubscriptionData sub : subList) {
        SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
        if (old == null) {
            SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
        } else if (sub.getSubVersion() > old.getSubVersion()) {
            this.subscriptionTable.put(sub.getTopic(), sub);
        }
    }
}


等等,这里好像有新发现ConsumerGroupInfo#subscriptionTable


// {@link org.apache.rocketmq.broker.client.ConsumerGroupInfo#subscriptionTable}
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
        new ConcurrentHashMap<String, SubscriptionData>();


可以有意外收获就是topic作为map的key,那岂不是一个Consumer可以订阅多个Topic?是的,通过这段源码可以发现是没毛病的,我也测试过。


2.这么看的话Consumer端只会存在一个进程,因为同组,注册进去就覆盖了呀?


大哥,注意ConsumerGroupInfo里的channelInfoTable


// 客户端信息,比如clientId等
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
    new ConcurrentHashMap<Channel, ClientChannelInfo>(16);


ClientChannelInfo是包含clientId等信息的,代表一个Consumer。注册方法是:


boolean r2 = consumerGroupInfo.updateSubscription(subList);


/**
 * 下面是删减后的代码,其实就是以Channel作为key,每个Consumer的Channel是不一样的。所以能存多个Consumer客户端
 */
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
        MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
    ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
    if (null == infoOld) {
        ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
    }
}


END

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 监控
RocketMQ Tag 详解!
本文详细介绍了 RocketMQ 中 Tag 的原理及其应用场景。Tag 是一种消息过滤机制,允许生产者在发送消息时指定标签,消费者据此选择性消费。文章通过源码分析展示了 Tag 在消息发送、存储及消费阶段的作用,并提供了完整的示例代码。尽管 Tag 功能简单高效,但也存在单一维度过滤等局限性。适合需要高效、低延迟消息传递的场景,如日志监控、电商系统等。
260 2
|
3月前
|
消息中间件 Java Kafka
RabbitMQ 入门
RabbitMQ 入门
|
6月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
183 19
|
5月前
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
63 0
分享一下rocketmq入门小知识
|
5月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
281 2
EMQ
|
8月前
|
安全 网络性能优化
MQTT 5.0 报文(Packets)入门指南
MQTT 控制报文是 MQTT 数据传输的最小单元。MQTT 客户端和服务端通过交换控制报文来完成它们的工作,比如订阅主题和发布消息。
EMQ
742 13
MQTT 5.0 报文(Packets)入门指南
|
7月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 负载均衡
消息队列 MQ使用问题之如何在grpc客户端中设置负载均衡器
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之如何设置nameserver监听的IP
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 Java 测试技术
消息队列 MQ操作报错合集之设置了setKeepAliveInterval(1)但仍然出现客户端未连接,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
110 2