RocketMQ 两大核心特性深度拆解:事务消息与延时消息,从原理到实战全打通

简介: RocketMQ作为阿里开源的金融级消息中间件,以高可靠、高吞吐、低延迟著称。其事务消息通过两阶段提交+回查机制,解决本地事务与消息发送的原子性问题;延时消息在5.x中升级为毫秒级任意时间定时消息,基于TimerStore与时间轮实现高性能调度,二者共同支撑分布式系统核心一致性与定时场景。

在分布式微服务架构中,消息中间件是解耦系统、削峰填谷、异步通信的核心组件。RocketMQ作为阿里开源的金融级分布式消息中间件,凭借高可靠、高吞吐、低延迟的特性,成为国内互联网企业的主流选型。其中事务消息与延时消息是RocketMQ最具差异化的两大核心能力,分别解决了分布式场景下的原子性事务与定时调度两大核心痛点。

一、RocketMQ事务消息:分布式事务的优雅解决方案

1.1 事务消息解决的核心痛点

在微服务架构中,我们经常会遇到「本地事务执行」与「消息发送」的原子性问题,典型场景如电商下单流程:用户下单时,需要在本地数据库创建订单、扣减库存,同时发送消息通知物流系统初始化发货流程。传统的处理方式存在两个致命问题:

  1. 先发消息,后执行本地事务:若消息发送成功,但本地事务执行失败回滚,会导致物流系统收到无效消息,产生脏数据。
  2. 先执行本地事务,后发消息:若本地事务执行成功,但消息发送失败,会导致物流系统无法收到消息,订单发货流程停滞,数据不一致。

传统的分布式事务方案如2PC、TCC、SAGA等,实现复杂、侵入性强、性能损耗大。而RocketMQ的事务消息,基于两阶段提交+事务回查机制,以极低的侵入性,完美解决了「本地事务与消息发送的原子性」问题,实现了二者的最终一致性。

1.2 事务消息的底层实现原理

RocketMQ事务消息的核心设计思想是两阶段提交+反向事务回查补偿,核心概念如下:

  • 半消息(Half Message):暂不能被消费者消费的消息。消息发送到Broker后,会被存储到对消费者不可见的特殊主题中,只有当本地事务执行成功并提交后,才会被投递给消费者。
  • 事务状态:包含COMMIT(提交)、ROLLBACK(回滚)、UNKNOWN(未知)三种状态,分别对应消息投递、消息删除、触发事务回查。
  • 事务回查:若Broker长时间未收到生产者的二次确认,会主动反向调用生产者的接口,查询本地事务的执行状态,确保消息最终能被正确提交或回滚。

底层存储实现上,RocketMQ为半消息专门设计了系统级主题RMQ_SYS_TRANS_HALF_TOPIC,该主题对普通消费者完全不可见,所有半消息都会被持久化到该主题中。当收到COMMIT指令时,Broker会将消息从半消息主题转移到业务指定的真实主题,此时消息才能被消费者消费;当收到ROLLBACK指令时,Broker会直接将半消息标记为删除,不会进行任何投递。

事务回查机制由Broker的定时任务驱动,默认每隔60秒扫描一次半消息主题中超过阈值未收到二次确认的消息,主动向生产者发起回查,默认最大回查次数为15次,超过次数后Broker会默认回滚该消息,避免消息无限期占用存储资源。

1.3 事务消息的核心流程

RocketMQ事务消息的完整执行流程如下,通过流程图可直观理解全链路逻辑:

全流程的核心逻辑可拆解为8个核心步骤:

  1. 生产者向Broker发送半消息,消息体包含完整的业务数据与全局唯一的事务ID。
  2. Broker收到半消息后,将其持久化到RMQ_SYS_TRANS_HALF_TOPIC主题,持久化成功后向生产者返回ACK确认。
  3. 生产者收到ACK确认后,执行本地事务逻辑。
  4. 生产者根据本地事务的执行结果,向Broker发送二次确认指令:事务执行成功发送COMMIT,执行失败发送ROLLBACK。
  5. Broker收到COMMIT指令后,将半消息从半消息主题转移到真实业务主题,对消费者可见并完成投递;收到ROLLBACK指令后,直接删除半消息,不进行任何投递。
  6. 若网络波动、服务宕机等原因导致Broker未收到二次确认指令,等待超时后,Broker会主动向生产者发起事务回查请求。
  7. 生产者收到回查请求后,根据事务ID查询本地事务的执行状态。
  8. 生产者根据查询结果,再次向Broker发送COMMIT/ROLLBACK指令,Broker重复步骤5的处理逻辑。

1.4 事务消息的生产级实战

本次实战基于经典的电商下单场景,实现「创建订单+扣减库存」与「发送物流消息」的原子性,确保二者最终一致。

1.4.1 环境依赖与项目配置

项目基于JDK17、Spring Boot 3.2.4、RocketMQ 5.2.0构建,Maven核心依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>
   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.4</version>
       <relativePath/>
   </parent>
   <groupId>com.jam.demo</groupId>
   <artifactId>rocketmq-demo</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>rocketmq-demo</name>
   <description>RocketMQ事务消息与延时消息实战</description>
   <properties>
       <java.version>17</java.version>
       <rocketmq.version>2.2.5</rocketmq.version>
       <mybatis-plus.version>3.5.6</mybatis-plus.version>
       <mysql.version>8.0.36</mysql.version>
       <fastjson2.version>2.0.52</fastjson2.version>
       <guava.version>33.1.0-jre</guava.version>
       <lombok.version>1.18.32</lombok.version>
       <springdoc.version>2.5.0</springdoc.version>
   </properties>
   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.apache.rocketmq</groupId>
           <artifactId>rocketmq-spring-boot-starter</artifactId>
           <version>${rocketmq.version}</version>
       </dependency>
       <dependency>
           <groupId>org.apache.rocketmq</groupId>
           <artifactId>rocketmq-client</artifactId>
           <version>5.2.0</version>
       </dependency>
       <dependency>
           <groupId>com.baomidou</groupId>
           <artifactId>mybatis-plus-boot-starter</artifactId>
           <version>${mybatis-plus.version}</version>
       </dependency>
       <dependency>
           <groupId>com.mysql</groupId>
           <artifactId>mysql-connector-j</artifactId>
           <version>${mysql.version}</version>
       </dependency>
       <dependency>
           <groupId>com.alibaba.fastjson2</groupId>
           <artifactId>fastjson2</artifactId>
           <version>${fastjson2.version}</version>
       </dependency>
       <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>${guava.version}</version>
       </dependency>
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>${lombok.version}</version>
           <scope>provided</scope>
       </dependency>
       <dependency>
           <groupId>org.springdoc</groupId>
           <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
           <version>${springdoc.version}</version>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>
   </dependencies>
   <build>
       <plugins>
           <plugin>
               <groupId>org.springframework.boot</groupId>
               <artifactId>spring-boot-maven-plugin</artifactId>
               <configuration>
                   <excludes>
                       <exclude>
                           <groupId>org.projectlombok</groupId>
                           <artifactId>lombok</artifactId>
                       </exclude>
                   </excludes>
               </configuration>
           </plugin>
       </plugins>
   </build>
</project>

application.yml配置文件如下:

spring:
 application:
   name: rocketmq-demo
 datasource:
   driver-class-name: com.mysql.cj.jdbc.Driver
   url: jdbc:mysql://127.0.0.1:3306/rocketmq_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
   username: root
   password: root
 jackson:
   default-property-inclusion: non_null
server:
 port: 8080
rocketmq:
 name-server: 127.0.0.1:9876
 producer:
   group: order-producer-group
   send-message-timeout: 3000
   retry-times-when-send-failed: 2
mybatis-plus:
 mapper-locations: classpath*:/mapper/**/*.xml
 configuration:
   map-underscore-to-camel-case: true
   log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
springdoc:
 swagger-ui:
   path: /swagger-ui.html
   enabled: true
 api-docs:
   enabled: true
   path: /v3/api-docs

1.4.2 数据库表设计

基于MySQL 8.0设计3张核心表,分别为订单表、库存表、本地事务状态表,SQL脚本如下:

CREATE DATABASE IF NOT EXISTS rocketmq_demo DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE rocketmq_demo;

-- 订单表
DROP TABLE IF EXISTS t_order;
CREATE TABLE t_order (
   id BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键ID',
   order_no VARCHAR(64) NOT NULL COMMENT '订单编号',
   user_id BIGINT NOT NULL COMMENT '用户ID',
   product_id BIGINT NOT NULL COMMENT '商品ID',
   quantity INT NOT NULL COMMENT '购买数量',
   total_amount DECIMAL(10,2) NOT NULL COMMENT '订单总金额',
   order_status TINYINT NOT NULL DEFAULT 0 COMMENT '订单状态:0-待支付,1-已支付,2-已取消,3-已完成',
   create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
   PRIMARY KEY (id),
   UNIQUE KEY uk_order_no (order_no),
   KEY idx_user_id (user_id),
   KEY idx_product_id (product_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='订单表';

-- 库存表
DROP TABLE IF EXISTS t_stock;
CREATE TABLE t_stock (
   id BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键ID',
   product_id BIGINT NOT NULL COMMENT '商品ID',
   stock_num INT NOT NULL DEFAULT 0 COMMENT '库存数量',
   create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
   PRIMARY KEY (id),
   UNIQUE KEY uk_product_id (product_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='库存表';

-- 本地事务状态表,用于事务回查
DROP TABLE IF EXISTS t_transaction_log;
CREATE TABLE t_transaction_log (
   id BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键ID',
   transaction_id VARCHAR(64) NOT NULL COMMENT '全局事务ID',
   business_type VARCHAR(32) NOT NULL COMMENT '业务类型',
   transaction_status TINYINT NOT NULL COMMENT '事务状态:0-执行中,1-已提交,2-已回滚',
   create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
   update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
   PRIMARY KEY (id),
   UNIQUE KEY uk_transaction_id (transaction_id),
   KEY idx_create_time (create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='本地事务状态表';

-- 初始化库存数据
INSERT INTO t_stock (product_id, stock_num) VALUES (1, 1000);

1.4.3 核心实体类定义

package com.jam.demo.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
* 订单实体类
* @author ken
*/

@Data
@TableName("t_order")
@Schema(name = "Order", description = "订单实体")
public class Order {
   @TableId(type = IdType.AUTO)
   @Schema(description = "主键ID")
   private Long id;

   @Schema(description = "订单编号")
   private String orderNo;

   @Schema(description = "用户ID")
   private Long userId;

   @Schema(description = "商品ID")
   private Long productId;

   @Schema(description = "购买数量")
   private Integer quantity;

   @Schema(description = "订单总金额")
   private BigDecimal totalAmount;

   @Schema(description = "订单状态:0-待支付,1-已支付,2-已取消,3-已完成")
   private Integer orderStatus;

   @Schema(description = "创建时间")
   private LocalDateTime createTime;

   @Schema(description = "更新时间")
   private LocalDateTime updateTime;
}

package com.jam.demo.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

import java.time.LocalDateTime;

/**
* 库存实体类
* @author ken
*/

@Data
@TableName("t_stock")
@Schema(name = "Stock", description = "库存实体")
public class Stock {
   @TableId(type = IdType.AUTO)
   @Schema(description = "主键ID")
   private Long id;

   @Schema(description = "商品ID")
   private Long productId;

   @Schema(description = "库存数量")
   private Integer stockNum;

   @Schema(description = "创建时间")
   private LocalDateTime createTime;

   @Schema(description = "更新时间")
   private LocalDateTime updateTime;
}

package com.jam.demo.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

import java.time.LocalDateTime;

/**
* 事务日志实体类
* @author ken
*/

@Data
@TableName("t_transaction_log")
@Schema(name = "TransactionLog", description = "事务日志实体")
public class TransactionLog {
   @TableId(type = IdType.AUTO)
   @Schema(description = "主键ID")
   private Long id;

   @Schema(description = "全局事务ID")
   private String transactionId;

   @Schema(description = "业务类型")
   private String businessType;

   @Schema(description = "事务状态:0-执行中,1-已提交,2-已回滚")
   private Integer transactionStatus;

   @Schema(description = "创建时间")
   private LocalDateTime createTime;

   @Schema(description = "更新时间")
   private LocalDateTime updateTime;
}

1.4.4 Mapper层定义

package com.jam.demo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Order;
import org.apache.ibatis.annotations.Mapper;

/**
* 订单Mapper
* @author ken
*/

@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}

package com.jam.demo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Stock;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

/**
* 库存Mapper
* @author ken
*/

@Mapper
public interface StockMapper extends BaseMapper<Stock> {

   /**
    * 扣减库存
    * @param productId 商品ID
    * @param quantity 扣减数量
    * @return 影响行数
    */

   @Update("UPDATE t_stock SET stock_num = stock_num - #{quantity} WHERE product_id = #{productId} AND stock_num >= #{quantity}")
   int deductStock(@Param("productId") Long productId, @Param("quantity") Integer quantity);
}

package com.jam.demo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.TransactionLog;
import org.apache.ibatis.annotations.Mapper;

/**
* 事务日志Mapper
* @author ken
*/

@Mapper
public interface TransactionLogMapper extends BaseMapper<TransactionLog> {
}

1.4.5 核心业务Service层

package com.jam.demo.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.Order;
import com.jam.demo.mapper.OrderMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

/**
* 订单服务
* @author ken
*/

@Service
@RequiredArgsConstructor
public class OrderService {

   private final OrderMapper orderMapper;

   /**
    * 保存订单
    * @param order 订单实体
    * @return 保存结果
    */

   public boolean saveOrder(Order order) {
       return orderMapper.insert(order) > 0;
   }

   /**
    * 根据订单号查询订单
    * @param orderNo 订单号
    * @return 订单实体
    */

   public Order getOrderByNo(String orderNo) {
       LambdaQueryWrapper<Order> wrapper = new LambdaQueryWrapper<Order>()
               .eq(Order::getOrderNo, orderNo);
       return orderMapper.selectOne(wrapper);
   }

   /**
    * 取消订单
    * @param orderNo 订单号
    * @return 取消结果
    */

   public boolean cancelOrder(String orderNo) {
       Order order = new Order();
       order.setOrderStatus(2);
       LambdaQueryWrapper<Order> wrapper = new LambdaQueryWrapper<Order>()
               .eq(Order::getOrderNo, orderNo)
               .eq(Order::getOrderStatus, 0);
       return orderMapper.update(order, wrapper) > 0;
   }
}

package com.jam.demo.service;

import com.jam.demo.mapper.StockMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

/**
* 库存服务
* @author ken
*/

@Service
@RequiredArgsConstructor
public class StockService {

   private final StockMapper stockMapper;

   /**
    * 扣减库存
    * @param productId 商品ID
    * @param quantity 扣减数量
    * @return 扣减结果
    */

   public boolean deductStock(Long productId, Integer quantity) {
       return stockMapper.deductStock(productId, quantity) > 0;
   }

   /**
    * 恢复库存
    * @param productId 商品ID
    * @param quantity 恢复数量
    * @return 恢复结果
    */

   public boolean restoreStock(Long productId, Integer quantity) {
       return stockMapper.deductStock(productId, -quantity) > 0;
   }
}

package com.jam.demo.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.TransactionLog;
import com.jam.demo.mapper.TransactionLogMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

/**
* 事务日志服务
* @author ken
*/

@Service
@RequiredArgsConstructor
public class TransactionLogService {

   private final TransactionLogMapper transactionLogMapper;

   /**
    * 保存事务日志
    * @param transactionLog 事务日志实体
    * @return 保存结果
    */

   public boolean saveTransactionLog(TransactionLog transactionLog) {
       return transactionLogMapper.insert(transactionLog) > 0;
   }

   /**
    * 更新事务状态
    * @param transactionId 事务ID
    * @param status 事务状态
    * @return 更新结果
    */

   public boolean updateTransactionStatus(String transactionId, Integer status) {
       TransactionLog transactionLog = new TransactionLog();
       transactionLog.setTransactionStatus(status);
       LambdaQueryWrapper<TransactionLog> wrapper = new LambdaQueryWrapper<TransactionLog>()
               .eq(TransactionLog::getTransactionId, transactionId);
       return transactionLogMapper.update(transactionLog, wrapper) > 0;
   }

   /**
    * 根据事务ID查询事务状态
    * @param transactionId 事务ID
    * @return 事务状态
    */

   public Integer getTransactionStatus(String transactionId) {
       LambdaQueryWrapper<TransactionLog> wrapper = new LambdaQueryWrapper<TransactionLog>()
               .eq(TransactionLog::getTransactionId, transactionId);
       TransactionLog transactionLog = transactionLogMapper.selectOne(wrapper);
       if (ObjectUtils.isEmpty(transactionLog)) {
           return null;
       }
       return transactionLog.getTransactionStatus();
   }
}

1.4.6 事务消息监听器与生产者配置

package com.jam.demo.config;

import com.jam.demo.listener.OrderTransactionListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* RocketMQ配置类
* @author ken
*/

@Slf4j
@Configuration
public class RocketMQConfig {

   @Value("${rocketmq.name-server}")
   private String nameServer;

   @Value("${rocketmq.producer.group}")
   private String producerGroup;

   private final OrderTransactionListener orderTransactionListener;

   public RocketMQConfig(OrderTransactionListener orderTransactionListener) {
       this.orderTransactionListener = orderTransactionListener;
   }

   /**
    * 事务消息生产者
    * @return TransactionMQProducer实例
    */

   @Bean
   public TransactionMQProducer transactionMQProducer() {
       ThreadPoolExecutor executor = new ThreadPoolExecutor(
               2,
               5,
               100,
               TimeUnit.SECONDS,
               new ArrayBlockingQueue<>(2000),
               r -> {
                   Thread thread = new Thread(r);
                   thread.setName("transaction-check-thread-%d");
                   return thread;
               }
       );
       TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
       producer.setNamesrvAddr(nameServer);
       producer.setExecutorService(executor);
       producer.setTransactionListener(orderTransactionListener);
       try {
           producer.start();
           log.info("TransactionMQProducer start success");
       } catch (Exception e) {
           log.error("TransactionMQProducer start failed", e);
           throw new RuntimeException(e);
       }
       return producer;
   }

   /**
    * RocketMQTemplate
    * @param transactionMQProducer 事务消息生产者
    * @return RocketMQTemplate实例
    */

   @Bean
   public RocketMQTemplate rocketMQTemplate(TransactionMQProducer transactionMQProducer) {
       RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
       rocketMQTemplate.setProducer(transactionMQProducer);
       return rocketMQTemplate;
   }
}

package com.jam.demo.listener;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.entity.Order;
import com.jam.demo.entity.TransactionLog;
import com.jam.demo.service.OrderService;
import com.jam.demo.service.StockService;
import com.jam.demo.service.TransactionLogService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import java.util.UUID;

/**
* 订单事务消息监听器
* @author ken
*/

@Slf4j
@RequiredArgsConstructor
public class OrderTransactionListener implements TransactionListener {

   private final OrderService orderService;
   private final StockService stockService;
   private final TransactionLogService transactionLogService;
   private final TransactionTemplate transactionTemplate;

   /**
    * 执行本地事务
    * @param msg 半消息
    * @param arg 业务参数
    * @return 本地事务状态
    */

   @Override
   public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
       String transactionId = msg.getTransactionId();
       if (!StringUtils.hasText(transactionId)) {
           transactionId = UUID.randomUUID().toString().replace("-", "");
       }
       log.info("开始执行本地事务,transactionId:{}", transactionId);
       try {
           Order order = JSON.parseObject(msg.getBody(), Order.class);
           String finalTransactionId = transactionId;
           Boolean result = transactionTemplate.execute(new TransactionCallback<Boolean>() {
               @Override
               public Boolean doInTransaction(TransactionStatus status) {
                   // 1. 保存事务日志,状态为执行中
                   TransactionLog transactionLog = new TransactionLog();
                   transactionLog.setTransactionId(finalTransactionId);
                   transactionLog.setBusinessType("ORDER_CREATE");
                   transactionLog.setTransactionStatus(0);
                   boolean saveLog = transactionLogService.saveTransactionLog(transactionLog);
                   if (!saveLog) {
                       status.setRollbackOnly();
                       return false;
                   }
                   // 2. 保存订单
                   boolean saveOrder = orderService.saveOrder(order);
                   if (!saveOrder) {
                       status.setRollbackOnly();
                       return false;
                   }
                   // 3. 扣减库存
                   boolean deductStock = stockService.deductStock(order.getProductId(), order.getQuantity());
                   if (!deductStock) {
                       status.setRollbackOnly();
                       return false;
                   }
                   // 4. 更新事务状态为已提交
                   boolean updateStatus = transactionLogService.updateTransactionStatus(finalTransactionId, 1);
                   if (!updateStatus) {
                       status.setRollbackOnly();
                       return false;
                   }
                   return true;
               }
           });
           if (ObjectUtils.isEmpty(result) || !result) {
               log.error("本地事务执行失败,transactionId:{}", transactionId);
               return LocalTransactionState.ROLLBACK_MESSAGE;
           }
           log.info("本地事务执行成功,transactionId:{}", transactionId);
           return LocalTransactionState.COMMIT_MESSAGE;
       } catch (Exception e) {
           log.error("本地事务执行异常,transactionId:{}", transactionId, e);
           return LocalTransactionState.ROLLBACK_MESSAGE;
       }
   }

   /**
    * 事务回查
    * @param msg 消息
    * @return 本地事务状态
    */

   @Override
   public LocalTransactionState checkLocalTransaction(MessageExt msg) {
       String transactionId = msg.getTransactionId();
       log.info("开始事务回查,transactionId:{}", transactionId);
       if (!StringUtils.hasText(transactionId)) {
           return LocalTransactionState.UNKNOW;
       }
       try {
           Integer status = transactionLogService.getTransactionStatus(transactionId);
           if (ObjectUtils.isEmpty(status)) {
               log.warn("事务状态不存在,transactionId:{}", transactionId);
               return LocalTransactionState.UNKNOW;
           }
           return switch (status) {
               case 1 -> LocalTransactionState.COMMIT_MESSAGE;
               case 2 -> LocalTransactionState.ROLLBACK_MESSAGE;
               default -> LocalTransactionState.UNKNOW;
           };
       } catch (Exception e) {
           log.error("事务回查异常,transactionId:{}", transactionId, e);
           return LocalTransactionState.UNKNOW;
       }
   }
}

1.4.7 消息消费者

package com.jam.demo.consumer;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
* 订单物流消息消费者
* @author ken
*/

@Slf4j
@Component
@RocketMQMessageListener(
       topic = "order_create_topic",
       consumerGroup = "order-logistics-consumer-group"
)
public class OrderLogisticsConsumer implements RocketMQListener<String> {

   @Override
   public void onMessage(String message) {
       log.info("收到订单创建消息,message:{}", message);
       Order order = JSON.parseObject(message, Order.class);
       // 初始化物流发货流程
       log.info("初始化订单物流流程,orderNo:{}", order.getOrderNo());
   }
}

1.4.8 接口层定义

package com.jam.demo.controller;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.entity.Order;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
* 订单事务消息接口
* @author ken
*/

@Slf4j
@RestController
@RequestMapping("/order")
@RequiredArgsConstructor
@Tag(name = "订单管理", description = "订单事务消息与定时消息相关接口")
public class OrderController {

   private final TransactionMQProducer transactionMQProducer;
   private final RocketMQTemplate rocketMQTemplate;

   private static final String ORDER_TOPIC = "order_create_topic";
   private static final String ORDER_CANCEL_TOPIC = "order_cancel_topic";
   // 30分钟延时,单位毫秒
   private static final long DELAY_TIME = 30 * 60 * 1000L;

   /**
    * 创建订单(事务消息)
    * @param order 订单实体
    * @return 订单创建结果
    */

   @PostMapping("/create")
   @Operation(summary = "创建订单", description = "基于RocketMQ事务消息创建订单,保证本地事务与消息发送的原子性")
   public String createOrder(@RequestBody Order order) {
       String transactionId = UUID.randomUUID().toString().replace("-", "");
       order.setOrderNo(transactionId);
       order.setOrderStatus(0);
       try {
           Message message = new Message(ORDER_TOPIC, JSON.toJSONBytes(order));
           message.setTransactionId(transactionId);
           SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
           log.info("事务消息发送结果,transactionId:{}, sendResult:{}", transactionId, sendResult);
           // 发送订单取消定时消息
           Message cancelMessage = new Message(ORDER_CANCEL_TOPIC, JSON.toJSONBytes(order));
           // 设置消息投递时间戳
           cancelMessage.setDeliverTimeMs(System.currentTimeMillis() + DELAY_TIME);
           SendResult cancelSendResult = rocketMQTemplate.getProducer().send(cancelMessage);
           log.info("定时消息发送结果,orderNo:{}, sendResult:{}", transactionId, cancelSendResult);
           return "订单创建成功,订单号:" + transactionId;
       } catch (Exception e) {
           log.error("订单创建失败,transactionId:{}", transactionId, e);
           return "订单创建失败:" + e.getMessage();
       }
   }
}

1.4.9 项目启动类

package com.jam.demo;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* 项目启动类
* @author ken
*/

@SpringBootApplication
@MapperScan("com.jam.demo.mapper")
public class RocketmqDemoApplication {

   public static void main(String[] args) {
       SpringApplication.run(RocketmqDemoApplication.class, args);
   }

}

1.5 事务消息的最佳实践与避坑指南

1.5.1 最佳实践

  1. 必须设计本地事务状态表:事务回查的核心依据是本地事务状态记录,避免回查时无法判断事务结果,同时可实现回查逻辑的幂等性。
  2. 本地事务执行逻辑必须幂等:事务回查可能会多次触发,本地事务的执行与回查逻辑必须保证幂等,避免重复处理导致数据错误。
  3. 合理设置事务回查参数:根据业务场景调整Broker的事务回查间隔(transactionCheckInterval)与最大回查次数(transactionCheckMax),避免频繁回查影响性能,或回查次数不足导致数据不一致。
  4. 半消息体携带完整业务数据:半消息必须包含执行本地事务所需的完整业务数据,避免本地事务执行时缺少参数导致执行失败。
  5. 事务ID全局唯一:使用UUID、雪花算法等生成全局唯一的事务ID,作为事务的唯一标识,避免事务冲突。

1.5.2 常见避坑点

  1. 本地事务执行成功,但二次确认发送失败:网络波动或服务宕机可能导致二次确认丢失,必须依赖事务回查机制兜底,不可省略回查逻辑。
  2. 回查逻辑返回UNKNOWN次数过多:若回查逻辑持续返回UNKNOWN,超过最大回查次数后,Broker会默认回滚消息,导致本地事务执行成功但消息未投递,数据不一致。
  3. 半消息主题权限不足:生产者必须拥有半消息系统主题RMQ_SYS_TRANS_HALF_TOPIC的写入权限,否则半消息发送会直接失败。
  4. 本地事务未使用事务模板:本地事务的多个操作(创建订单、扣减库存、记录事务日志)必须在同一个数据库事务中,避免部分操作成功部分失败。
  5. 消费者未做幂等处理:RocketMQ可能会重复投递消息,消费者必须基于订单号等唯一标识做幂等处理,避免重复初始化物流流程。

二、RocketMQ延时/定时消息:定时调度的高性能实现

2.1 延时消息解决的核心痛点

在业务开发中,我们经常会遇到需要延迟处理的业务场景,典型场景如:

  • 电商订单超时未支付,自动取消订单并释放库存
  • 用户注册后,延迟24小时发送欢迎邮件
  • 业务操作失败后,延迟一定时间进行重试
  • 定时触发的业务统计任务

传统的实现方式是基于数据库轮询,定时扫描符合条件的记录进行处理。这种方式存在严重的性能问题:高频轮询会给数据库带来巨大的IO压力,同时轮询间隔会导致业务处理延迟,无法做到精准触发。而RocketMQ的延时/定时消息,基于Broker端的定时调度机制,完美解决了这一痛点,实现了高性能、高精度的延迟任务处理。

2.2 4.x固定级别延时消息的实现原理

RocketMQ 4.x版本仅支持固定级别的延时消息,预设了18个延时级别,每个级别对应固定的延时时间,用户无法自定义任意延时时间。

2.2.1 固定延时级别对应关系

延时级别 延时时间 延时级别 延时时间
1 1秒 10 6分钟
2 5秒 11 7分钟
3 10秒 12 8分钟
4 30秒 13 9分钟
5 1分钟 14 10分钟
6 2分钟 15 20分钟
7 3分钟 16 30分钟
8 4分钟 17 1小时
9 5分钟 18 2小时

2.2.2 底层实现原理

4.x延时消息的核心实现逻辑如下:

  1. 生产者发送延时消息时,通过setDelayTimeLevel方法设置延时级别,消息发送到Broker。
  2. Broker收到延时消息后,不会将其写入真实的业务主题,而是写入系统级延时主题SCHEDULE_TOPIC_XXXX,每个延时级别对应该主题下的一个独立队列。
  3. Broker为每个延时级别队列启动一个独立的调度线程,持续轮询队列中的消息,判断消息是否到达投递时间。
  4. 当消息到达投递时间后,调度线程会将消息从延时主题转移到用户指定的真实业务主题,此时消息对消费者可见,会被正常投递。
  5. 消费者监听业务主题,收到消息后执行对应的延迟业务逻辑。

该实现方案的优点是简单稳定,缺点是灵活性极差,仅支持固定的18个延时级别,无法满足自定义延时时间的业务需求。

2.3 5.x任意时间定时消息的底层实现

RocketMQ 5.0版本正式推出了定时消息(Timed Message)能力,支持毫秒级精度的任意时间延时,最大支持40天的延时时长,彻底解决了4.x版本的灵活性问题。

2.3.1 底层实现原理

5.x定时消息的核心实现基于TimerStore定时索引存储多级时间轮调度机制,核心逻辑如下:

  1. 生产者发送定时消息时,通过setDeliverTimeMs方法设置消息的投递时间戳(当前时间+延时时长),消息发送到Broker。
  2. Broker收到定时消息后,首先将消息完整写入CommitLog进行持久化,保证消息的可靠性。
  3. Broker为定时消息构建定时索引,索引包含消息的投递时间、物理偏移量、主题等信息,将索引持久化到基于RocksDB实现的TimerStore中,保证索引的可靠性。
  4. 索引持久化成功后,Broker向生产者返回发送成功的ACK确认。
  5. Broker内部启动基于多级时间轮的调度线程,持续扫描TimerStore中的定时索引,判断是否有消息到达投递时间。
  6. 当消息到达投递时间后,调度线程会根据索引中的物理偏移量,从CommitLog中读取完整的消息内容,将其写入用户指定的真实业务主题的CommitLog中。
  7. 消息写入业务主题后,会被正常投递给监听该主题的消费者,完成定时消息的全流程处理。

完整的定时消息执行流程如下:

2.3.2 核心优势

  1. 超高灵活性:支持毫秒级精度的任意时间延时,最大支持40天的延时时长,完全满足各类业务场景的需求。
  2. 高性能:基于RocksDB的定时索引存储与多级时间轮调度机制,支持百万级的定时消息并发调度,性能远高于数据库轮询方案。
  3. 高可靠:消息与定时索引均进行持久化存储,Broker主从高可用架构保证消息不会丢失,即使Broker宕机重启,也能恢复定时索引继续调度。
  4. 低侵入性:业务代码仅需设置投递时间戳,无需关注底层调度逻辑,侵入性极低。

2.4 两种延时方案的核心差异对比

为了清晰区分4.x延时消息与5.x定时消息的差异,避免使用时混淆,核心对比如下:

特性维度 4.x固定级别延时消息 5.x任意时间定时消息
延时时间 仅支持18个固定级别,不可自定义 支持毫秒级任意时间,最大40天
存储实现 基于固定主题SCHEDULE_TOPIC_XXXX的队列存储 基于CommitLog+TimerStore(RocksDB)的索引存储
调度机制 单级别单线程轮询调度 多级时间轮高效调度
时间精度 秒级 毫秒级
扩展性 差,新增级别需修改Broker配置并重启 好,无需修改配置,直接使用
版本支持 所有4.x版本 5.0及以上版本

2.5 定时消息的生产级实战

本次实战基于经典的电商订单超时取消场景,用户下单后发送30分钟的定时消息,30分钟后检查订单支付状态,若仍为待支付,则自动取消订单并释放库存。

2.5.1 Broker配置

使用5.x定时消息前,需要在Broker的配置文件broker.conf中开启定时消息功能,核心配置如下:

# 开启定时消息功能

enableTimer=true

# 定时消息最大延时时长,单位毫秒,默认40天,此处设置为30天

timerMaxDelay=2592000000

# 定时消息调度线程数,默认4

timerWheelNum=4

配置完成后重启Broker,即可使用定时消息能力。

2.5.2 定时消息消费者

package com.jam.demo.consumer;

import com.alibaba.fastjson2.JSON;
import com.jam.demo.entity.Order;
import com.jam.demo.service.OrderService;
import com.jam.demo.service.StockService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.ObjectUtils;

/**
* 订单取消定时消息消费者
* @author ken
*/

@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
       topic = "order_cancel_topic",
       consumerGroup = "order-cancel-consumer-group"
)
public class OrderCancelConsumer implements RocketMQListener<String> {

   private final OrderService orderService;
   private final StockService stockService;
   private final TransactionTemplate transactionTemplate;

   @Override
   public void onMessage(String message) {
       log.info("收到订单取消定时消息,message:{}", message);
       Order order = JSON.parseObject(message, Order.class);
       String orderNo = order.getOrderNo();
       try {
           // 幂等校验:查询订单状态
           Order existOrder = orderService.getOrderByNo(orderNo);
           if (ObjectUtils.isEmpty(existOrder)) {
               log.warn("订单不存在,orderNo:{}", orderNo);
               return;
           }
           if (existOrder.getOrderStatus() != 0) {
               log.info("订单已支付或已取消,无需处理,orderNo:{}", orderNo);
               return;
           }
           // 执行订单取消与库存恢复,使用编程式事务
           Boolean result = transactionTemplate.execute(new TransactionCallback<Boolean>() {
               @Override
               public Boolean doInTransaction(TransactionStatus status) {
                   // 1. 取消订单
                   boolean cancelOrder = orderService.cancelOrder(orderNo);
                   if (!cancelOrder) {
                       status.setRollbackOnly();
                       return false;
                   }
                   // 2. 恢复库存
                   boolean restoreStock = stockService.restoreStock(order.getProductId(), order.getQuantity());
                   if (!restoreStock) {
                       status.setRollbackOnly();
                       return false;
                   }
                   return true;
               }
           });
           if (ObjectUtils.isEmpty(result) || !result) {
               log.error("订单取消失败,orderNo:{}", orderNo);
               throw new RuntimeException("订单取消失败,触发重试");
           }
           log.info("订单超时取消成功,orderNo:{}", orderNo);
       } catch (Exception e) {
           log.error("订单取消处理异常,orderNo:{}", orderNo, e);
           throw e;
       }
   }
}

2.6 定时消息的最佳实践与避坑指南

2.6.1 最佳实践

  1. 消费者必须实现幂等处理:RocketMQ的定时消息可能会重复投递,消费者必须基于订单号等唯一标识做幂等校验,避免重复取消订单、重复恢复库存等问题。
  2. 合理设置延时时长:定时消息的最大延时时长不能超过Broker配置的timerMaxDelay,否则消息会被Broker拒绝。
  3. 投递时间戳必须是未来时间setDeliverTimeMs方法设置的是消息的投递时间戳,必须是大于当前时间的未来时间,否则消息会被立即投递。
  4. 区分使用4.x与5.x的API:4.x延时消息使用setDelayTimeLevel设置级别,5.x定时消息使用setDeliverTimeMs设置时间戳,不可混淆使用。
  5. 消费失败主动抛出异常触发重试:若消费过程中出现异常,需主动抛出异常,RocketMQ会根据重试策略重新投递消息,避免业务处理失败。

2.6.2 常见避坑点

  1. Broker未开启定时消息功能:5.x定时消息需要在Broker配置中设置enableTimer=true,否则定时消息会被当成普通消息立即投递。
  2. 延时时间超过最大限制:若设置的延时时间超过timerMaxDelay配置的最大值,消息会被Broker直接拒绝,发送失败。
  3. 混淆延时级别与时间戳API:4.x的setDelayTimeLevel与5.x的setDeliverTimeMs不可同时使用,同时设置时只有setDeliverTimeMs生效。
  4. 未处理消息重复投递:未做幂等处理会导致重复消费,引发数据错误,比如重复恢复库存导致库存超卖。
  5. 定时消息调度延迟:Broker节点宕机、时间轮调度线程阻塞等情况会导致定时消息调度延迟,需配置Broker主从高可用,避免单点故障。

三、总结

RocketMQ的事务消息与延时消息,是其区别于其他消息中间件的核心差异化能力,分别解决了分布式场景下的两大核心痛点:

  • 事务消息基于两阶段提交+事务回查机制,以极低的侵入性解决了「本地事务与消息发送的原子性」问题,实现了分布式场景下的最终一致性。
  • 5.x版本的定时消息基于TimerStore与时间轮机制,实现了毫秒级精度的任意时间延时,完美替代了传统的数据库轮询方案,大幅提升了延迟任务的处理性能。
目录
相关文章
|
21天前
|
消息中间件 存储 Java
击穿 Kafka 高可用核心:分区副本、ISR 机制与底层原理全链路拆解
本文深度解析Kafka高可用核心机制:从分区存储、副本分配、ISR同步模型,到HW/LEO语义、Leader选举与故障转移,结合代码实战与避坑指南,助你彻底掌握数据不丢失、低延迟、强一致的生产级实践。
161 3
|
21天前
|
存储 自然语言处理 算法
Elasticsearch 核心命脉:倒排索引、分片机制与全链路高性能调优实战
本文深度解析Elasticsearch三大核心:倒排索引(Term Dict/Posting List/FST压缩)、分片机制(主/副本协同、路由算法)及全链路调优(写入/查询/分片/JVM),辅以ES 8.x实战代码,助开发者突破性能瓶颈,构建高可用、高性能搜索系统。
317 1
|
24天前
|
关系型数据库 MySQL Java
分布式事务终极指南:2PC/XA/TCC/SAGA 从底层原理到生产选型全拆解
本文系统解析分布式事务四大主流方案:XA/2PC(强一致但性能差)、TCC(高并发柔性事务)、SAGA(长事务最终一致)及理论基石(ACID/CAP/BASE),涵盖原理、流程、实战代码、优劣对比与生产选型标准,助你深入掌握核心逻辑。
478 3
|
23天前
|
SQL 关系型数据库 Java
吃透 Seata 分布式事务:原理拆解 + 生产级落地 + 全场景避坑实战
本文深度解析阿里开源分布式事务框架Seata:剖析TC/TM/RM三大角色与全局事务流程,详解AT(零侵入)、TCC(强控制)、SAGA(长事务)、XA(强一致)四大模式原理、适用场景及核心对比,并通过电商下单实战演示AT模式落地,最后系统梳理生产环境高可用、SQL限制、幂等处理、XID传播等全链路避坑指南。
353 4
|
存储 算法 Nacos
Nacos支持哪些协议
Nacos支持哪些协议
|
1月前
|
缓存 Java 开发者
吃透 Spring Bean 生命周期:从源码底层到实战落地
本文深度解析Spring 6.2.3 Bean生命周期,涵盖BeanDefinition注册、实例化、属性填充、Aware回调、BeanPostProcessor前后置处理、初始化(@PostConstruct/InitializingBean/init-method)、AOP代理、单例缓存及销毁全流程,结合源码、实战示例与生产问题排查,助你彻底掌握IoC核心机制。
496 3
|
26天前
|
算法 Java 关系型数据库
JVM GC 深度破局:G1 与 ZGC 底层原理、生产调优全链路实战
本文深度解析JDK17主流GC:G1(默认,兼顾吞吐与延迟)与ZGC(革命性低延迟,STW&lt;1ms)。涵盖核心理论(可达性分析、三色标记)、内存布局、全流程机制(SATB写屏障 vs 染色指针+读屏障)、关键参数调优及生产选型指南,助你精准定位性能瓶颈,高效优化JVM。
472 4
|
1月前
|
SQL Java 关系型数据库
Spring 声明式事务 万字详解(通俗易懂)
Spring 第七节 声明式事务,万字详解!
211 4
|
22天前
|
消息中间件 存储 Java
消息队列选型终极指南:Kafka、RocketMQ、RabbitMQ 底层原理与场景化选型全解
本文深度解析消息队列核心原理与三大主流MQ(RabbitMQ、RocketMQ、Kafka)的架构、特性、代码实现及选型策略。涵盖异步解耦、流量削峰、数据分发三大价值,At-most/least/exactly-once投递语义,推拉模式差异,事务消息实现对比,并提供场景化选型指南与生产避坑实践。
252 1
|
19天前
|
存储 NoSQL Java
扛住百万级 QPS:高并发架构核心三板斧全解
本文系统阐述高并发架构三大核心支柱:流量削峰(前端拦截、网关限流、应用缓冲、分布式限流)、异步化(本地CompletableFuture与RocketMQ分布式解耦)及水平扩展(无状态化、服务注册发现、读写分离与分库分表),并以秒杀系统为例实战整合,兼顾避坑指南与概念辨析。
207 3

热门文章

最新文章