rocketMq - tag不一致造成的假象

简介: 概述    这篇文章是以同事在实际工作中遇到的问题作为分析的切入点,加深自己对mq的掌握,践行“干中学”的团队理念。    当自己差不多把基本概念都掌握的差不多的时候,必须需要实际的案例或者实践来提深自己的深度,这个时候just do it 变得很重要,所以我喜欢不停的被人挑战,截止目前帮人解答的问题包括:client端消息堆积问题、批量消息拉取问题中遇到的神奇的数字32、以及本篇的tag不一致造成的假象,也就说会有3篇文章输出。

概述

    这篇文章是以同事在实际工作中遇到的问题作为分析的切入点,加深自己对mq的掌握,践行“干中学”的团队理念。

    当自己差不多把基本概念都掌握的差不多的时候,必须需要实际的案例或者实践来提深自己的深度,这个时候just do it 变得很重要,所以我喜欢不停的被人挑战,截止目前帮人解答的问题包括:client端消息堆积问题、批量消息拉取问题中遇到的神奇的数字32、以及本篇的tag不一致造成的假象,也就说会有3篇文章输出。

    整个mq的问题解决文章会收录在mq的另外一个《rocketMq干中学》专题当中,欢迎订阅,欢迎挑战。


背景

    某次线上发布升级mq的消费端修改订阅的topic对应的tags,为了保证稳定性,采取了灰度发布策略,也就说发布一台服务后观察一段时间看是否正常再全量发布。

    升级的内容为consumer订阅的tags信息,灰度一台之后存在同一个consumeGroup下有多个consumer,且其中一个consumer的topic的tags信息和其他consumer不一致。

    如我们在consumeGroupA下有3个consumer,一开始3个consumer订阅了topicA + tagA||tagB,然后我们升级一个consumer订阅topicA+tagC,这个时候在同一个consumeGroup下针对同一个topic会有两个不同的订阅信息。

    升级以后的现象是什么呢,重要的事情说3遍,说3遍,说3遍。

    升级后我们发现所有的consumer都没有消费数据的记录!

     升级后我们发现所有的consumer都没有消费数据的记录! 

    升级后我们发现所有的consumer都没有消费数据的记录!


复现

    请按照以下顺序进行复现操作

        1、启动consumeA,负责订阅orderTopic,tags为A||D;

        2、等待一段时间,待consumeA启动完成;

        3、producer发送消息,发现consumeA正常消费消息;

        4、启动consumeB,负责订阅orderTopic,tags为F;

        5、producet发送消息,发现consumeA和consumerB都没有消费记录;

        6、理论上这个时候consumeA应该能够消费(tags一致),但是事实上却没有。


img_5713011cba26f8dfcbd706adc0887874.png
consumerA


img_6303ed1fd2f2364505d7171dada76b99.png
consumerB


img_d7542b22176531b03a299cfae2ff78e6.png
producer


原因分析

    在复现问题以后,基本上你知道离定位问题就不远了,其实对于经常出现的问题你只要静下心去排查问题就不大了,

我的问题排查理念:

    1、对mq在订阅topic的过程和消息拉取的过程在心里要有一个宏观的理解,说白了在心里面要清楚整个交互过程,看整个交互过程中是不是可能本身就存在这个逻辑缺陷。

    2、在原来日志不能够帮助定位你的问题的时候,在可以获取源码的时候增加日志,增加在怀疑的执行路径上。


我的问题排查过程:

    1、排查rocketMq订阅消息的逻辑

    2、排查rocketMq订阅关系同步的逻辑


订阅过程-client端:


img_e04736554220bd3a09704c3c22c97115.png
消息订阅及心跳发送

说明:

    1、在consumer端订阅的时候我们会在本地保存一个订阅数据,在这个订阅数据里面有一个字段非常重要,就是用时间戳来代表的订阅消息版本信息。


img_b27cbd7ec672df85f411ea7917fdce41.png

说明:

    1、定时通过心跳信息发送订阅数据到broker,也就是说我们会把订阅信息多次发送。

    2、定时同步broker的订阅信息到client端,也就是最终都会拷贝到一份最新的订阅信息。


img_71a0ac73b72800403945c0894f067c8d.png
订阅信息数据结构

说明:

    1、在我们创建SubscriptionData的时候我们其实用时间戳代表了版本号,这个东西非常重要,因为在broker端我们会通过版本号来区分最新数据。


订阅过程-broker端

img_40d032de7abf236656c99653fb316834.png
broker端处理过程-1

说明

    broker端处理的入口函数,相当于接收consumer的心跳数据的处理函数。


img_882b2883e58e18737fe804031b9dba22.png
broker端处理过程-2

说明:

    核心关键点,我们每次只会用最新版本号的订阅数据。


消息拉取-server端

img_9c97984183da83cf2f5e6eb8b17eb30c.png
broker端处理数据拉取

说明

    在broker端进行消费的时候我们会根据subscriptionData来判断这个消息是否属于tag内的消息,如果不是指定tag的消息,就返回false直接过滤消息。


消息拉取-client端

img_9583d8cfa22d282801ecff76494bf478.png
client端处理数据拉取

说明

    client端也做了类似的过滤,不知道是处于什么考虑,但是broker端已经对消息进行了过滤。


结论

    1、同一个consumeGroup下面的多个client定时向broker发送心跳信息,汇报自己最新的subscription信息,broker端在收到消息后以最新版本的订阅消息为准。  

    2、broker端在收到client拉取消息的请求后,会从broker的store中获取消息数据并以subscription信息去进行过滤,这个是关键的地方,broker在获取数据的时候会用最新的subscription去进行过滤。

    3、我们这个现象原因就是旧的subscription(tag为A||D)信息和新的subscription(tag为F)信息不一致,我们以最新的subscription(tag为F)为准,这个时候即便你发送的消息tag为A||D,在消息消费的会因为最新的subscription(tag为F)被过滤掉。


其他辅助信息

在消费数据的时候会不停的打印错误日志:NO_MATCHED_MSG

img_be4ded30d62709ec99ba820ccdbbdc73.png
心跳信息
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 存储 监控
RocketMQ Tag 详解!
本文详细介绍了 RocketMQ 中 Tag 的原理及其应用场景。Tag 是一种消息过滤机制,允许生产者在发送消息时指定标签,消费者据此选择性消费。文章通过源码分析展示了 Tag 在消息发送、存储及消费阶段的作用,并提供了完整的示例代码。尽管 Tag 功能简单高效,但也存在单一维度过滤等局限性。适合需要高效、低延迟消息传递的场景,如日志监控、电商系统等。
260 2
|
7月前
|
消息中间件 Java 开发工具
消息队列 MQ产品使用合集之topic相同,但是tag不同,这个类不能放入map中,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
SQL 存储 消息中间件
RocketMQ的TAG过滤和SQL过滤机制
写作目的 项目中各个中台都使用同一个DB。而DB下会使用中间件监听binlog转换成MQ消息,而下游的各个中台去MQ去拿自己感兴趣的消息。
410 0
RocketMQ的TAG过滤和SQL过滤机制
|
存储 消息中间件 文件存储
RocketMQ中msg&tag的生命周期
RocketMQ中msg&tag的生命周期
113 0
RocketMQ中msg&tag的生命周期
|
消息中间件 存储 RocketMQ
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
760 0
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
|
消息中间件 存储 缓存
我擦,RocketMQ的tag还有这个“坑”!
我擦,RocketMQ的tag还有这个“坑”!
我擦,RocketMQ的tag还有这个“坑”!
|
消息中间件 负载均衡 Java
RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象
RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象
|
消息中间件 负载均衡 Java
RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?
RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?
1342 0
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
797 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67805 2
3 张图带你彻底理解 RocketMQ 事务消息