RabbitMQ的死信队列

简介: 如果有有错误消息,如果手动nack同时将消息放回到队列中,那么这条消息会反复消费,留在队列中 。如果nack后将消息丢弃,那么如果碰到网络抖动,消息也会丢失 。所以 通过建立死信队列避免消息丢失。


image.png

1.业务背景

如果有有错误消息,如果手动nack同时将消息放回到队列中,那么这条消息会反复消费,留在队列中 。

如果nack后将消息丢弃,那么如果碰到网络抖动,消息也会丢失 。所以 通过建立死信队列避免消息丢失。

2.实现

文件目录如下:

image.png

1.原理

我们额外建立一条队列。当消息进入进入业务队列后,如果收到nack那么就将这条消息放入这条条队列中 。

2.修改pom文件

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.修改配置文件

server:
  port: 8088
spring:
  rabbitmq:
    host: 192.168.*.*
    port: 5672
    username: root
    password: root
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual  #手动应答
        prefetch: 1 # 每次只处理一个信息
    publisher-confirms: true #开启消息确认机制
    publisher-returns: true #支持消息发送失败返回队列

4.rabbitmq的配置

@Configuration
public class RabbitMqConfig {
    /**
     * 连接工厂
     */
    @Autowired
    private ConnectionFactory connectionFactory;
    /**
     * 定制化amqp模版
     *
     * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack
     * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);
        // 发送消息确认, yml需要配置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
        // 消息返回, yml需要配置 publisher-returns: true
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId().toString();
            logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange,
                routingKey);
        });
        return rabbitTemplate;
    }
    /**
     * 确认发送消息是否成功(调用util方法)
     *
     * @return
     */
    @Bean
    public MsgSendConfirmCallBack msgSendConfirmCallBack() {
        return new MsgSendConfirmCallBack();
    }
}

5.util类

发送是否成功的回调方法。

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    /**
     * 回调方法
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("MsgSendConfirmCallBack  , 回调id:" + correlationData);
        if (ack) {
            System.out.println("消息发送成功");
        } else {
            //可以将消息写入本地,使用定时任务重新发送
            System.out.println("消息发送失败:" + cause + "\n重新发送");
        }
    }
}

这里有一个点,如果想做实现消息失败重新发送,在注释处可以实现。需要将消息写入本地,如果失败从本地读取,然后发送,如果成功删除本地信息。

6.业务队列(如:订单业务)

这里声明了一个业务队列 ,关键点在于x-dead-letter-exchange,x-dead-letter-routing-key 两个参数。

@Configuration
public class BusinessConfig {
    /**
     * 业务1模块direct交换机的名字
     */
    public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
    /**
     * 业务1 demo业务的队列名称
     */
    public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
    /**
     * 业务1 demo业务的routekey
     */
    public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
    @Bean
    public Queue yewu1DemoDeadQueue() {
        // 将普通队列绑定到死信队列交换机上
        Map<String, Object> args = new HashMap<>(2);
        args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);
        args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);
        return new Queue("yewu1_demo_dead_queue", true, false, false, args);
    }
    /**
     * 将消息队列和交换机进行绑定
     */
    @Bean
    public Binding binding_one() {
        return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())
            .with("yewu1_demo_dead_key");
    }
}

这里有一个点如果想持久化消息到磁盘,需要新建队列时,new Queue将第二个参数输入为true,但是面对大并发时效率会变低 。

7.死信队列

这里声明死信队列与绑定关系。

@Configuration
public class DeadConfig {
    /**
     * 死信队列
     */
    public final static String FAIL_QUEUE_NAME = "fail_queue";
    /**
     * 死信交换机
     */
    public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
    /**
     * 死信routing
     */
    public final static String FAIL_ROUTING_KEY = "fail_routing";
    /**
     * 创建配置死信队列
     *
     */
    @Bean
    public Queue deadQueue() {
        return new Queue(FAIL_QUEUE_NAME, true, false, false);
    }
    /**
     * 死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange deadExchange() {
        DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
        return directExchange;
    }
    /**
     * 绑定关系
     * 
     * @return
     */
    @Bean
    public Binding failBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
    }
}

8.生产者消费者

生产者与消费者的代码实现。

public enum RabbitEnum {
    /**
     * 处理成功
     */
    ACCEPT,
    /**
     * 可以重试的错误
     */
    RETRY,
    /**
     * 无需重试的错误
     */
    REJECT
@RequestMapping("/sendDirectDead")
        String sendDirectDead(@RequestBody String message) throws Exception {
        System.out.println("开始生产");
        CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",
                message, data);
        System.out.println("结束生产");
        System.out.println("发送id:" + data);
        return "OK,sendDirect:" + message;
    }
    @RabbitListener(queues = "yewu1_demo_dead_queue")
    protected void consumerDead(Message message, Channel channel) throws Exception {
        RabbitEnum ackSign = RabbitEnum.RETRY;
        try {
            int i = 10 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            ackSign = RabbitEnum.RETRY;
            throw e;
        } finally {
            // 通过finally块来保证Ack/Nack会且只会执行一次
            if (ackSign == RabbitEnum.ACCEPT) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else if (ackSign == RabbitEnum.RETRY) {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }

9.实验

当发送yewu1_demo_dead_queue队列时,如果抛出异常,会放入死信队列中。


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9月前
|
消息中间件 存储 开发者
死信是什么,如何运用RabbitMQ的死信机制?
死信是什么,如何运用RabbitMQ的死信机制?
183 0
|
消息中间件
RabbitMQ的死信队列和延时队列
RabbitMQ的死信队列和延时队列
|
21天前
|
消息中间件 NoSQL Java
RabbitMQ的延时队列
RabbitMQ的延时队列
17 0
|
5月前
|
消息中间件
消息中间件系列教程(17) -RabbitMQ-死信队列
消息中间件系列教程(17) -RabbitMQ-死信队列
123 0
|
2月前
|
消息中间件 监控 数据挖掘
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
兔子的后院奇遇:深入了解RabbitMQ中的死信队列【RabbitMQ 四】
48 0
|
7月前
|
Java 消息中间件 Spring
浅析RabbitMQ死信队列
浅析RabbitMQ死信队列
137 0
|
3月前
|
消息中间件
RabbitMQ之死信队列
【1月更文挑战第10天】先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。 应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
176 1
|
9月前
|
消息中间件 Java 中间件
rabbitmq简单队列
rabbitmq简单队列
|
6月前
|
消息中间件 存储 Java
RabbitMQ之死信队列解读
RabbitMQ之死信队列解读
|
9月前
|
消息中间件
RabbitMQ 的死信队列、延迟队列
RabbitMQ 的死信队列、延迟队列
70 0