RabbitMQ的延时重试队列

简介: RabbitMQ的延时重试队列


image.png

1.背景

通过上文学习知道了死信队列,如果只是网络抖动,出现异常那么直接进入死信队列,那么是不合理的。这就可以使用延时重试队列,本文将介绍如何实现延时重试队列。

2.原理                image.png



图是俺在网上找的,请原作者谅解。

  1. 发送到业务队里 如果正常收到 正常运行
  2. 如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入业务队列
  3. 如果重试次数大于3 那么进入死信队列

3.代码实现

1.业务队列

这里声明业务队列与绑定关系。

@Configuration
public class BusinessConfig {
    /**
     * yewu1模块direct交换机的名字
     */
    public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
    /**
     * demo业务的队列名称
     */
    public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
    /**
     * demo业务的routekey
     */
    public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
    /**
     * 业务交换机交换机(一个项目一个业务交换机即可)
     * 1.定义direct exchange,绑定queueTest
     * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
     * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
     * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
     * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中
     */
    @Bean
    public DirectExchange yewu1Exchange() {
        DirectExchange directExchange = new DirectExchange(YEWU1_EXCHANGE, true, false);
        return directExchange;
    }
    /**
     * 新建队列(一个业务需要一个队列一个routekey 命名格式 项目名-业务名)
     * 1.队列名称
     * 2.durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
     * 3.exclusive 表示该消息队列是否只在当前connection生效,默认是false
     * 4.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
     * 5.对nack或者发送超时的 发送给死信队列 args是绑定死信队列
     *
     */
    @Bean
    public Queue yewu1DemoQueue() {
        return new Queue(YEWU1_DEMO_QUEUE, true, false, false);
    }
    /**
     * 交换机与routekey绑定
     * 
     * @return
     */
    @Bean
    public Binding yewu1DemoBinding() {
        return BindingBuilder.bind(yewu1DemoQueue()).to(yewu1Exchange())
            .with(YEWU1_DEMO_ROUTINGKEY);
    }
}

2.延时队列

声明延时队列与绑定关系。

 
         

3.死信队列

声明私信队列与绑定关系。

@Configuration
public class DeadConfig {
    /**
     * 死信队列
     */
    public final static String FAIL_QUEUE_NAME = "fail_queue";
    /**
     * 死信交换机
     */
    public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
    /**
     * 死信routing
     */
    public final static String FAIL_ROUTING_KEY = "fail_routing";
    /**
     * 创建配置死信队列
     *
     */
    @Bean
    public Queue deadQueue() {
        return new Queue(FAIL_QUEUE_NAME, true, false, false);
    }
    /**
     * 死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange deadExchange() {
        DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
        return directExchange;
    }
    /**
     * 绑定关系
     * 
     * @return
     */
    @Bean
    public Binding failBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
    }
}

4.生产者

生产者如上文,通用代码。

@RestController
@RequestMapping("/TestRabbit")
public class ProducerDemo {
    @Resource
    private RabbitTemplate rabbitTemplate;
    //@RequestMapping("/sendDirect")
    String sendDirect(@RequestBody String message) throws Exception {
        System.out.println("开始生产");
        CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, BusinessConfig.YEWU1_DEMO_ROUTINGKEY,
            message, data);
        System.out.println("结束生产");
        System.out.println("发送id:" + data);
        return "OK,sendDirect:" + message;
    }
}

5.消费者

大量的逻辑,请参考注释。

public enum RabbitEnum {
    /**
     * 处理成功
     */
    ACCEPT,
    /**
     * 可以重试的错误
     */
    RETRY,
    /**
     * 无需重试的错误
     */
    REJECT
@Component
public class ConsumerDemo {
    private final static Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
    @Resource
    private RabbitTemplate rabbitTemplate;
    // @RabbitListener(queues = "yewu1_demo_queue")
    protected void consumer(Message message, Channel channel) throws Exception {
        RabbitEnum ackSign = RabbitEnum.RETRY;
        System.out.println(message.getMessageProperties().getCorrelationId());
        try {
            // 可以加入重复消费判断
            int i = 1 / 0;
        } catch (Exception e) {
            ackSign = RabbitEnum.RETRY;
            throw e;
        } finally {
            // 通过finally块来保证Ack/Nack会且只会执行一次
            if (ackSign == RabbitEnum.ACCEPT) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else if (ackSign == RabbitEnum.RETRY) {
                String correlationData =
                    (String)message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
                System.out.println(message.getMessageProperties().getCorrelationId());
                long retryCount = getRetryCount(message.getMessageProperties());
                if (retryCount >= 3) {
                    // 重试次数超过3次,则将消息发送到失败队列等待特定消费者处理或者人工处理
                    try {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                        rabbitTemplate.convertAndSend(DeadConfig.FAIL_EXCHANGE_NAME, DeadConfig.FAIL_ROUTING_KEY,
                            message, new CorrelationData(correlationData));
                        logger.info("连续失败三次,将消息发送到死信队列,发送消息:" + new String(message.getBody()));
                    } catch (Exception e1) {
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                        logger.error("发送死信队列报错:" + e1.getMessage() + ",原始消息:" + new String(message.getBody()));
                    }
                } else {
                    try {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                        // 重试次数不超过3次,则将消息发送到重试队列等待重新被消费
                        rabbitTemplate.convertAndSend(RetryConfig.YEWU1_RETRY_EXCHANGE_NAME,
                            RetryConfig.YEWU1_DEMO_RETRY_ROUTING_KEY, message,
                            new CorrelationData(correlationData));
                        logger.info("消费失败,消息发送到重试队列;" + "原始消息:" + new String(message.getBody()) + ";第"
                            + (retryCount + 1) + "次重试");
                    } catch (Exception e1) {
                        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                        logger.error("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息");
                    }
                }
            }
        }
    }
    /**
     * 获取消息被重试的次数
     */
    public long getRetryCount(MessageProperties messageProperties) {
        Long retryCount = 0L;
        if (null != messageProperties) {
            List<Map<String, ?>> deaths = messageProperties.getXDeathHeader();
            if (deaths != null && deaths.size() > 0) {
                Map<String, Object> death = (Map<String, Object>)deaths.get(0);
                retryCount = (Long)death.get("count");
            }
        }
        return retryCount;
    }
}

参考:https://www.cnblogs.com/mfrank/p/11260355.html


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
ly~
|
3月前
|
消息中间件 存储 监控
如何查看 RocketMQ 消息的重试次数和时间间隔?
RocketMQ消息重试次数和时间间隔可通过查看消费者和Broker日志、使用管理控制台的监控页面和消息查询功能,或通过分析消费者代码和RocketMQ客户端库代码等方式获取。日志中常有消费失败重试的明确记录,控制台可监控消费情况推断重试状态,代码分析则适合技术用户深入了解。
ly~
283 3
|
3月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
231 6
|
4月前
|
消息中间件 JSON Java
|
4月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
5月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
147 2
|
4月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
107 0
|
6月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
6月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
94 0
说说RabbitMQ延迟队列实现原理?
|
6月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
175 1
|
7月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。