本章概括
分布式事务
由何而来
我们在使用MQ在解决实际业务场景中的问题时,往往伴随诸多问题!比如如下图
上述两种可能都会导致数据不一致,在业务系统中是 致命的问题!
这个时候我们就要保证事务消息。要不全部成功,要不全部失败。来达到订单服务,购物车服务的数据一致性!
对于购物车服务收到订单创建成功消息清理购物车这个操作来说,失败的处理比较简单,只要成功执行购物车清理后再提交消费确认即可,如果失败,由于没有提交消费确认,消息队列会自动重试。
解决了购物车服务问题,剩下的就是订单服务这边的创建订单,生产消息这两步了。要么全部成功,要么全部失败,不允许一个成功,一个失败的情况。
一旦订单控制不住,购物车那边也是控制不住的! 这就是事务需要解决的问题了!
什么是分布式事务
事务就是为了保证这些数据的完整性和一致性,我们希望这些更新操作要么全部成功,要么全部失败。这就是我们通过对事务的理解。如果严格来说,MQ的事务和MySQL一样,都具有四种属性 ACID
- 原子性:一个事务操作不可分割,要么成功,要么失败,不能有一半成功一半失败的情 况
- 一致性:这些数据在事务执行完成这个时间点之前,读到的一定是更新前的数据,之后 读到的一定是更新后的数据,不应该存在一个时刻,让用户读到更新过程中的数据
- 隔离性:指一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对 正在进行的其他事务是隔离的,并发执行的各个事务之间不能互相干扰
- 持久性:指一个事务一旦完成提交,后续的其他操作和故障都不会对事务的结果产生任何 影响
对于单体服务来说,都实现了ACID,但是对于分布式系统来说,实现ACID这几乎是不可能的,或者说代价太大。所有目前大家所说的分布式事务,更多的情况下,是一种分布式事务的不完整实现。不同的应用场景中,有不同的实现,目的都是通过一些妥协来解决实际问题。
比较常见的分布式事务有
- 2PC(Two-phase Commit,也叫二阶段提 交)
- TCC(Try-Confirm-Cancel)
- 和事务消息
事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。 比如我们在开始时提到的那个例子,在创建订单后,如果出现短暂的几秒,购物车里的商品没有被及时清空,也不是完全不可接受的,只要最终购物车的数据和订单数据保持一致就可以了。
剩下的就不做过多解释了。
MQ是如何实现的
MQ主要借助的是 半消息 实现的,如下图
- 订单服务首先会开启一个事务,就类似于MySQL那样。
- 对MQ生产一个半消息
- 以上都没有问题之后,就会执行事务,写入数据库
- 提交事务或回滚事务
这里的半消息,并不是只有一半的数据。而是有全部的数据,这里的半只是 在事务提交之前,对于消费者来说,这个消息是不可见的
到了这里,订单服务肯定是没有问题的,所以把数据写入到MQ的Broker之后
这里回顾一下生产端的交互流程,可以参考下列图片,理解
- 订单服务会向MQ的Broker发送一个ACK包
- 如果Broker确认收到了,会给订单服务回一个ACK+SYN包 (如果Broker没有收到,会开始重传)
- 如果Broker收到了,一定可以确保订单服务的数据执行完成,以及确保数据已经到Broker了。
到了这里,订单服务,Broker端是没有问题的,把数据写入Broker之后,购物车服务就会开始进行消费这条消息
这里回顾一下消费端的交互流程,可以参考下列图片,理解
- 购物车服务在监听收到消息后进行消费
- 当购物车服务执行了当前的逻辑之后,会给Broker发送一个 ACK+SYN包确认消费
- 如果购物车服务没有给Broker回复,那么Broker就会开始重发
到了这里,订单服务,Broker端,购物车服务基本实现了 要么成功,要么失败 的一致性要求。
天网恢恢疏而不漏,在第四步的时候提交事务,如果失败了怎么办?
Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中 反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿
RocketMQ是如何实现的
这里RocketMQ也给出了相应的应对策略!在事务实现中,他加了 事务反查的机制 来解决事务的提交失败问题。
如果订单服务,在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去订单服务上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
为了支撑这个机制,我们需要做一个反查本地事务状态的接口,告知RocketMQ本地事务是否成功。
例如 只需要根据消息中的订单ID,检查这个订单是否创建成功即可
这个反查本地事务的实现,并不依赖订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ 依然 可以通过其他订单服务的节点来执行反查,确保事务的完整性
确保消息不会丢失
聊到消息一致性,可靠性传输,我们可以从问题的根源入手。我先列举一些容易出问题的故障点
- **生产阶段:**在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
- **存储阶段:**在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
- **消费阶段:**在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。
生产阶段
在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。
只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。
你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失
存储阶段
在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。
如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。
对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。
集群我不会,后续再更新。
消费阶段
消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。
你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
消息丢失检测
前期代码健壮性不友好的情况,可以在拦截器里编写日志输出,把消费的id号记录下来。
- 生产者,生产一条就记录一条
- 消费者,消费一条就记录一条
这样这样两边对照就可以把丢失的id号 定位出来。也可以通过分布式链路追踪系统 扯远了,以后再说吧
确保消息不被重复消费
为什么会有重复消息
在消息传递过程中,如果出现传递失败的情况,发送方会执行重试,重试的过程中就有可能 会产生重复的消息。对使用消息队列的业务系统来说,如果没有对重复消息进行处理,就有可能会导致系统的数据出现错误。
所以重复消费的情况必然存在
在MQTT协议中,大概提供了三种标准
- At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什 么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使 用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
- At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消 息,但是允许有少量重复消息出现。
- Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重 复,这个是最高的等级。
大多数的消息队列,都是采用的 At least once: 至少一次
根据上面介绍,我们可以得知 消息队列很难保证消息不重复
既然消息队列,无法保证重复消费的问题,那我们就要在程序里解决这个问题了。
如何解决重复消费(幂等性)
幂等性是一个数学上的概念,它是这样定义的:如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。
这里被扩展到计算机领域,被广泛的应用于多次执行产生的影响均与一次执行的影响相同
使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。
(这里可以联想到 用户充值,多次消费充值的话,肯定是有问题的!)
如果说MQ解决不了数据重复消费的问题,那么现在可以转化成 At least once + 幂等性 = Exactly once 这样就可以保证重复消费了。主要有下列三种方法
- 数据库的唯一约束实现幂等
- 为更新的数据设置前置条件
- 记录并检查操作
数据库的唯一约束实现幂等
我先举一个我自己系统的例子:用户在充值账号余额时,会产生一个账单ID。
我们在实现唯一约束的时候就可以重新创建一个表。伪代码如下
create table aaa( id bigint(15) not null comment '约束id', user_id bigint(15) not null comment '用户id', bill_id bigint(15) not null comment '账单id', money decimal(10,2) not null comment '充值金额', PRIMARY KEY (`id`) USING BTREE, KEY `adasdasdas` (`user_id`,`bill_id`), -- 唯一约束 用户di和账单id ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='账单约束表';
这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。
基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。
参考李玥老师的 消息队列高手课 思想
为更新的数据设置前置条件
在更新数据时,我们可以设置一个更新前的值,如下图。
这里可以加一个充值前金额,这里因为我的体量,并发不大,暂时没加,后面我会根据老板的要求再加的。
如果有重复订单打过来,那我就可以计算充值前的金额,以及当前的付款金额。来付款来实现幂等性。
也可以通过版本号控制,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
在修改数据记录并检查操作
可以采用Token,UUID的方式实现幂等性。这种方式是通用性比较强的。实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
结尾
有些不懂的地方或者不对的地方,麻烦各位指出,一定修改优化!