分布式事务实现方案:一文详解RocketMQ事务消息

简介: 分布式事务实现方案:一文详解RocketMQ事务消息

常见的分布式事务实现方案有以下几种:两阶段提交(2PC)、两阶段提交(2PC)、补偿事务(Saga)、MQ事务消息等。今天就讲一下 RocketMQ 的事务消息,是一种非常特殊的分布式事务实现方案,基于半消息(Half Message)机制实现的。 看完这篇想一下,RocketMQ事务消息到底能不能保证分布式系统中数据的强一致性?

实现原理

RocketMQ事务消息执行流程如下:

  1. 生产者将消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息(Half Message)。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  2. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

图片

代码实现

RocketMQ事务消息示例如下:

复制

//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
private static boolean checkOrderById(String orderId) {
    return true;
}
//演示demo,模拟本地事务的执行结果。
private static boolean doLocalTransaction() {
    return true;
}
public static void main(String[] args) throws ClientException {
    ClientServiceProvider provider = new ClientServiceProvider();
    MessageBuilder messageBuilder = new MessageBuilderImpl();
    //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
    Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                /**
                 * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
                 * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
                 */
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    // 错误的消息,直接返回Rollback。
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
            })
            .build();
    //开启事务
    final Transaction transaction;
    try {
        transaction = producer.beginTransaction();
    } catch (ClientException e) {
        e.printStackTrace();
        //事务开启失败,直接退出。
        return;
    }
    Message message = messageBuilder.setTopic("topic")
            //设置消息索引键,可根据关键字精确查找某条消息。
            .setKeys("messageKey")
            //设置消息Tag,用于消费端根据指定Tag过滤消息。
            .setTag("messageTag")
            //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
            .addProperty("OrderId", "xxx")
            //消息体。
            .setBody("messageBody".getBytes())
            .build();
    //发送半事务消息
    final SendReceipt sendReceipt;
    try {
        sendReceipt = producer.send(message, transaction);
    } catch (ClientException e) {
        //半事务消息发送失败,事务可以直接退出并回滚。
        return;
    }
    /**
     * 执行本地事务,并确定本地事务结果。
     * 1. 如果本地事务提交成功,则提交消息事务。
     * 2. 如果本地事务提交失败,则回滚消息事务。
     * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
     *
     */
    boolean localTransactionOk = doLocalTransaction();
    if (localTransactionOk) {
        try {
            transaction.commit();
        } catch (ClientException e) {
            // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
            e.printStackTrace();
        }
    } else {
        try {
            transaction.rollback();
        } catch (ClientException e) {
            // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
            e.printStackTrace();
        }
    }
}• 

注意事项

  • 幂等性: 消费者处理消息时需要确保业务逻辑的幂等性,以应对消息可能的重复消费。
  • 超时和监控: 设置合理的超时时间,并监控事务消息的性能

总结

RocketMQ 事务消息是分布式事务中一种常见的实现方案,只是把发送消息和本地事务放在一个事务中,并且只保证最终一致性,无法保证强一致性。 原因有两点:

  1. 执行完成本地事务后,在commit事务消息之前,这段时间内数据是不一致的,所以只是保证了发送消息和本地事务的最终一致性。
  2. 在commit事务消息之后,然后把消息投递给消费者。至于消费者是否消费消息,什么时候消费?也都是不可控的,所以也只能尽量保证数据最终一致性。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
15天前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中如何与事务隔离级别结合使用
乐观锁在分布式数据库中如何与事务隔离级别结合使用
|
21天前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
37 2
|
29天前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
22天前
|
消息中间件 监控 安全
大事务+MQ普通消息线上问题排查过程技术分享
【8月更文挑战第23天】在复杂的企业级系统中,大事务与消息队列(MQ)的结合使用是一种常见的架构设计,用于解耦系统、提升系统响应性和扩展性。然而,这种设计也带来了其特有的挑战,特别是在处理退款业务等涉及金融交易的高敏感场景时。本文将围绕“大事务+MQ普通消息线上问题排查过程”这一主题,分享一次实际工作中的技术排查经验,旨在为大家提供可借鉴的解决思路和方法。
38 0
|
2月前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中与事务隔离级别结合使用
乐观锁在分布式数据库中与事务隔离级别结合使用
|
1月前
|
消息中间件 存储 C#
分布式事务之最终一致性实现方案
分布式事务之最终一致性实现方案
37 0
|
2月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
139 1
|
2月前
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
582 1
|
3月前
|
存储 关系型数据库 Java
技术经验解读:三种分布式事务LCN、Seata、MQ
技术经验解读:三种分布式事务LCN、Seata、MQ
106 0
|
3月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程