RocketMQ系列 | 全网最全的导致RocketMQ消息“丢失”的几个场景都在这了,肯定有你不知道!

简介: 发送时会丢失消息、消息存储场景丢失消息、消费时会丢失消息

RocketMQ 简介

RocketMQ 5.0:
云原生“消息、事件、流”实时数据处理平台,覆盖云边端一体化数据处理场景。


RocketMQ领域模型


如上图所示,Apache RocketMQ 中消息的生命周期主要分为消息生产消息存储消息消费这三部分。

生产者生产消息并发送至 Apache RocketMQ 服务端,消息被存储在服务端的主题[Topic]中,消费者通过订阅主题[Topic]消费消息。

消息生产

生产者(Producer):Apache RocketMQ 领域中用于产生消息的运行实体,一般集成于业务调用链路的上游。

消息存储

  • 主题(Topic):Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。
  • 队列(MessageQueue):Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。
  • 消息(Message):Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。

消息消费

  • 消费者分组(ConsumerGroup):Apache RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。
  • 消费者(Consumer):Apache RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。
  • 订阅关系(Subscription):Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。


如何让“消息丢失”?

在“如何让消息丢失”之前,让我们梳理一下消息的生命周期,先对齐下整体的概念。

一条消息的历程

1、发送场景丢失消息

1.1 单向发送

/**     * 发送消息,Oneway形式,服务器不应答,     * 无法保证消息是否成功到达服务器     *     * @param message 要发送的消息     */voidsendOneway(finalMessagemessage);com.aliyun.openservices.ons.api.Producer#sendOneway


RocketMQ 提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。

同步发送同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。

此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。



异步发送异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

单向发送发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。

此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。


1.2 发送失败时未重试或补偿

importcom.aliyun.openservices.ons.api.Message;importcom.aliyun.openservices.ons.api.Producer;importcom.aliyun.openservices.ons.api.SendResult;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;
importjava.nio.charset.StandardCharsets;
@Service@Slf4jpublicclassProductSender {
@AutowiredprivateProducerproducer;
publicvoidsendMsg(Stringcontent) {        try {            byte[] body=content.getBytes(StandardCharsets.UTF_8);            Messagemessage=newMessage("topicName", "tagName", "msgKey", body);            SendResultsendResult=producer.send(message);        } catch (Exceptionignored) {            // TODO: 2023/9/29 发送失败时无处理,网络抖动或服务不稳定时会造成消息丢失        }    }}


2、消息存储场景丢失消息

2.1 、Broker宕机或者磁盘损坏,Broker Server内存中的消息没有落盘

2.2 、过期清理机制引发消息丢失
Apache RocketMQ 中队列的定义,消息按照到服务器的先后顺序被存储到队列中,理论上每个队列都支持无限存储。但是在实际部署场景中,服务端节点的物理存储空间有限,消息无法做到永久存储。因此,在实际使用中需要考虑以下问题,消息在服务端中的存储以什么维度为判定条件?消息存储以什么粒度进行管理?消息存储超过限制后如何处理?这些问题都是由消息存储和过期清理机制来定义的。

Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。删除旧的没有使用过的消息是由后台定时任务完成的。

消息存储文件结构说明

3、消费场景丢失消息

3.1 消费失败,但消费消息的返回结果为成功

importcom.aliyun.openservices.ons.api.Action;importcom.aliyun.openservices.ons.api.ConsumeContext;importcom.aliyun.openservices.ons.api.Message;importcom.aliyun.openservices.ons.api.MessageListener;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Service;
importjava.nio.charset.StandardCharsets;
@Service@Slf4jpublicclassMissingMsgWhenConsumeFailimplementsMessageListener {
@OverridepublicActionconsume(Messagemessage, ConsumeContextcontext) {        
try {            Stringmsg=newString(message.getBody(), StandardCharsets.UTF_8);            returnAction.CommitMessage;        } catch (Exceptione) {            //丢失消息:消费失败了,但消费消息的返回结果为成功。            return Action.CommitMessage;        }    }}

RocketMQ消费场景引发的系统故障

3.2 订阅关系不一致导致消息丢失您可在云消息队列 RocketMQ 版控制台Group 详情页面查看指定Group的订阅关系是否一致。出现订阅关系不一致时,控制台中也会有告警:

同一个消费者Group ID下所有Consumer实例所订阅的Topic、Tag必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
如下图所示,一个队列中分发不同类型的消息。

如果一个消费者Group ID订阅了tagA和tagB,那么这个消费组下消费者绑定的队列中会被borker投递所订阅所有Tag的信息。

消息丢失的根因是,一个队列在同一时间只会被分配给一个消费者,这样队列上不符合消息过滤规则的消息消费会被忽略,并且消息消费的进度会向前移动,从而造成消息丢失。

经典实践
一个GroupId[消费组]只在一个JVM中使用

正确订阅关系一:相同Group ID的N个消费者订阅一个Topic且订阅一个Tag

正确订阅关系二:相同Group ID的N个消费者订阅一个Topic且订阅多个Tag

正确订阅关系三:相同Group ID的N个消费者订阅多个Topic且订阅多个Tag

小结

在RocketMQ领域中,一条消息从生产、存储、消费整个链路中都可以让消息“丢失”。
业务逻辑复杂,历史久远的接口出现数据错误怎么办?
干货|如何快速问题出在哪了?
从全链路视角看,让消息丢失的漏洞百出。
那么,你“学会”让消息丢失的"技巧"了吗?

参考

https://rocketmq.apache.org/zh/docs/发送普通消息(单向发送):https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/sample-code-2发送普通消息(三种方式):https://www.alibabacloud.com/help/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-normal-messages-in-one-of-three-modes消息存储机制:https://rocketmq.apache.org/zh/docs/featureBehavior/11messagestorepolicy消息在云消息队列 RocketMQ 版中能保存多久?https://www.alibabacloud.com/help/zh/apsaramq-for-rocketmq/faq-about-features#section-r2b-stc-pz6
常见订阅关系不一致问题 https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/use-cases/subscription-consistency


MQ相关阅读
RabbitMQ消息为什么变成了数字呢?
微服务+RabbitMQ之从零到yi


RocketMQ系列 | 如何让消息“丢失”?

https://mp.weixin.qq.com/s/RnS675dt-wErnEuolK6Zeg


RocketMQ系列 | 容量削峰填谷后,发送的消息“少”了怎么办!!??

https://mp.weixin.qq.com/s/kejgc_u8GHdXrI4uW9TWNw

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 数据库
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
88460 0
|
8月前
|
消息中间件 弹性计算 Java
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
|
2月前
|
消息中间件 SQL 容灾
深度剖析 RocketMQ 5.0,消息进阶:如何支撑复杂业务消息场景?
本文主要学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。
108324 287
|
19天前
|
消息中间件 存储 运维
RocketMQ与Kafka深度对比:特性与适用场景解析
RocketMQ与Kafka深度对比:特性与适用场景解析
|
1月前
|
消息中间件 Serverless Windows
消息队列 MQ产品使用合集之MQTT协议是否可以应用于社交软件的系统通知场景
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
2月前
|
消息中间件 Cloud Native 物联网
深度剖析 RocketMQ 5.0,消息基础:RocketMQ 在业务消息场景的基础优势是什么?
本文主要介绍业务消息的应用解耦场景,具体解耦什么? RocketMQ 在业务消息场景的基础特性。业界那么多消息队列能实现应用解耦,RocketMQ 在基础特性上有哪些增强?
125395 2
深度剖析 RocketMQ 5.0,消息基础:RocketMQ 在业务消息场景的基础优势是什么?
|
2月前
|
消息中间件 存储 Cloud Native
深度剖析 RocketMQ 5.0,架构解析:云原生架构如何支撑多元化场景?
了解 RocketMQ 5.0 的核心概念和架构概览;然后我们会从集群角度出发,从宏观视角学习 RocketMQ 的管控链路、数据链路、客户端和服务端如何交互;学习 RocketMQ 如何实现数据的存储,数据的高可用,如何利用云原生存储进一步提升竞争力。
140492 3
|
2月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
517 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
2月前
|
消息中间件 存储 数据库
深度剖析 RocketMQ 5.0,流存储:流场景的诉求是什么?
本文将从使用的角度出发,来更详细的展示一下流存储的场景,看看它和业务消息的场景有哪些区别。 RocketMQ 5.0 面向流存储的场景,提供了哪些特性。再结合两个数据集成的案例,来帮助大家了解流存储的用法。
3438 2
|
2月前
|
消息中间件 存储 程序员
RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)
RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)
234 0
RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)