深度剖析 RocketMQ 事务消息!

简介: 本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。

你好,我是猿java。

这篇文章,我们将深入探讨 RocketMQ 的事务消息原理,并从源码角度进行分析,以及事务消息适合什么场景,使用事务消息需要注意哪些事项。

什么是事务消息

事务消息是为了保证分布式系统中消息的一致性而引入的一种消息类型。事务消息允许消息发送方在发送消息后,进行本地事务操作,并根据本地事务的执行结果来决定消息的最终状态(提交或回滚)。

RocketMQ 事务消息的基本流程

RocketMQ 采用了 2PC(两段式协议) + 补偿机制(事务回查)的方式实现分布式事务功能,保证分布式事务的最终一致性。

1. 准备阶段(Prepare Phase):

  • 生产者发送一个half消息(Prepare Message)到 RocketMQ broker,这个消息会被存储但不会被消费者消费。
  • RocketMQ broker 接收到half消息后会进行持久化,并返回消息的状态给生产者。

2. 提交阶段(Commit Phase):

  • 生产者根据本地事务的执行结果(成功或失败)决定向 RocketMQ broker 发送提交(Commit)或回滚(Rollback)操作。
  • 如果本地事务执行成功,生产者发送提交消息,RocketMQ broker 会将half消息标记为可消费,消费者可以进行消费。
  • 如果本地事务执行失败,生产者发送回滚消息,RocketMQ broker 会删除half消息,消费者不会消费这条消息。

补偿机制

RocketMQ 还提供了一个事务回查机制,如果生产者在发送half事务消息后由于网络或其他原因未能及时通知 RocketMQ 提交或回滚消息,RocketMQ 会定期(默认 60s)回查生产者的事务状态,以决定如何处理这条half事务消息,以确保消息的一致性。

整个过程如下图:

rocketmq-transaction-message-process.png

为了更好的理解整个过程,我们通过一个完整的示例代码来展示:

public class TransactionProducer {
   
    public static void main(String[] args) throws Exception {
   
        // 创建事务消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
        producer.setNamesrvAddr("localhost:9876");

        // 设置事务状态回查监听器
        producer.setTransactionCheckListener(new TransactionCheckListener() {
   
            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
   
                // 处理事务状态回查逻辑
                System.out.println("Checking transaction state for message: " + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        // 启动生产者
        producer.start();

        // 发送事务消息
        Message msg = new Message("TransactionTopic", "TagA", "Transaction Message".getBytes());
        SendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() {
   
            @Override
            public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
   
                // 执行本地事务逻辑
                System.out.println("Executing local transaction for message: " + new String(msg.getBody()));
                // 假设本地事务执行成功,返回 COMMIT_MESSAGE
                // 如果本地事务失败,返回 ROLLBACK_MESSAGE
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        }, null);

        System.out.println("Send result: " + sendResult);

        // 阻塞主线程,防止退出
        System.in.read();

        // 关闭生产者
        producer.shutdown();
    }
}

RocketMQ 事务消息的源码分析

发送half消息

发送half消息的核心代码在 TransactionMQProducer 类中,通过 sendMessageInTransaction 方法实现:

public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) {
   
    // 1. 发送`half`消息
    SendResult sendResult = this.defaultMQProducerImpl.send(msg);

    // 2. 执行本地事务
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);

    // 3. 根据本地事务状态提交或回滚消息
    this.endTransaction(msg, localTransactionState);

    return new TransactionSendResult(sendResult, localTransactionState);
}

在 sendMessageInTransaction 方法中,首先调用 send 方法发送half消息,然后执行本地事务,并根据本地事务的结果调用 endTransaction 方法提交或回滚消息。

执行本地事务

本地事务的执行由 LocalTransactionExecuter 接口的实现类来完成。在实际使用中,用户需要实现该接口,并在 executeLocalTransactionBranch 方法中定义具体的本地事务逻辑。

public interface LocalTransactionExecuter {
   
    LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
}

提交或回滚事务消息

提交或回滚事务消息的实现也在 TransactionMQProducer 类中,通过 endTransaction 方法完成:

private void endTransaction(Message msg, LocalTransactionState localTransactionState) {
   
    // 构建事务结束请求
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setCommitOrRollback(localTransactionState == LocalTransactionState.COMMIT_MESSAGE ? 0 : 1);
    requestHeader.setTranStateTableOffset(msg.getQueueOffset());
    requestHeader.setCommitLogOffset(msg.getCommitLogOffset());

    // 发送事务结束请求到 Broker
    this.defaultMQProducerImpl.endTransaction(msg, requestHeader);
}

在 endTransaction 方法中,根据本地事务的执行结果构建事务结束请求,并调用 endTransaction 方法将请求发送到 Broker。

事务状态回查

事务状态回查是由 Broker 发起的。当 Broker 在规定时间内没有收到提交或回滚请求时,会主动向消息发送方发起事务状态回查。回查的实现主要在 TransactionCheckListener 接口中:

public interface TransactionCheckListener {
   
    LocalTransactionState checkLocalTransactionState(final MessageExt msg);
}

消息发送方需要实现 TransactionCheckListener 接口,并在 checkLocalTransactionState 方法中定义如何检查本地事务的状态。

Broker 端的事务消息处理

Broker 端的事务消息处理主要在 TransactionalMessageServiceImpl 类中实现。Broker 负责接收half消息、提交或回滚请求,并在必要时发起事务状态回查。

接收half消息

Broker 接收half消息的逻辑在 TransactionalMessageServiceImpl 类的 prepareMessage 方法中:

public PutMessageResult prepareMessage(MessageExtBrokerInner msgInner) {
   
    // 存储`half`消息
    return this.store.putMessage(msgInner);
}

提交或回滚消息

Broker 处理提交或回滚请求的逻辑在 TransactionalMessageServiceImpl 类的 commitMessage 和 rollbackMessage 方法中:

public boolean commitMessage(MessageExt msgExt) {
   
    // 提交消息
    return this.store.commitTransaction(msgExt);
}

public boolean rollbackMessage(MessageExt msgExt) {
   
    // 回滚消息
    return this.store.rollbackTransaction(msgExt);
}

事务状态回查

Broker 发起事务状态回查的逻辑在 TransactionalMessageServiceImpl 类的 check 方法中:

public void check(long transactionTimeout, int transactionCheckMax, String topic) {
   
    // 遍历`half`消息队列,发起事务状态回查
    List<MessageExt> halfMessages = this.store.getHalfMessages(topic);
    for (MessageExt msg : halfMessages) {
   
        // 发起回查请求
        this.brokerController.getBroker2Client().checkProducerTransactionState(msg);
    }
}

RocketMQ 事务消息的优缺点

优点

  • 保证消息一致性:通过事务消息,RocketMQ 能够保证分布式系统中消息的一致性,避免数据不一致问题。
  • 高性能:RocketMQ 的事务消息性能较高,能够满足高并发场景的需求。
  • 易用性:RocketMQ 提供了简单易用的 API,使得开发者能够方便地使用事务消息。

缺点

  • 复杂性:事务消息的引入增加了系统的复杂性,开发者需要处理事务状态回查等问题。
  • 时延:事务消息的处理涉及half消息、回查等操作,可能会增加消息的时延。

事务消息适用场景

资金转账

在金融系统中,资金转账需要确保资金的一致性和安全性。例如,从账户 A 转账到账户 B,必须确保 A 的金额减少和 B 的金额增加是一个原子操作。使用事务消息可以保证在转账过程中,如果任何一个步骤失败,整个操作都会回滚,确保数据一致性。

订单处理

在电子商务系统中,订单处理通常涉及多个步骤,例如创建订单、扣减库存、生成支付记录等。这些步骤需要保证一致性。使用事务消息可以确保如果某一步操作失败,整个订单处理过程可以回滚,避免数据不一致。

分布式事务

在微服务架构中,分布式事务是一个常见的挑战。多个微服务之间的操作需要协调一致,事务消息可以作为一种分布式事务解决方案,确保各个微服务之间的数据一致性。

库存管理

在库存管理系统中,库存的增减操作需要保证一致性。例如,用户下单后需要扣减库存,使用事务消息可以确保在扣减库存失败时,订单状态不会被错误更新。

事务消息注意事项

确保本地事务的幂等性

在分布式系统中,本地事务操作可能会被多次执行。例如,在事务状态回查时,Broker 可能会多次检查本地事务状态。因此,确保本地事务操作的幂等性非常重要。幂等性可以确保多次执行相同的操作不会产生副作用。

设置合理的超时时间

事务消息的处理涉及half消息、提交或回滚请求以及事务状态回查。设置合理的超时时间可以避免长时间等待,影响系统性能。超时时间应根据实际业务需求和系统性能进行调整。

处理事务状态回查

事务状态回查是事务消息的重要机制。当 Broker 在规定时间内没有收到提交或回滚请求时,会主动发起事务状态回查。开发者需要实现 TransactionCheckListener 接口,并在 checkLocalTransactionState 方法中处理回查逻辑,确保能够正确返回事务状态。

监控和日志

监控和日志是确保事务消息系统稳定运行的重要手段。通过监控,可以及时发现系统中的异常情况,例如事务状态回查失败、消息发送失败等。日志记录可以帮助开发者排查问题,分析系统性能。

资源隔离

在使用事务消息时,确保事务消息与其他普通消息的资源隔离,以避免相互影响。例如,可以为事务消息单独配置 Topic 和队列,确保事务消息的处理不受其他消息影响。

事务消息的重试机制

在某些情况下,事务消息的提交或回滚请求可能会失败。开发者需要考虑实现重试机制,以确保最终能够正确提交或回滚事务消息。重试机制可以通过定时任务或消息队列实现。

性能影响

事务消息的处理涉及多次网络通信和状态检查,可能会对系统性能产生一定影响。在高并发场景中,需要评估事务消息对系统性能的影响,并进行相应的优化。例如,可以通过批量处理、异步处理等方式提高性能。

总结

本文详细介绍了 RocketMQ 事务消息的基本流程,并通过源码分析揭示了其内部实现原理,尽管事务消息增加了系统的复杂性,但在需要保证消息一致性的场景中,它仍然是一种非常有效的解决方案,比如资金转账、订单处理、分布式事务、库存管理等场景。

参考资料

rocketmq官网

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注:猿java,持续输出硬核文章。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
29天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
6月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
3月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
3月前
|
消息中间件 监控 安全
大事务+MQ普通消息线上问题排查过程技术分享
【8月更文挑战第23天】在复杂的企业级系统中,大事务与消息队列(MQ)的结合使用是一种常见的架构设计,用于解耦系统、提升系统响应性和扩展性。然而,这种设计也带来了其特有的挑战,特别是在处理退款业务等涉及金融交易的高敏感场景时。本文将围绕“大事务+MQ普通消息线上问题排查过程”这一主题,分享一次实际工作中的技术排查经验,旨在为大家提供可借鉴的解决思路和方法。
55 0
|
4月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
317 1
|
4月前
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
925 1
|
5月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
5月前
|
消息中间件 网络性能优化 RocketMQ
消息队列 MQ产品使用合集之本地事务还没有执行完就触发了回查是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
消息中间件 Kafka 测试技术
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
153 0