其中的每一步都可能导致消息丢失,常见的丢失原因包括:
- 发送时丢失:
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
针对这些问题,RabbitMQ分别给出了解决方案:
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
本篇博客就来带大家解决消息的可靠性。
1、导入Demo工程
导入Demo课程。
把配置文件application.yml配置修改完毕。
然后我们在consumer 的config包的通用配置类创建一个队列。
运行consumer的启动类。
2、生产者消息确认
生产者确认机制:
RabbitMQ提供了publisherconfirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求: 1. publisher-confirm,发送者确认
- 消息成功投递到交换机,返回Ack(acknowledge 告知已收到)。
- 消息未投递到交换机,返回Nack(未收到)。
2. publisher-return,发送者回执 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
注:
确认机制发送消息时,需要给每个消息设置一个全局唯一id以区分不同消息,避免ACK冲突。
2.1 修改配置
首先,修改publisher服务中的application.yml文件,添加下面的内容:
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
配置说明:
publish-confirm-type
:开启publisher-confirm,这里支持两种类型:
simple
:同步等待confirm结果,直到超时(可能会导致消息堵塞,不推荐)。correlated
:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback。(推荐)
publish-returns
:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback。template.mandatory
:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。
2.2 定义Return回调
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置
修改publisher服务,添加一个:
package com.jie.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description:生产者通用配置 * @author: jie * @time: 2022/2/25 15:34 */ @Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { /** * @description:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置 * @author: jie * @time: 2022/2/25 15:35 */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 设置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 投递失败,记录日志 log.info("消息发送到队列失败,响应码{},失败原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有业务需要,可以重发消息 }); } }
2.3 发送消息
我们发送消息通过一个单元测试来发。
这里面有一个最简单的消息发送代码。
我们要去为amq.topic这个交换机绑定一下simple.queue这个队列。这里我用的是手动的方式,大家可以选择使用代码的方式。
打开浏览器
点击进去
绑定完成,回到代码区,我现在发送消息,符合要求,那一定能发送成功,所以我们要修改一下代码。
@Test public void testSendMessage2SimpleQueue() throws InterruptedException { String routingKey = "simple.test"; //1、准备消息 String message = "hello, spring amqp!"; //2、准备CorrelationData //2.1 消息ID CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //2.2 准备ConfirmCallback //成功回调 correlationData.getFuture().addCallback(confirm -> { //判断结果 if(confirm.isAck()){ //ACK 消息成功 log.error("消息成功投递到交换机!消息ID:{}",correlationData.getId()); }else { //NACK 消息失败 log.error("消息投递到交换机失败!消息ID:{}",correlationData.getId()); //重发消息 } //失败回调 }, throwable -> { //记录日志 log.error("消息发送失败!",throwable); }); //3、发送消息 rabbitTemplate.convertAndSend("amq.topic", routingKey, message,correlationData); }
我们可以先运行代码查看控制消息。
这个是成功的情况,接下来演示一下失败的情况。比如消息根本没有到达交换机,可能是交换机名称填错了。
还有一种就是交换机到达了,没有到达队列。比如队列的名称填错了。