RocketMQ极简入门-RocketMQ事务消息

简介: 如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。分布式事物的解决方案有很多,如:2PC,TCC,最终一致性,最大努力通知等等。这里要介绍的是基于RocketMQ事务消息的最终一致性方案

概述

如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务

分布式事物的解决方案有很多,如:2PC,TCC,最终一致性,最大努力通知等等。这里要介绍的是基于RocketMQ事务消息的最终一致性方案

分布式事务

用户注册成功,向用户数据库保存用户信息,同时通过远程调用积分服务为用户赠送积分,模型如下:

我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。这种业务场景可以选择强一致性方案,也可以选择最终一致性。我们选择最终一致性,因为用户注册成功,不要求马上赠送积分,延迟一定时间后再赠送成功也是允许的。所以有了如下模型

我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。这种业务场景可以选择强一致性方案,也可以选择最终一致性。我们选择最终一致性,因为用户注册成功,不要求马上赠送积分,延迟一定时间后再赠送成功也是允许的。所以有了如下模型

事务流程

1.用户服务(事务发起方)往MQ中发送一个事务消息,

2.MQ返回结果是否发送成功

3.用户服务受到消息发送成功结果,保存用户数据,提交本地事务

4.积分服务拿到MQ中的事务消息

5.积分服务保存积分到数据库

RocketMQ事务消息原理

事务流程中的最大的难点就是如何保证事务消息发送和本地事务的原子性,即:第一步和第二步要么都成功,要么都失败,不能说消息发送成功了,结果用户保存失败了,那么积分服务可能会增加成功,就导致数据不一致。RocketMQ已经帮我们处理好这个问题。它的工作原理如下:

  1. 事务发起方,即用户服务会先向broker发送一个prepare“半事务消息”(一个并不完整的消息)到RMQ_SYS_TRANS_HALF_TOPIC的queue中, 该消息对消费者不可见。
  2. MQ会返回一个ACK确认消息发送成功或者失败
  3. 消息发送成功,用户服务执行保存用户操作,提交本地事务,并根据本地事务的执行结果来决定半消息的提交状态为提交或者回滚
  4. 本地事务提交成功,事务发起方即用户服务会向broker再次发起“结束半事务”消息请求,commit或者rollback指令
  5. broker端收到请求后,首先从RMQ_SYS_TRANS_HALF_TOPIC的queue中查出该消息,设置为完成状态。如果消息状态为提交,则把半消息从RMQ_SYS_TRANS_HALF_TOPIC队列中复制到这个消息原始topic的queue中去(之后这条消息就能被正常消费了);如果消息状态为回滚,则什么也不做。
  6. Producer发送的半消息结束请求是oneway的,也就是发送后就不管了,只靠这个是无法保证半消息一定被提交的(比如未执行第4步),rocketMq提供了一个兜底方案,这个方案叫消息反查机制,Broker启动时,会启动一个TransactionalMessageCheckService任务,该任务会定时从半消息队列中读出所有超时未完成的半消息,针对每条未完成的消息,Broker会给对应的Producer发送一个消息反查请求,根据反查结果来决定这个半消息是需要提交还是回滚,或者后面继续来反查
  7. consumer(本例中指积分系统)消费消息,执行本地数据变更,提交本地事务

RocketMQ事务消息实战

我们需要做什么

  • 编写本地事务检查监听TransactionListener ,一是执行本地事务逻辑,二是返回本地事务执行状态
  • 发消息时生产者需要设置producer.setTransactionListener 事务监听

事务回调监听

publicclassMyTransactionCheckListenerimplementsTransactionListener {
@OverridepublicLocalTransactionStateexecuteLocalTransaction(Messagemessage, Objecto) {
//执行业务,保存本地事务,比如注册用户保存的数据库的动作可以在这里完成//保存成功returnLocalTransactionState.ROLLBACK_MESSAGE;
    }
@OverridepublicLocalTransactionStatecheckLocalTransaction(MessageExtmessageExt) {
//这里查询本地事务状态 ,查询用户是否注册成功returnLocalTransactionState.COMMIT_MESSAGE;
    }
}

消息发送者

publicclassTransationSender {
publicstaticvoidmain(String[] args) throwsMQClientException {
TransactionMQProducerproducer=newTransactionMQProducer("tran-product-group");
producer.setNamesrvAddr("127.0.0.1:9876");
//线程池ExecutorServiceexcutorService=Executors.newFixedThreadPool(20);
producer.setExecutorService(excutorService);
producer.setTransactionListener(newMyTransactionCheckListener());
//设置事务消息监听producer.start();
for(inti=0 ; i<10 ; i++){
StringorderId=UUID.randomUUID().toString();
Stringtags="Tag";
Messagemessage=newMessage("topic-tran", "tag", orderId, ("下单:"+i).getBytes(CharsetUtil.UTF_8));
TransactionSendResulttransactionSendResult=producer.sendMessageInTransaction(message, null);
System.out.println(transactionSendResult);
        }
producer.shutdown();
    }
}

消费者

消费者和普通消费者一样

publicclassTransationConsumer {
publicstaticvoidmain(String[] args) throwsMQClientException {
//创建消费者DefaultMQPushConsumerdefaultMQPushConsumer=newDefaultMQPushConsumer("trans-consumer-group");
//设置name server 地址defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//订阅defaultMQPushConsumer.subscribe("topic-tran", "tag");
defaultMQPushConsumer.registerMessageListener(newMessageListenerConcurrently() {
@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>list, ConsumeConcurrentlyContextconsumeConcurrentlyContext) {
list.forEach(message->{
System.out.println(message+" ; "+newString(message.getBody(), CharsetUtil.UTF_8));
                });
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
defaultMQPushConsumer.start();
    }
}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
26天前
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
48 0
|
26天前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
44 0
|
26天前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
43 0
RabbitMQ入门指南(九):消费者可靠性
|
26天前
|
消息中间件 存储 Java
RabbitMQ入门指南(八):MQ可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了MQ数据持久化、LazyQueue模式、管理控制台配置Lazy模式、代码配置Lazy模式、更新已有队列为lazy模式等内容。
64 0
|
26天前
|
消息中间件 JSON Java
RabbitMQ入门指南(六):消息转换器及其案例
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了RabbitMQ默认转换器、JSON转换器及其案例等内容。
36 0
|
26天前
|
消息中间件 Java API
RabbitMQ入门指南(五):Java声明队列、交换机以及绑定
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了Java声明队列、交换机以及绑定队列和交换机等内容。
30 0
|
26天前
|
消息中间件 微服务
RabbitMQ入门指南(四):交换机与案例解析
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了交换机在RabbitMQ中的作用与类型、交换机案例(Fanout交换机、Direct交换机、Topic交换机)等内容。
37 0
|
26天前
|
消息中间件 Java API
RabbitMQ入门指南(三):Java入门示例
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了AMQP、Spring AMQP和使用SpringAMQP实现对RabbitMQ的消息收发等内容。
29 0
RabbitMQ入门指南(三):Java入门示例
|
26天前
|
消息中间件 存储 数据库
RabbitMQ入门指南(二):架构和管理控制台的使用
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了RabbitMQ架构和管理控制台的使用等内容。
51 0
RabbitMQ入门指南(二):架构和管理控制台的使用