RocketMQ消息丢失,消息一致性,重复消费解决方案

简介: 大家好,我是Leo。这是开端的第三次循环了。当前正在正处于RocketMQ基础原理。

本章概括


image.png

分布式事务

由何而来

我们在使用MQ在解决实际业务场景中的问题时,往往伴随诸多问题!比如如下图

image.png

上述两种可能都会导致数据不一致,在业务系统中是 致命的问题

这个时候我们就要保证事务消息。要不全部成功,要不全部失败。来达到订单服务,购物车服务的数据一致性!

对于购物车服务收到订单创建成功消息清理购物车这个操作来说,失败的处理比较简单,只要成功执行购物车清理后再提交消费确认即可,如果失败,由于没有提交消费确认,消息队列会自动重试。

解决了购物车服务问题,剩下的就是订单服务这边的创建订单,生产消息这两步了。要么全部成功,要么全部失败,不允许一个成功,一个失败的情况。

一旦订单控制不住,购物车那边也是控制不住的! 这就是事务需要解决的问题了!


什么是分布式事务

事务就是为了保证这些数据的完整性和一致性,我们希望这些更新操作要么全部成功,要么全部失败。这就是我们通过对事务的理解。如果严格来说,MQ的事务和MySQL一样,都具有四种属性 ACID

  1. 原子性:一个事务操作不可分割,要么成功,要么失败,不能有一半成功一半失败的情 况
  2. 一致性:这些数据在事务执行完成这个时间点之前,读到的一定是更新前的数据,之后 读到的一定是更新后的数据,不应该存在一个时刻,让用户读到更新过程中的数据
  3. 隔离性:指一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对 正在进行的其他事务是隔离的,并发执行的各个事务之间不能互相干扰
  4. 持久性:指一个事务一旦完成提交,后续的其他操作和故障都不会对事务的结果产生任何 影响

对于单体服务来说,都实现了ACID,但是对于分布式系统来说,实现ACID这几乎是不可能的,或者说代价太大。所有目前大家所说的分布式事务,更多的情况下,是一种分布式事务的不完整实现。不同的应用场景中,有不同的实现,目的都是通过一些妥协来解决实际问题。

比较常见的分布式事务有

  • 2PC(Two-phase Commit,也叫二阶段提 交)
  • TCC(Try-Confirm-Cancel)
  • 和事务消息

事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。 比如我们在开始时提到的那个例子,在创建订单后,如果出现短暂的几秒,购物车里的商品没有被及时清空,也不是完全不可接受的,只要最终购物车的数据和订单数据保持一致就可以了。

剩下的就不做过多解释了。

MQ是如何实现的

MQ主要借助的是 半消息 实现的,如下图

image.png

  1. 订单服务首先会开启一个事务,就类似于MySQL那样。
  2. 对MQ生产一个半消息
  3. 以上都没有问题之后,就会执行事务,写入数据库
  4. 提交事务或回滚事务

这里的半消息,并不是只有一半的数据。而是有全部的数据,这里的半只是 在事务提交之前,对于消费者来说,这个消息是不可见的

到了这里,订单服务肯定是没有问题的,所以把数据写入到MQ的Broker之后

这里回顾一下生产端的交互流程,可以参考下列图片,理解

  1. 订单服务会向MQ的Broker发送一个ACK包
  2. 如果Broker确认收到了,会给订单服务回一个ACK+SYN包 (如果Broker没有收到,会开始重传)
  3. 如果Broker收到了,一定可以确保订单服务的数据执行完成,以及确保数据已经到Broker了。

到了这里,订单服务,Broker端是没有问题的,把数据写入Broker之后,购物车服务就会开始进行消费这条消息

这里回顾一下消费端的交互流程,可以参考下列图片,理解

  1. 购物车服务在监听收到消息后进行消费
  2. 当购物车服务执行了当前的逻辑之后,会给Broker发送一个 ACK+SYN包确认消费
  3. 如果购物车服务没有给Broker回复,那么Broker就会开始重发

image.png

到了这里,订单服务,Broker端,购物车服务基本实现了 要么成功,要么失败 的一致性要求。

天网恢恢疏而不漏,在第四步的时候提交事务,如果失败了怎么办?

Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中 反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿

RocketMQ是如何实现的

这里RocketMQ也给出了相应的应对策略!在事务实现中,他加了 事务反查的机制 来解决事务的提交失败问题。

如果订单服务,在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去订单服务上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

为了支撑这个机制,我们需要做一个反查本地事务状态的接口,告知RocketMQ本地事务是否成功。

例如 只需要根据消息中的订单ID,检查这个订单是否创建成功即可

这个反查本地事务的实现,并不依赖订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ 依然 可以通过其他订单服务的节点来执行反查,确保事务的完整性

image.png


确保消息不会丢失


聊到消息一致性,可靠性传输,我们可以从问题的根源入手。我先列举一些容易出问题的故障点

  • **生产阶段:**在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
  • **存储阶段:**在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
  • **消费阶段:**在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

image.png

生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失

存储阶段

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

集群我不会,后续再更新。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

消息丢失检测

前期代码健壮性不友好的情况,可以在拦截器里编写日志输出,把消费的id号记录下来。

  • 生产者,生产一条就记录一条
  • 消费者,消费一条就记录一条

这样这样两边对照就可以把丢失的id号  定位出来。也可以通过分布式链路追踪系统 扯远了,以后再说吧


确保消息不被重复消费


为什么会有重复消息

在消息传递过程中,如果出现传递失败的情况,发送方会执行重试,重试的过程中就有可能 会产生重复的消息。对使用消息队列的业务系统来说,如果没有对重复消息进行处理,就有可能会导致系统的数据出现错误。

所以重复消费的情况必然存在

在MQTT协议中,大概提供了三种标准

  1. At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什 么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使 用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  2. At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消 息,但是允许有少量重复消息出现。
  3. Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重 复,这个是最高的等级。

大多数的消息队列,都是采用的 At least once: 至少一次

根据上面介绍,我们可以得知 消息队列很难保证消息不重复

既然消息队列,无法保证重复消费的问题,那我们就要在程序里解决这个问题了。


如何解决重复消费(幂等性)


幂等性是一个数学上的概念,它是这样定义的:如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。

这里被扩展到计算机领域,被广泛的应用于多次执行产生的影响均与一次执行的影响相同

使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。

(这里可以联想到 用户充值,多次消费充值的话,肯定是有问题的!)

如果说MQ解决不了数据重复消费的问题,那么现在可以转化成 At least once + 幂等性 = Exactly once  这样就可以保证重复消费了。主要有下列三种方法

  • 数据库的唯一约束实现幂等
  • 为更新的数据设置前置条件
  • 记录并检查操作

数据库的唯一约束实现幂等

我先举一个我自己系统的例子:用户在充值账号余额时,会产生一个账单ID。

我们在实现唯一约束的时候就可以重新创建一个表。伪代码如下

create table aaa(
    id  bigint(15) not null comment '约束id',
    user_id bigint(15) not null comment '用户id',
    bill_id bigint(15) not null comment '账单id',
    money decimal(10,2) not null comment '充值金额',
    PRIMARY KEY (`id`) USING BTREE,
    KEY `adasdasdas` (`user_id`,`bill_id`),  -- 唯一约束  用户di和账单id
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='账单约束表';

image.png

这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。

基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

参考李玥老师的 消息队列高手课 思想

为更新的数据设置前置条件

在更新数据时,我们可以设置一个更新前的值,如下图。

这里可以加一个充值前金额,这里因为我的体量,并发不大,暂时没加,后面我会根据老板的要求再加的。

image.png

如果有重复订单打过来,那我就可以计算充值前的金额,以及当前的付款金额。来付款来实现幂等性。

也可以通过版本号控制,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

在修改数据记录并检查操作

可以采用Token,UUID的方式实现幂等性。这种方式是通用性比较强的。实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。


结尾


有些不懂的地方或者不对的地方,麻烦各位指出,一定修改优化!


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
97 12
|
2月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
2月前
|
消息中间件 存储 弹性计算
云消息队列 RabbitMQ 版实践解决方案评测
随着企业业务的增长,对消息队列的需求日益提升。阿里云的云消息队列 RabbitMQ 版通过架构优化,解决了消息积压、内存泄漏等问题,并支持弹性伸缩和按量计费,大幅降低资源和运维成本。本文从使用者角度详细评测这一解决方案,涵盖实践原理、部署体验、实际优势及应用场景。
|
2月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
76 4
|
3月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
93 16
|
3月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
82 9
|
3月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
55 1
|
3月前
|
消息中间件 监控 持续交付
《云消息队列RabbitMQ实践》解决方案测评报告
《云消息队列RabbitMQ实践》解决方案通过RabbitMQ实现业务解耦、异步处理和高可用性。其核心优势包括消息持久化、灵活路由及高可靠性。文档详细介绍了部署步骤、配置方法及监控手段,帮助用户快速搭建消息队列系统。方案适用于电商、金融和实时数据处理等高并发场景,通过异步处理提升系统性能。建议增加自动化部署、复杂场景示例及更详尽的日志解析,进一步提升用户体验。
|
2月前
|
消息中间件 运维 监控
《云消息队列RabbitMQ实践》解决方案
《云消息队列RabbitMQ实践》解决方案
|
7月前
|
消息中间件 人工智能 Java
RocketMQ重复消费的症状以及解决方案
RocketMQ重复消费的症状以及解决方案