我擦,RocketMQ的tag还有这个“坑”!

简介: 我擦,RocketMQ的tag还有这个“坑”!

1、消费组订阅关系不一致导致消息丢失


从消息消费的视角来看消费组是一个基本的物理隔离单位,每一个消费组拥有自己的消费位点、消费线程池等。


RocketMQ的初学者容易犯这样一个错误:消费组中的不同消费者,订阅同一个topic的不同的tag,这样会导致消息丢失(部分消息没有消费),在思考这个问题时,我们不妨先来看一张图:

1b326ea815c0b32c8d3f5138fae00c94.png

简单阐述一下其核心关键点:


  1. 例如一个Topic共有4个队列。
  2. 消息发送者连续发送4条tagA的消息后,再连续发送4条tagb的消息,消息发送者端默认采取轮循的负载均衡机制,这样topic的每一个队列中都存在tagA、tabB两个tag的消息。
  3. 消费组dw_tag_test的IP为192.168.3.10的消费者订阅tagA,另外一个IP为192.168.3.11的消费者订阅tagB。
  4. 消费组内的消费者在进行消息消费之前,首先会进行队列负载,默认为平均分配,分配结果:
  • 消费者然后向Broker发起消息拉取请求,192.168.3.10消费者会由于只订阅了tagA,这样存在q0、q1中的tagB的消息会被过滤,但被过滤的tagB并不会投递到另外一个订阅了tagB的消费者,造成这部分消息没有被投递,从而导致消息丢失。
  • 同样192.168.3.11消费者会由于只订阅了tagB,这样存在q2、q3中的tagA的消息会被过滤,但被过滤的tagA并不会投递到另外一个订阅了tagA的消费者,造成这部分消息没有被投递,从而导致消息丢失。
  • 192.168.3.10 分配到q0、q1。
  • 192.168.3.11 分配到q2、q3。


2、如果一个tag的消息数量很少,是否会显示很高的延迟?


开篇有群友会存在这样一个担忧,其场景大概如下图所示:

8a90dc56520a7458fb2d52e25bad46da.png

消费者在消费offset=100的这条tag1消息后,后面连续出现1000W条非tag1的消息,这个消费组的积压会持续增加,直接到1000W吗?


要想明白这个问题,我们至少应该要重点去查看如下几个功能的源码:


  • 消息拉取流程
  • 位点提交机制

本文不准备全流程去分析这块的源码,如果大家对这块代码有兴趣,可以查阅笔者出版的《RocketMQ技术内幕》书籍


本文将从以问题为导向,经过自己的思考,并找到关键源码加以求证,最后进行简单的示例代码进行验证。


遇到问题之前,我们可以先尝试思考一下,如果这个功能要我们实现,我们大概会怎么去思考?


要判断消费组在消费为offset=100的消息后,在接下来1000W条消息都会被过滤的情况下,如果我们希望位点能够提交,我们应该怎么设计?我觉得应该至少有如下几个关键点:


  • 消息消息拉取时连续1000W条消息找不到合适的消息,服务端会如何处理
  • 客户端拉取到消息与未拉取到消息两种情况如何提交位点


2.1 消息拉取流程中的关键设计


客户端向服务端拉取消息,连续1000W条消息都不符合条件,一次过滤查找这么多消息,肯定非常耗时,客户端也不能等待这么久,那服务端必须采取措施,必须触发一个停止查找的条件并向客户端返回NO_MESSAGE,客户端在消息查找时会等待多久呢?


核心关键点一:客户端在向服务端发起消息拉取请求时会设置超时时间,代码如下所示:

ca25dcbc5f7f6e57f5e70e89bdefd8fc.png

其中与超时时间相关的两个变量,其含义分别:


  • long brokerSuspendMaxTimeMillis 在当前没有符合的消息时在Broker端允许挂起的时间,默认为15s,暂时不支持自定义。
  • long timeoutMillis 消息拉取的超时时间,默认为30s,暂时不支持自定义。

即一次消息拉取最大的超时时间为30s。


核心关键点二:Broker端在处理消息拉取时设置了完备的退出条件,具体由DefaultMessageStore的getMessage方法事项,具体代码如下所述:

4e48353105f6fd2033c25b1853f03f2e.png

核心要点


  • 首先客户端在发起时会传入一个本次期望拉取的消息数量,对应上述代码中的maxMsgNums,如果拉取到指定条数到消息(读者朋友们如体代码读者可以查阅isTheBatchFull方法),则正常退出。
  • 另外一个非常关键的过滤条件,即一次消息拉取过程中,服务端最大扫描的索引字节数,即一次拉取扫描ConsumeQueue的字节数量,取16000与期望拉取条数乘以20,因为一个consumequeue条目占20个字节。
  • 服务端还蕴含了一个长轮循机制,即如果扫描了指定的字节数,但一条消息都没查询到,会在broker端挂起一段时间,如果有新消息到来并符合过滤条件,则会唤醒,向客户端返回消息。


回到这个问题,如果服务端连续1000W条非tag1的消息,拉取请求不会一次性筛选,而是会返回,不至于让客户端超时


从这里可以打消第一个顾虑:服务端在没有找到消息时不会傻傻等待不返回,接下来看是否会有积压的关键是看如何提交位点。


2.2 位点提交机制


2.2.1 客户端拉取到合适的消息位点提交机制


Pull线程从服务端拉取到结构后会将消息提交到消费组线程池,主要定义在DefaultMQPushConsumerImpl的PullTask类中,具体代码如下所示:


众所周知,RocketMQ是在消费成功后进行位点提交,代码在ConsumeMessageConcurrentlyService中,如下所示:

a893fc79882b93d58593eb7bf483c163.jpg

这里的核心要点:


  • 消费端成功消息完消费后,会采用最小位点提交机制,确保消费不丢失。
  • 最小位点提交机制,其实就是将拉取到的消息放入一个TreeMap中,然后消费线程成功消费一条消息后,将该消息从TreeMap中移除,再计算位点:
  • 如果当前TreeMap中还有消息在处理,则返回TreeMap中的第一条消息(最小位点)
  • 如果当前TreeMap中已没有消息处理,返回的位点为this.queueOffsetMax,queueOffsetMax的表示的是当前消费队列中拉取到的最大消费位点,因为此时拉取到的消息全部消费了。
  • 最后调用updateoffset方法,更新本地的位点缓存(有定时持久机制)


2.2.2 客户端没有拉取到合适的消息位点提交机制


客户端如果没有拉取到合适的消息,例如全部被tag过滤了,在DefaultMqPushConsumerImpl的PullTask中定义了处理方式,具体如下所示:

d9c8549cca369adcc36f775eb49ba5d6.png

其关键代码在correctTasOffset中,具体代码请看:

a932c2962011657a0b1580263d563f7e.png核心要点:如果此时处理队列中的消息为0时,则会将下一次拉取偏移量当成位点,而这个值在服务端进行消息查找时会向前驱动,代码在DefaultMessageStore的getMessage中:


7a8d8ba34462383113f789fa2d4dfa2f.jpg

故从这里可以看到,就算消息全部过滤掉了,位点还是会向前驱动的,不会造成大量积压。


2.2.3 消息拉取时会附带一次位点提交


其实RocketMQ的位点提交,客户端提交位点时会先存储在本地缓存中,然后定时将位点信息一次性提交到Broker端,其实还存在另外一种较为隐式位点提交机制:

5ed8ed9cc14a2d7ce937d468d5cb4432.png

即在消息拉取时,如果本地缓存中存在位点信息,会设置一个系统标记:FLAG_COMMIT_OFFSET,该标记在服务端会触发一次位点提交,具体代码如下:

cb4c94620c71670b002626237e97a376.png


2.2.4 总结与验证


综上述所述,使用TAG并不会因为对应tag数量比较少,从而造成大量积压的情况。


为了验证这个观点,我也做了一个简单的验证,具体方法是启动一个消息发送者,向指定topic发送tag B的消息,而消费者只订阅tag A,但消费者并不会出现消费积压,测试代码如下图所示:

7b8dbe6492daac4cb07127b7e3fd7cbf.jpg

查看消费组积压情况如下图所示:

aa3db8e3f6c670cd1851e8a21ef8f033.jpg



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 监控
RocketMQ Tag 详解!
本文详细介绍了 RocketMQ 中 Tag 的原理及其应用场景。Tag 是一种消息过滤机制,允许生产者在发送消息时指定标签,消费者据此选择性消费。文章通过源码分析展示了 Tag 在消息发送、存储及消费阶段的作用,并提供了完整的示例代码。尽管 Tag 功能简单高效,但也存在单一维度过滤等局限性。适合需要高效、低延迟消息传递的场景,如日志监控、电商系统等。
117 2
|
6月前
|
消息中间件 Java 开发工具
消息队列 MQ产品使用合集之topic相同,但是tag不同,这个类不能放入map中,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
SQL 存储 消息中间件
RocketMQ的TAG过滤和SQL过滤机制
写作目的 项目中各个中台都使用同一个DB。而DB下会使用中间件监听binlog转换成MQ消息,而下游的各个中台去MQ去拿自己感兴趣的消息。
387 0
RocketMQ的TAG过滤和SQL过滤机制
|
存储 消息中间件 文件存储
RocketMQ中msg&tag的生命周期
RocketMQ中msg&tag的生命周期
105 0
RocketMQ中msg&tag的生命周期
|
消息中间件 存储 RocketMQ
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
743 0
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
|
消息中间件 负载均衡 Java
RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象
RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象
|
消息中间件 负载均衡 Java
RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?
RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?
1327 0
|
消息中间件 RocketMQ 测试技术
rocketMq - tag不一致造成的假象
概述     这篇文章是以同事在实际工作中遇到的问题作为分析的切入点,加深自己对mq的掌握,践行“干中学”的团队理念。     当自己差不多把基本概念都掌握的差不多的时候,必须需要实际的案例或者实践来提深自己的深度,这个时候just do it 变得很重要,所以我喜欢不停的被人挑战,截止目前帮人解答的问题包括:client端消息堆积问题、批量消息拉取问题中遇到的神奇的数字32、以及本篇的tag不一致造成的假象,也就说会有3篇文章输出。
1534 0
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
774 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67780 2
3 张图带你彻底理解 RocketMQ 事务消息