自顶向下学习 RocketMQ(七):事务消息

简介: RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

定义


“RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。”


Demo


下面的例子,还是以 spring cloud stream 编程模型为基础,结合 spring cloud alibaba RocketMQ 的实现,演示了如何使用事务消息。


流程


事务消息交互流程如下图所示:


18.jpg


事务消息发送步骤如下:


  1. 生产者将半事务消息发送至消息队列 RocketMQ 版服务端。


  1. 消息队列 RocketMQ 版服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息为半事务消息。


  1. 生产者开始执行本地事务逻辑。


  1. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下:
  • 二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者。
  • 二次确认结果为 Rollback:服务端不会将该消息投递给消费者,并按照如下逻辑进行回查处理。


事务消息回查步骤如下:


  1. 在断网或者是生产者应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。


  1. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。


  1. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。


配置


和之前一样,我将生产者消息者配置在一起了,先来看一下配置文件:


spring:
  application:
    name: mq-example
  cloud:
    stream:
      bindings:
        input-transaction:
          content-type: application/json
          destination: TransactionTopic
          group: transaction-consumer-group
        output-transaction:
          content-type: application/json
          destination: TransactionTopic
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          # 配置 rocketmq 的 nameserver 地址
          name-server: 127.0.0.1:9876
          group: rocketmq-group
        bindings:
          output-transaction:
            #  对应 RocketMQProducerProperties 类
            producer:
              producerType: Trans
              group: transaction-producer-group # 生产者分组
              transactionListener: myTransactionListener


这里需要注意生产者类型为:Trans 即,事务消息。


还配置了相应的生产者组和消费者组,这里回顾一下这两个概念


消费者组:同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。


生产者组:同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。


实现


transactionListener 为我们自定义的事务监听器,具体代码见下文:


@Component("myTransactionListener")
public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Object num = msg.getProperty("test");
        if ("1".equals(num)) {
            System.out.println("executer: " + new String(msg.getBody()) + " unknown");
            return LocalTransactionState.UNKNOW;
        } else if ("2".equals(num)) {
            System.out.println("executer: " + new String(msg.getBody()) + " rollback");
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        System.out.println("executer: " + new String(msg.getBody()) + " commit");
        return LocalTransactionState.COMMIT_MESSAGE;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("check: " + new String(msg.getBody()));
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}


以上代码是参考官方的 Demo, 可以看到根据 num 的不同,返回不同的事务状态


  • 如果 num1,则返回 UNKNOW,表示本地事务状态未知,需要定期回查事务状态,则会执行 checkLocalTransaction 方法。
  • 如果 num2,则返回 ROLLBACK_MESSAGE,表示本地事务状态为回滚,则 broker 会回滚之前的提交的事务消息,即不投递消息。
  • 如果 num3,则返回 COMMIT_MESSAGE,表示本地事务状态为提交,则 broker 会投递消息。


发送消息跟之前的代码类似:


@GetMapping("/send_transaction")
    public void sendTransaction() {
        String msg = "这是一条事务消息";
        Integer num = 2;
        MessageBuilder builder = MessageBuilder.withPayload(msg)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
        builder.setHeader("test", String.valueOf(num));
        builder.setHeader(RocketMQConst.USER_TRANSACTIONAL_ARGS, "binder");
        Message message = builder.build();
        mySource.outputTransaction().send(message);
    }


为了保证可靠消息最终一致性,需要有一个数据库表记录事务状态,


19.jpg


事务开始的时候先将 UNKNOW 状态存起来,当事务异常时,返回 ROLLBACK_MESSAGE 状态,并且在数据库表中记录此状态。当事务提交成功时,将状态修改为 COMMIT_MESSAGE。


有了事务消息表,checkLocalTransaction 方法就可以依据此表进行事务状态的查询。

当然,如果按上图所示,一个完整的分布式事务跨越 A、B 两个系统,如果 B 系统事务失败回滚时,考虑 A 系统的事务是否需要回滚,如需要,还需要 A 系统提供回滚接口,供 B 系统调用。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
2月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
145 2
|
7月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
4月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
5月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
385 1
|
5月前
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
995 1
|
6月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
6月前
|
消息中间件 网络性能优化 RocketMQ
消息队列 MQ产品使用合集之本地事务还没有执行完就触发了回查是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
7月前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
62 0