七.RocketMQ极简入门-RocketMQ事务消息

简介: RocketMQ极简入门-RocketMQ事务消息

概述

如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。

分布式事物的解决方案有很多,如:2PC,TCC,最终一致性,最大努力通知等等。这里要介绍的是基于RocketMQ事务消息的最终一致性方案

分布式事务

用户注册成功,向用户数据库保存用户信息,同时通过远程调用积分服务为用户赠送积分,模型如下:

我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。这种业务场景可以选择强一致性方案,也可以选择最终一致性。我们选择最终一致性,因为用户注册成功,不要求马上赠送积分,延迟一定时间后再赠送成功也是允许的。所以有了如下模型
在这里插入图片描述
我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。这种业务场景可以选择强一致性方案,也可以选择最终一致性。我们选择最终一致性,因为用户注册成功,不要求马上赠送积分,延迟一定时间后再赠送成功也是允许的。所以有了如下模型
在这里插入图片描述

事务流程

1.用户服务(事务发起方)往MQ中发送一个事务消息,
2.MQ返回结果是否发送成功
3.用户服务受到消息发送成功结果,保存用户数据,提交本地事务
4.积分服务拿到MQ中的事务消息
5.积分服务保存积分到数据库

RocketMQ事务消息原理

事务流程中的最大的难点就是如何保证事务消息发送和本地事务的原子性,即:第一步和第二步要么都成功,要么都失败,不能说消息发送成功了,结果用户保存失败了,那么积分服务可能会增加成功,就导致数据不一致。RocketMQ已经帮我们处理好这个问题。它的工作原理如下:
在这里插入图片描述

  1. 事务发起方,即用户服务会先向broker发送一个prepare“半事务消息”(一个并不完整的消息)到RMQ_SYS_TRANS_HALF_TOPIC的queue中, 该消息对消费者不可见。

  2. MQ会返回一个ACK确认消息发送成功或者失败

  3. 消息发送成功,用户服务执行保存用户操作,提交本地事务,并根据本地事务的执行结果来决定半消息的提交状态为提交或者回滚

  4. 本地事务提交成功,事务发起方即用户服务会向broker再次发起“结束半事务”消息请求,commit或者rollback指令

  5. broker端收到请求后,首先从RMQ_SYS_TRANS_HALF_TOPIC的queue中查出该消息,设置为完成状态。如果消息状态为提交,则把半消息从RMQ_SYS_TRANS_HALF_TOPIC队列中复制到这个消息原始topic的queue中去(之后这条消息就能被正常消费了);如果消息状态为回滚,则什么也不做。

  6. Producer发送的半消息结束请求是oneway的,也就是发送后就不管了,只靠这个是无法保证半消息一定被提交的(比如未执行第4步),rocketMq提供了一个兜底方案,这个方案叫消息反查机制,Broker启动时,会启动一个TransactionalMessageCheckService任务,该任务会定时从半消息队列中读出所有超时未完成的半消息,针对每条未完成的消息,Broker会给对应的Producer发送一个消息反查请求,根据反查结果来决定这个半消息是需要提交还是回滚,或者后面继续来反查

  7. consumer(本例中指积分系统)消费消息,执行本地数据变更,提交本地事务

RocketMQ事务消息实战

我们需要做什么

  • 编写本地事务检查监听TransactionListener ,一是执行本地事务逻辑,二是返回本地事务执行状态
  • 发消息时生产者需要设置producer.setTransactionListener 事务监听

事务回调监听

public class MyTransactionCheckListener implements TransactionListener {
   
   

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
   
   
        //执行业务,保存本地事务,比如注册用户保存的数据库的动作可以在这里完成

        //保存成功
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
   
   
        //这里查询本地事务状态 ,查询用户是否注册成功
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

消息发送者

public class TransationSender {
   
   

    public static void main(String[] args) throws MQClientException {
   
   

        TransactionMQProducer producer = new TransactionMQProducer("tran-product-group");

        producer.setNamesrvAddr("127.0.0.1:9876");

        //线程池
        ExecutorService excutorService = Executors.newFixedThreadPool(20);

        producer.setExecutorService(excutorService);

        producer.setTransactionListener(new MyTransactionCheckListener());

        //设置事务消息监听
        producer.start();

        for(int i = 0 ; i < 10 ; i++){
   
   
            String orderId = UUID.randomUUID().toString();
            String tags = "Tag";
            Message message = new Message("topic-tran", "tag", orderId, ("下单:"+i).getBytes(CharsetUtil.UTF_8));

            TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
            System.out.println(transactionSendResult);

        }
        producer.shutdown();
    }
}

消费者

消费者和普通消费者一样

public class TransationConsumer {
   
   
    public static void main(String[] args) throws MQClientException {
   
   
        //创建消费者
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("trans-consumer-group");
        //设置name server 地址
        defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

        //订阅
        defaultMQPushConsumer.subscribe("topic-tran", "tag");

        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
   
   
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
   
   

                list.forEach(message->{
   
   
                    System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
                });

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        defaultMQPushConsumer.start();
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
12天前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
22 1
|
12天前
|
消息中间件 调度 RocketMQ
【RocketMQ系列六】RocketMQ事务消息
【RocketMQ系列六】RocketMQ事务消息
22 1
|
20天前
|
消息中间件 存储 Kafka
01.RabbitMQ入门
01.RabbitMQ入门
34 0
|
25天前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
1月前
|
消息中间件 存储 前端开发
RabbitMQ在Java中的完美实现:从入门到精通
本文由木头左介绍如何在Java项目中使用RabbitMQ。RabbitMQ是开源的AMQP实现,支持多种客户端,适合分布式系统中的消息传递。首先需安装Erlang和RabbitMQ,接着在Java项目中添加RabbitMQ客户端库依赖。通过创建连接工厂和连接,建立与RabbitMQ的通信,并展示了创建连接和通道的代码示例。
|
1月前
|
消息中间件 网络性能优化 RocketMQ
消息队列 MQ产品使用合集之本地事务还没有执行完就触发了回查是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
2月前
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
334 0
|
2月前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
95 0
|
2月前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
102 0
RabbitMQ入门指南(九):消费者可靠性