事务消息应用场景、实现原理与项目实战(附全部源码)

简介: 事务消息应用场景、实现原理与项目实战(附全部源码)

1、活动中心场景介绍


在电商系统上线初期,往往会进行一些“拉新”活动,例如活动部门提出新用户注册送积分、送优惠券活动


基于分布式、微服务的设计理念,通常的架构设计(子系统交互)如下图所示:

d70819911876740916c6834f4140cd9f.png

其核心系统介绍如下:


  • 账户中心
    提供用户登录、用户注册等服务,一个新用户注册时,向MQ服务器中的USER_REGISTER主题发送一条消息,主流程结束,与送积分,送优惠券等过程解耦。
  • 优惠券(券系统)
    提供发放优惠券、使用优惠券等与券相关的基础服务。
  • 积分中心
    提供积分相关的服务,例如积分赠送、积分消费、积分查询等基础服务。
  • 送积分服务(消费者)
    订阅MQ,按照规则决定是否需要赠送积分,如果需要则调用积分相关的基础接口,完成积分的发放。
  • 送优惠券(消费者)
    订阅MQ,按照规则决定是否需要赠送优惠券,如果需要则调用券系统相关的基础接口,完成优惠券的发放。


上面的架构设计非常优雅,但并不是无懈可击,读者们肯定会想到如果新用户注册成功,但消息发送到MQ失败,或者消息成功发送到MQ,但发送完MQ后系统出现异常导致用户注册失败又该如何呢?


上面的问题其实就是典型的分布式事务问题:即如何保证用户注册(数据库操作)与MQ消息发送这两个分布式操作的一致性。


RocketMQ事务消息闪亮登场


2、事务消息实现原理


一言以蔽之:RocketMQ事务消息要解决的问题是消息发送与业务的一致性,其解决思路:二阶段提交与事务状态回查,其具体实现流程如下图所示:

fac24f1d2ced7944db3bc461b919ad03.png

其核心设计理念:


  • 应用程序开启一个数据库事务,进行数据库操作,并且在事务中发送一条PREPARE消息,PREPARE消息发送成功后通知应用程序记录本地事务状态,然后提交本地事务。
  • RocketMQ在收到类型为PREPARE的消息时,首先备份消息的原主题与原消息消费队列,然后将消息存储在主题为RMQ_SYS_TRANS_HALF_TOPIC的消息队列中,故PREPARE的消息是不会被客户端消费的。
  • Broker消息服务器开启一个定时任务处理RMQ_SYS_TRANS_HALF_TOPIC中的消息,会每隔指定时间向消息发送者发起事务状态查询请求 ,询问消息发送者客户端本地事务是否成功,然后根据回查状态决定是提交还是回滚,即对处于PREPARE状态进行提交或回滚操作。
  • 发送者如果明确得知事务成功,则可以返回COMMIT,服务端会提交该条消息,具体操作是恢复原消息的主题与队列,重新发送到Broker,消费端感知后消费。
  • 发送者如果无法明确得知事务状态,则返回UNOWN,此时服务端会等待一定时间后再次向发送者询问,默认询问15次。
  • 发送者如果非常明确得知事务失败,则可以返回ROLLBACK。


在具体实践中,消息发送者在无法获取事务状态时不要武断的返回ROLLBACK,而是要返回UNOWN,让服务端定时重试回查,说明如下:


e8df6e0a62071448bdec68acd375e7d6.png

在将PREPARE消息发送到Broker后,服务端发起事务查询时本地事务可能还未提交,为了避免无效的事务回查机制,RocketMQ通常至少在收到PREPARE消息6s后才会发起第一次事务回查,可通过 transactionTimeOut 配置。故客户端在实现事务回查时无法证明事务状态时不应该返回ROLLBACK,而是返回UNOWN。


3、事务消息实战


光说不练假把式,接下来以一个新用户注册送优惠券的场景来详细介绍如何使用事务消息。


项目模块职责说明如下:

507395fd95e4adc38653db42c12e73ee.png

事务消息的核心代码组装在transaction-service,其核心类图如下:


3faa944804222edd096bd7c40809c9db.png

其中核心要点如下:


  • UserServiceImpl
    Dubbo接口业务实现类,类似MVC的控制层,在这里做一些参数验证,但不执行具体的业务逻辑,只是发送一条事务消息到MQ。
  • UserRegTransactionListener
    事务监听器,在 executeLocalTransaction 方法中执行业务逻辑,数据库本地事务加在该方法。

温馨提示:之所以不在UserServicveImpl中执行本地事务,是因为 executeLocalTransaction 中抛出的异常会被RocketMQ框架捕捉,及异常无法被UserServiceImpl感知,即无法实现其事务的一致性。


接下来展示其核心代码,全部源码已上传到github仓库。


仓库地址:https://github.com/dingwpmz/rocketmq-learning


3.1 UserServiceImpl 核心实现


7ac7070c414b464b6951cdb11a56c40c.png


UserServiceImpl 的核心要点如下:


  • 首先应该对参数进行校验、业务逻辑进行校验,如果不满足业务条件,会发送一些无效消息到MQ,虽然不会造成业务异常,但会消耗性能
  • 发送事务消息,建议对消息设置Key,Key的值可以用业务处理流水号(可唯一表示该业务操作)或者核心业务字段(例如订单编号)
  • 业务入口类可通过事务消息发送状态来判断业务是否失败


3.2 UserRegTransactionListener 核心实现


事务监听器需要实现执行本地事务与事务回查两个接口。


3.2.1 实现 executeLocalTransaction


首先需要实现 executeLocalTransaction 方法,执行本地事务,其代码如下图所示:


37a1a0d2144f3e9defc348c945d7b938.png

其中几个关键点说明如下:


  • 在该方法上添加数据库事务标签。
  • 执行业务逻辑,示例Demo只是将用户数据存储到数据库。
  • 如果业务执行失败,可明确告知需要回滚,上层调用方也可根据ROLLBACK_MESSAGE进行相应的处理。
  • 如果业务成功,不建议直接返回COMMIT,而是建议返回UNKNOW,因为该方法尽管在方法最后一行,但可能发生断电等异常情况,数据库并没有成功。


3.2.2 实现 checkLocalTransaction


其次需要实现事务状态回查,用来RocketMQ服务端感知事务是否成功,其实现原理如下图所示:

ac5854b0e02fb272d43180be0d0f77c8.png

其实现关键点如下:


  • 如果能明确得知本地事务成功,则返回COMMIT_MESSAGE
  • 如该不能明确得知本地事务成功,不能返回ROLLBACK_MESSAGE,而是返回UNKNOW,等待服务端下一次事务回查(不会立即触发),服务端默认回查15次,如果15次都得到UNKNOW,则会回滚该消息


3.3 代码获取


上文只是将事务消息的核心代码加以解读,并重点阐述每个步骤的实现关键点,笔者基于SpringBoot,尝试结合场景学习RocketMQ的使用技巧,其代码上传到了github仓库。

7cca698dee09fde6ab4a814cfc8cec3b.png


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
8月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
112 0
|
2月前
|
NoSQL Java API
分布式锁的实现原理与应用场景,5 分钟彻底搞懂!
本文详细解析了分布式锁的实现原理与应用场景,包括线程锁、进程锁和分布式锁的区别,以及分布式锁的四种要求和三种实现方式(数据库乐观锁、ZooKeeper、Redis)。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
分布式锁的实现原理与应用场景,5 分钟彻底搞懂!
|
3月前
|
存储 Java 数据库
事务的四大特性及其实现原理:深入剖析与实战示例
【10月更文挑战第17天】在数据库管理和分布式系统设计中,事务(Transaction)扮演着至关重要的角色。事务的四大特性——原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability),通常简称为ACID特性。
90 1
|
8月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
109 0
|
8月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
656 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
8月前
|
消息中间件 存储 负载均衡
精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)
精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)
83 1
|
8月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
142 1
|
存储 设计模式 消息中间件
消费架构实战-原理
消费架构实战-原理
100 0
|
PHP 计算机视觉
PHPGrafika 如何实现圆角图片
PHPGrafika 如何实现圆角图片 在网站开发中,圆角图片是非常常见的一种设计元素。使用 PHPGrafika 库可以很方便的实现圆角图片的制作。本文将介绍如何使用 PHPGrafika 库制作圆角图片的方法。
119 0