深度剖析 RocketMQ 事务消息!

简介: 本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。

你好,我是猿java。

这篇文章,我们将深入探讨 RocketMQ 的事务消息原理,并从源码角度进行分析,以及事务消息适合什么场景,使用事务消息需要注意哪些事项。

什么是事务消息

事务消息是为了保证分布式系统中消息的一致性而引入的一种消息类型。事务消息允许消息发送方在发送消息后,进行本地事务操作,并根据本地事务的执行结果来决定消息的最终状态(提交或回滚)。

RocketMQ 事务消息的基本流程

RocketMQ 采用了 2PC(两段式协议) + 补偿机制(事务回查)的方式实现分布式事务功能,保证分布式事务的最终一致性。

1. 准备阶段(Prepare Phase):

  • 生产者发送一个half消息(Prepare Message)到 RocketMQ broker,这个消息会被存储但不会被消费者消费。
  • RocketMQ broker 接收到half消息后会进行持久化,并返回消息的状态给生产者。

2. 提交阶段(Commit Phase):

  • 生产者根据本地事务的执行结果(成功或失败)决定向 RocketMQ broker 发送提交(Commit)或回滚(Rollback)操作。
  • 如果本地事务执行成功,生产者发送提交消息,RocketMQ broker 会将half消息标记为可消费,消费者可以进行消费。
  • 如果本地事务执行失败,生产者发送回滚消息,RocketMQ broker 会删除half消息,消费者不会消费这条消息。

补偿机制

RocketMQ 还提供了一个事务回查机制,如果生产者在发送half事务消息后由于网络或其他原因未能及时通知 RocketMQ 提交或回滚消息,RocketMQ 会定期(默认 60s)回查生产者的事务状态,以决定如何处理这条half事务消息,以确保消息的一致性。

整个过程如下图:

rocketmq-transaction-message-process.png

为了更好的理解整个过程,我们通过一个完整的示例代码来展示:

public class TransactionProducer {
   
    public static void main(String[] args) throws Exception {
   
        // 创建事务消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
        producer.setNamesrvAddr("localhost:9876");

        // 设置事务状态回查监听器
        producer.setTransactionCheckListener(new TransactionCheckListener() {
   
            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
   
                // 处理事务状态回查逻辑
                System.out.println("Checking transaction state for message: " + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        // 启动生产者
        producer.start();

        // 发送事务消息
        Message msg = new Message("TransactionTopic", "TagA", "Transaction Message".getBytes());
        SendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() {
   
            @Override
            public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
   
                // 执行本地事务逻辑
                System.out.println("Executing local transaction for message: " + new String(msg.getBody()));
                // 假设本地事务执行成功,返回 COMMIT_MESSAGE
                // 如果本地事务失败,返回 ROLLBACK_MESSAGE
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        }, null);

        System.out.println("Send result: " + sendResult);

        // 阻塞主线程,防止退出
        System.in.read();

        // 关闭生产者
        producer.shutdown();
    }
}

RocketMQ 事务消息的源码分析

发送half消息

发送half消息的核心代码在 TransactionMQProducer 类中,通过 sendMessageInTransaction 方法实现:

public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) {
   
    // 1. 发送`half`消息
    SendResult sendResult = this.defaultMQProducerImpl.send(msg);

    // 2. 执行本地事务
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);

    // 3. 根据本地事务状态提交或回滚消息
    this.endTransaction(msg, localTransactionState);

    return new TransactionSendResult(sendResult, localTransactionState);
}

在 sendMessageInTransaction 方法中,首先调用 send 方法发送half消息,然后执行本地事务,并根据本地事务的结果调用 endTransaction 方法提交或回滚消息。

执行本地事务

本地事务的执行由 LocalTransactionExecuter 接口的实现类来完成。在实际使用中,用户需要实现该接口,并在 executeLocalTransactionBranch 方法中定义具体的本地事务逻辑。

public interface LocalTransactionExecuter {
   
    LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
}

提交或回滚事务消息

提交或回滚事务消息的实现也在 TransactionMQProducer 类中,通过 endTransaction 方法完成:

private void endTransaction(Message msg, LocalTransactionState localTransactionState) {
   
    // 构建事务结束请求
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setCommitOrRollback(localTransactionState == LocalTransactionState.COMMIT_MESSAGE ? 0 : 1);
    requestHeader.setTranStateTableOffset(msg.getQueueOffset());
    requestHeader.setCommitLogOffset(msg.getCommitLogOffset());

    // 发送事务结束请求到 Broker
    this.defaultMQProducerImpl.endTransaction(msg, requestHeader);
}

在 endTransaction 方法中,根据本地事务的执行结果构建事务结束请求,并调用 endTransaction 方法将请求发送到 Broker。

事务状态回查

事务状态回查是由 Broker 发起的。当 Broker 在规定时间内没有收到提交或回滚请求时,会主动向消息发送方发起事务状态回查。回查的实现主要在 TransactionCheckListener 接口中:

public interface TransactionCheckListener {
   
    LocalTransactionState checkLocalTransactionState(final MessageExt msg);
}

消息发送方需要实现 TransactionCheckListener 接口,并在 checkLocalTransactionState 方法中定义如何检查本地事务的状态。

Broker 端的事务消息处理

Broker 端的事务消息处理主要在 TransactionalMessageServiceImpl 类中实现。Broker 负责接收half消息、提交或回滚请求,并在必要时发起事务状态回查。

接收half消息

Broker 接收half消息的逻辑在 TransactionalMessageServiceImpl 类的 prepareMessage 方法中:

public PutMessageResult prepareMessage(MessageExtBrokerInner msgInner) {
   
    // 存储`half`消息
    return this.store.putMessage(msgInner);
}

提交或回滚消息

Broker 处理提交或回滚请求的逻辑在 TransactionalMessageServiceImpl 类的 commitMessage 和 rollbackMessage 方法中:

public boolean commitMessage(MessageExt msgExt) {
   
    // 提交消息
    return this.store.commitTransaction(msgExt);
}

public boolean rollbackMessage(MessageExt msgExt) {
   
    // 回滚消息
    return this.store.rollbackTransaction(msgExt);
}

事务状态回查

Broker 发起事务状态回查的逻辑在 TransactionalMessageServiceImpl 类的 check 方法中:

public void check(long transactionTimeout, int transactionCheckMax, String topic) {
   
    // 遍历`half`消息队列,发起事务状态回查
    List<MessageExt> halfMessages = this.store.getHalfMessages(topic);
    for (MessageExt msg : halfMessages) {
   
        // 发起回查请求
        this.brokerController.getBroker2Client().checkProducerTransactionState(msg);
    }
}

RocketMQ 事务消息的优缺点

优点

  • 保证消息一致性:通过事务消息,RocketMQ 能够保证分布式系统中消息的一致性,避免数据不一致问题。
  • 高性能:RocketMQ 的事务消息性能较高,能够满足高并发场景的需求。
  • 易用性:RocketMQ 提供了简单易用的 API,使得开发者能够方便地使用事务消息。

缺点

  • 复杂性:事务消息的引入增加了系统的复杂性,开发者需要处理事务状态回查等问题。
  • 时延:事务消息的处理涉及half消息、回查等操作,可能会增加消息的时延。

事务消息适用场景

资金转账

在金融系统中,资金转账需要确保资金的一致性和安全性。例如,从账户 A 转账到账户 B,必须确保 A 的金额减少和 B 的金额增加是一个原子操作。使用事务消息可以保证在转账过程中,如果任何一个步骤失败,整个操作都会回滚,确保数据一致性。

订单处理

在电子商务系统中,订单处理通常涉及多个步骤,例如创建订单、扣减库存、生成支付记录等。这些步骤需要保证一致性。使用事务消息可以确保如果某一步操作失败,整个订单处理过程可以回滚,避免数据不一致。

分布式事务

在微服务架构中,分布式事务是一个常见的挑战。多个微服务之间的操作需要协调一致,事务消息可以作为一种分布式事务解决方案,确保各个微服务之间的数据一致性。

库存管理

在库存管理系统中,库存的增减操作需要保证一致性。例如,用户下单后需要扣减库存,使用事务消息可以确保在扣减库存失败时,订单状态不会被错误更新。

事务消息注意事项

确保本地事务的幂等性

在分布式系统中,本地事务操作可能会被多次执行。例如,在事务状态回查时,Broker 可能会多次检查本地事务状态。因此,确保本地事务操作的幂等性非常重要。幂等性可以确保多次执行相同的操作不会产生副作用。

设置合理的超时时间

事务消息的处理涉及half消息、提交或回滚请求以及事务状态回查。设置合理的超时时间可以避免长时间等待,影响系统性能。超时时间应根据实际业务需求和系统性能进行调整。

处理事务状态回查

事务状态回查是事务消息的重要机制。当 Broker 在规定时间内没有收到提交或回滚请求时,会主动发起事务状态回查。开发者需要实现 TransactionCheckListener 接口,并在 checkLocalTransactionState 方法中处理回查逻辑,确保能够正确返回事务状态。

监控和日志

监控和日志是确保事务消息系统稳定运行的重要手段。通过监控,可以及时发现系统中的异常情况,例如事务状态回查失败、消息发送失败等。日志记录可以帮助开发者排查问题,分析系统性能。

资源隔离

在使用事务消息时,确保事务消息与其他普通消息的资源隔离,以避免相互影响。例如,可以为事务消息单独配置 Topic 和队列,确保事务消息的处理不受其他消息影响。

事务消息的重试机制

在某些情况下,事务消息的提交或回滚请求可能会失败。开发者需要考虑实现重试机制,以确保最终能够正确提交或回滚事务消息。重试机制可以通过定时任务或消息队列实现。

性能影响

事务消息的处理涉及多次网络通信和状态检查,可能会对系统性能产生一定影响。在高并发场景中,需要评估事务消息对系统性能的影响,并进行相应的优化。例如,可以通过批量处理、异步处理等方式提高性能。

总结

本文详细介绍了 RocketMQ 事务消息的基本流程,并通过源码分析揭示了其内部实现原理,尽管事务消息增加了系统的复杂性,但在需要保证消息一致性的场景中,它仍然是一种非常有效的解决方案,比如资金转账、订单处理、分布式事务、库存管理等场景。

参考资料

rocketmq官网

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注:猿java,持续输出硬核文章。

目录
相关文章
|
23天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
15天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
20天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2574 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
18天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
3天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
2天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
159 2
|
19天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1575 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
22天前
|
编解码 JSON 自然语言处理
通义千问重磅开源Qwen2.5,性能超越Llama
击败Meta,阿里Qwen2.5再登全球开源大模型王座
957 14
|
3天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
198 2
|
17天前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
726 10