3. 消息可靠性
消息丢失原因?
- 发送时丢失:
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
如何确保RabbitMQ消息的可靠性?
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
3.1. 生产者确认机制
三种方式:
- 发送成功,返回ack publish-confirm
- 调用ConfirmCallback,进行打印“消息发送成功, ID:{}”
- 发送到交换机,但没有到达队列,返回ack publish-return
- 调用ConfirmCallback,进行打印“消息发送成功, ID:{}”
- 调用ReturnCallback,进行日志打印“消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}”,还可进行消息重发。
- 没有发送成功,返回 nack publish-confirm
- 调用ConfirmCallback,进行打印“消息发送失败, ID:{}, 原因{}”
ConfirmCallback和ReturnCallback区别?
ConfirmCallback(发布确认回调):处理返回确认消息
ReturnCallback(发布返回回调):处理返回消息
回调函数理解,由于发送消息对于返回消息是异步回调
实现方式:修改配置
publish-confirm-type:开启publisher-confirm,这里支持两种类型:
- simple:同步等待confirm结果,直到超时
- correlated:异步回调,定义ConfirmCallback
publish-returns:回调机制,定义ReturnCallback
template.mandatory:定义消息路由失败时的策略。
- true,则调用ReturnCallback
- false:则直接丢弃消息
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
public void testSendMessage2SimpleQueue() throws InterruptedException { // 1.消息体 String message = "hello!"; // 2.全局唯一的消息ID,需要封装到CorrelationData中 CorrelationData cd = new CorrelationData(UUID.randomUUID().toString()); // 3.添加callback cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { public void onSuccess(CorrelationData.Confirm result) { if (result.isAck()) { log.debug("消息投递到交换机成功, ID:{}", cd.getId()); } else { log.error("消息投递到交换机失败,消息ID:{},原因:{}", cd.getId(), cd.getReturnedMessage()); } } public void onFailure(Throwable ex) { log.error("消息发送异常,ID:{},原因:{}", cd.getId(), ex.getMessage()); rabbitTemplate.convertAndSend("amq.topic", routingKey, message, cd); } }); // cd.getFuture().addCallback(result -> { // if (result.isAck()) { // log.debug("消息投递到交换机成功, ID:{}", cd.getId()); // } else { // log.error("消息投递到交换机失败,消息ID:{},原因:{}", cd.getId(), cd.getReturnedMessage()); // } // }, ex -> log.error("消息发送异常,ID:{},原因:{}", cd.getId(), ex.getMessage())); // 4.发送消息 rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData); // 休眠一会儿,等待ack回执 Thread.sleep(2000); }
package cn.itcast.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; public class CommonConfig implements ApplicationContextAware { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 设置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 投递失败,记录日志 log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有业务需要,可以重发消息 }); } }
3.2. 消息持久化
交换机持久化,队列持久化,消息持久化
默认情况下三者都是持久化的,记住关键字durable,在MQ上看到属性feature带D
3.3. 消费者确认机制
manual
手动ack,需要在业务代码结束后,调用api发送ack。
none
关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
消费者接收消息后,消费者异常,消息依然被RabbitMQ删除了。
auto
自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
消费者接收消息后,消费者异常,消息会重入队,在重新发送给消费者,进行无限循环。
本地重试
- 消息处理过程中抛出异常,不会重入队,而是在消费者本地重试
- 重试达到最大次数后,执行失败策略
失败策略
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初识的失败等待时长为1秒 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval multiplier: 1 max-attempts: 3 # 最大重试次数 # true无状态;false有状态。如果业务中包含事务,这里改为false stateless: true
package cn.itcast.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; public class ErrorMessageConfig { public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } public Queue errorQueue(){ return new Queue("error.queue", true); } public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }