RocketMQ进阶-事务消息

简介: RocketMQ进阶-事务消息

前言


分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好。今天我们来唠唠如何实现RocketMQ的事务消息!

Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。


RocketMQ事务流程概要

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程 整体流程为:

  • 正常事务发送与提交阶段
    1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)
    2、服务端响应消息写入结果,半消息发送成功
    3、开始执行本地事务
    4、根据本地事务的执行状态执行Commit或者Rollback操作
  • 事务信息的补偿流程
    1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求
    2、生产者收到确认回查请求后,检查本地事务的执行状态
    3、根据检查后的结果执行Commit或者Rollback操作
    补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

RocketMQ事务流程关键

1、事务消息在一阶段对用户不可见

事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC ,这样由于消费者没有订阅这个主题,所以不会被消费。

2、如何处理第二阶段的失败消息?

在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。

当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。

3、消息状态 事务消息有三种状态:

  • TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息
  • TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

实现


我们构建这样一个需求:用户请求订单微服务 order-service 接口删除订单(退货),删除订单后需要发送消息给用户服务account-service,用户微服务收到消息后会给用户账户增加余额。这个需求跟钱相关,肯定要保证消息的事务性,接下来我们根据上面的原理实现整个流程。


基础配置

生产者order-servcie和account-service都要引入RocketMQ相关依赖,增加RocketMQ的相关配置

  • 引入组件
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
  • 添加配置
# within rocketmq
rocketmq:
  name-server:xxx.xx.x.xx:9876;xxx.xx.x.xx:9876
  producer:
    group:cloud-group


发送半消息

order-service在执行删除订单操作时发送一条半消息给MQServer,发送半消息主要是使用rocketMQTemplate.sendMessageInTransaction() 方法,发送事务消息。

@Override
public void delete(String orderNo) {
  Order order = orderMapper.selectByNo(orderNo);
  //如果订单存在且状态为有效,进行业务处理
  if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
    String transactionId = UUID.randomUUID().toString();
    //如果可以删除订单则发送消息给rocketmq,让用户中心消费消息
    rocketMQTemplate.sendMessageInTransaction("add-amount",
        MessageBuilder.withPayload(
            UserAddMoneyDTO.builder()
                .userCode(order.getAccountCode())
                .amount(order.getAmount())
                .build()
        )
        .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
        .setHeader("order_id",order.getId())
        .build()
        ,order
    );
  }
}

首先先校验一下订单状态,然后发送消息给MQServer,这个逻辑大家都看得懂,主要是关注sendMessageInTransaction() 方法,源码如下:

public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException {
  try {
    if (((TransactionMQProducer)this.producer).getTransactionListener() == null) {
      thrownew IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
    } else {
      org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
      returnthis.producer.sendMessageInTransaction(rocketMsg, arg);
    }
  } catch (MQClientException var5) {
    throw RocketMQUtil.convert(var5);
  }
}

该方法有三个参数:

  • destination:目的地(主题),这里发送给add-amount 这个主题
  • message:发送给消费者的消息体,需要使用 MessageBuilder.withPayload() 来构建消息
  • arg:参数

注意,这里我们生成了一个transactionId,并放在header中跟消息一起发送(这里实际也可以构造成一个对象,放在arg里进行发送),作用后面再讲!


执行本地事务与回查

MQServer收到半消息后会告诉生产者order-service确认收到半消息,这时候order-service需要执行本地事务,执行完本地事务后再告诉MQServer本地事务的执行状态,确认消息究竟是Commit还是Rollback。如果在告诉MQServer本地执行状态的时候出异常了还需要让MQServer能够回查到,怎么实现这一些列操作呢?

RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事务监听器,这个接口类的实现如下:

第一个方法executeLocalTransaction 为执行本地事务;

第二个方法checkLocalTransaction 为检查本地事务的执行状态,也就是回查动作。

有了这个接口类我们的执行逻辑清楚了,但是还有个问题:本地事务已经执行完成了,怎么去回查本地事务的执行结果呢?


我们可以在执行本地事务的时候同时生成一个事务日志,让本地事务与日志事务在同一个方法中,同时添加@Transactional 注解,保证两个操作事务是一个原子操作。这样如果事务日志表中有这个本地事务的信息,那就代表本地事务执行成功,需要Commit,相反如果没有对应的事务日志,则表示没执行成功,需要Rollback

思路既然理顺了,咱们就开撸。


  • 首先创建一个日志表很简单的三个字段,主要是这个事务id,需要根据这个事务id回查事务,还记得我们在发送半消息时生成的事务id吗,就是干这个用的!
  • 在生产者编写方法实现RocketMQLocalTransactionListener
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
publicclass AddUserAmountListener implements RocketMQLocalTransactionListener {
    privatefinal OrderService orderService;
    privatefinal RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("执行本地事务");
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer orderId = Integer.valueOf((String)headers.get("order_id"));
        log.info("transactionId is {}, orderId is {}",transactionId,orderId);
        try{
            //执行本地事务,并记录日志
            orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
            //执行成功,可以提交事务
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * 本地事务的检查,检查本地事务是否成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("检查本地事务,事务ID:{}",transactionId);
        //根据事务id从日志表检索
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id",transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        if(null != rocketmqTransactionLog){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}
  • 执行本地事务的方法
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){
    //将订单状态置位无效
  orderMapper.changeStatus(id,status);
    //插入事务表
  rocketMqTransactionLogMapper.insert(
      RocketmqTransactionLog.builder()
          .transactionId(transactionId)
          .log("执行删除订单操作")
      .build()
  );
}

这一块的代码逻辑都是在生产端,即Order-Server,大家不要搞错了


消费消息

Rollback的消息MQServer会给我们处理,我们只要关注Commit状态时消费端可以正常消费即可。在account-service监听消息,如果收到消息则给用户账户增加余额。

@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
publicclass AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {
    privatefinal AccountMapper accountMapper;
    /**
     * 收到消息的业务逻辑
     */
    @Override
    public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {
        log.info("received message: {}",userAddMoneyDTO);
        accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
        log.info("add money success");
    }
}

测试

订单表有这样一条记录,用户为jianzh5,amount为200

用户表的记录,执行完成后jianzh5的账户应该变成250

  • 调用删除订单接口,删除订单
  • 发送半消息
  • 执行本地事务,并生成事务日志
  • 模拟异常情况 在发送Commit消息的时候我们用命令杀掉进程taskkill /pid 19748 -t -f,模拟异常!
  • 重新启动order-service,查看是否会执行回查动作
  • MQServer进行回查,检查事务日志,判断是否可以提交事务
  • 消费者消费事务消息,保证事务的一致性

总结


使用RocketMQ实现事务消息的过程还是很复杂的,需要好好理解开头的那张图,只有理解了事务消息的交互过程才能编写相应的代码!

好了,各位朋友们,本期的内容到此就全部结束啦,能看到这里的同学都是优秀的同学,下一个升职加薪的就是你了!

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
2月前
|
消息中间件 RocketMQ
MQ与本地事务一致性问题---RocketMQ事务型消息
MQ与本地事务一致性问题---RocketMQ事务型消息
25 2
|
6月前
|
消息中间件 Kafka 测试技术
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
96 0
|
2月前
|
消息中间件 SQL 容灾
深度剖析 RocketMQ 5.0,消息进阶:如何支撑复杂业务消息场景?
本文主要学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。
108198 287
|
3月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
383 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
3月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
56 0
|
4月前
|
消息中间件 数据库 RocketMQ
Springboot+RocketMQ通过事务消息优雅的实现订单支付功能
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。
132 0
|
4月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(下)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
30 0
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(上)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
73 0
|
4月前
|
存储 消息中间件 关系型数据库
解密分布式事务:CAP理论、BASE理论、两阶段提交(2PC)、三阶段提交(3PC)、补偿事务(TCC)、MQ事务消息、最大努力通知
解密分布式事务:CAP理论、BASE理论、两阶段提交(2PC)、三阶段提交(3PC)、补偿事务(TCC)、MQ事务消息、最大努力通知