可靠消息最终一致性分布式事务

简介: 推荐一个零声教育C/C++后台开发的免费公开课程,个人觉得老师讲得不错,分享给大家:C/C++后台开发高级架构师,内容包括Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习

一、前言
可靠消息最终一致性方案主要适用于消息数据能够独立存储:

能够降低系统之间耦合度
业务对数据一致性的时间敏感度高
此方案需要实现的服务模式:

可查询操作:提供查询自身事务状态的接口。
幂等操作:只要参数相同,无论调用多少次接口,都应该和第一次调用产生的结果相同。
那么什么时候回查?
事务发送端执行本地事务时(已经发送了 Half 消息了),这时候发送端宕机了或者超时了,就需要回查

(1)实现方案
实现方案有两种:

1.基于本地消息
优点:在业务应用中实现了消息的可靠性,减少了对消息中间件的依赖。
缺点:
绑定了具体的业务场景,耦合性太高,不可公用和扩展。
消息数据与业务数据在同一数据库,占用了业务系统的扩展。
消息数据可能会受到数据库并发性的影响。
2.基于消息队列中间件
优点:
消息数据能够独立存储,与具体的业务数据库解耦。
消息的并发性和吞吐量优于本地消息表方案。
缺点:
发送一次消息需要完成两次网络交互:1.消息的发送 ; 2. 消息的提交或回滚。
需要实现消息的回查接口,增加了开发成本。
(2)注意的问题
1、事务发送方本地事务与消息发送的原子性问题:

原因:执行本地事务和发送消息,要么都成功,要么都失败。
解决方案:通过消息确认服务本地事务执行成功。
// 原子性:事务 + 消息确认(回滚)
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) {
try{

TxMessage txMessage = this.getTxMessage(msg);
// 1. 执行本地事务
orderService.submitOrderAndSaveTxNo(txMessage);

    // 2. 提交事务
    return RocketMQLocalTransactionState.COMMIT;

} catch (Exception e){


// 异常回滚事务
return RocketMQLocalTransactionState.ROLLBACK;
}

}
2、事务参与方接收消息的可靠性问题:

原因:由于服务器宕机、服务崩溃或网络异常等原因,导致事务参与方不能正常接收消息; 或者接收消息后处理事务的过程中发生异常,无法将结果正确回传到消息库中。
解决方案:通过消息恢复服务保证事务参与方的可靠性。
3、事务参与方接收消息的幂等性问题:

原因:可靠消息服务可能会多次向事务参与方发送消息
解决方案:需要具有幂等性,只要参数相同,无论调用多少次接口或方法,结果都相同。
C/C++Linux服务器开发高级架构师/C++后台开发架构师​免费学习地址
另外还整理一些C++后台开发架构师 相关学习资料,面试题,教学视频,以及学习路线图,免费分享有需要的可以自行添加:Q群:720209036 点击加入~ 群文件共享

(3)实战
通过 RocketMQ 消息中间件实现可靠消息最终一致性分布式事务,模拟电商业务中的下单扣减库存场景。

涉及服务有:

订单服务
库存服务

整体流程如下:

第一步:订单服务向 RocketMQ 发送 Half 消息。
第二步:RocketMQ 向订单服务响应 Half 消息发送成功。
第三步:订单服务执行本地事务,向本地数据库中插入、更新、删除数据。
第四步:订单服务向 RocketMQ 发送提交事务或者回滚事务的消息。
第五步:如果库存服务未收到消息,或者执行事务失败,且 RocketMQ 未删除保存的消息数据,RocketMQ 会回查订单服务的接口,查询事务状态,以此确认是再次提交事务还是回滚事务。
第六步:订单服务查询本地数据库,确认事务是否执行成功。
第七步:订单服务根据查询出的事务状态,向 RocketMQ 发送提交事务或者回滚事务的消息。
第八步:如果第七步中订单服务向 RocketMQ 发送的是提交事务的消息,则 RocketMQ 会向库存服务投递消息。
第九步:如果第七步中订单服务向 RocketMQ 发送的是回滚事务的消息,则 RocketMQ 不会向库存微服务投递消息,并且会删除内部存储的消息数据。
第十步:如果 RocketMQ 向库存服务投递的是执行本地事务的消息,则库存服务会执行本地事务,向本地数据库中插入、更新、删除数据。
第十一步:如果 RocketMQ 向库存服务投递的是查询本地事务状态的消息,则库存服务会查询本地数据库中事务的执行状态。
二、实战实验
涉及服务有:

订单服务:项目地址
库存服务:项目地址
实验准备:

MySQL:8.0.20
RocketMQ 消息中间件:rocketmq-all-4.5.0-bin-release
RocketMQ 客户端:rocketmq-spring-boot-starter 2.0.2
Spring Boot 版本:2.2.6.RELEASE
订单服务重点相关代码:

1.发送 Half 消息:

@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Override
public void submitOrder(Long productId, Integer payCount) {
// 1. 生成全局分布式序列号
String txNo = UUID.randomUUID().toString();
。。。 。。。
// 2. 封装消息
Message message =
MessageBuilder.withPayload(jsonObject.toJSONString()).build();
// 3. 发送一条事务消息
rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg",
message, null);
}
}
2.处理本地事务

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private OrderMapper orderMapper;

@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object obj) {
try{
// 1. 获取消息并解析消息
TxMessage txMessage = this.getTxMessage(msg);
// 2. 提交订单 并且 保存事务日志
orderService.submitOrderAndSaveTxNo(txMessage);
// 3. 事务状态为提交
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
// 发生异常
// 事务状态为回滚
return RocketMQLocalTransactionState.ROLLBACK;
}
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 1. 获取消息并解析消息
TxMessage txMessage = this.getTxMessage(msg);
// 2. 查询订单是否存在
Integer exists = orderMapper.isExistsTx(txMessage.getTxNo());
if(exists != null){
// 订单存在:事务状态为提交
return RocketMQLocalTransactionState.COMMIT;
}
// 订单不存在:事务状态为未知
// 这里需要再次调用:处理本地事务嘛?
return RocketMQLocalTransactionState.UNKNOWN;
}
}
库存服务重点相关代码:

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements RocketMQListener {
@Autowired
private StockService stockService;

@Override
public void onMessage(String message) {
// 监听到对应消息
// 获取消息并解析
TxMessage txMessage = this.getTxMessage(message);
stockService.decreaseStock(txMessage);
}
}
服务:

订单服务端口:8080
库存服务端口:8081
RocketMQ: 9876
数据准备:

USE tx_msg_stock;

INSERT INTO stock (id, product_id, total_count) VALUES (1, 1001, 10000);

SELECT * FROM stock;
+----|------------|-------------+
| id | product_id | total_count |
+----|------------|-------------+
| 1 | 1001 | 10000 |
+----|------------|-------------+
(1)正常流程
1.请求下单接口:调用 订单服务

$ curl "http://localhost:8080/order/submit_order?productId=1&payCount=1"
下单成功
订单服务日志:

2022-05-09 14:19:05.197 c.d.t.message.OrderTxMessageListener : 订单微服务执行本地事务
2022-05-09 14:19:05.233 c.d.t.message.OrderTxMessageListener : 订单微服务提交事务
2022-05-09 14:21:05.090 INFO 19423 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[172.17.0.3:10909] result: true
2.库存服务日志:

2022-05-09 14:22:59.956 [MessageThread_5] c.d.t.message.StockTxMessageConsumer : 库存微服务开始消费事务消息:{"txMessage":{"payCount":1,"productId":1,"txNo":"3fcd4e8d-1f5b-448d-ad3d-693c335f994e"}}
2022-05-09 14:22:59.956 [MessageThread_5] c.d.t.service.impl.StockServiceImpl : 库存微服务执行本地事务,商品id:1, 购买数量:1
3.查看对应数据库:

-- order 库下
SELECT FROM order;
+---------------|---------------------|---------------|------------|-----------+
| id | create_time | order_no | product_id | pay_count |
+---------------|---------------------|---------------|------------|-----------+
| 1652077145222 | 2022-05-09 14:19:05 | 1652077145224 | 1 | 1 |
+---------------|---------------------|---------------|------------|-----------+

SELECT
FROM tx_log;
+--------------------------------------|---------------------+
| tx_no | create_time |
+--------------------------------------|---------------------+
| 3fcd4e8d-1f5b-448d-ad3d-693c335f994e | 2022-05-09 06:19:05 |
+--------------------------------------|---------------------+



-- stock 库下

SELECT FROM tx_log;
+--------------------------------------|---------------------+
| tx_no | create_time |
+--------------------------------------|---------------------+
| 3fcd4e8d-1f5b-448d-ad3d-693c335f994e | 2022-05-09 06:23:00 |
+--------------------------------------|---------------------+


SELECT
FROM stock;
+----|------------|-------------+
| id | product_id | total_count |
+----|------------|-------------+
| 1 | 1 | 9999 |
+----|------------|-------------+
(2)异常流程:消息中间件宕机

  1. 步骤一出现异常

即:还没开始下单,消息中间件宕机了,那么立刻下单失败。
如图:

  1. 步骤四出现异常

即:此步骤还在下单事务中,当提交中间请求失败,本地事务不会回滚。

这时候,RocketMQ 客户端会不断去重试。
当 RocketMQ 恢复后,RocketMQ 会去查询一次

实验步骤:

在执行本地事务之后,睡眠 30s
此期间,消息中间件宕机:broker 关闭。
日志如下:

2022-05-09 15:27:14.980 c.d.t.message.OrderTxMessageListener : 订单微服务执行本地事务
2022-05-09 15:27:15.015 c.d.t.message.OrderTxMessageListener : 订单微服务提交事务
2022-05-09 15:27:15.015 c.d.t.message.OrderTxMessageListener : 尝试睡 30s
2022-05-09 15:27:39.025 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:27:39.025 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:27:45.015 c.d.t.message.OrderTxMessageListener : 睡醒,起来干活了
2022-05-09 15:27:48.041 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:28:09.017 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:28:09.019 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true

。。。。。。
2022-05-09 15:35:47.481 INFO 22815 --- [pool-1-thread-1] c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
(3)异常流程:事务发送方执行本地事务失败
即:这时候捕捉到异常:

订单服务给 RocketMQ 发送回滚消息:RocketMQLocalTransactionState.ROLLBACK
RocketMQ 接收到消息后,会回查
回查,发现不存在这个订单,订单服务向 RocketMQ 发送 未知消息:RocketMQLocalTransactionState.UNKNOWN
UNKNOWN 未知状态:可能是事务正在执行中出异常等,这种情况下消息系统不知道该如何处理,当前的逻辑是会直接丢弃掉,等待后续检查逻辑来处理。

日志如下:

2022-05-09 15:45:11.829 c.d.t.message.OrderTxMessageListener : 订单微服务执行本地事务
2022-05-09 15:45:11.861 c.d.t.message.OrderTxMessageListener : 订单微服务回滚事务

2022-05-09 15:45:47.490 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:46:47.484 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:47:47.489 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:48:47.491 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:49:47.492 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:50:47.494 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:51:47.496 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:52:47.497 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:53:47.498 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:54:47.501 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:55:47.501 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:56:47.503 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:57:47.504 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:58:47.506 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
2022-05-09 15:59:47.510 c.d.t.message.OrderTxMessageListener : 订单微服务查询本地事务
可看到 RocketMQ 回查了 15 次。

(4)异常流程:事务接收方宕机
即:RocketMQ 无法推送消息给消息接收方。

此时,订单服务还是会下单成功
库存服务无法处理
当库存服务再次上线后:会接收到消息。

订单服务日志:

2022-05-09 16:47:53.142 c.d.t.message.OrderTxMessageListener : 订单微服务执行本地事务
2022-05-09 16:47:53.174 c.d.t.message.OrderTxMessageListener : 订单微服务提交事务
库存服务日志:

2022-05-09 16:48:10.283 c.d.t.message.StockTxMessageConsumer : 库存微服务开始消费事务消息:{"txMessage":{"payCount":1,"productId":1,"txNo":"d5d86dae-76c2-4b56-9597-a12dae14325a"}}
2022-05-09 16:48:10.284 c.d.t.service.impl.StockServiceImpl : 库存微服务执行本地事务,商品id:1, 购买数量:1
(5)异常流程:事务接收方执行本地事务失败
即:当接收到消息后,执行本地事务失败,RocketMQ 会不断发送消费消息。

事务接收方执行本地事务失败,措施有:

记录日志,人工介入处理。
重试,再出错,则人工介入。
实验步骤:

下单错误商品,订单下单成功。
库存中没有此商品数据,向上抛错。
日志:

2022-05-09 14:19:59.930 WARN 19542 --- [MessageThread_3] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [queueId=0, storeSize=480, queueOffset=1, sysFlag=8, bornTimestamp=1652077145061, bornHost=/172.17.0.1:36848, storeTimestamp=1652077199916, storeHost=/172.17.0.3:10911, msgId=AC11000300002A9F0000000000000D1B, commitLogOffset=3355, bodyCR

java.lang.NullPointerException: null
at com.donald.txmsgstock.service.impl.StockServiceImpl.decreaseStock(StockServiceImpl.java:35) ~[classes/:na]
at com.donald.txmsgstock.message.StockTxMessageConsumer.onMessage(StockTxMessageConsumer.java:27) ~[classes/:na]
at com.donald.txmsgstock.message.StockTxMessageConsumer.onMessage(StockTxMessageConsumer.java:16) ~[classes/:na]
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:308) ~[rocketmq-spring-boot-2.0.2.jar:2.0.2]
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417) [rocketmq-client-4.4.0.jar:4.4.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_162]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_162]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
参考资料

推荐一个零声教育C/C++后台开发的免费公开课程,个人觉得老师讲得不错,分享给大家:C/C++后台开发高级架构师,内容包括Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 算法 分布式数据库
Raft算法:分布式一致性领域的璀璨明珠
【4月更文挑战第21天】Raft算法是分布式一致性领域的明星,通过领导者选举、日志复制和安全性解决一致性问题。它将复杂问题简化,角色包括领导者、跟随者和候选者。领导者负责日志复制,确保多数节点同步。实现细节涉及超时机制、日志压缩和网络分区处理。广泛应用于分布式数据库、存储系统和消息队列,如Etcd、TiKV。其简洁高效的特点使其在分布式系统中备受青睐。
|
1月前
|
算法 分布式数据库
Paxos算法:分布式一致性的基石
【4月更文挑战第21天】Paxos算法是分布式一致性基础,由Leslie Lamport提出,包含准备和提交阶段,保证安全性和活性。通过提案编号、接受者和学习者实现,广泛应用于分布式数据库、锁和配置管理。其简单、高效、容错性强,影响了后续如Raft等算法,是理解分布式系统一致性关键。
|
1月前
|
存储 缓存 负载均衡
分布式系统Session一致性问题
分布式系统Session一致性问题
41 0
|
1月前
|
消息中间件 Dubbo 应用服务中间件
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)
分布式事物【Hmily实现TCC分布式事务、Hmily实现TCC事务、最终一致性分布式事务解决方案】(七)-全面详解(学习总结---从入门到深化)
103 0
|
14天前
|
消息中间件 中间件 程序员
分布式事务大揭秘:使用MQ实现最终一致性
本文由小米分享,介绍分布式事务中的MQ最终一致性实现,以RocketMQ为例。RocketMQ的事务消息机制包括准备消息、本地事务执行、确认/回滚消息及事务状态检查四个步骤。这种机制通过消息队列协调多系统操作,确保数据最终一致。MQ最终一致性具有系统解耦、提高可用性和灵活事务管理等优点,广泛应用于分布式系统中。文章还讨论了RocketMQ的事务消息处理流程和失败情况下的处理策略,帮助读者理解如何在实际应用中解决分布式事务问题。
21 6
|
19天前
|
运维 程序员 数据库
如何用TCC方案轻松实现分布式事务一致性
TCC(Try-Confirm-Cancel)是一种分布式事务解决方案,将事务拆分为尝试、确认和取消三步,确保在分布式系统中实现操作的原子性。它旨在处理分布式环境中的数据一致性问题,通过预检查和资源预留来降低失败风险。TCC方案具有高可靠性和灵活性,但也增加了系统复杂性并可能导致性能影响。它需要为每个服务实现Try、Confirm和Cancel接口,并在回滚时确保资源正确释放。虽然有挑战,TCC在复杂的分布式系统中仍被广泛应用。
30 5
|
30天前
|
算法 程序员
破解Paxos活性难题:分布式一致性的终极指南
Paxos算法是解决分布式系统一致性问题的关键,由Leslie Lamport提出。它涉及提议者、接受者和学习者三个角色,通过准备和接受两个阶段达成共识。然而,确保算法的活性,即在面对网络分区、竞争冲突和节点故障时仍能及时决策,是一个挑战。解决方法包括领导者选举、优化提案编号管理、使用超时机制和Fast Paxos等。实际案例中,通过领导者选举和超时机制,可以提高Paxos在应对网络延迟和冲突时的活性。
41 1
|
1月前
|
Java 数据库连接 API
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
77 0
|
1月前
|
算法
基于一致性理论的微电网分布式控制策略仿真模型【自适应虚拟阻抗】【simulink仿真】
基于一致性理论的微电网分布式控制策略仿真模型【自适应虚拟阻抗】【simulink仿真】
|
27天前
|
算法 程序员 分布式数据库
分布式一致性必备:一文读懂Raft算法
Raft算法是一种用于分布式系统中复制日志一致性管理的算法。它通过选举领导者来协调日志复制,确保所有节点数据一致。算法包括心跳机制、选举过程、日志复制和一致性保证。当领导者失效时,节点会重新选举,保证高可用性。Raft易于理解和实现,提供强一致性,常用于分布式数据库和协调服务。作者小米分享了相关知识,鼓励对分布式系统感兴趣的读者进一步探索。
147 0

热门文章

最新文章