死信队列可以实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会被丢弃。
概念:
消息会变成死信消息的场景:
- 消息被
(basic.reject() or basic.nack()) and requeue = false
,即消息被消费者拒绝签收,并且重新入队为false。
1.1 有一种场景需要注意下:消费者设置了自动ACK,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了nack
/reject
。 - 消息过期,过了TTL存活时间。
- 队列设置了
x-max-length
最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。
- 消息被
代码编写流程是:
- 有一个(n个)正常业务的Exchange,比如为
user-exchange
。 - 有一个(n个)正常业务的Queue,比如为
user-queue
。(因为该队列需要绑定死信交换机,所以需要加俩参数:死信交换机:x-dead-letter-exchange
,死信消息路由键:x-dead-letter-routing-key
) - 进行正常业务的交换机和队列绑定。
- 定义一个死信交换机,比如为
common-dead-letter-exchange
。 - 将正常业务的队列绑定到死信交换机(队列设置了
x-dead-letter-exchange
即会自动绑定)。 - 定义死信队列
user-dead-letter-queue
用于接收死信消息,绑定死信交换机。
- 有一个(n个)正常业务的Exchange,比如为
业务流程是:
- 正常业务消息被投递到正常业务的Exchange,该Exchange根据路由键将消息路由到绑定的正常队列。
- 正常业务队列中的消息变成了死信消息之后,会被自动投递到该队列绑定的死信交换机上(并带上配置的路由键,如果没有指定死信消息的路由键,则默认继承该消息在正常业务时设定的路由键)。
- 死信交换机收到消息后,将消息根据路由规则路由到指定的死信队列。
- 消息到达死信队列后,可监听该死信队列,处理死信消息。
死信交换机
、死信队列
也是普通的交换机和队列,只不过是我们人为的将某个交换机和队列来处理死信消息。- 流程图
代码实现
- 配置
spring:
application:
name: learn-rabbitmq
rabbitmq:
host: localhost
port: 5672
username: futao
password: 123456789
virtual-host: deadletter-vh
connection-timeout: 15000
# 发送确认
publisher-confirms: true
# 路由失败回调
publisher-returns: true
template:
# 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
mandatory: true
listener:
simple:
# 每次从RabbitMQ获取的消息数量
prefetch: 1
default-requeue-rejected: false
# 每个队列启动的消费者数量
concurrency: 1
# 每个队列最大的消费者数量
max-concurrency: 1
# 签收模式为手动签收-那么需要在代码中手动ACK
acknowledge-mode: manual
app:
rabbitmq:
# 队列定义
queue:
# 正常业务队列
user: user-queue
# 死信队列
user-dead-letter: user-dead-letter-queue
# 交换机定义
exchange:
# 正常业务交换机
user: user-exchange
# 死信交换机
common-dead-letter: common-dead-letter-exchange
- 队列、交换机定义与绑定。
/**
* 队列与交换机定义与绑定
*
* @author futao
* @date 2020/4/7.
*/
@Configuration
public class Declare {
/**
* 用户队列
*
* @param userQueueName 用户队列名
* @return
*/
@Bean
public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
return QueueBuilder
.durable(userQueueName)
//声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
.withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
//声明该队列死信消息在交换机的 路由键
.withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
.build();
}
/**
* 用户交换机
*
* @param userExchangeName 用户交换机名
* @return
*/
@Bean
public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
return ExchangeBuilder
.topicExchange(userExchangeName)
.durable(true)
.build();
}
/**
* 用户队列与交换机绑定
*
* @param userQueue 用户队列名
* @param userExchange 用户交换机名
* @return
*/
@Bean
public Binding userBinding(Queue userQueue, Exchange userExchange) {
return BindingBuilder
.bind(userQueue)
.to(userExchange)
.with("user.*")
.noargs();
}
/**
* 死信交换机
*
* @param commonDeadLetterExchange 通用死信交换机名
* @return
*/
@Bean
public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
return ExchangeBuilder
.topicExchange(commonDeadLetterExchange)
.durable(true)
.build();
}
/**
* 用户队列的死信消息 路由的队列
* 用户队列user-queue的死信投递到死信交换机`common-dead-letter-exchange`后再投递到该队列
* 用这个队列来接收user-queue的死信消息
*
* @return
*/
@Bean
public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) {
return QueueBuilder
.durable(userDeadLetterQueue)
.build();
}
/**
* 死信队列绑定死信交换机
*
* @param userDeadLetterQueue user-queue对应的死信队列
* @param commonDeadLetterExchange 通用死信交换机
* @return
*/
@Bean
public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) {
return BindingBuilder
.bind(userDeadLetterQueue)
.to(commonDeadLetterExchange)
.with("user-dead-letter-routing-key")
.noargs();
}
}
- 定义好之后启动程序,springboot会读取Spring容器中类型为Queue和Exchange的bean进行队列和交换机的初始化与绑定。当然也可以自己在RabbitMQ的管理后台进行手动创建与绑定。
- 查看管理后台
测试
- 消息生产者
/**
* @author futao
* @date 2020/4/7.
*/
@Component
public class DeadLetterSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${app.rabbitmq.exchange.user}")
private String userExchange;
public void send() {
User user = User.builder()
.userName("天文")
.address("浙江杭州")
.birthday(LocalDate.now(ZoneOffset.ofHours(8)))
.build();
rabbitTemplate.convertAndSend(userExchange, "user.abc", user);
}
}
1. 场景1.1
消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝或者nack,并且重新入队为false。
nack()与reject()的区别是:reject()不支持批量拒绝,而nack()可以.
- 消费者代码
/**
* @author futao
* @date 2020/4/9.
*/
@Slf4j
@Component
public class Consumer {
/**
* 正常用户队列消息监听消费者
*
* @param user
* @param message
* @param channel
*/
@RabbitListener(queues = "${app.rabbitmq.queue.user}")
public void userConsumer(User user, Message message, Channel channel) {
log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user));
try {
//参数为:消息的DeliveryTag,是否批量拒绝,是否重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.info("拒绝签收...消息的路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey());
} catch (IOException e) {
log.error("消息拒绝签收失败", e);
}
}
/**
* @param user
* @param message
* @param channel
*/
@RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
public void userDeadLetterConsumer(User user, Message message, Channel channel) {
log.info("接收到死信消息:[{}]", JSON.toJSONString(user));
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("死信队列签收消息....消息路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey());
} catch (IOException e) {
log.error("死信队列消息签收失败", e);
}
}
}
- 可以看到,正常消息被NACK之后最后到了死信队列,且路由键发生了变化。
1. 场景1.2
消费者设置了自动签收,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了
nack
/reject
。
- application.yml中需要更改一些配置
spring:
application:
name: learn-rabbitmq
rabbitmq:
listener:
simple:
# 每次从RabbitMQ获取的消息数量
prefetch: 1
default-requeue-rejected: false
# 每个队列启动的消费者数量
concurrency: 1
# 每个队列最大的消费者数量
max-concurrency: 1
# 自动签收
acknowledge-mode: auto
retry:
enabled: true
# 第一次尝试时间间隔
initial-interval: 10S
# 两次尝试之间的最长持续时间。
max-interval: 10S
# 最大重试次数(=第一次正常投递1+重试次数4)
max-attempts: 5
# 上一次重试时间的乘数
multiplier: 1.0
- 消费者代码
/**
* @author futao
* @date 2020/4/9.
*/
@Slf4j
@Configuration
public class AutoAckConsumer {
/**
* 正常用户队列消息监听消费者
*
* @param user
*/
@RabbitListener(queues = "${app.rabbitmq.queue.user}")
public void userConsumer(User user) {
log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user));
throw new RuntimeException("模拟发生异常");
}
/**
* @param user
*/
@RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
public void userDeadLetterConsumer(User user) {
log.info("接收到死信消息并自动签收:[{}]", JSON.toJSONString(user));
}
}
- 测试结果:
- 从测试结果可以看出,消息如果未被正常消费,则进行重试,如果最终还未被正常消费,则会被投递到死信队列。
initial-interval
,max-interval
这两个参数啥作用不知道,现在测试的结果是一直都会取最短的那个时间作为下次投递时间...
2. 测试场景 2
消息过期,过了TTL存活时间。
- 需要修改队列定义,设置队列消息的过期时间
x-message-ttl
.
/**
* 用户队列
*
* @param userQueueName 用户队列名
* @return
*/
@Bean
public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
return QueueBuilder
.durable(userQueueName)
//声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
.withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
//声明该队列死信消息在交换机的 路由键
.withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
//该队列的消息的过期时间-超过这个时间还未被消费则路由到死信队列
.withArgument("x-message-ttl", 5000)
.build();
}
- 把
user-queue
的消费者注释,使消息无法被消费,直到消息在队列中的时间达到设定的存活时间。
- 根据日志可以看到,消息在5S后会被投递到死信队列。
- 注意:可以给队列设置消息过期时间,那么所有投递到这个队列的消息都自动具有这个属性。还可以在消息投递之前,给每条消息设定指定的过期时间。(当两者都设置了,则默认取较短的值)
下面测试给每条消息设置指定的过期时间:
- 修改消息生产者:
/**
* @author futao
* @date 2020/4/7.
*/
@Slf4j
@Component
public class DeadLetterSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${app.rabbitmq.exchange.user}")
private String userExchange;
public void send(String exp) {
User user = User.builder()
.userName("天文")
.address("浙江杭州")
.birthday(LocalDate.now(ZoneOffset.ofHours(8)))
.build();
log.info("消息投递...指定的存活时长为:[{}]ms", exp);
rabbitTemplate.convertAndSend(userExchange, "user.abc", user, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
//为每条消息设定过期时间
messageProperties.setExpiration(exp);
return message;
}
});
}
}
- 从测试结果可以看出,每条消息都在指定的时间投递到了死信队列。
【坑】重点注意!!!:RabbitMQ对于消息过期的检测:只会检测最近将要被消费的那条消息是否到达了过期时间,不会检测非末端消息是否过期。造成的问题是:非末端消息已经过期了,但是因为末端消息还未过期,非末端消息处于阻塞状态,所以非末端消息不会被检测到已经过期。使业务产生与预期严重不一致的结果。
- 对上面的问题进行测试:(第一条消息的过期时间设置成10S,第二条消息设置成5S)
- 从测试结果可以看出,id为1的消息存活时长为10S,id为2的消息存活时间为5S。但是只有当第一条消息(id=1)过期之后,id=2的消息到达队列末端,才会被检测到已经过期。
3. 测试场景3
队列设置了
x-max-length
最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。
- 修改队列定义
/**
* 用户队列
*
* @param userQueueName 用户队列名
* @return
*/
@Bean
public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
return QueueBuilder
.durable(userQueueName)
//声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
.withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
//声明该队列死信消息在交换机的 路由键
.withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
//队列最大消息数量
.withArgument("x-max-length", 2)
.build();
}
- 向队列中投递消息
- 从结果可以看出,当投递第3条消息的时候,RabbitMQ会把在最靠经被消费那一端的消息移出队列,并投递到死信队列。
队列中将始终保持最多两个消息。
其他:
- Queue的可配置项可在RabbitMQ的管理后台查看:
相关:
[SpringBoot RabbitMQ实现消息可靠投递
](https://www.jianshu.com/p/432167bbe95f)
TODO:
- 消费端限流保护
- 延迟队列