消息队列(MQ)是中高级/专家级面试中绕不开的核心考点。面试官不会仅停留在“你用过什么MQ”这类基础问题,而是深挖“为什么用”“底层如何实现”“生产问题怎么解”等直击核心的问题。本文结合一线大厂专家级面试高频题,从基础认知到底层原理,再到生产级实战,全方位拆解MQ面试的核心考点。
一、基础认知:MQ面试的“开胃菜”
面试题1:什么是消息队列?它解决了什么核心问题?
消息队列是基于“生产者-消费者”模型的异步通信中间件,核心价值是解耦、异步、削峰填谷,这也是它区别于同步RPC调用的核心优势:
- 解耦:系统间不再直接耦合调用,通过MQ传递消息,一方系统变更无需修改另一方代码,比如订单系统无需关心短信系统的接口变更;
- 异步:生产者发送消息后无需等待消费者响应,主流程耗时大幅缩短,比如用户下单后,核心流程(扣库存、生成订单)同步完成,短信通知、物流推送等非核心流程通过MQ异步处理;
- 削峰:高并发请求先写入MQ,消费者按自身处理能力消费,避免下游服务被瞬间流量打垮,比如秒杀活动中,几十万请求瞬间涌入,MQ可缓冲流量,让数据库按每秒千级的速率处理。
面试题2:MQ的核心应用场景有哪些?请结合实际业务说明
MQ的核心应用场景均围绕“解耦、异步、削峰”三大核心价值展开,结合真实业务场景如下:
- 异步通信:电商下单场景,订单创建成功后发送MQ消息,短信服务、推送服务异步消费,主流程响应时间从500ms缩短至50ms;
- 系统解耦:支付系统完成支付后,发送“支付成功”消息,订单系统、积分系统、财务系统各自消费,无需支付系统逐个调用,后续新增“优惠券系统”只需新增消费者即可;
- 流量削峰:秒杀活动中,每秒10万级请求先写入Kafka,秒杀系统按每秒1000级的速率消费,避免数据库连接池耗尽、CPU打满;
- 数据分发:日志采集场景(ELK),业务系统将日志写入MQ,Logstash消费后分发到Elasticsearch,实现日志的统一收集和检索;
- 最终一致性:分布式事务场景,通过MQ实现柔性事务,保证跨库/跨服务数据最终一致(如订单创建后,库存扣减、积分增加最终一致)。
二、底层原理:MQ面试的“分水岭”
面试题3:主流MQ(RabbitMQ/Kafka/RocketMQ)的核心架构是什么?
不同MQ的架构设计决定了其性能、功能和适用场景,这是专家级面试的核心考察点。
1. RabbitMQ架构(AMQP协议)
核心组件解析:
- Broker:RabbitMQ服务节点,一个集群由多个Broker组成,负责存储和转发消息;
- Virtual Host:虚拟主机,用于隔离不同租户的资源(交换机、队列),类似数据库的“库”,不同业务线可使用不同Virtual Host;
- Exchange:交换机,接收生产者消息并按路由规则转发到队列,核心类型:Direct(精准路由)、Fanout(广播)、Topic(模糊路由);
- Channel:信道,建立在TCP连接之上的轻量级连接,避免频繁创建/销毁TCP连接,单TCP连接可创建上千个Channel,大幅提升性能;
- Queue:消息队列,存储消息,消费者从队列拉取消息,队列是RabbitMQ的最小存储单元。
2. Kafka架构(发布-订阅模型)
核心组件解析:
- Topic:逻辑上的消息分类,比如“order_topic”存储所有订单相关消息;
- Partition:Topic的物理拆分,每个Partition是有序的日志文件,Kafka通过分区实现并行消费(一个Topic的多个Partition可被多个消费者同时消费);
- Replica:副本,分为Leader(处理读写)和Follower(同步数据),默认副本数3,保证数据可靠性;
- ConsumerGroup:消费者组,组内多个消费者共同消费一个Topic的所有Partition(一个Partition只能被组内一个消费者消费),组间互不影响;
- Controller:Kafka 2.8+弃用ZooKeeper,改用内置Controller管理集群元数据(如Partition Leader选举)。
3. RocketMQ架构(自研协议,阿里开源)
核心组件解析:
- NameServer:轻量级注册中心,无状态,存储Broker路由信息,生产者/消费者通过NameServer获取Broker地址;
- Broker:核心节点,分Master和Slave,存储消息的核心文件是CommitLog(所有Topic的消息统一写入)和ConsumeQueue(CommitLog的索引文件,加速消费);
- CommitLog:全局消息存储文件,默认每个文件1GB,顺序写(性能远高于随机写);
- ConsumeQueue:消费队列,每个Topic的每个Queue对应一个ConsumeQueue,存储消息在CommitLog中的偏移量、大小等索引信息。
面试题4:MQ的消息持久化原理是什么?不同MQ的持久化方式有何差异?
持久化是保证MQ不丢消息的核心,不同MQ的持久化机制因架构设计不同而差异显著:
1. RabbitMQ的持久化
需同时满足两个条件才能保证消息不丢:
- 队列设置为持久化(
durable=true):队列元数据存储在Erlang内置的Mnesia数据库; - 消息设置为持久化(
deliveryMode=2):消息先写入内存缓存,再异步刷盘到磁盘文件(可配置同步刷盘)。
2. Kafka的持久化
Kafka默认持久化所有消息(可配置保留时间/大小),核心是日志文件+分段存储:
- 每个Partition对应一组日志文件(.log),消息按顺序追加写入(顺序IO性能是随机IO的10倍以上);
- 引入Segment分段:每个Partition的日志分为多个Segment(默认1GB),避免单个文件过大,方便清理过期数据;
- 刷盘策略:支持3种模式(
acks=0:不确认;acks=1:Leader刷盘确认;acks=all:所有同步副本刷盘确认),生产环境核心业务建议用acks=all。
3. RocketMQ的持久化
采用“混合存储”模式,兼顾性能和可靠性:
- CommitLog:所有Topic的消息统一写入CommitLog(顺序写),默认每个文件1GB;
- ConsumeQueue:每个Topic的每个Queue对应一个ConsumeQueue,存储消息在CommitLog中的偏移量、大小、Tag哈希值(索引);
- 刷盘策略:支持同步刷盘(Master写入CommitLog后立即刷盘,返回ACK)和异步刷盘(默认,写入内存后异步刷盘),核心业务建议用同步刷盘。
三、核心特性:MQ面试的“核心考点”(生产实践必问)
面试题5:如何保证MQ的消息不丢失?
消息丢失可能发生在生产者发送、MQ存储、消费者消费三个阶段,需分阶段兜底,以下是生产环境经过验证的完整方案:
1. 阶段1:生产者发送阶段防丢失
核心思路:生产者确认机制(确保MQ接收到消息后再返回成功),不同MQ的实现方式如下:
RabbitMQ生产者确认示例(JDK17 + Spring Boot 3.2.2)
第一步:添加核心依赖
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.2</version>
</dependency>
<!-- RabbitMQ Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.2.2</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-boot-starter</artifactId>
<version>3.0.0</version>
</dependency>
<!-- Spring Utils -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>6.1.3</version>
</dependency>
</dependencies>
第二步:配置文件(application.yml)
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 开启生产者确认
publisher-confirm-type: correlated # 异步确认(推荐)
publisher-returns: true # 开启消息返回(路由失败时回调)
第三步:生产者代码(带确认回调)
package com.jam.demo.mq.rabbit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import java.util.UUID;
/**
* RabbitMQ生产者(防丢失版)
* @author ken
* @date 2026-02-09
*/
@Component
@Slf4j
public class RabbitProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 初始化回调函数
*/
@PostConstruct
public void initCallback() {
// 1. 生产者确认回调(MQ接收到消息时触发)
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
String msgId = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("消息[{}]已成功投递到MQ", msgId);
} else {
log.error("消息[{}]投递到MQ失败,原因:{}", msgId, cause);
// 生产环境:此处需触发重试机制(如定时任务重试、写入本地消息表补偿)
retrySend(msgId, correlationData);
}
});
// 2. 消息返回回调(路由到队列失败时触发)
rabbitTemplate.setReturnsCallback((Message returned) -> {
log.error("消息路由失败,交换机:{},路由键:{},响应码:{},原因:{}",
returned.getExchange(), returned.getRoutingKey(),
returned.getReplyCode(), returned.getReplyText());
});
}
/**
* 发送持久化消息
* @param exchange 交换机名称
* @param routingKey 路由键
* @param message 消息内容
*/
public void sendPersistentMsg(String exchange, String routingKey, String message) {
// 参数校验(符合阿里巴巴开发手册)
StringUtils.hasText(exchange, "交换机名称不能为空");
StringUtils.hasText(routingKey, "路由键不能为空");
StringUtils.hasText(message, "消息内容不能为空");
// 生成唯一消息ID(用于追踪)
String msgId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(msgId);
try {
// 发送持久化消息(deliveryMode=2)
rabbitTemplate.convertAndSend(
exchange,
routingKey,
message,
msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
},
correlationData
);
log.info("消息[{}]发送请求已提交", msgId);
} catch (Exception e) {
log.error("消息[{}]发送异常", msgId, e);
// 生产环境:写入本地消息表,后续通过定时任务重试
saveLocalMsg(msgId, exchange, routingKey, message);
}
}
/**
* 重试发送失败消息(简化版)
*/
private void retrySend(String msgId, CorrelationData correlationData) {
// 生产环境:需限制重试次数(如3次),避免死循环
log.info("开始重试发送消息[{}]", msgId);
// 此处省略重试逻辑
}
/**
* 保存本地消息表(用于补偿)
*/
private void saveLocalMsg(String msgId, String exchange, String routingKey, String message) {
// 生产环境:写入数据库,字段包括msgId、exchange、routingKey、message、status(0-待发送,1-发送成功)、createTime等
log.info("消息[{}]写入本地消息表待补偿", msgId);
}
}
Kafka生产者确认示例
package com.jam.demo.mq.kafka;
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* Kafka生产者(防丢失版)
* @author ken
* @date 2026-02-09
*/
@Component
@Slf4j
public class KafkaProducerDemo {
private KafkaProducer<String, String> kafkaProducer;
/**
* 初始化Kafka生产者
*/
@PostConstruct
public void initProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 关键配置:确保消息不丢失
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有同步副本确认后返回
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性,避免重复发送
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // 保证重试时消息顺序
kafkaProducer = new KafkaProducer<>(props);
}
/**
* 发送消息(同步确认)
* @param topic 主题名称
* @param message 消息内容
*/
public void sendMsg(String topic, Object message) {
StringUtils.hasText(topic, "主题名称不能为空");
if (ObjectUtils.isEmpty(message)) {
throw new IllegalArgumentException("消息内容不能为空");
}
String msgStr = JSON.toJSONString(message);
String msgId = UUID.randomUUID().toString();
ProducerRecord<String, String> record = new ProducerRecord<>(
topic,
msgId, // 消息ID作为key
msgStr
);
try {
// 同步发送(生产环境可改用异步+回调,避免阻塞)
Future<RecordMetadata> future = kafkaProducer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("消息[{}]发送失败", msgId, exception);
// 写入本地消息表补偿
saveLocalMsg(msgId, topic, msgStr);
} else {
log.info("消息[{}]发送成功,分区:{},偏移量:{}",
msgId, metadata.partition(), metadata.offset());
}
});
// 阻塞等待确认
future.get();
} catch (Exception e) {
log.error("消息[{}]发送异常", msgId, e);
saveLocalMsg(msgId, topic, msgStr);
}
}
/**
* 保存本地消息表
*/
private void saveLocalMsg(String msgId, String topic, String message) {
// 省略数据库存储逻辑
}
}
2. 阶段2:MQ存储阶段防丢失
核心思路:开启持久化+集群部署:
- RabbitMQ:创建持久化队列(
durable=true)+ 发送持久化消息(deliveryMode=2)+ 集群部署(镜像队列); - Kafka:
acks=all+ 副本数≥3 + 禁用unclean.leader.election.enable(避免非同步副本成为Leader); - RocketMQ:同步刷盘 + Master/Slave集群 + 副本数≥2。
3. 阶段3:消费者消费阶段防丢失
核心思路:关闭自动ACK,手动确认消费成功(确保业务处理完成后再告知MQ删除消息)。
RabbitMQ消费者手动ACK示例
package com.jam.demo.mq.rabbit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* RabbitMQ消费者(手动ACK版)
* @author ken
* @date 2026-02-09
*/
@Component
@Slf4j
public class RabbitConsumer {
/**
* 消费消息(手动ACK)
* @param message 消息内容
* @param channel 信道
*/
@RabbitListener(queues = "order_queue", ackMode = "MANUAL") // 关键:手动ACK
public void consumeMsg(String message, Channel channel, Message msg) {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
String msgId = msg.getMessageProperties().getMessageId();
try {
log.info("开始消费消息[{}]:{}", msgId, message);
// 1. 处理业务逻辑(如更新订单状态、发送短信)
handleBusiness(message);
// 2. 手动确认消费成功(单条确认)
channel.basicAck(deliveryTag, false);
log.info("消息[{}]消费成功并确认", msgId);
} catch (Exception e) {
log.error("消息[{}]消费失败", msgId, e);
try {
// 3. 消费失败:拒绝消息并重新入队(或进入死信队列)
// requeue=false:不再重新入队,直接进入死信队列
channel.basicNack(deliveryTag, false, false);
} catch (Exception ex) {
log.error("消息[{}]拒绝失败", msgId, ex);
}
}
}
/**
* 处理业务逻辑
*/
private void handleBusiness(String message) {
// 省略业务逻辑(如调用短信接口、更新数据库)
}
}
面试题6:如何保证消息不重复消费?(幂等性设计)
1. 重复消费的原因
- 生产者重试:生产者未收到MQ确认,重新发送同一条消息;
- MQ重试:消费者消费超时,MQ重新推送消息;
- 网络延迟:消费者ACK响应延迟,MQ认为消费失败,重新推送。
2. 核心解决方案:幂等性设计
核心思路:为每条消息生成唯一标识,消费前校验该标识是否已处理,常用方案如下:
方案1:数据库唯一索引(最常用)
适用场景:消息消费最终落地到数据库的场景(如订单状态更新、积分增加)。
数据库表设计(MySQL 8.0)
CREATE TABLE `mq_msg_consume_record` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
`msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
`topic` varchar(64) NOT NULL COMMENT 'MQ主题/交换机',
`consume_status` tinyint NOT NULL DEFAULT '0' COMMENT '0-待消费 1-消费成功 2-消费失败',
`consume_time` datetime DEFAULT NULL COMMENT '消费时间',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_msg_id` (`msg_id`) COMMENT '唯一索引,防止重复消费'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='MQ消息消费记录表';
消费逻辑代码(MyBatis-Plus)
package com.jam.demo.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jam.demo.entity.MqMsgConsumeRecord;
import com.jam.demo.mapper.MqMsgConsumeRecordMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import java.util.Date;
/**
* 消息消费幂等性服务
* @author ken
* @date 2026-02-09
*/
@Service
@Slf4j
public class MqIdempotentService {
@Autowired
private MqMsgConsumeRecordMapper consumeRecordMapper;
/**
* 校验并记录消息消费状态
* @param msgId 消息唯一ID
* @param topic 主题/交换机
* @return true-可消费 false-已消费
*/
@Transactional(rollbackFor = Exception.class)
public boolean checkAndRecord(String msgId, String topic) {
StringUtils.hasText(msgId, "消息ID不能为空");
StringUtils.hasText(topic, "主题不能为空");
// 1. 校验是否已消费
LambdaQueryWrapper<MqMsgConsumeRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(MqMsgConsumeRecord::getMsgId, msgId)
.eq(MqMsgConsumeRecord::getConsumeStatus, 1);
MqMsgConsumeRecord existRecord = consumeRecordMapper.selectOne(queryWrapper);
if (!ObjectUtils.isEmpty(existRecord)) {
log.info("消息[{}]已消费,无需重复处理", msgId);
return false;
}
// 2. 插入消费记录(唯一索引保证幂等)
MqMsgConsumeRecord record = new MqMsgConsumeRecord();
record.setMsgId(msgId);
record.setTopic(topic);
record.setConsumeStatus(0);
record.setCreateTime(new Date());
try {
consumeRecordMapper.insert(record);
return true;
} catch (Exception e) {
log.error("消息[{}]插入消费记录失败(可能重复)", msgId, e);
return false;
}
}
/**
* 更新消费成功状态
*/
public void updateConsumeSuccess(String msgId) {
LambdaQueryWrapper<MqMsgConsumeRecord> updateWrapper = new LambdaQueryWrapper<>();
updateWrapper.eq(MqMsgConsumeRecord::getMsgId, msgId);
MqMsgConsumeRecord updateRecord = new MqMsgConsumeRecord();
updateRecord.setConsumeStatus(1);
updateRecord.setConsumeTime(new Date());
consumeRecordMapper.update(updateRecord, updateWrapper);
}
}
方案2:Redis分布式锁+原子操作
适用场景:非数据库落地的场景(如缓存更新、接口调用)。
package com.jam.demo.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* Redis幂等性工具类
* @author ken
* @date 2026-02-09
*/
@Component
@Slf4j
public class RedisIdempotentUtil {
@Resource
private RedisTemplate<String, String> redisTemplate;
private static final String LOCK_KEY_PREFIX = "mq:consume:lock:";
private static final String CONSUME_KEY_PREFIX = "mq:consume:status:";
/**
* 获取分布式锁并校验是否已消费
*/
public boolean acquireLockAndCheck(String msgId) {
StringUtils.hasText(msgId, "消息ID不能为空");
String lockKey = LOCK_KEY_PREFIX + msgId;
String consumeKey = CONSUME_KEY_PREFIX + msgId;
// 1. 检查是否已消费
String status = redisTemplate.opsForValue().get(consumeKey);
if ("1".equals(status)) {
return false;
}
// 2. 获取分布式锁(Lua脚本保证原子性)
String script = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = redisTemplate.execute(
redisScript,
Collections.singletonList(lockKey),
"locked",
"30" // 锁过期时间30秒
);
return result != null && result == 1;
}
/**
* 释放锁并标记消费成功
*/
public void releaseLockAndMarkSuccess(String msgId) {
String lockKey = LOCK_KEY_PREFIX + msgId;
String consumeKey = CONSUME_KEY_PREFIX + msgId;
// 1. 标记消费成功(过期时间7天)
redisTemplate.opsForValue().set(consumeKey, "1", 7, TimeUnit.DAYS);
// 2. 释放锁
redisTemplate.delete(lockKey);
}
}
面试题7:如何保证消息的顺序性?
消息顺序性指“生产者发送消息的顺序”与“消费者消费消息的顺序”一致(如订单创建→支付→发货,需按此顺序消费),不同MQ的实现方案如下:
1. RabbitMQ保证顺序性
核心方案:单队列+单消费者(或分区队列+按业务ID路由到固定分区)。
/**
* RabbitMQ顺序消费示例
* @author ken
* @date 2026-02-09
*/
@Component
public class RabbitOrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 按订单ID路由到固定队列(保证同一订单的消息顺序)
*/
public void sendOrderMsg(String orderId, String message) {
// 按订单ID哈希取模,路由到固定队列
int queueIndex = Math.abs(orderId.hashCode()) % 3; // 假设有3个订单队列
String queueName = "order_queue_" + queueIndex;
String routingKey = "order.key." + queueIndex;
// 发送消息
rabbitTemplate.convertAndSend(
"order_exchange",
routingKey,
message,
msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
},
new CorrelationData(UUID.randomUUID().toString())
);
}
}
2. Kafka保证顺序性
核心方案:同一业务ID的消息发送到同一Partition + 单消费者消费该Partition。
/**
* Kafka顺序消费示例
* @author ken
* @date 2026-02-09
*/
@Component
public class KafkaOrderProducer {
private KafkaProducer<String, String> kafkaProducer;
/**
* 发送订单消息(同一订单ID路由到同一Partition)
*/
public void sendOrderMsg(String orderId, String message) {
// 按订单ID计算Partition
int partition = Math.abs(orderId.hashCode()) % 3; // 假设Topic有3个Partition
ProducerRecord<String, String> record = new ProducerRecord<>(
"order_topic",
partition, // 指定Partition
orderId,
message
);
kafkaProducer.send(record, (metadata, exception) -> {
if (exception == null) {
log.info("订单[{}]消息发送到Partition:{}", orderId, metadata.partition());
}
});
}
}
面试题8:MQ的死信队列(DLQ)是什么?如何使用?
死信队列是存储“无法正常消费”消息的专用队列,触发死信的条件:
- 消息被消费者拒绝(
basicNack/basicReject)且requeue=false; - 消息过期(设置了TTL);
- 队列达到最大长度。
死信队列配置与使用示例(RabbitMQ)
package com.jam.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 死信队列配置
* @author ken
* @date 2026-02-09
*/
@Configuration
public class DeadLetterQueueConfig {
// 普通业务队列
public static final String BUSINESS_QUEUE = "business_queue";
// 死信交换机
public static final String DLX_EXCHANGE = "dlx_exchange";
// 死信队列
public static final String DLX_QUEUE = "dlx_queue";
// 死信路由键
public static final String DLX_ROUTING_KEY = "dlx.key";
/**
* 声明死信交换机
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE, true, false);
}
/**
* 声明死信队列
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(DLX_QUEUE).build();
}
/**
* 绑定死信队列和死信交换机
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
}
/**
* 声明普通业务队列(绑定死信交换机)
*/
@Bean
public Queue businessQueue() {
return QueueBuilder.durable(BUSINESS_QUEUE)
// 绑定死信交换机
.deadLetterExchange(DLX_EXCHANGE)
// 绑定死信路由键
.deadLetterRoutingKey(DLX_ROUTING_KEY)
// 消息过期时间(10秒)
.ttl(10000)
.build();
}
}
面试题9:如何基于MQ实现分布式事务的最终一致性?
分布式事务的核心痛点是“跨库/跨服务操作无法原子提交”,MQ实现最终一致性的主流方案是RocketMQ事务消息(阿里开源,专门解决分布式事务问题)。
RocketMQ事务消息核心原理
RocketMQ事务消息代码示例
第一步:添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
第二步:生产者代码
package com.jam.demo.mq.rocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* RocketMQ事务消息生产者
* @author ken
* @date 2026-02-09
*/
@Component
@Slf4j
public class RocketMqTransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息
* @param orderId 订单ID
* @param amount 订单金额
*/
public void sendTransactionMsg(String orderId, Long amount) {
StringUtils.hasText(orderId, "订单ID不能为空");
if (ObjectUtils.isEmpty(amount) || amount <= 0) {
throw new IllegalArgumentException("订单金额必须大于0");
}
// 构建消息体
OrderMsgDTO msgDTO = new OrderMsgDTO();
msgDTO.setOrderId(orderId);
msgDTO.setAmount(amount);
// 构建消息(添加事务ID)
String transactionId = UUID.randomUUID().toString();
Message<OrderMsgDTO> message = MessageBuilder.withPayload(msgDTO)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build();
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction(
"order_transaction_group", // 生产者组
"order_topic", // 主题
message,
null // 附加参数
);
}
}
第三步:事务监听器
package com.jam.demo.mq.rocket;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.jam.demo.entity.Order;
import com.jam.demo.mapper.OrderMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* RocketMQ事务监听器
* @author ken
* @date 2026-02-09
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "order_transaction_group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderMapper orderMapper;
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
OrderMsgDTO msgDTO = (OrderMsgDTO) msg.getPayload();
String orderId = msgDTO.getOrderId();
try {
// 1. 执行本地事务(如创建订单)
createOrder(msgDTO);
// 2. 返回提交状态
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事务执行失败,订单ID:{}", orderId, e);
// 3. 返回回滚状态
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 事务回查(MQ未收到Commit/Rollback时触发)
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
OrderMsgDTO msgDTO = (OrderMsgDTO) msg.getPayload();
String orderId = msgDTO.getOrderId();
// 查询订单状态
Order order = orderMapper.selectById(orderId);
if (ObjectUtils.isEmpty(order)) {
return RocketMQLocalTransactionState.ROLLBACK;
}
// 订单已创建,提交事务
if (order.getStatus() == 1) {
return RocketMQLocalTransactionState.COMMIT;
}
// 未知状态,继续回查
return RocketMQLocalTransactionState.UNKNOWN;
}
/**
* 创建订单(本地事务)
*/
private void createOrder(OrderMsgDTO msgDTO) {
Order order = new Order();
order.setOrderId(msgDTO.getOrderId());
order.setAmount(msgDTO.getAmount());
order.setStatus(1); // 1-已创建
orderMapper.insert(order);
}
}
四、生产问题排查:MQ面试的“实战题”
面试题10:MQ消息堆积如何排查和解决?
1. 堆积原因排查步骤
- 查看消费者状态:是否宕机、是否有异常日志(如数据库连接失败、接口超时);
- 查看消费速度:对比生产速度和消费速度,确认是否消费能力不足;
- 查看MQ指标:队列/Partition的消息数量、消费偏移量(lag值);
- 查看系统资源:消费者机器CPU/内存/磁盘是否满、MQ服务器资源是否瓶颈。
2. 解决方案
- 紧急扩容:增加消费者实例数(RabbitMQ)/ 增加消费者组内消费者数(Kafka,注意Partition数≥消费者数);
- 优化消费逻辑:减少消费耗时(如异步处理、批量处理、优化SQL);
- 临时分流:将堆积消息迁移到临时队列,分批次消费;
- 长期优化:调整MQ参数(如Kafka分区数、RabbitMQ预取数)、优化业务逻辑。
面试题11:MQ的性能优化手段有哪些?
1. 生产者优化
- 批量发送:RabbitMQ使用
batchSend、Kafka设置batch.size和linger.ms; - 异步发送:避免同步发送阻塞主线程;
- 压缩消息:Kafka开启
compression.type=lz4,减少网络传输量。
2. MQ服务端优化
- RabbitMQ:增加Channel数、调整预取数(
prefetch_count)、开启流控; - Kafka:合理设置Partition数(建议=CPU核心数)、使用SSD磁盘、调整日志刷盘策略;
- RocketMQ:调整CommitLog大小、开启内存映射文件(MMAP)。
3. 消费者优化
- 批量消费:RabbitMQ手动ACK批量确认、Kafka设置
fetch.min.bytes; - 并发消费:RabbitMQ多消费者、Kafka增加Partition数;
- 非阻塞消费:将耗时操作异步处理,快速ACK。
五、完整实战案例:SpringBoot整合RocketMQ实现订单异步处理
1. 项目结构
com.jam.demo
├── config/ // 配置类
├── controller/ // 接口层
├── entity/ // 实体类
├── mapper/ // Mapper层
├── mq/ // MQ相关
├── service/ // 业务层
└── MqDemoApplication.java // 启动类
2. 核心代码
启动类
package com.jam.demo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import springfox.documentation.oas.annotations.EnableOpenApi;
/**
* 应用启动类
* @author ken
* @date 2026-02-09
*/
@SpringBootApplication
@EnableOpenApi // 开启Swagger3
@MapperScan("com.jam.demo.mapper")
public class MqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(MqDemoApplication.class, args);
}
}
订单控制器(带Swagger3)
package com.jam.demo.controller;
import com.jam.demo.mq.rocket.RocketMqTransactionProducer;
import com.jam.demo.vo.CommonResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.util.StringUtils;
/**
* 订单控制器
* @author ken
* @date 2026-02-09
*/
@RestController
@RequestMapping("/order")
@Slf4j
@Tag(name = "订单接口", description = "订单创建、支付等接口")
public class OrderController {
@Autowired
private RocketMqTransactionProducer transactionProducer;
/**
* 创建订单(发送事务消息)
*/
@PostMapping("/create")
@Operation(summary = "创建订单", description = "创建订单并发送事务消息保证最终一致性")
public CommonResult<String> createOrder(
@Parameter(description = "订单ID", required = true) @RequestParam String orderId,
@Parameter(description = "订单金额", required = true) @RequestParam Long amount) {
try {
transactionProducer.sendTransactionMsg(orderId, amount);
return CommonResult.success("订单创建请求已提交,订单ID:" + orderId);
} catch (Exception e) {
log.error("创建订单失败", e);
return CommonResult.fail("创建订单失败:" + e.getMessage());
}
}
}
通用返回类
package com.jam.demo.vo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* 通用返回结果
* @author ken
* @date 2026-02-09
*/
@Data
@Schema(description = "通用返回结果")
public class CommonResult<T> {
@Schema(description = "返回码,200-成功,500-失败")
private int code;
@Schema(description = "返回消息")
private String msg;
@Schema(description = "返回数据")
private T data;
public static <T> CommonResult<T> success(T data) {
CommonResult<T> result = new CommonResult<>();
result.setCode(200);
result.setMsg("成功");
result.setData(data);
return result;
}
public static <T> CommonResult<T> fail(String msg) {
CommonResult<T> result = new CommonResult<>();
result.setCode(500);
result.setMsg(msg);
result.setData(null);
return result;
}
}
总结
关键点回顾
- MQ核心价值:解耦、异步、削峰填谷,是分布式系统的核心中间件;
- 消息不丢失:生产者确认+MQ持久化+消费者手动ACK,三阶段兜底;
- 幂等性设计:唯一消息ID+数据库唯一索引/Redis原子操作,解决重复消费问题;
- 最终一致性:RocketMQ事务消息是分布式事务的最优方案之一,通过半消息+本地事务+回查机制保证;
- 生产问题排查:消息堆积需从消费者状态、消费速度、系统资源三个维度排查,紧急扩容+长期优化结合解决。