消息队列四大核心消息类型深度解析:普通、顺序、事务、定时消息原理与实战

简介: 本文深入剖析了分布式系统中消息队列的四大核心消息类型。普通消息作为基础模型实现异步通信;顺序消息通过分区有序机制保证关键业务流程的顺序性;事务消息基于两阶段提交解决分布式事务问题;定时消息则支持延迟任务执行。文章从原理、实现到应用场景,结合RocketMQ实例代码(包括事务消息与MySQL的整合)进行了全面讲解,并提供了选型对比建议。这四种消息类型各具特点,开发者应根据业务需求在解耦、顺序保证、事务一致性和延迟执行等维度进行合理选择,以构建高性能、高可用的分布式系统。

在分布式系统架构中,消息队列作为异步通信的核心组件,承担着解耦、削峰、异步化的关键作用。而消息队列的核心价值,往往通过不同类型的消息模式来体现。普通消息、顺序消息、事务消息、定时消息作为主流消息队列(如RocketMQ、Kafka)的四大核心消息类型,各自解决着不同场景下的业务痛点。本文将从底层原理、应用场景、代码实现三个维度,全方位剖析这四种消息类型,让你既能理解其设计思想,又能落地到实际开发中。

一、普通消息:消息队列的基石

1.1 定义与核心特性

普通消息(Normal Message)是消息队列中最基础、最常用的消息类型。它遵循“生产者发送-队列存储-消费者接收”的基本模型,不保证消息的顺序性,也不提供额外的事务或定时能力,仅保证消息的可靠传递(取决于队列的配置)。

1.2 底层原理

普通消息的处理流程遵循典型的“生产-存储-消费”模型:

image.png

  • 生产阶段:生产者将消息序列化后,通过TCP连接发送到消息队列服务器,服务器接收到消息后,将其存储到磁盘或内存中,并返回确认响应。
  • 存储阶段:消息队列服务器将消息按照Topic进行分类存储,支持持久化(如RocketMQ的CommitLog)或非持久化存储。
  • 消费阶段:消费者通过长轮询或推送模式获取消息,处理完成后发送ACK确认,服务器收到ACK后删除消息;若消费者处理失败,消息会被重新投递(取决于重试策略)。

1.3 实战案例(基于RocketMQ 5.1.4)

1.3.1 环境准备

pom.xml依赖配置

<dependencies>
   <!-- RocketMQ客户端 -->
   <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client</artifactId>
       <version>5.1.4</version>
   </dependency>
   <!-- Lombok -->
   <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
       <version>1.18.30</version>
       <scope>provided</scope>
   </dependency>
   <!-- Spring Context -->
   <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-context</artifactId>
       <version>6.1.2</version>
   </dependency>
   <!-- Fastjson2 -->
   <dependency>
       <groupId>com.alibaba.fastjson2</groupId>
       <artifactId>fastjson2</artifactId>
       <version>2.0.48</version>
   </dependency>
</dependencies>

1.3.2 生产者实现

package com.jam.demo.mq.normal;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.util.ObjectUtils;

/**
* 普通消息生产者
* @author ken
*/

@Slf4j
public class NormalMessageProducer {
   /**
    * 生产者组名
    */

   private static final String PRODUCER_GROUP = "normal_producer_group";
   /**
    * NameServer地址
    */

   private static final String NAMESRV_ADDR = "localhost:9876";
   /**
    * Topic名称
    */

   private static final String TOPIC = "normal_topic";

   public static void main(String[] args) throws Exception {
       // 创建生产者实例
       DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
       // 设置NameServer地址
       producer.setNamesrvAddr(NAMESRV_ADDR);
       // 启动生产者
       producer.start();

       try {
           for (int i = 0; i < 5; i++) {
               // 构建消息内容
               String messageContent = "普通消息示例-" + i;
               Message message = new Message(
                       TOPIC,
                       "tagA",
                       messageContent.getBytes("UTF-8")
               );
               // 发送消息
               var sendResult = producer.send(message);
               if (ObjectUtils.isEmpty(sendResult)) {
                   log.error("发送消息失败,结果为空");
                   continue;
               }
               log.info("发送消息成功,消息ID:{},发送状态:{}",
                       sendResult.getMsgId(), sendResult.getSendStatus());
           }
       } catch (Exception e) {
           log.error("发送消息异常", e);
       } finally {
           // 关闭生产者
           producer.shutdown();
       }
   }
}

1.3.3 消费者实现

package com.jam.demo.mq.normal;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
* 普通消息消费者
* @author ken
*/

@Slf4j
public class NormalMessageConsumer {
   /**
    * 消费者组名
    */

   private static final String CONSUMER_GROUP = "normal_consumer_group";
   /**
    * NameServer地址
    */

   private static final String NAMESRV_ADDR = "localhost:9876";
   /**
    * Topic名称
    */

   private static final String TOPIC = "normal_topic";

   public static void main(String[] args) throws Exception {
       // 创建消费者实例
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
       // 设置NameServer地址
       consumer.setNamesrvAddr(NAMESRV_ADDR);
       // 订阅Topic
       consumer.subscribe(TOPIC, "tagA");
       // 注册消息监听器
       consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
           if (CollectionUtils.isEmpty(msgs)) {
               log.warn("接收到的消息列表为空");
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
           for (MessageExt msg : msgs) {
               try {
                   String content = new String(msg.getBody(), "UTF-8");
                   log.info("消费消息:{},消息ID:{}", content, msg.getMsgId());
               } catch (Exception e) {
                   log.error("消费消息异常", e);
                   // 消费失败,重试
                   return ConsumeConcurrentlyStatus.RECONSUME_LATER;
               }
           }
           // 消费成功
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       });
       // 启动消费者
       consumer.start();
       log.info("消费者启动成功");
   }
}

1.3.4 代码说明

  • 生产者通过DefaultMQProducer发送消息,指定Topic和Tag,消息发送成功后返回SendResult,包含消息ID和发送状态。
  • 消费者通过DefaultMQPushConsumer订阅Topic,使用并发消费模式(MessageListenerConcurrently),消费完成后返回CONSUME_SUCCESS确认消费,失败则返回RECONSUME_LATER触发重试。
  • 使用ObjectUtilsCollectionUtils进行空值判断,符合阿里巴巴开发手册规范。

1.4 应用场景

普通消息适用于大多数不需要严格顺序、不需要事务保证的场景,例如:

  • 用户注册后的短信/邮件通知
  • 日志收集与分析
  • 非核心业务的异步处理(如积分更新)

二、顺序消息:保证消息的有序性

2.1 定义与核心特性

顺序消息(Ordered Message)是指消息的消费顺序与生产顺序完全一致的消息类型。它解决了普通消息在并发消费时可能出现的顺序错乱问题,适用于对业务流程顺序有严格要求的场景(如订单创建、支付、发货的流程)。

2.2 底层原理

顺序消息的实现依赖于“分区有序”的设计思想:

image.png

  • 生产阶段:生产者发送消息时,指定一个ShardingKey(如订单ID),消息队列服务器根据ShardingKey的哈希值,将同一ShardingKey的消息分配到同一个消息队列中。
  • 存储阶段:消息在队列中按照生产顺序存储,保证队列内的消息有序。
  • 消费阶段:每个消息队列只分配给一个消费者线程处理,确保队列内的消息被顺序消费。

RocketMQ中,顺序消息分为全局顺序消息分区顺序消息

  • 全局顺序消息:一个Topic只有一个队列,所有消息都在同一个队列中顺序消费(性能较低)。
  • 分区顺序消息:一个Topic有多个队列,同一ShardingKey的消息进入同一个队列,保证分区内顺序(性能较高)。

2.3 实战案例(基于RocketMQ 5.1.4)

2.3.1 生产者实现

package com.jam.demo.mq.order;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.util.ObjectUtils;

import java.util.List;

/**
* 顺序消息生产者
* @author ken
*/

@Slf4j
public class OrderMessageProducer {
   /**
    * 生产者组名
    */

   private static final String PRODUCER_GROUP = "order_producer_group";
   /**
    * NameServer地址
    */

   private static final String NAMESRV_ADDR = "localhost:9876";
   /**
    * Topic名称
    */

   private static final String TOPIC = "order_topic";

   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
       producer.setNamesrvAddr(NAMESRV_ADDR);
       producer.start();

       // 模拟订单流程:创建->支付->发货
       String[] orderIds = {"order_001", "order_002", "order_003"};
       String[] actions = {"创建订单", "支付订单", "发货"};

       try {
           for (String orderId : orderIds) {
               for (String action : actions) {
                   String content = String.format("订单[%s]:%s", orderId, action);
                   Message message = new Message(
                           TOPIC,
                           "tag_order",
                           content.getBytes("UTF-8")
                   );
                   // 发送顺序消息,指定ShardingKey为orderId
                   SendResult sendResult = producer.send(
                           message,
                           (MessageQueueSelector) (mqs, msg, arg) -> {
                               String shardingKey = (String) arg;
                               int hashCode = Math.abs(shardingKey.hashCode());
                               return mqs.get(hashCode % mqs.size());
                           },
                           orderId
                   );
                   if (ObjectUtils.isEmpty(sendResult)) {
                       log.error("发送顺序消息失败,订单ID:{}", orderId);
                       continue;
                   }
                   log.info("发送顺序消息成功,订单ID:{},队列ID:{},内容:{}",
                           orderId, sendResult.getMessageQueue().getQueueId(), content);
               }
           }
       } catch (Exception e) {
           log.error("发送顺序消息异常", e);
       } finally {
           producer.shutdown();
       }
   }
}

2.3.2 消费者实现

package com.jam.demo.mq.order;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
* 顺序消息消费者
* @author ken
*/

@Slf4j
public class OrderMessageConsumer {
   /**
    * 消费者组名
    */

   private static final String CONSUMER_GROUP = "order_consumer_group";
   /**
    * NameServer地址
    */

   private static final String NAMESRV_ADDR = "localhost:9876";
   /**
    * Topic名称
    */

   private static final String TOPIC = "order_topic";

   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
       consumer.setNamesrvAddr(NAMESRV_ADDR);
       consumer.subscribe(TOPIC, "tag_order");

       // 注册顺序消息监听器
       consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
           if (CollectionUtils.isEmpty(msgs)) {
               log.warn("接收到的顺序消息列表为空");
               return ConsumeOrderlyStatus.SUCCESS;
           }
           // 锁定当前队列,保证顺序消费
           context.setAutoCommit(true);
           for (MessageExt msg : msgs) {
               try {
                   String content = new String(msg.getBody(), "UTF-8");
                   log.info("消费顺序消息:{},订单ID:{},队列ID:{}",
                           content, msg.getKeys(), msg.getQueueId());
               } catch (Exception e) {
                   log.error("消费顺序消息异常", e);
                   // 消费失败,暂停当前队列的消费
                   context.setSuspendCurrentQueueTimeMillis(1000);
                   return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
               }
           }
           return ConsumeOrderlyStatus.SUCCESS;
       });

       consumer.start();
       log.info("顺序消息消费者启动成功");
   }
}

2.3.3 代码说明

  • 生产者通过MessageQueueSelector根据订单ID(ShardingKey)选择队列,保证同一订单的消息进入同一个队列。
  • 消费者使用MessageListenerOrderly进行顺序消费,每个队列由一个线程处理,确保消费顺序与生产顺序一致。
  • 消费失败时,通过SUSPEND_CURRENT_QUEUE_A_MOMENT暂停当前队列的消费,避免消息堆积。

2.4 应用场景

顺序消息适用于对业务流程顺序有严格要求的场景,例如:

  • 订单处理流程(创建→支付→发货→完成)
  • 物流轨迹更新(下单→接单→揽收→运输→派送→签收)
  • 金融交易流水(充值→扣款→记账)

2.5 注意事项

  • 顺序消息的性能取决于队列数量,队列越多,并发度越高,但需要保证同一ShardingKey的消息进入同一队列。
  • 消费失败会导致整个队列暂停消费,因此需要合理设置重试策略和暂停时间。

三、事务消息:解决分布式事务问题

3.1 定义与核心特性

事务消息(Transactional Message)是一种支持分布式事务的消息类型,它保证“本地事务执行”和“消息发送”的原子性,即两者要么都成功,要么都失败。事务消息解决了分布式系统中跨服务的数据一致性问题。

3.2 底层原理

事务消息的实现基于“两阶段提交”和“消息回查”机制:

image.png

  • 第一阶段(发送半消息):生产者发送一条“半消息”到消息队列服务器,半消息对消费者不可见。
  • 第二阶段(执行本地事务):生产者执行本地事务(如数据库操作),并根据事务结果发送“提交”或“回滚”指令。
  • 消息回查:若消息队列服务器未收到提交/回滚指令,会定时向生产者发起回查请求,生产者根据本地事务状态返回结果,服务器根据结果决定提交或回滚消息。

3.3 实战案例(基于RocketMQ 5.1.4 + MySQL 8.0 + MyBatis-Plus 3.5.5)

3.3.1 环境准备

pom.xml新增依赖

<!-- MySQL驱动 -->
<dependency>
   <groupId>com.mysql</groupId>
   <artifactId>mysql-connector-j</artifactId>
   <version>8.3.0</version>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
   <groupId>com.baomidou</groupId>
   <artifactId>mybatis-plus-boot-starter</artifactId>
   <version>3.5.5</version>
</dependency>
<!-- Spring Boot -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
   <version>3.2.0</version>
</dependency>
<!-- Swagger3 -->
<dependency>
   <groupId>io.springfox</groupId>
   <artifactId>springfox-boot-starter</artifactId>
   <version>3.0.0</version>
</dependency>

3.3.2 数据库表设计

CREATE TABLE `t_order` (
 `id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
 `order_no` varchar(64) NOT NULL COMMENT '订单编号',
 `user_id` bigint NOT NULL COMMENT '用户ID',
 `amount` decimal(10,2) NOT NULL COMMENT '订单金额',
 `status` tinyint NOT NULL DEFAULT '0' COMMENT '订单状态:0-创建中,1-已创建,2-已取消',
 `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
 `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
 PRIMARY KEY (`id`),
 UNIQUE KEY `uk_order_no` (`order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';

3.3.3 实体类与Mapper

Order实体类

package com.jam.demo.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
* 订单实体类
* @author ken
*/

@Data
@TableName("t_order")
public class Order {
   /**
    * 订单ID
    */

   @TableId(type = IdType.AUTO)
   private Long id;
   /**
    * 订单编号
    */

   private String orderNo;
   /**
    * 用户ID
    */

   private Long userId;
   /**
    * 订单金额
    */

   private BigDecimal amount;
   /**
    * 订单状态:0-创建中,1-已创建,2-已取消
    */

   private Integer status;
   /**
    * 创建时间
    */

   private LocalDateTime createTime;
   /**
    * 更新时间
    */

   private LocalDateTime updateTime;
}

OrderMapper接口

package com.jam.demo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Order;
import org.apache.ibatis.annotations.Mapper;

/**
* 订单Mapper
* @author ken
*/

@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}

3.3.4 事务消息生产者

package com.jam.demo.mq.transaction;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.Order;
import com.jam.demo.mapper.OrderMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.UUID;

/**
* 事务消息生产者
* @author ken
*/

@Slf4j
@Component
public class TransactionMessageProducer {
   /**
    * 生产者组名
    */

   private static final String PRODUCER_GROUP = "transaction_producer_group";
   /**
    * NameServer地址
    */

   private static final String NAMESRV_ADDR = "localhost:9876";
   /**
    * Topic名称
    */

   private static final String TOPIC = "transaction_topic";

   private final TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);

   @Autowired
   private OrderMapper orderMapper;

   /**
    * 初始化生产者
    */

   @PostConstruct
   public void init() throws Exception {
       producer.setNamesrvAddr(NAMESRV_ADDR);
       // 设置事务监听器
       producer.setTransactionListener(new TransactionListener() {
           /**
            * 执行本地事务
            */

           @Override
           public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
               String orderNo = (String) arg;
               if (!StringUtils.hasText(orderNo, "订单编号不能为空")) {
                   log.error("订单编号为空,回滚事务");
                   return LocalTransactionState.ROLLBACK_MESSAGE;
               }
               try {
                   // 创建订单(本地事务)
                   Order order = new Order();
                   order.setOrderNo(orderNo);
                   order.setUserId(1001L);
                   order.setAmount(new BigDecimal("99.00"));
                   order.setStatus(1);
                   order.setCreateTime(LocalDateTime.now());
                   order.setUpdateTime(LocalDateTime.now());
                   orderMapper.insert(order);
                   log.info("本地事务执行成功,订单编号:{}", orderNo);
                   return LocalTransactionState.COMMIT_MESSAGE;
               } catch (Exception e) {
                   log.error("本地事务执行失败,订单编号:{}", orderNo, e);
                   return LocalTransactionState.ROLLBACK_MESSAGE;
               }
           }

           /**
            * 事务回查
            */

           @Override
           public LocalTransactionState checkLocalTransaction(MessageExt msg) {
               String orderNo = msg.getKeys();
               if (!StringUtils.hasText(orderNo)) {
                   log.error("回查事务时订单编号为空");
                   return LocalTransactionState.ROLLBACK_MESSAGE;
               }
               // 查询订单状态
               Order order = orderMapper.selectOne(new LambdaQueryWrapper<Order>()
                       .eq(Order::getOrderNo, orderNo));
               if (order == null) {
                   log.warn("订单不存在,回滚事务,订单编号:{}", orderNo);
                   return LocalTransactionState.ROLLBACK_MESSAGE;
               }
               if (order.getStatus() == 1) {
                   log.info("回查事务:订单已创建,提交消息,订单编号:{}", orderNo);
                   return LocalTransactionState.COMMIT_MESSAGE;
               } else {
                   log.warn("回查事务:订单状态异常,回滚事务,订单编号:{}", orderNo);
                   return LocalTransactionState.ROLLBACK_MESSAGE;
               }
           }
       });
       producer.start();
       log.info("事务消息生产者启动成功");
   }

   /**
    * 发送事务消息
    * @param orderNo 订单编号
    */

   public void sendTransactionMessage(String orderNo) throws Exception {
       String messageContent = "创建订单:" + orderNo;
       Message message = new Message(
               TOPIC,
               "tag_transaction",
               orderNo,
               messageContent.getBytes("UTF-8")
       );
       producer.sendMessageInTransaction(message, orderNo);
       log.info("发送事务消息成功,订单编号:{}", orderNo);
   }

   /**
    * 销毁生产者
    */

   @PreDestroy
   public void destroy() {
       producer.shutdown();
       log.info("事务消息生产者已关闭");
   }
}

3.3.5 事务消息消费者

package com.jam.demo.mq.transaction;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;

/**
* 事务消息消费者
* @author ken
*/

@Slf4j
@Component
public class TransactionMessageConsumer {
   /**
    * 消费者组名
    */

   private static final String CONSUMER_GROUP = "transaction_consumer_group";
   /**
    * NameServer地址
    */

   private static final String NAMESRV_ADDR = "localhost:9876";
   /**
    * Topic名称
    */

   private static final String TOPIC = "transaction_topic";

   private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);

   /**
    * 初始化消费者
    */

   @PostConstruct
   public void init() throws Exception {
       consumer.setNamesrvAddr(NAMESRV_ADDR);
       consumer.subscribe(TOPIC, "tag_transaction");
       consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
           if (CollectionUtils.isEmpty(msgs)) {
               log.warn("接收到的事务消息列表为空");
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
           for (MessageExt msg : msgs) {
               try {
                   String content = new String(msg.getBody(), "UTF-8");
                   String orderNo = msg.getKeys();
                   log.info("消费事务消息:{},订单编号:{}", content, orderNo);
                   // 执行后续业务逻辑(如扣减库存)
               } catch (Exception e) {
                   log.error("消费事务消息异常", e);
                   return ConsumeConcurrentlyStatus.RECONSUME_LATER;
               }
           }
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       });
       consumer.start();
       log.info("事务消息消费者启动成功");
   }

   /**
    * 销毁消费者
    */

   @PreDestroy
   public void destroy() {
       consumer.shutdown();
       log.info("事务消息消费者已关闭");
   }
}

3.3.6 测试接口

package com.jam.demo.controller;

import com.jam.demo.mq.transaction.TransactionMessageProducer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
* 订单控制器
* @author ken
*/

@Slf4j
@RestController
@RequestMapping("/order")
@Api(tags = "订单管理")
public class OrderController {

   @Autowired
   private TransactionMessageProducer transactionMessageProducer;

   /**
    * 创建订单
    */

   @PostMapping("/create")
   @ApiOperation("创建订单")
   public String createOrder() {
       String orderNo = "ORDER_" + UUID.randomUUID().toString().replace("-", "");
       try {
           transactionMessageProducer.sendTransactionMessage(orderNo);
           return "创建订单成功,订单编号:" + orderNo;
       } catch (Exception e) {
           log.error("创建订单失败", e);
           return "创建订单失败:" + e.getMessage();
       }
   }
}

3.3.7 代码说明

  • 生产者通过TransactionMQProducer发送事务消息,实现TransactionListener接口处理本地事务和事务回查。
  • 本地事务执行订单创建操作,成功则提交消息,失败则回滚消息。
  • 事务回查机制确保在网络异常时,消息队列服务器能主动查询本地事务状态,保证数据一致性。
  • 消费者消费提交后的消息,执行后续业务逻辑(如扣减库存)。

3.4 应用场景

事务消息适用于需要保证分布式事务一致性的场景,例如:

  • 电商下单(创建订单+扣减库存)
  • 金融转账(扣款+收款)
  • 支付回调(更新订单状态+发送通知)

3.5 注意事项

  • 事务消息的回查机制需要生产者提供幂等性实现,避免重复执行本地事务。
  • 本地事务的执行时间不宜过长,否则会导致消息回查频繁,影响性能。

四、定时消息:延迟执行的消息

4.1 定义与核心特性

定时消息(Delayed Message)又称延迟消息,是指消息发送后,消费者不会立即收到,而是在指定的延迟时间后才被消费的消息类型。定时消息支持在未来的某个时间点触发业务逻辑,适用于延时任务场景。

4.2 底层原理

定时消息的实现依赖于“延迟队列”和“时间轮”机制:

image.png

  • 生产阶段:生产者发送消息时,指定延迟级别(如RocketMQ支持18个延迟级别,对应不同的延迟时间),消息被存储到延迟队列中。
  • 存储阶段:延迟队列中的消息不会被消费者立即消费,而是由定时任务定期扫描。
  • 触发阶段:当消息到达指定的延迟时间后,定时任务将消息从延迟队列转移到正式队列,消费者才能接收到消息。

RocketMQ的延迟级别配置如下:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

4.3 实战案例(基于RocketMQ 5.1.4)

4.3.1 生产者实现

package com.jam.demo.mq.delay;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.util.ObjectUtils;

/**
* 定时消息生产者
* @author ken
*/

@Slf4j
public class DelayMessageProducer {
   /**
    * 生产者组名
    */

   private static final String PRODUCER_GROUP = "delay_producer_group";
   /**
    * NameServer地址
    */

   private static final String NAMESRV_ADDR = "localhost:9876";
   /**
    * Topic名称
    */

   private static final String TOPIC = "delay_topic";

   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
       producer.setNamesrvAddr(NAMESRV_ADDR);
       producer.start();

       try {
           String messageContent = "定时消息示例:5秒后执行";
           Message message = new Message(
                   TOPIC,
                   "tag_delay",
                   messageContent.getBytes("UTF-8")
           );
           // 设置延迟级别为2(对应5秒)
           message.setDelayTimeLevel(2);
           var sendResult = producer.send(message);
           if (ObjectUtils.isEmpty(sendResult)) {
               log.error("发送定时消息失败");
               return;
           }
           log.info("发送定时消息成功,消息ID:{},延迟级别:{}",
                   sendResult.getMsgId(), message.getDelayTimeLevel());
       } catch (Exception e) {
           log.error("发送定时消息异常", e);
       } finally {
           producer.shutdown();
       }
   }
}

4.3.2 消费者实现

package com.jam.demo.mq.delay;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;

import java.time.LocalDateTime;
import java.util.List;

/**
* 定时消息消费者
* @author ken
*/

@Slf4j
public class DelayMessageConsumer {
   /**
    * 消费者组名
    */

   private static final String CONSUMER_GROUP = "delay_consumer_group";
   /**
    * NameServer地址
    */

   private static final String NAMESRV_ADDR = "localhost:9876";
   /**
    * Topic名称
    */

   private static final String TOPIC = "delay_topic";

   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
       consumer.setNamesrvAddr(NAMESRV_ADDR);
       consumer.subscribe(TOPIC, "tag_delay");
       consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
           if (CollectionUtils.isEmpty(msgs)) {
               log.warn("接收到的定时消息列表为空");
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
           for (MessageExt msg : msgs) {
               try {
                   String content = new String(msg.getBody(), "UTF-8");
                   log.info("消费定时消息:{},当前时间:{},延迟级别:{}",
                           content, LocalDateTime.now(), msg.getDelayTimeLevel());
               } catch (Exception e) {
                   log.error("消费定时消息异常", e);
                   return ConsumeConcurrentlyStatus.RECONSUME_LATER;
               }
           }
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       });
       consumer.start();
       log.info("定时消息消费者启动成功");
   }
}

4.3.3 代码说明

  • 生产者通过setDelayTimeLevel设置延迟级别,RocketMQ的延迟级别对应固定的延迟时间,不支持自定义延迟时间。
  • 消费者消费定时消息的逻辑与普通消息一致,但消息会在延迟时间到达后才被推送。
  • 定时消息的延迟级别可以根据业务需求调整,最高支持2小时延迟。

4.4 应用场景

定时消息适用于需要延迟执行的业务场景,例如:

  • 订单超时关闭(下单后30分钟未支付自动关闭)
  • 定时通知(会议开始前15分钟提醒)
  • 数据备份(每天凌晨2点执行备份)

4.5 注意事项

  • RocketMQ的定时消息不支持自定义延迟时间,只能使用预设的延迟级别;若需要自定义延迟时间,可以使用RocketMQ的“定时消息V2”或其他消息队列(如RabbitMQ的Delayed Exchange)。
  • 定时消息的精度受限于定时任务的扫描频率,可能存在少量误差。

五、四大消息类型对比与选型建议

5.1 核心特性对比

特性 普通消息 顺序消息 事务消息 定时消息
顺序性 不保证 保证分区内顺序 不保证 不保证
事务性 不支持 不支持 支持分布式事务 不支持
延迟执行 不支持 不支持 不支持 支持
性能 中(分区数决定) 中(两阶段提交)
适用场景 普通异步处理 顺序业务流程 分布式事务 延迟任务

5.2 选型建议

  • 普通消息:适用于大多数不需要特殊特性的异步场景,优先选择。
  • 顺序消息:仅在业务流程有严格顺序要求时使用,注意控制队列数量以平衡性能。
  • 事务消息:用于解决分布式事务问题,避免使用2PC或TCC的复杂性。
  • 定时消息:用于延迟执行的任务,根据是否需要自定义延迟时间选择合适的消息队列。

六、总结

普通消息、顺序消息、事务消息、定时消息作为消息队列的四大核心消息类型,各自解决了不同的业务痛点。普通消息是基础,顺序消息保证了业务流程的有序性,事务消息解决了分布式事务问题,定时消息支持延迟执行。在实际开发中,需要根据业务需求选择合适的消息类型,并结合消息队列的特性进行优化,以保证系统的高性能和高可用性。

通过本文的解析,相信你已经对这四种消息类型有了深入的理解,能够在实际项目中灵活运用,解决分布式系统中的异步通信和数据一致性问题。

目录
相关文章
|
3月前
|
SQL 关系型数据库 MySQL
MySQL锁机制深度剖析:从底层原理到实战避坑,一篇吃透所有锁!
本文深入解析MySQL锁机制,涵盖全局锁、表锁、行锁、间隙锁等核心概念,结合实战案例剖析死锁、锁等待等问题根源,并提供乐观锁、索引优化等避坑方案,助你全面提升高并发场景下的数据库性能与一致性控制能力。
257 2
|
架构师 中间件
阿里中间件首席架构师钟华:《企业IT架构转型之道:阿里巴巴中台战略思想与架构实战》新书出版(含试读PDF)!
阿里中间件首席架构师钟华:《企业IT架构转型之道:阿里巴巴中台战略思想与架构实战》新书出版!
37797 93
|
2月前
|
Java Nacos Sentinel
SpringCloud 微服务解决方案:企业级架构实战
全面介绍 SpringCloud 微服务解决方案,涵盖服务注册发现、网关路由、熔断限流、分布式事务等企业级实践
|
3月前
|
存储 分布式计算 数据库
ETL vs ELT:到底谁更牛?别被名字骗了,这俩是两种世界观
ETL vs ELT:到底谁更牛?别被名字骗了,这俩是两种世界观
166 12
|
3月前
|
Java 数据库 开发者
为什么我的Java代码越来越“胖”?浅析职责单一原则
为什么我的Java代码越来越“胖”?浅析职责单一原则
136 64
|
3月前
|
存储 安全 Java
微服务安全之Token机制:从认证到授权的深度实践指南
本文深入解析微服务架构下Token认证与授权机制,涵盖JWT、OAuth2.0核心原理,结合Spring Boot实战代码,详解Token生成、验证、安全加固及细粒度权限控制,助你构建安全可靠的分布式系统认证体系。
507 2
|
2月前
|
安全 Java 测试技术
Groovy 脚本语法全解析:从入门到精通的干货指南
本文全面介绍基于JVM的动态脚本语言Groovy,涵盖从基础语法到高级特性的完整知识体系。主要内容包括:Groovy环境搭建与Maven集成;基础语法(变量、数据类型、运算符、流程控制);核心特性(集合操作、方法定义、类与对象、闭包);高级特性(元编程、异常处理、文件操作);与Java的差异对比;以及自动化测试、数据迁移、Jenkins Pipeline等实战场景。文章通过大量可直接运行的代码示例,帮助开发者快速掌握Groovy在提高开发效率、简化代码方面的优势,同时提供性能优化建议和学习资源。
233 2
|
消息中间件 存储 RocketMQ
Rocketmq如何保证消息不丢失
文章分析了RocketMQ如何通过生产者端的同步发送与重试机制、Broker端的持久化存储与消息重试投递策略、以及消费者端的手动提交ack与幂等性处理,来确保消息在整个传输和消费过程中的不丢失。
|
6月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
2590 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路