Spring Cloud异步场景分布式事务怎样做?试试RocketMQ

简介: 在微服务架构中,我们常常使用异步化的手段来提升系统的吞吐量和解耦上下游,而构建异步架构最常用的手段就是使用消息队列(MQ),那异步架构怎样才能实现数据一致性呢?本文主要介绍如何使用RocketMQ的事务消息来解决一致性问题。

封面.png

一、背景

在微服务架构中,我们常常使用异步化的手段来提升系统的 吞吐量解耦 上下游,而构建异步架构最常用的手段就是使用 消息队列(MQ),那异步架构怎样才能实现数据一致性呢?本文主要介绍如何使用RocketMQ事务消息来解决一致性问题。

RocketMQ 是阿里巴巴开源的分布式消息中间件,目前已成为 Apache 的顶级项目。历经多次天猫双十一海量消息考验,具有高性能、低延时和高可靠等特性

PS:同步场景怎样保证一致性?请看文章《Spring Cloud同步场景分布式事务怎样做?试试Seata

 

二、MQ选型

可以看到在 业务处理 方面来说 RocketMQ 优于其他对手,而且原生支持 事务消息

mq对比.jpg

PS:业务系统用的是其他 MQ 产品但是又需要 事务消息 怎么办?学习原理自己开发实现!

 

三、什么是事务消息

例如下图的场景:生成订单记录 -> MQ -> 增加积分

案例.jpg

我们是应该先 创建订单记录,还是先 发送MQ消息 呢?

  1. 先发送MQ消息:这个明显是不行的,因为如果消息发送成功,而订单创建失败的话是没办法把消息收回来的
  2. 先创建订单记录:如果订单创建成功后MQ消息发送失败 抛出异常,因为两个操作都在本地事务中所以订单数据是可以 回滚

上面的 方式二 看似没问题,但是 网络是不可靠的!如果 MQ 的响应因为网络原因没有收到,所以在面对不确定的结果只好进行回滚;但是 MQ 端又确实是收到了这条消息的,只是回给客户端的 响应丢失 了!

所以 事务消息 就是用来保证 本地事务MQ消息发送 的原子性!

 

四、RocketMQ事务消息原理

RocketMQ事务消息原理.png

主要的逻辑分为两个流程:

  • 事务消息发送及提交

    1. 发送 half消息
    2. MQ服务端 响应消息写入结果
    3. 根据发送结果执行 本地事务(如果写入失败,此时half消息对业务 不可见,本地逻辑不执行)
    4. 根据本地事务状态执行 Commit 或者 Rollback(Commit操作生成消息索引,消息对消费者 可见
  • 回查流程

    1. 对于长时间没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次 回查
    2. Producer 收到回查消息,检查回查消息对应的 本地事务状态
    3. 根据本地事务状态,重新 Commit 或者 Rollback

逻辑时序图
RocketMQ事务消息时序图.png

 

五、异步架构一致性实现思路

从上面的原理可以发现 事务消息 仅仅只是保证本地事务和MQ消息发送形成整体的 原子性,而投递到MQ服务器后,并无法保证消费者一定能消费成功!

如果 消费端消费失败 后的处理方式,建议是记录异常信息然后 人工处理,并不建议回滚上游服务的数据(因为两者是 解耦 的,而且 复杂度 太高)

我们可以利用 MQ 的两个特性 重试死信队列 来协助消费端处理:

  1. 消费失败后进行一定次数的 重试
  2. 重试后也失败的话该消息丢进 死信队列
  3. 另外起一个线程监听消费 死信队列 里的消息,记录日志并且预警!

因为有 重试 所以消费者需要实现 幂等性

 

六、分布式事务场景样例

下面就用刚刚提到的场景:生成订单记录 -> MQ -> 增加积分;来简单讲一下 Spring Cloud 中应该怎么做,详细代码请 下载demo 查看。
PS:怎样安装部署RocketMQ可以参考《Apache RocketMQ 消息队列部署与可视化界面安装

6.1. 引入依赖

使用 spring-cloud-stream 框架来访问 RocketMQ
引入依赖.png

Spring Cloud Stream 是一个构建消息驱动的框架,通过抽象的定义实现应用与MQ消息队列之间的解耦,目前支持 RabbitMQkafkaRocketMQ

srping-cloud-stream.png

6.2. 开启事务消息

消息生产者需要添加 transactional: true 开启 事务消息
rocketMQ配置.png

6.3. 订单服务发送half消息

订单服务发送half消息.png

因为开启了 事务消息 所以这里发送的是 half消息 对于消费端是 不可见

6.4. 订单服务监听half消息

使用 @RocketMQTransactionListener 注解监听 半消息,并实现 RocketMQLocalTransactionListener 接口,该接口有两个方法

  • executeLocalTransaction:用于提交本地事务
  • checkLocalTransaction:用于事务回查

订单服务监听half消息.png

如果提交事务消息失败,需等待约1分钟左右 事务回查 方法才会被调用

6.5. 积分服务消费消息

积分服务消费消息.png

注意:因为有 重试,这里如果是真实的业务需要自行实现 幂等性

6.6. 消费死信队列预警

消费死信队列预警.png

监听并消费死信队列中的消息,用于记录错误日志,并且预警通知运维人员等

6.7. 测试用例

demo中提供了3个接口分别测试不同的场景:

  • 事务成功
    http://localhost:11002/success
    流程如下:

    1. 订单创建 成功
    2. 提交事务消息 成功
    3. 消费消息增加积分 成功
  • 订单创建成功但提交事务消息失败
    http://localhost:11002/produceError
    流程如下:

    1. 订单创建 成功
    2. 提交事务消息 失败
    3. 事务回查(等待1分钟左右) 成功
    4. 提交事务消息 成功
    5. 消费消息增加积分 成功
  • 消费消息失败
    http://localhost:11002/consumeError
    流程如下:

    1. 订单创建 成功
    2. 提交事务消息 成功
    3. 消费消息增加积分 失败
    4. 重试消费消息 失败
    5. 进入死信队列 成功
    6. 消费死信队列的消息 成功
    7. 记录日志并发出预警 成功

 

七、demo下载地址

https://gitee.com/zlt2000/microservices-platform/tree/master/zlt-demo/rocketmq-demo/rocketmq-transactional

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
21天前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
|
4天前
|
消息中间件 Kafka 数据安全/隐私保护
RabbitMQ异步通信详解
RabbitMQ异步通信详解
44 14
|
19天前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
37 2
|
24天前
|
Java 微服务 Spring
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
文章介绍了如何利用Spring Cloud Alibaba快速构建大型电商系统的分布式微服务,包括服务限流降级等主要功能的实现,并通过注解和配置简化了Spring Cloud应用的接入和搭建过程。
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
|
2月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
268 15
|
19天前
|
消息中间件 监控 RocketMQ
分布式事务实现方案:一文详解RocketMQ事务消息
分布式事务实现方案:一文详解RocketMQ事务消息
|
2月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
56 3
|
23天前
|
Dubbo Java 调度
揭秘!Spring Cloud Alibaba的超级力量——如何轻松驾驭分布式定时任务调度?
【8月更文挑战第20天】在现代微服务架构中,Spring Cloud Alibaba通过集成分布式定时任务调度功能解决了一致性和可靠性挑战。它利用TimerX实现任务的分布式编排与调度,并通过`@SchedulerLock`确保任务不被重复执行。示例代码展示了如何配置定时任务及其分布式锁,以实现每5秒仅由一个节点执行任务,适合构建高可用的微服务系统。
44 0
|
2月前
|
消息中间件 Java Nacos
通用快照方案问题之通过Spring Cloud实现配置的自动更新如何解决
通用快照方案问题之通过Spring Cloud实现配置的自动更新如何解决
57 0
|
2月前
|
缓存 监控 Java
通用快照方案问题之Spring Boot Admin的定义如何解决
通用快照方案问题之Spring Boot Admin的定义如何解决
44 0