什么是事务消息
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
事务消息所对应的场景
在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。
以秒杀购物商城的商品下单交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、库存变更、购物车状态清空等多个子系统的变更。
事务性业务的处理分支包括:
- 主分支订单系统状态更新:由未支付变更为支付成功。
- 调用第三方物流系统状态新增:新增待发货物流记录,创建订单物流记录。
- 积分系统状态变更:变更用户积分,更新用户积分表。
- 购物车系统状态变更:清空购物车,更新用户购物车记录。
RocketMQ的事务消息
Apache RocketMQ在4.3.0版的时候已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。
RocketMQ事务消息流程
针对于事务消息的总体运作流程,主要分为两个部分:正常事务消息的发送及提交、事务消息的补偿流程。
事务消息发送及提交基本流程概要(后面会详细分析原理)
事务消息发送步骤如下
- 消息发送者:生产者将半事务消息发送至RocketMQ Broker。
- Broker服务端:RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
- 业务系统:生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback)。
- 如果本地操作成功,Commit操作生成消息索引,消息对消费者可见
- 如果本地操作失败,此时对应的half消息对业务不可见,本地逻辑不执行,Rollback均进行回滚。
服务端收到确认结果后处理逻辑如下
- 确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
消息出现异常情况的补偿流程如下
在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果(Commit/Rollback)或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
注意:服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置
事务消息回查步骤如下
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
补偿总结
- 对没有Commit/Rollback的事务消息(pending状态的消息),Broker服务端会发起一次“回查”。
- 生产者Producer收到回查消息,检查回查消息对应的本地事务的状态。
- 生产者根据本地事务状态,重新Commit或者Rollback。
补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况
RocketMQ事务消息实现原理
事务消息在一阶段对用户不可见
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。
实现技术要点一:事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。如何做到写入消息但是对用户不可见呢?
RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的Topic与消息消费队列,然后,改变Topic为RMQ_SYS_TRANS_HALF_TOPIC。
由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通过ConsumeQueue这个二级索引来读取消息实体内容,其流程如下:
RocketMQ的底层实现原理
- 写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列。
- 由于没有直接发送到目标的topic的队列里面,故此消费者无法感知消息的存在,不会消费,其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。
发送一个半事务消息
半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递(pending)”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向Broker半事务消息状态(commit或者rollback),半事务消息只有commit状态才会真正向下游投递。
Commit和Rollback操作以及Op消息的底层实现原理
Rollback的情况,对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。
但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。
如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。
Op消息的存储和对应关系
Op消息写入到全局特定的Topic中通过源码中的方法
TransactionalMessageUtil.buildOpTopic(); 复制代码
这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。
Half消息的索引构建
执行二阶段Commit操作时,需要构建出Half消息的索引。
- 一阶段的Half消息由于是写到一个特殊的Topic,
- 二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。
所以,RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。
补偿控制要点
如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker端会通过扫描发现某条消息长期处于"半事务消息"时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。
这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。
注意:事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。
RocketMQ的回查功能实现原理
如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。
- 回查次数的配置化
- Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制
- 如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志,执行回滚Rollback操作。
- 回查行为的定制化d
- 此外用户可以通过重写AbstractTransactionalMessageCheckListener 类来修改这个Rollback的行为,比如改写为Commit,或者其他的记录日志或者发送消息邮件推送给指定人进行人工跟进。
- 回查触发时间定制化
事务消息将在 Broker配置文件中的参数transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
事务性消息可能不止一次被检查或消费。
- 发送给用户的目标topic消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
消息事务样例
事务消息共有三种状态,提交状态、回滚状态、中间状态。
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态(Pending),它代表需要检查消息队列来确定状态。
开发实现案例
发送事务消息样例
创建事务性生产者
使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。
TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } 复制代码
实现事务的监听接口
TransactionListener接口的定义如下:
public interface TransactionListener { /** * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. * * @param msg Half(prepare) message * @param arg Custom business parameter * @return Transaction state */ LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); /** * When no response to prepare(half) message. broker will send check message to check the transaction status, and this * method will be invoked to get local transaction status. * * @param msg Check message * @return Transaction state */ LocalTransactionState checkLocalTransaction(final MessageExt msg); } 复制代码
当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } } 复制代码
executeLocalTransaction 是半事务消息发送成功后,执行本地事务的方法,具体执行完本地事务后,可以在该方法中返回以下三种状态:
- LocalTransactionState.COMMIT_MESSAGE:提交事务,允许消费者消费该消息
- LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
- LocalTransactionState.UNKNOW:暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。
checkLocalTransaction是由于二次确认消息没有收到,Broker端回查事务状态的方法。回查规则:本地事务执行完成后,若Broker端收到的本地事务返回状态为LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则Broker端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。
事务消息使用上的限制
事务消息不支持延时消息和批量消息。