RabbitMQ重试机制

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: RabbitMQ重试机制

RabbitMQ重试机制

RabbitMQ重试机制(阻塞)

RabbitMQ的消息重试机制,就是消息消费失败后进行重试,重试机制的触发条件是消费者显式的抛出异常,这个很类似@Transactional,如果没有显式地抛出异常或者try catch起来没有手动回滚,事务是不会回滚的。

if("ACK重试机制".equals(messageBody)){
    message.getMessageProperties().getHeaders().put("x-death", count+1);
    throw new RuntimeException("手动出发异常,测试重试机制");
}

还有一种情况就是消息被拒绝后重新加入队列,比如basic.reject和basic.nack,并且requeue = true,但是这个是重新进入到了消息队列然后重新被消费,并且也不会触发我们重试机制的配置(如重试间隔、最大重试次数等等)。重试机制是默认开启的,但是如果没有重试机制相关的配置会导致消息一直无间隔的重试,直到消费成功,所以要使用重试机制一定要有相关配置。

死信队列

死信就是消息在特定场景下的一种表现形式,这些场景包括:

  1. 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  2. 消息的 TTL 过期时
  3. 消息队列达到最大长度
  4. 达到最大重试限制

消息在这些场景中时,被称为死信。

死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。死信队列也是一个普通队列,也可以被消费者消费,区别在于业务队列需要绑定在死信队列上,才能正常地把死信发送到死信队列上。

业务队列绑定死信队列

@Bean
    public Queue directQueue() {
        /**
         * 绑定死信交换机及路由key
         */
        Map<String, Object> args = new HashMap<>();
        // x-dead-letter-exchange:这里声明当前业务队列绑定的死信交换机
        //消息被拒绝、消息过期,或者队列达到其最大长度。消息会变成死信
        args.put("x-dead-letter-exchange", DEAD_TCP_DATA_DIRECT_EXCHANGE);
        // x-dead-letter-routing-key:这里声明当前业务队列的死信路由 key
        args.put("x-dead-letter-routing-key", DEAD_TCP_DATA_DIRECT_ROUTING);
        return QueueBuilder.durable(DIRECT_QUEUE).withArguments(args).build();
    }

自动ACK + RabbitMQ重试机制(阻塞)

appliction.properties

# 消息重试机制: 自动ACK+MQ消息重试
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.initial-interval=5000

消费者

@RabbitListener(queues = RabbitMqConfig.USER_ADD_QUEUE, concurrency = "3")
    public void userAddReceiver(String data, Message message, Channel channel) throws Exception {
        UserVo vo = OBJECT_MAPPER.readValue(data, UserVo.class);
        boolean success = messageHandle(vo);
        // 通过业务控制是否消费成功,消费失败则抛出异常触发重试
        if (!success) {
            log.error("消费失败");
            throw new Exception("消息消费失败");
        }
    }

一定要开启自动ACK,才会在到达最大重试上限后发送到死信队列,而且在重试过程中会独占当前线程,如果是单线程的消费者会导致其他消息阻塞,直至重试完成,所以可以使用@RabbitListener上的concurrency属性来控制并发数量。

自动ACK后不需要

手动ACK + 手动重试机制(阻塞)

appliction.properties

# 手动ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual

手动ACK配置了重试机制,在抛出异常的时候仍会触发重试,但是达到重试上限之后,会永远处于Unacked状态,不会进入到死信队列,必须要手动拒绝才可以进入死信队列,所以说这里不用配置重试机制而是采用手动重试的方式

消费者

@RabbitHandler
    @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2,concurrency = "3")
    public void process3(Message message, Channel channel) throws InterruptedException, IOException {
        // 重试次数
        int retryCount = 0;
        boolean success = false;
        // 消费失败并且重试次数<=重试上限次数
        while (!success && retryCount < MAX_RETRIES) {
            retryCount++;
            // 具体业务逻辑
            String messageBody = new String(message.getBody(), "UTF-8");
            success = !messageBody.equals("ACK重试机制");  //如果消息体等于ACK重试机制
            // 如果失败则重试
            if (!success) {
                String errorTip = "第" + retryCount + "次消费失败" +
                        ((retryCount < 3) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列");
                log.error(errorTip);
                Thread.sleep(RETRY_INTERVAL * 1000);
            }
        }
        if (success) {
            // 消费成功,确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("消费成功");
        } else {
            // 重试多次之后仍失败,进入死信队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.info("消费失败");
        }
    }

使用spring-retry

pom.xml

<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置spring-retry

package com.autumn.retry;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
 * spring-retry的配置类
 * 配置了一个线程池任务执行器,用于执行异步方法,但是用在rabbitmq上面还是会阻塞主线程
 */
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(30);
        threadPoolTaskExecutor.setMaxPoolSize(100);
        threadPoolTaskExecutor.setQueueCapacity(10000);
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

编写 RabbitMQ 的消息消费者,同时在方法上添加 @Retryable 注解来指定重试策略。但是注意这里依然会阻塞,所以尽量使用在死信上面

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQConsumer {
    @RabbitListener(queues = "your-queue-name")
    @Retryable(value = {Exception.class}, maxAttempts = 5, backoff = @Backoff(delay = 5000))
    public void handleMessage(Message message, Channel channel)  throws Exception{
        try {
            String messageBody = new String(message.getBody(), "UTF-8");
            int count = (int) message.getMessageProperties().getHeaders().getOrDefault("x-death", 1);
            log.info("{} DirectReceiver消费者收到消息({}): {} ",Thread.currentThread(),count , messageBody);
            // 发送第几次
            if (count == 3){
                // 发送确认
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
        // 消息处理逻辑
        // 如果发生异常,重试策略会在间隔5秒后再次尝试执行,最多重试5次
    }
}

启用重试机制:在 Spring Boot 的启动类上添加 @EnableRetry 注解以启用 Spring Retry。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;
@SpringBootApplication
@EnableRetry
public class YourApplication {
    public static void main(String[] args) {
        SpringApplication.run(YourApplication.class, args);
    }
}

定时任务轮询死信队列(适合高并发)

启用定时任务

@EnableScheduling  //启用定时任务
public class SpbootMpApplication {
  public static void main(String[] args) {
    SpringApplication.run(SpbootMpApplication.class, args);
  }
}

定时任务遍历死信队列,receive时消息会从死信队列中移除,然后判断headers中的retrycount值为多少,小于3则把retrycount+1发送message到原始队列,大于3则不做处理直接被移除掉

package com.autumn.task;
import com.autumn.rabbitmq.DirectExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
@Slf4j
public class DeadLetterQueueProcessorTask {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 定时读取死信队列所有消息
     */
    @Scheduled(fixedRate = 100000)
    public void processDeadLetterQueue() {
        List<Message> messageList = new ArrayList<Message>();
        // 持续从死信队列中接收消息,并处理每一条消息
        while (true) {
            // 从死信队列中接收一条消息,接收成功后会从死信队列里移除
            Message message = rabbitTemplate.receive(DirectExchangeConfig.DEAD_TCP_DATA_RETRY_QUEUE);
            messageList.add(message);
            // 如果消息为 null,则表示队列中没有更多消息,结束循环
            if (message == null) {
                break;
            }
        }
        //循环获取的消息体
        for (int i=0;i<messageList.size();i++){
            Message message = messageList.get(i);
            if (message!=null){
                // 处理消息
                handleMessage(message);
            }
        }
    }
    /**
     * 处理消息
     * @param message
     */
    private void handleMessage(Message message) {
        //
        int count = (int) message.getMessageProperties().getHeaders().getOrDefault("retrycount", 1);
        message.getMessageProperties().getHeaders().put("retrycount", count+1);
        log.info("Processing dead letter message({}): {}" ,count, new String(message.getBody()));
        if (count <= 3) {  //如果小于等于3,则发送回原始队列
            // 将消息重新发送到原始队列
            rabbitTemplate.convertAndSend(DirectExchangeConfig.RETRY_QUEUE, message);
        }
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 缓存
spring-boot + rabbitmq消息手动确认模式的几点说明(重试机制)
前提:使用rabbitmq的手动确认消息的模式 消息手动确认模式的几点说明 监听的方法内部必须使用channel进行消息确认,包括消费成功或消费失败 如果不手动确认,也不抛出异常,消息不会自动重新推送(包括其他消费者),因为对于rabbitmq来说始终没有接收到消息消费是否成功的确认,并且Chan.
17370 0
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
11天前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
44 15
|
10天前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
37 9
|
6天前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
14 1
|
7天前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。
|
20天前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
24 0
手撸MQ消息队列——循环数组
|
2月前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
56 5
|
2月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
91 1

热门文章

最新文章

下一篇
无影云桌面