五分钟带你玩转rabbitmq(五)死信队列

简介: 五分钟带你玩转rabbitmq(五)死信队列


文件目录如下

image.png

业务背景:

如果有有错误消息 如果手动nack同时将消息放回到队列中 那么这条消息会反复消费 留在队列中

如果nack后将消息丢弃 那么如果碰到网络抖动 消息也会丢失 。 所以 建立死信队列避免消息丢失。

原理 :

当消息进入进入业务队列后 如果收到nack那么就将这条消息放入另一条队列中 。

1.pom文件

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.配置文件

server:
  port: 8088
spring:
  rabbitmq:
    host: 192.168.*.*
    port: 5672
    username: root
    password: root
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual  #手动应答
        prefetch: 1 # 每次只处理一个信息
    publisher-confirms: true #开启消息确认机制
    publisher-returns: true #支持消息发送失败返回队列

3.rabbitmq的配置

@Configuration
public class RabbitMqConfig {
    /**
     * 连接工厂
     */
    @Autowired
    private ConnectionFactory connectionFactory;
    /**
     * 定制化amqp模版
     *
     * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack
     * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);
        // 发送消息确认, yml需要配置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
        // 消息返回, yml需要配置 publisher-returns: true
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId().toString();
            logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange,
                routingKey);
        });
        return rabbitTemplate;
    }
    /**
     * 确认发送消息是否成功(调用util方法)
     *
     * @return
     */
    @Bean
    public MsgSendConfirmCallBack msgSendConfirmCallBack() {
        return new MsgSendConfirmCallBack();
    }
}

util发送回调方法

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    /**
     * 回调方法
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("MsgSendConfirmCallBack  , 回调id:" + correlationData);
        if (ack) {
            System.out.println("消息发送成功");
        } else {
            //可以将消息写入本地,使用定时任务重新发送
            System.out.println("消息发送失败:" + cause + "\n重新发送");
        }
    }
}

这里有一个点 如果想做实现消息失败重新发送 在注释处可以实现

需要将消息写入本地 如果失败从本地读取 然后发送 如果成功删除本地信息

4.业务队列(如:订单业务)

这里声明了一个业务队列

关键点在于x-dead-letter-exchange,x-dead-letter-routing-key 两个参数

@Configuration
public class BusinessConfig {
    /**
     * 业务1模块direct交换机的名字
     */
    public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
    /**
     * 业务1 demo业务的队列名称
     */
    public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
    /**
     * 业务1 demo业务的routekey
     */
    public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
    @Bean
    public Queue yewu1DemoDeadQueue() {
        // 将普通队列绑定到死信队列交换机上
        Map<String, Object> args = new HashMap<>(2);
        args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);
        args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);
        return new Queue("yewu1_demo_dead_queue", true, false, false, args);
    }
    /**
     * 将消息队列和交换机进行绑定
     */
    @Bean
    public Binding binding_one() {
        return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())
            .with("yewu1_demo_dead_key");
    }
}

这里有一个点如果想持久化消息到磁盘 需要新建队列时 new Queue将第二个参数输入为true 但是面对大并发时效率会变低

5.死信队列

@Configuration
public class DeadConfig {
    /**
     * 死信队列
     */
    public final static String FAIL_QUEUE_NAME = "fail_queue";
    /**
     * 死信交换机
     */
    public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
    /**
     * 死信routing
     */
    public final static String FAIL_ROUTING_KEY = "fail_routing";
    /**
     * 创建配置死信队列
     *
     */
    @Bean
    public Queue deadQueue() {
        return new Queue(FAIL_QUEUE_NAME, true, false, false);
    }
    /**
     * 死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange deadExchange() {
        DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
        return directExchange;
    }
    /**
     * 绑定关系
     * 
     * @return
     */
    @Bean
    public Binding failBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
    }
}

6.生产者消费者

public enum RabbitEnum {
    /**
     * 处理成功
     */
    ACCEPT,
    /**
     * 可以重试的错误
     */
    RETRY,
    /**
     * 无需重试的错误
     */
    REJECT
@RequestMapping("/sendDirectDead")
        String sendDirectDead(@RequestBody String message) throws Exception {
        System.out.println("开始生产");
        CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",
                message, data);
        System.out.println("结束生产");
        System.out.println("发送id:" + data);
        return "OK,sendDirect:" + message;
    }
    @RabbitListener(queues = "yewu1_demo_dead_queue")
    protected void consumerDead(Message message, Channel channel) throws Exception {
        RabbitEnum ackSign = RabbitEnum.RETRY;
        try {
            int i = 10 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            ackSign = RabbitEnum.RETRY;
            throw e;
        } finally {
            // 通过finally块来保证Ack/Nack会且只会执行一次
            if (ackSign == RabbitEnum.ACCEPT) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else if (ackSign == RabbitEnum.RETRY) {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }

7.实验

当发送yewu1_demo_dead_queue队列时 如果抛出异常 会放入死信队列中。


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
26 1
|
12天前
|
消息中间件 存储 监控
RabbitMQ 死信队列
RabbitMQ的死信队列(DLQ)是存储无法正常消费消息的特殊队列,常见于消息被拒绝、过期或队列满时。DLQ用于异常处理、任务调度和监控,通过绑定到普通队列自动路由死信消息。通过监听死信队列,可以对异常消息进行补偿和进一步处理,提升系统稳定性和可维护性。
12 1
|
12天前
|
消息中间件 存储 负载均衡
消息队列 MQ产品使用合集之如何排查是哪个队列导致的异常TPS增加
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
1月前
|
消息中间件 Java
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
53 1
|
1月前
|
消息中间件
RabbitMQ 死信队列
RabbitMQ 死信队列
31 0
RabbitMQ 死信队列
|
1月前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
81 0
|
1月前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
318 4
|
1月前
|
消息中间件
第十五章 RabbitMQ 延迟队列
第十五章 RabbitMQ 延迟队列
21 0
|
1月前
|
消息中间件 Java API
RabbitMQ入门指南(五):Java声明队列、交换机以及绑定
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了Java声明队列、交换机以及绑定队列和交换机等内容。
94 0
|
10月前
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
158 0