七.RocketMQ极简入门-RocketMQ事务消息

简介: RocketMQ极简入门-RocketMQ事务消息

概述

如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。

分布式事物的解决方案有很多,如:2PC,TCC,最终一致性,最大努力通知等等。这里要介绍的是基于RocketMQ事务消息的最终一致性方案

分布式事务

用户注册成功,向用户数据库保存用户信息,同时通过远程调用积分服务为用户赠送积分,模型如下:

我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。这种业务场景可以选择强一致性方案,也可以选择最终一致性。我们选择最终一致性,因为用户注册成功,不要求马上赠送积分,延迟一定时间后再赠送成功也是允许的。所以有了如下模型
在这里插入图片描述
我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。这种业务场景可以选择强一致性方案,也可以选择最终一致性。我们选择最终一致性,因为用户注册成功,不要求马上赠送积分,延迟一定时间后再赠送成功也是允许的。所以有了如下模型
在这里插入图片描述

事务流程

1.用户服务(事务发起方)往MQ中发送一个事务消息,
2.MQ返回结果是否发送成功
3.用户服务受到消息发送成功结果,保存用户数据,提交本地事务
4.积分服务拿到MQ中的事务消息
5.积分服务保存积分到数据库

RocketMQ事务消息原理

事务流程中的最大的难点就是如何保证事务消息发送和本地事务的原子性,即:第一步和第二步要么都成功,要么都失败,不能说消息发送成功了,结果用户保存失败了,那么积分服务可能会增加成功,就导致数据不一致。RocketMQ已经帮我们处理好这个问题。它的工作原理如下:
在这里插入图片描述

  1. 事务发起方,即用户服务会先向broker发送一个prepare“半事务消息”(一个并不完整的消息)到RMQ_SYS_TRANS_HALF_TOPIC的queue中, 该消息对消费者不可见。

  2. MQ会返回一个ACK确认消息发送成功或者失败

  3. 消息发送成功,用户服务执行保存用户操作,提交本地事务,并根据本地事务的执行结果来决定半消息的提交状态为提交或者回滚

  4. 本地事务提交成功,事务发起方即用户服务会向broker再次发起“结束半事务”消息请求,commit或者rollback指令

  5. broker端收到请求后,首先从RMQ_SYS_TRANS_HALF_TOPIC的queue中查出该消息,设置为完成状态。如果消息状态为提交,则把半消息从RMQ_SYS_TRANS_HALF_TOPIC队列中复制到这个消息原始topic的queue中去(之后这条消息就能被正常消费了);如果消息状态为回滚,则什么也不做。

  6. Producer发送的半消息结束请求是oneway的,也就是发送后就不管了,只靠这个是无法保证半消息一定被提交的(比如未执行第4步),rocketMq提供了一个兜底方案,这个方案叫消息反查机制,Broker启动时,会启动一个TransactionalMessageCheckService任务,该任务会定时从半消息队列中读出所有超时未完成的半消息,针对每条未完成的消息,Broker会给对应的Producer发送一个消息反查请求,根据反查结果来决定这个半消息是需要提交还是回滚,或者后面继续来反查

  7. consumer(本例中指积分系统)消费消息,执行本地数据变更,提交本地事务

RocketMQ事务消息实战

我们需要做什么

  • 编写本地事务检查监听TransactionListener ,一是执行本地事务逻辑,二是返回本地事务执行状态
  • 发消息时生产者需要设置producer.setTransactionListener 事务监听

事务回调监听

public class MyTransactionCheckListener implements TransactionListener {
   
   

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
   
   
        //执行业务,保存本地事务,比如注册用户保存的数据库的动作可以在这里完成

        //保存成功
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
   
   
        //这里查询本地事务状态 ,查询用户是否注册成功
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
AI 代码解读

消息发送者

public class TransationSender {
   
   

    public static void main(String[] args) throws MQClientException {
   
   

        TransactionMQProducer producer = new TransactionMQProducer("tran-product-group");

        producer.setNamesrvAddr("127.0.0.1:9876");

        //线程池
        ExecutorService excutorService = Executors.newFixedThreadPool(20);

        producer.setExecutorService(excutorService);

        producer.setTransactionListener(new MyTransactionCheckListener());

        //设置事务消息监听
        producer.start();

        for(int i = 0 ; i < 10 ; i++){
   
   
            String orderId = UUID.randomUUID().toString();
            String tags = "Tag";
            Message message = new Message("topic-tran", "tag", orderId, ("下单:"+i).getBytes(CharsetUtil.UTF_8));

            TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
            System.out.println(transactionSendResult);

        }
        producer.shutdown();
    }
}
AI 代码解读

消费者

消费者和普通消费者一样

public class TransationConsumer {
   
   
    public static void main(String[] args) throws MQClientException {
   
   
        //创建消费者
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("trans-consumer-group");
        //设置name server 地址
        defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

        //订阅
        defaultMQPushConsumer.subscribe("topic-tran", "tag");

        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
   
   
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
   
   

                list.forEach(message->{
   
   
                    System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
                });

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        defaultMQPushConsumer.start();
    }
}
AI 代码解读
相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
4
分享
相关文章
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
688 7
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
439 2
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
116 0
分享一下rocketmq入门小知识
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
607 2
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
799 1
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等