RocketMQ事务实现原理

简介: RocketMQ事务实现原理

1需求


       RocketMQ一个优点是有事务特性,可以保证事务的最终一致性。举一个简单的例子,以电商为例,创建订单ID9527被创建后要保存到数据库,通过该订单通过MQ投递给其他系统进行消费。如果要保证订单数据入库与消息投递状态要保证最终一致性,要怎么做?


第一种情况,先保存数据库,在发送MQ


public void register(UserInfo userInfo){
    //保存用户信息
  userDao.insert(userInfo);
  //发送MQ消息
  SendResult res = mqProvider.sendMessage(userInfo.getId);
  if(res.getCode().equals("SEND_OK")){
     connection.commit();
  }else{
     connection.rollback();    
  }
}


如果生产者发送消息时,因为网络原因导致10秒消息才返回SendResult结果,这就意味着10秒内数据库事务无法提交,高并发情况下,数据库连接池资源会被打满,后果是比较严重的。


第二种情况,先发送MQ,在保存数据库


public void register(UserInfo userInfo){
  userDao.insert(userInfo);
  //发送MQ消息
  SendResult res = mqProvider.sendMessage(userInfo.getId);
  if(res.getCode().equals("SEND_OK")){
     //保存用户信息
     userDao.insert(userInfo);
     //此时系统宕机
     connection.commit();
  }else{
     connection.rollback();    
  }
}


如果消息发送成功,但是执行保存的时候数据库出现宕机,则MQ中却有了消息,但是数据库却没有该条订单信息,则MQ的下游消费方需要从业务逻辑上做额外处理。


问题很清晰了,我就是想要数据库保存和MQ发送同时成功或者同时失败,保证事务的最终一致性。下面分析RocketMQ是怎么实现的


2代码


  首先我会发送消息内容为9527到broker,此时消息的状态是半消息状态(properties中一个标志位,即对消费不可见),并且存储到Commitlog中,把偏移量返回给producer,producer收到存储消息成功后,会调用TransactionExecuterimpl的executeLocalTransactionBranch方法,在该方法中执行保存数据库的操作,如果保存数据库成功,返回一个LocalTransactionState.COMMIT_MESSAGE,反之返回LocalTransactionState.ROLLBACK_MESSAGE,并且把返回值发送给broker,broker根据LocalTransactionState的状态确定是消息是否标记丢弃和标记成功,从而实现对用户是否可见。


       如果在TransactionExecuterimpl的executeLocalTransactionBranch中数据库执行commit后系统宕机,此时数据库是有数据而borker的消息为半消息状态,这种情况下数据还是不一致,那么这时就用到事务回查器,broker会通过线程池扫描半消息的任务队列去发送请求给broker到TransactionCheckListenerImpl方法,比如查看数据库是否存在返回LocalTransactionState状态。


2.1生产者


package transaction;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
 * @author chaird
 * @create 2021-10-25 11:53
 */
public class SyncProducer {
  public static void main(String[] args) throws Exception {
    // 事务回查器
    TransactionCheckListenerImpl checkListener = new TransactionCheckListenerImpl();
    // 事务消息生产者
    TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
    // NameServer地址
    producer.setNamesrvAddr("localhost:9876");
    producer.setSendMsgTimeout(999999999);
    // 注册事务回查监听
    producer.setTransactionCheckListener(checkListener);
    // 本地事务执行器
    TransactionExecuterimpl executerimpl = new TransactionExecuterimpl();
    producer.start();
      // Create a message instance, specifying topic, tag and message body.
      Message msg =
          new Message(
              "TopicTest" /* Topic */,
              "TagA" /* Tag */,
              ("" + 9527).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
      // Call send message to deliver message to one of brokers.
      SendResult sendResult = producer.sendMessageInTransaction(msg, executerimpl, null);
      System.out.printf("%s%n", sendResult);
    // Shut down once the producer instance is not longer in use.
    producer.shutdown();
  }
}


2.2事务回查器


broker会查看服务器中某个半消息主题中的消息,通过定时任务去看这个消息是是否发送成功,后面会介绍。


package transaction;
/**
 * @author chaird
 * @create 2022-01-08 20:24
 */
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionCheckListenerImpl implements TransactionCheckListener {
    @Override
    public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
        System.out.println("服务器端回查事务消息: "+messageExt.toString());
        //由于RocketMQ迟迟没有收到消息的确认消息,因此主动询问这条prepare消息,是否正常?
        //可以查询数据库看这条数据是否已经处理
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}


2.3注册事务回查监听器


package transaction;
/**
 * @author chaird
 * @create 2022-01-08 20:24
 */
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import java.util.Date;
public class TransactionExecuterimpl implements LocalTransactionExecuter {
  @Override
  public LocalTransactionState executeLocalTransactionBranch(
      final Message message, final Object o) {
    try {
      String msg = new String(message.getBody(), "UTF-8");
      System.out.println("TransactionExecuterimpl-------" + msg);
      // db.save()
      // db.save()
      // db.save()
      // connect.commit()
      System.out.println(new Date() + "===> 本地事务执行成功,发送确认消息");
    } catch (Exception e) {
      System.out.println(new Date() + "===> 本地事务执行失败,回滚!!!");
      // connect.rollback()
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.COMMIT_MESSAGE;
  }
}


3部分源码分析


3.1客户端发送消息源码分析


客户端的生产者类型为TransactionMQProducer类型,设置了一个执行器和回查器,如下面代码所示


// 事务回查器
    TransactionCheckListenerImpl checkListener = new TransactionCheckListenerImpl();
    // 事务消息生产者
    TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
// 注册事务回查监听
producer.setTransactionCheckListener(checkListener);
 // 本地事务执行器
    TransactionExecuterimpl executerimpl = new TransactionExecuterimpl();
//创建消息
 Message msg =
          new Message(
              "TopicTest" /* Topic */,
              "TagA" /* Tag */,
              ("" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
      // 发送消息.
      SendResult sendResult = producer.sendMessageInTransaction(msg, executerimpl, null);


调用producer的sendMessageInTransaction方法,在该方法里会给消息添加一个如下代码的属性,标记该消息为事务消息


MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");


此时消息会发送到broker。


总结:消息在producer会被拦截并且添加属性


MessageConst.PROPERTY_TRANSACTION_PREPARED标记消息为事务消息


3.2broker收到消息源码分析


直接从SendMessageProcessor的asyncSendMessage方法看吧,该方法里会根据MessageConst.PROPERTY_TRANSACTION_PREPARED确定走哪个分支,如下图的transFlag(此时为“true”)属性


String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
               //省略
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }


然后就跳转到TransactionalMessageBridge的asyncPutHalfMessage方法,如下面代码所示,


public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }


很明显会parseHalfMessageInner方法对messageInner进行修改配置,这里重点是把主题改为一个默认的半消息主题


//public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());


构建完毕后调用DefaultMessageStore的asyncPutMessage方法把数据方法CommitLog里,此时把偏移量返回给producer


总结:消息在broker会被换主题,发到一个特定的主题里,并且把消息在CommitLog的offset发送给proudcer


3.3客户端发送消息后收到broker消息处理本地事务并且反馈结果给broker


DefaultMQProducerImpl调用sendMessageInTransaction后发送消息并且拿到响应结果,如图是放入CommitLog成功,则会执行LocalTransactionExecuter(看上面的代码,逻辑为运行MySQL,并且跟进成功与否返回一个状态localTransactionState),然后执行DefaultMQProducerImpl的endTransaction方法,把localTransactionState状态发送给broker


//执行自定义的LocalTransactionExecuter,即上面的TransactionExecuterimpl
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
this.endTransaction(msg, sendResult, localTransactionState, localException);


总结:客户端确定消息(半消息状态)已经被放在CommitLog中,执行自定义的LocalTransactionExecuter(逻辑为根据数据库是否成功提交返回localTransactionState状态,即确定消息是否为丢弃或者修改为提交状态)


3.4broker收到producer的本地事务状态修改消息的状态


直接从EndTransactionProcessor的processRequest方法开始说吧


如果producer中返回的MessageSysFlag.TRANSACTION_COMMIT_TYPE,即本地事务执行成功。


首先会根据客户端发送的偏移量去CommitLog中取到消息,然后创建一个新的消息对象放入到CommitLog中


//EndTransactionProcessor###processRequest
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            //根据prdducer中传过来的偏移量获取消息内容
//根据prdducer中传过来的偏移量获取消息内容
//根据prdducer中传过来的偏移量获取消息内容
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    //把取出来的消息内容重新copy一个新对象
//把取出来的消息内容重新copy一个新对象
//把取出来的消息内容重新copy一个新对象
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                   // 把新的数据在重新放在CommitLog中,生成ConsumeQueue
// 把新的数据在重新放在CommitLog中,生成ConsumeQueue
// 把新的数据在重新放在CommitLog中,生成ConsumeQueue
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }


总结:把原来的消息(半消息主题)重新复制一份(更正为正确的主题)插入到CommitLog,构建CounsumeQueue。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6天前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
5天前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
6天前
|
消息中间件 RocketMQ
MQ与本地事务一致性问题---RocketMQ事务型消息
MQ与本地事务一致性问题---RocketMQ事务型消息
25 2
|
6月前
|
消息中间件 Kafka 测试技术
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
99 0
|
6天前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
402 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
6天前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
58 0
|
6天前
|
消息中间件 数据库 RocketMQ
Springboot+RocketMQ通过事务消息优雅的实现订单支付功能
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。
143 0
|
6天前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(下)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
31 0
|
6天前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(上)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
76 0
|
6天前
|
存储 消息中间件 关系型数据库
解密分布式事务:CAP理论、BASE理论、两阶段提交(2PC)、三阶段提交(3PC)、补偿事务(TCC)、MQ事务消息、最大努力通知
解密分布式事务:CAP理论、BASE理论、两阶段提交(2PC)、三阶段提交(3PC)、补偿事务(TCC)、MQ事务消息、最大努力通知