概述
如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。
分布式事物的解决方案有很多,如:2PC,TCC,最终一致性,最大努力通知等等。这里要介绍的是基于RocketMQ事务消息的最终一致性方案
分布式事务
用户注册成功,向用户数据库保存用户信息,同时通过远程调用积分服务为用户赠送积分,模型如下:
我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。这种业务场景可以选择强一致性方案,也可以选择最终一致性。我们选择最终一致性,因为用户注册成功,不要求马上赠送积分,延迟一定时间后再赠送成功也是允许的。所以有了如下模型
我们需要使用分布式事务管理实现用户数据库和积分数据库的一致性。即:用户保存成功,用户的积分也要保存成功,或者都回滚不做任何存储。这种业务场景可以选择强一致性方案,也可以选择最终一致性。我们选择最终一致性,因为用户注册成功,不要求马上赠送积分,延迟一定时间后再赠送成功也是允许的。所以有了如下模型
事务流程
1.用户服务(事务发起方)往MQ中发送一个事务消息,
2.MQ返回结果是否发送成功
3.用户服务受到消息发送成功结果,保存用户数据,提交本地事务
4.积分服务拿到MQ中的事务消息
5.积分服务保存积分到数据库
RocketMQ事务消息原理
事务流程中的最大的难点就是如何保证事务消息发送和本地事务的原子性,即:第一步和第二步要么都成功,要么都失败,不能说消息发送成功了,结果用户保存失败了,那么积分服务可能会增加成功,就导致数据不一致。RocketMQ已经帮我们处理好这个问题。它的工作原理如下:
事务发起方,即用户服务会先向broker发送一个prepare“半事务消息”(一个并不完整的消息)到RMQ_SYS_TRANS_HALF_TOPIC的queue中, 该消息对消费者不可见。
MQ会返回一个ACK确认消息发送成功或者失败
消息发送成功,用户服务执行保存用户操作,提交本地事务,并根据本地事务的执行结果来决定半消息的提交状态为提交或者回滚
本地事务提交成功,事务发起方即用户服务会向broker再次发起“结束半事务”消息请求,commit或者rollback指令
broker端收到请求后,首先从RMQ_SYS_TRANS_HALF_TOPIC的queue中查出该消息,设置为完成状态。如果消息状态为提交,则把半消息从RMQ_SYS_TRANS_HALF_TOPIC队列中复制到这个消息原始topic的queue中去(之后这条消息就能被正常消费了);如果消息状态为回滚,则什么也不做。
Producer发送的半消息结束请求是oneway的,也就是发送后就不管了,只靠这个是无法保证半消息一定被提交的(比如未执行第4步),rocketMq提供了一个兜底方案,这个方案叫消息反查机制,Broker启动时,会启动一个TransactionalMessageCheckService任务,该任务会定时从半消息队列中读出所有超时未完成的半消息,针对每条未完成的消息,Broker会给对应的Producer发送一个消息反查请求,根据反查结果来决定这个半消息是需要提交还是回滚,或者后面继续来反查
- consumer(本例中指积分系统)消费消息,执行本地数据变更,提交本地事务
RocketMQ事务消息实战
我们需要做什么
- 编写本地事务检查监听TransactionListener ,一是执行本地事务逻辑,二是返回本地事务执行状态
- 发消息时生产者需要设置producer.setTransactionListener 事务监听
事务回调监听
public class MyTransactionCheckListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//执行业务,保存本地事务,比如注册用户保存的数据库的动作可以在这里完成
//保存成功
return LocalTransactionState.ROLLBACK_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//这里查询本地事务状态 ,查询用户是否注册成功
return LocalTransactionState.COMMIT_MESSAGE;
}
}
消息发送者
public class TransationSender {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer("tran-product-group");
producer.setNamesrvAddr("127.0.0.1:9876");
//线程池
ExecutorService excutorService = Executors.newFixedThreadPool(20);
producer.setExecutorService(excutorService);
producer.setTransactionListener(new MyTransactionCheckListener());
//设置事务消息监听
producer.start();
for(int i = 0 ; i < 10 ; i++){
String orderId = UUID.randomUUID().toString();
String tags = "Tag";
Message message = new Message("topic-tran", "tag", orderId, ("下单:"+i).getBytes(CharsetUtil.UTF_8));
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
System.out.println(transactionSendResult);
}
producer.shutdown();
}
}
消费者
消费者和普通消费者一样
public class TransationConsumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("trans-consumer-group");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//订阅
defaultMQPushConsumer.subscribe("topic-tran", "tag");
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(message->{
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
}
}