RabbitMQ重试机制
RabbitMQ重试机制(阻塞)
RabbitMQ的消息重试机制,就是消息消费失败后进行重试,重试机制的触发条件是消费者显式的抛出异常,这个很类似@Transactional,如果没有显式地抛出异常或者try catch起来没有手动回滚,事务是不会回滚的。
if("ACK重试机制".equals(messageBody)){ message.getMessageProperties().getHeaders().put("x-death", count+1); throw new RuntimeException("手动出发异常,测试重试机制"); }
还有一种情况就是消息被拒绝后重新加入队列,比如basic.reject和basic.nack,并且requeue = true,但是这个是重新进入到了消息队列然后重新被消费,并且也不会触发我们重试机制的配置(如重试间隔、最大重试次数等等)。重试机制是默认开启的,但是如果没有重试机制相关的配置会导致消息一直无间隔的重试,直到消费成功,所以要使用重试机制一定要有相关配置。
死信队列
死信就是消息在特定场景下的一种表现形式,这些场景包括:
- 消息被拒绝(basic.reject / basic.nack),并且requeue = false
- 消息的 TTL 过期时
- 消息队列达到最大长度
- 达到最大重试限制
消息在这些场景中时,被称为死信。
死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。死信队列也是一个普通队列,也可以被消费者消费,区别在于业务队列需要绑定在死信队列上,才能正常地把死信发送到死信队列上。
业务队列绑定死信队列
@Bean public Queue directQueue() { /** * 绑定死信交换机及路由key */ Map<String, Object> args = new HashMap<>(); // x-dead-letter-exchange:这里声明当前业务队列绑定的死信交换机 //消息被拒绝、消息过期,或者队列达到其最大长度。消息会变成死信 args.put("x-dead-letter-exchange", DEAD_TCP_DATA_DIRECT_EXCHANGE); // x-dead-letter-routing-key:这里声明当前业务队列的死信路由 key args.put("x-dead-letter-routing-key", DEAD_TCP_DATA_DIRECT_ROUTING); return QueueBuilder.durable(DIRECT_QUEUE).withArguments(args).build(); }
自动ACK + RabbitMQ重试机制(阻塞)
appliction.properties
# 消息重试机制: 自动ACK+MQ消息重试 spring.rabbitmq.listener.simple.acknowledge-mode=auto spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.max-attempts=5 spring.rabbitmq.listener.simple.retry.initial-interval=5000
消费者
@RabbitListener(queues = RabbitMqConfig.USER_ADD_QUEUE, concurrency = "3") public void userAddReceiver(String data, Message message, Channel channel) throws Exception { UserVo vo = OBJECT_MAPPER.readValue(data, UserVo.class); boolean success = messageHandle(vo); // 通过业务控制是否消费成功,消费失败则抛出异常触发重试 if (!success) { log.error("消费失败"); throw new Exception("消息消费失败"); } }
一定要开启自动ACK,才会在到达最大重试上限后发送到死信队列,而且在重试过程中会独占当前线程,如果是单线程的消费者会导致其他消息阻塞,直至重试完成,所以可以使用@RabbitListener上的concurrency属性来控制并发数量。
自动ACK后不需要
手动ACK + 手动重试机制(阻塞)
appliction.properties
# 手动ACK spring.rabbitmq.listener.simple.acknowledge-mode=manual
手动ACK配置了重试机制,在抛出异常的时候仍会触发重试,但是达到重试上限之后,会永远处于Unacked状态,不会进入到死信队列,必须要手动拒绝才可以进入死信队列,所以说这里不用配置重试机制而是采用手动重试的方式
消费者
@RabbitHandler @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2,concurrency = "3") public void process3(Message message, Channel channel) throws InterruptedException, IOException { // 重试次数 int retryCount = 0; boolean success = false; // 消费失败并且重试次数<=重试上限次数 while (!success && retryCount < MAX_RETRIES) { retryCount++; // 具体业务逻辑 String messageBody = new String(message.getBody(), "UTF-8"); success = !messageBody.equals("ACK重试机制"); //如果消息体等于ACK重试机制 // 如果失败则重试 if (!success) { String errorTip = "第" + retryCount + "次消费失败" + ((retryCount < 3) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列"); log.error(errorTip); Thread.sleep(RETRY_INTERVAL * 1000); } } if (success) { // 消费成功,确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("消费成功"); } else { // 重试多次之后仍失败,进入死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); log.info("消费失败"); } }
使用spring-retry
pom.xml
<dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置spring-retry
package com.autumn.retry; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; /** * spring-retry的配置类 * 配置了一个线程池任务执行器,用于执行异步方法,但是用在rabbitmq上面还是会阻塞主线程 */ @Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(30); threadPoolTaskExecutor.setMaxPoolSize(100); threadPoolTaskExecutor.setQueueCapacity(10000); threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new SimpleAsyncUncaughtExceptionHandler(); } }
编写 RabbitMQ 的消息消费者,同时在方法上添加 @Retryable 注解来指定重试策略。但是注意这里依然会阻塞,所以尽量使用在死信上面
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.retry.annotation.Backoff; import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Component; @Component public class RabbitMQConsumer { @RabbitListener(queues = "your-queue-name") @Retryable(value = {Exception.class}, maxAttempts = 5, backoff = @Backoff(delay = 5000)) public void handleMessage(Message message, Channel channel) throws Exception{ try { String messageBody = new String(message.getBody(), "UTF-8"); int count = (int) message.getMessageProperties().getHeaders().getOrDefault("x-death", 1); log.info("{} DirectReceiver消费者收到消息({}): {} ",Thread.currentThread(),count , messageBody); // 发送第几次 if (count == 3){ // 发送确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } // 消息处理逻辑 // 如果发生异常,重试策略会在间隔5秒后再次尝试执行,最多重试5次 } }
启用重试机制:在 Spring Boot 的启动类上添加 @EnableRetry 注解以启用 Spring Retry。
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.retry.annotation.EnableRetry; @SpringBootApplication @EnableRetry public class YourApplication { public static void main(String[] args) { SpringApplication.run(YourApplication.class, args); } }
定时任务轮询死信队列(适合高并发)
启用定时任务
@EnableScheduling //启用定时任务 public class SpbootMpApplication { public static void main(String[] args) { SpringApplication.run(SpbootMpApplication.class, args); } }
定时任务遍历死信队列,receive时消息会从死信队列中移除,然后判断headers中的retrycount值为多少,小于3则把retrycount+1发送message到原始队列,大于3则不做处理直接被移除掉
package com.autumn.task; import com.autumn.rabbitmq.DirectExchangeConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; @Component @Slf4j public class DeadLetterQueueProcessorTask { @Autowired private RabbitTemplate rabbitTemplate; /** * 定时读取死信队列所有消息 */ @Scheduled(fixedRate = 100000) public void processDeadLetterQueue() { List<Message> messageList = new ArrayList<Message>(); // 持续从死信队列中接收消息,并处理每一条消息 while (true) { // 从死信队列中接收一条消息,接收成功后会从死信队列里移除 Message message = rabbitTemplate.receive(DirectExchangeConfig.DEAD_TCP_DATA_RETRY_QUEUE); messageList.add(message); // 如果消息为 null,则表示队列中没有更多消息,结束循环 if (message == null) { break; } } //循环获取的消息体 for (int i=0;i<messageList.size();i++){ Message message = messageList.get(i); if (message!=null){ // 处理消息 handleMessage(message); } } } /** * 处理消息 * @param message */ private void handleMessage(Message message) { // int count = (int) message.getMessageProperties().getHeaders().getOrDefault("retrycount", 1); message.getMessageProperties().getHeaders().put("retrycount", count+1); log.info("Processing dead letter message({}): {}" ,count, new String(message.getBody())); if (count <= 3) { //如果小于等于3,则发送回原始队列 // 将消息重新发送到原始队列 rabbitTemplate.convertAndSend(DirectExchangeConfig.RETRY_QUEUE, message); } } }