在分布式系统架构中,消息队列作为异步通信的核心组件,承担着解耦、削峰、异步化的关键作用。而消息队列的核心价值,往往通过不同类型的消息模式来体现。普通消息、顺序消息、事务消息、定时消息作为主流消息队列(如RocketMQ、Kafka)的四大核心消息类型,各自解决着不同场景下的业务痛点。本文将从底层原理、应用场景、代码实现三个维度,全方位剖析这四种消息类型,让你既能理解其设计思想,又能落地到实际开发中。
一、普通消息:消息队列的基石
1.1 定义与核心特性
普通消息(Normal Message)是消息队列中最基础、最常用的消息类型。它遵循“生产者发送-队列存储-消费者接收”的基本模型,不保证消息的顺序性,也不提供额外的事务或定时能力,仅保证消息的可靠传递(取决于队列的配置)。
1.2 底层原理
普通消息的处理流程遵循典型的“生产-存储-消费”模型:
- 生产阶段:生产者将消息序列化后,通过TCP连接发送到消息队列服务器,服务器接收到消息后,将其存储到磁盘或内存中,并返回确认响应。
- 存储阶段:消息队列服务器将消息按照Topic进行分类存储,支持持久化(如RocketMQ的CommitLog)或非持久化存储。
- 消费阶段:消费者通过长轮询或推送模式获取消息,处理完成后发送ACK确认,服务器收到ACK后删除消息;若消费者处理失败,消息会被重新投递(取决于重试策略)。
1.3 实战案例(基于RocketMQ 5.1.4)
1.3.1 环境准备
pom.xml依赖配置:
<dependencies>
<!-- RocketMQ客户端 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.4</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- Spring Context -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>6.1.2</version>
</dependency>
<!-- Fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.48</version>
</dependency>
</dependencies>
1.3.2 生产者实现
package com.jam.demo.mq.normal;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.util.ObjectUtils;
/**
* 普通消息生产者
* @author ken
*/
@Slf4j
public class NormalMessageProducer {
/**
* 生产者组名
*/
private static final String PRODUCER_GROUP = "normal_producer_group";
/**
* NameServer地址
*/
private static final String NAMESRV_ADDR = "localhost:9876";
/**
* Topic名称
*/
private static final String TOPIC = "normal_topic";
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
// 设置NameServer地址
producer.setNamesrvAddr(NAMESRV_ADDR);
// 启动生产者
producer.start();
try {
for (int i = 0; i < 5; i++) {
// 构建消息内容
String messageContent = "普通消息示例-" + i;
Message message = new Message(
TOPIC,
"tagA",
messageContent.getBytes("UTF-8")
);
// 发送消息
var sendResult = producer.send(message);
if (ObjectUtils.isEmpty(sendResult)) {
log.error("发送消息失败,结果为空");
continue;
}
log.info("发送消息成功,消息ID:{},发送状态:{}",
sendResult.getMsgId(), sendResult.getSendStatus());
}
} catch (Exception e) {
log.error("发送消息异常", e);
} finally {
// 关闭生产者
producer.shutdown();
}
}
}
1.3.3 消费者实现
package com.jam.demo.mq.normal;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 普通消息消费者
* @author ken
*/
@Slf4j
public class NormalMessageConsumer {
/**
* 消费者组名
*/
private static final String CONSUMER_GROUP = "normal_consumer_group";
/**
* NameServer地址
*/
private static final String NAMESRV_ADDR = "localhost:9876";
/**
* Topic名称
*/
private static final String TOPIC = "normal_topic";
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// 设置NameServer地址
consumer.setNamesrvAddr(NAMESRV_ADDR);
// 订阅Topic
consumer.subscribe(TOPIC, "tagA");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
log.warn("接收到的消息列表为空");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
String content = new String(msg.getBody(), "UTF-8");
log.info("消费消息:{},消息ID:{}", content, msg.getMsgId());
} catch (Exception e) {
log.error("消费消息异常", e);
// 消费失败,重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
log.info("消费者启动成功");
}
}
1.3.4 代码说明
- 生产者通过
DefaultMQProducer发送消息,指定Topic和Tag,消息发送成功后返回SendResult,包含消息ID和发送状态。 - 消费者通过
DefaultMQPushConsumer订阅Topic,使用并发消费模式(MessageListenerConcurrently),消费完成后返回CONSUME_SUCCESS确认消费,失败则返回RECONSUME_LATER触发重试。 - 使用
ObjectUtils和CollectionUtils进行空值判断,符合阿里巴巴开发手册规范。
1.4 应用场景
普通消息适用于大多数不需要严格顺序、不需要事务保证的场景,例如:
- 用户注册后的短信/邮件通知
- 日志收集与分析
- 非核心业务的异步处理(如积分更新)
二、顺序消息:保证消息的有序性
2.1 定义与核心特性
顺序消息(Ordered Message)是指消息的消费顺序与生产顺序完全一致的消息类型。它解决了普通消息在并发消费时可能出现的顺序错乱问题,适用于对业务流程顺序有严格要求的场景(如订单创建、支付、发货的流程)。
2.2 底层原理
顺序消息的实现依赖于“分区有序”的设计思想:
- 生产阶段:生产者发送消息时,指定一个ShardingKey(如订单ID),消息队列服务器根据ShardingKey的哈希值,将同一ShardingKey的消息分配到同一个消息队列中。
- 存储阶段:消息在队列中按照生产顺序存储,保证队列内的消息有序。
- 消费阶段:每个消息队列只分配给一个消费者线程处理,确保队列内的消息被顺序消费。
RocketMQ中,顺序消息分为全局顺序消息和分区顺序消息:
- 全局顺序消息:一个Topic只有一个队列,所有消息都在同一个队列中顺序消费(性能较低)。
- 分区顺序消息:一个Topic有多个队列,同一ShardingKey的消息进入同一个队列,保证分区内顺序(性能较高)。
2.3 实战案例(基于RocketMQ 5.1.4)
2.3.1 生产者实现
package com.jam.demo.mq.order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.util.ObjectUtils;
import java.util.List;
/**
* 顺序消息生产者
* @author ken
*/
@Slf4j
public class OrderMessageProducer {
/**
* 生产者组名
*/
private static final String PRODUCER_GROUP = "order_producer_group";
/**
* NameServer地址
*/
private static final String NAMESRV_ADDR = "localhost:9876";
/**
* Topic名称
*/
private static final String TOPIC = "order_topic";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(NAMESRV_ADDR);
producer.start();
// 模拟订单流程:创建->支付->发货
String[] orderIds = {"order_001", "order_002", "order_003"};
String[] actions = {"创建订单", "支付订单", "发货"};
try {
for (String orderId : orderIds) {
for (String action : actions) {
String content = String.format("订单[%s]:%s", orderId, action);
Message message = new Message(
TOPIC,
"tag_order",
content.getBytes("UTF-8")
);
// 发送顺序消息,指定ShardingKey为orderId
SendResult sendResult = producer.send(
message,
(MessageQueueSelector) (mqs, msg, arg) -> {
String shardingKey = (String) arg;
int hashCode = Math.abs(shardingKey.hashCode());
return mqs.get(hashCode % mqs.size());
},
orderId
);
if (ObjectUtils.isEmpty(sendResult)) {
log.error("发送顺序消息失败,订单ID:{}", orderId);
continue;
}
log.info("发送顺序消息成功,订单ID:{},队列ID:{},内容:{}",
orderId, sendResult.getMessageQueue().getQueueId(), content);
}
}
} catch (Exception e) {
log.error("发送顺序消息异常", e);
} finally {
producer.shutdown();
}
}
}
2.3.2 消费者实现
package com.jam.demo.mq.order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 顺序消息消费者
* @author ken
*/
@Slf4j
public class OrderMessageConsumer {
/**
* 消费者组名
*/
private static final String CONSUMER_GROUP = "order_consumer_group";
/**
* NameServer地址
*/
private static final String NAMESRV_ADDR = "localhost:9876";
/**
* Topic名称
*/
private static final String TOPIC = "order_topic";
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.subscribe(TOPIC, "tag_order");
// 注册顺序消息监听器
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
log.warn("接收到的顺序消息列表为空");
return ConsumeOrderlyStatus.SUCCESS;
}
// 锁定当前队列,保证顺序消费
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
try {
String content = new String(msg.getBody(), "UTF-8");
log.info("消费顺序消息:{},订单ID:{},队列ID:{}",
content, msg.getKeys(), msg.getQueueId());
} catch (Exception e) {
log.error("消费顺序消息异常", e);
// 消费失败,暂停当前队列的消费
context.setSuspendCurrentQueueTimeMillis(1000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
log.info("顺序消息消费者启动成功");
}
}
2.3.3 代码说明
- 生产者通过
MessageQueueSelector根据订单ID(ShardingKey)选择队列,保证同一订单的消息进入同一个队列。 - 消费者使用
MessageListenerOrderly进行顺序消费,每个队列由一个线程处理,确保消费顺序与生产顺序一致。 - 消费失败时,通过
SUSPEND_CURRENT_QUEUE_A_MOMENT暂停当前队列的消费,避免消息堆积。
2.4 应用场景
顺序消息适用于对业务流程顺序有严格要求的场景,例如:
- 订单处理流程(创建→支付→发货→完成)
- 物流轨迹更新(下单→接单→揽收→运输→派送→签收)
- 金融交易流水(充值→扣款→记账)
2.5 注意事项
- 顺序消息的性能取决于队列数量,队列越多,并发度越高,但需要保证同一ShardingKey的消息进入同一队列。
- 消费失败会导致整个队列暂停消费,因此需要合理设置重试策略和暂停时间。
三、事务消息:解决分布式事务问题
3.1 定义与核心特性
事务消息(Transactional Message)是一种支持分布式事务的消息类型,它保证“本地事务执行”和“消息发送”的原子性,即两者要么都成功,要么都失败。事务消息解决了分布式系统中跨服务的数据一致性问题。
3.2 底层原理
事务消息的实现基于“两阶段提交”和“消息回查”机制:
- 第一阶段(发送半消息):生产者发送一条“半消息”到消息队列服务器,半消息对消费者不可见。
- 第二阶段(执行本地事务):生产者执行本地事务(如数据库操作),并根据事务结果发送“提交”或“回滚”指令。
- 消息回查:若消息队列服务器未收到提交/回滚指令,会定时向生产者发起回查请求,生产者根据本地事务状态返回结果,服务器根据结果决定提交或回滚消息。
3.3 实战案例(基于RocketMQ 5.1.4 + MySQL 8.0 + MyBatis-Plus 3.5.5)
3.3.1 环境准备
pom.xml新增依赖:
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.3.0</version>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>3.2.0</version>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-boot-starter</artifactId>
<version>3.0.0</version>
</dependency>
3.3.2 数据库表设计
CREATE TABLE `t_order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`order_no` varchar(64) NOT NULL COMMENT '订单编号',
`user_id` bigint NOT NULL COMMENT '用户ID',
`amount` decimal(10,2) NOT NULL COMMENT '订单金额',
`status` tinyint NOT NULL DEFAULT '0' COMMENT '订单状态:0-创建中,1-已创建,2-已取消',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
3.3.3 实体类与Mapper
Order实体类:
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单实体类
* @author ken
*/
@Data
@TableName("t_order")
public class Order {
/**
* 订单ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 订单编号
*/
private String orderNo;
/**
* 用户ID
*/
private Long userId;
/**
* 订单金额
*/
private BigDecimal amount;
/**
* 订单状态:0-创建中,1-已创建,2-已取消
*/
private Integer status;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}
OrderMapper接口:
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Order;
import org.apache.ibatis.annotations.Mapper;
/**
* 订单Mapper
* @author ken
*/
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
3.3.4 事务消息生产者
package com.jam.demo.mq.transaction;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.Order;
import com.jam.demo.mapper.OrderMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 事务消息生产者
* @author ken
*/
@Slf4j
@Component
public class TransactionMessageProducer {
/**
* 生产者组名
*/
private static final String PRODUCER_GROUP = "transaction_producer_group";
/**
* NameServer地址
*/
private static final String NAMESRV_ADDR = "localhost:9876";
/**
* Topic名称
*/
private static final String TOPIC = "transaction_topic";
private final TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
@Autowired
private OrderMapper orderMapper;
/**
* 初始化生产者
*/
@PostConstruct
public void init() throws Exception {
producer.setNamesrvAddr(NAMESRV_ADDR);
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String orderNo = (String) arg;
if (!StringUtils.hasText(orderNo, "订单编号不能为空")) {
log.error("订单编号为空,回滚事务");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
try {
// 创建订单(本地事务)
Order order = new Order();
order.setOrderNo(orderNo);
order.setUserId(1001L);
order.setAmount(new BigDecimal("99.00"));
order.setStatus(1);
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
orderMapper.insert(order);
log.info("本地事务执行成功,订单编号:{}", orderNo);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("本地事务执行失败,订单编号:{}", orderNo, e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 事务回查
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderNo = msg.getKeys();
if (!StringUtils.hasText(orderNo)) {
log.error("回查事务时订单编号为空");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 查询订单状态
Order order = orderMapper.selectOne(new LambdaQueryWrapper<Order>()
.eq(Order::getOrderNo, orderNo));
if (order == null) {
log.warn("订单不存在,回滚事务,订单编号:{}", orderNo);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
if (order.getStatus() == 1) {
log.info("回查事务:订单已创建,提交消息,订单编号:{}", orderNo);
return LocalTransactionState.COMMIT_MESSAGE;
} else {
log.warn("回查事务:订单状态异常,回滚事务,订单编号:{}", orderNo);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
producer.start();
log.info("事务消息生产者启动成功");
}
/**
* 发送事务消息
* @param orderNo 订单编号
*/
public void sendTransactionMessage(String orderNo) throws Exception {
String messageContent = "创建订单:" + orderNo;
Message message = new Message(
TOPIC,
"tag_transaction",
orderNo,
messageContent.getBytes("UTF-8")
);
producer.sendMessageInTransaction(message, orderNo);
log.info("发送事务消息成功,订单编号:{}", orderNo);
}
/**
* 销毁生产者
*/
@PreDestroy
public void destroy() {
producer.shutdown();
log.info("事务消息生产者已关闭");
}
}
3.3.5 事务消息消费者
package com.jam.demo.mq.transaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
/**
* 事务消息消费者
* @author ken
*/
@Slf4j
@Component
public class TransactionMessageConsumer {
/**
* 消费者组名
*/
private static final String CONSUMER_GROUP = "transaction_consumer_group";
/**
* NameServer地址
*/
private static final String NAMESRV_ADDR = "localhost:9876";
/**
* Topic名称
*/
private static final String TOPIC = "transaction_topic";
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
/**
* 初始化消费者
*/
@PostConstruct
public void init() throws Exception {
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.subscribe(TOPIC, "tag_transaction");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
log.warn("接收到的事务消息列表为空");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
String content = new String(msg.getBody(), "UTF-8");
String orderNo = msg.getKeys();
log.info("消费事务消息:{},订单编号:{}", content, orderNo);
// 执行后续业务逻辑(如扣减库存)
} catch (Exception e) {
log.error("消费事务消息异常", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
log.info("事务消息消费者启动成功");
}
/**
* 销毁消费者
*/
@PreDestroy
public void destroy() {
consumer.shutdown();
log.info("事务消息消费者已关闭");
}
}
3.3.6 测试接口
package com.jam.demo.controller;
import com.jam.demo.mq.transaction.TransactionMessageProducer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* 订单控制器
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/order")
@Api(tags = "订单管理")
public class OrderController {
@Autowired
private TransactionMessageProducer transactionMessageProducer;
/**
* 创建订单
*/
@PostMapping("/create")
@ApiOperation("创建订单")
public String createOrder() {
String orderNo = "ORDER_" + UUID.randomUUID().toString().replace("-", "");
try {
transactionMessageProducer.sendTransactionMessage(orderNo);
return "创建订单成功,订单编号:" + orderNo;
} catch (Exception e) {
log.error("创建订单失败", e);
return "创建订单失败:" + e.getMessage();
}
}
}
3.3.7 代码说明
- 生产者通过
TransactionMQProducer发送事务消息,实现TransactionListener接口处理本地事务和事务回查。 - 本地事务执行订单创建操作,成功则提交消息,失败则回滚消息。
- 事务回查机制确保在网络异常时,消息队列服务器能主动查询本地事务状态,保证数据一致性。
- 消费者消费提交后的消息,执行后续业务逻辑(如扣减库存)。
3.4 应用场景
事务消息适用于需要保证分布式事务一致性的场景,例如:
- 电商下单(创建订单+扣减库存)
- 金融转账(扣款+收款)
- 支付回调(更新订单状态+发送通知)
3.5 注意事项
- 事务消息的回查机制需要生产者提供幂等性实现,避免重复执行本地事务。
- 本地事务的执行时间不宜过长,否则会导致消息回查频繁,影响性能。
四、定时消息:延迟执行的消息
4.1 定义与核心特性
定时消息(Delayed Message)又称延迟消息,是指消息发送后,消费者不会立即收到,而是在指定的延迟时间后才被消费的消息类型。定时消息支持在未来的某个时间点触发业务逻辑,适用于延时任务场景。
4.2 底层原理
定时消息的实现依赖于“延迟队列”和“时间轮”机制:
- 生产阶段:生产者发送消息时,指定延迟级别(如RocketMQ支持18个延迟级别,对应不同的延迟时间),消息被存储到延迟队列中。
- 存储阶段:延迟队列中的消息不会被消费者立即消费,而是由定时任务定期扫描。
- 触发阶段:当消息到达指定的延迟时间后,定时任务将消息从延迟队列转移到正式队列,消费者才能接收到消息。
RocketMQ的延迟级别配置如下:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
4.3 实战案例(基于RocketMQ 5.1.4)
4.3.1 生产者实现
package com.jam.demo.mq.delay;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.util.ObjectUtils;
/**
* 定时消息生产者
* @author ken
*/
@Slf4j
public class DelayMessageProducer {
/**
* 生产者组名
*/
private static final String PRODUCER_GROUP = "delay_producer_group";
/**
* NameServer地址
*/
private static final String NAMESRV_ADDR = "localhost:9876";
/**
* Topic名称
*/
private static final String TOPIC = "delay_topic";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(NAMESRV_ADDR);
producer.start();
try {
String messageContent = "定时消息示例:5秒后执行";
Message message = new Message(
TOPIC,
"tag_delay",
messageContent.getBytes("UTF-8")
);
// 设置延迟级别为2(对应5秒)
message.setDelayTimeLevel(2);
var sendResult = producer.send(message);
if (ObjectUtils.isEmpty(sendResult)) {
log.error("发送定时消息失败");
return;
}
log.info("发送定时消息成功,消息ID:{},延迟级别:{}",
sendResult.getMsgId(), message.getDelayTimeLevel());
} catch (Exception e) {
log.error("发送定时消息异常", e);
} finally {
producer.shutdown();
}
}
}
4.3.2 消费者实现
package com.jam.demo.mq.delay;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
/**
* 定时消息消费者
* @author ken
*/
@Slf4j
public class DelayMessageConsumer {
/**
* 消费者组名
*/
private static final String CONSUMER_GROUP = "delay_consumer_group";
/**
* NameServer地址
*/
private static final String NAMESRV_ADDR = "localhost:9876";
/**
* Topic名称
*/
private static final String TOPIC = "delay_topic";
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.subscribe(TOPIC, "tag_delay");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
log.warn("接收到的定时消息列表为空");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
String content = new String(msg.getBody(), "UTF-8");
log.info("消费定时消息:{},当前时间:{},延迟级别:{}",
content, LocalDateTime.now(), msg.getDelayTimeLevel());
} catch (Exception e) {
log.error("消费定时消息异常", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
log.info("定时消息消费者启动成功");
}
}
4.3.3 代码说明
- 生产者通过
setDelayTimeLevel设置延迟级别,RocketMQ的延迟级别对应固定的延迟时间,不支持自定义延迟时间。 - 消费者消费定时消息的逻辑与普通消息一致,但消息会在延迟时间到达后才被推送。
- 定时消息的延迟级别可以根据业务需求调整,最高支持2小时延迟。
4.4 应用场景
定时消息适用于需要延迟执行的业务场景,例如:
- 订单超时关闭(下单后30分钟未支付自动关闭)
- 定时通知(会议开始前15分钟提醒)
- 数据备份(每天凌晨2点执行备份)
4.5 注意事项
- RocketMQ的定时消息不支持自定义延迟时间,只能使用预设的延迟级别;若需要自定义延迟时间,可以使用RocketMQ的“定时消息V2”或其他消息队列(如RabbitMQ的Delayed Exchange)。
- 定时消息的精度受限于定时任务的扫描频率,可能存在少量误差。
五、四大消息类型对比与选型建议
5.1 核心特性对比
| 特性 | 普通消息 | 顺序消息 | 事务消息 | 定时消息 |
| 顺序性 | 不保证 | 保证分区内顺序 | 不保证 | 不保证 |
| 事务性 | 不支持 | 不支持 | 支持分布式事务 | 不支持 |
| 延迟执行 | 不支持 | 不支持 | 不支持 | 支持 |
| 性能 | 高 | 中(分区数决定) | 中(两阶段提交) | 高 |
| 适用场景 | 普通异步处理 | 顺序业务流程 | 分布式事务 | 延迟任务 |
5.2 选型建议
- 普通消息:适用于大多数不需要特殊特性的异步场景,优先选择。
- 顺序消息:仅在业务流程有严格顺序要求时使用,注意控制队列数量以平衡性能。
- 事务消息:用于解决分布式事务问题,避免使用2PC或TCC的复杂性。
- 定时消息:用于延迟执行的任务,根据是否需要自定义延迟时间选择合适的消息队列。
六、总结
普通消息、顺序消息、事务消息、定时消息作为消息队列的四大核心消息类型,各自解决了不同的业务痛点。普通消息是基础,顺序消息保证了业务流程的有序性,事务消息解决了分布式事务问题,定时消息支持延迟执行。在实际开发中,需要根据业务需求选择合适的消息类型,并结合消息队列的特性进行优化,以保证系统的高性能和高可用性。
通过本文的解析,相信你已经对这四种消息类型有了深入的理解,能够在实际项目中灵活运用,解决分布式系统中的异步通信和数据一致性问题。