1.背景
通过上文学习知道了死信队列,如果只是网络抖动,出现异常那么直接进入死信队列,那么是不合理的。这就可以使用延时重试队列,本文将介绍如何实现延时重试队列。
2.原理
图是俺在网上找的,请原作者谅解。
- 发送到业务队里 如果正常收到 正常运行
- 如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入业务队列
- 如果重试次数大于3 那么进入死信队列
3.代码实现
1.业务队列
这里声明业务队列与绑定关系。
@Configuration public class BusinessConfig { /** * yewu1模块direct交换机的名字 */ public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange"; /** * demo业务的队列名称 */ public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue"; /** * demo业务的routekey */ public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key"; /** * 业务交换机交换机(一个项目一个业务交换机即可) * 1.定义direct exchange,绑定queueTest * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机 * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列 * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。 * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中 */ @Bean public DirectExchange yewu1Exchange() { DirectExchange directExchange = new DirectExchange(YEWU1_EXCHANGE, true, false); return directExchange; } /** * 新建队列(一个业务需要一个队列一个routekey 命名格式 项目名-业务名) * 1.队列名称 * 2.durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列 * 3.exclusive 表示该消息队列是否只在当前connection生效,默认是false * 4.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false * 5.对nack或者发送超时的 发送给死信队列 args是绑定死信队列 * */ @Bean public Queue yewu1DemoQueue() { return new Queue(YEWU1_DEMO_QUEUE, true, false, false); } /** * 交换机与routekey绑定 * * @return */ @Bean public Binding yewu1DemoBinding() { return BindingBuilder.bind(yewu1DemoQueue()).to(yewu1Exchange()) .with(YEWU1_DEMO_ROUTINGKEY); } }
2.延时队列
声明延时队列与绑定关系。
3.死信队列
声明私信队列与绑定关系。
@Configuration public class DeadConfig { /** * 死信队列 */ public final static String FAIL_QUEUE_NAME = "fail_queue"; /** * 死信交换机 */ public final static String FAIL_EXCHANGE_NAME = "fail_exchange"; /** * 死信routing */ public final static String FAIL_ROUTING_KEY = "fail_routing"; /** * 创建配置死信队列 * */ @Bean public Queue deadQueue() { return new Queue(FAIL_QUEUE_NAME, true, false, false); } /** * 死信交换机 * * @return */ @Bean public DirectExchange deadExchange() { DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false); return directExchange; } /** * 绑定关系 * * @return */ @Bean public Binding failBinding() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY); } }
4.生产者
生产者如上文,通用代码。
@RestController @RequestMapping("/TestRabbit") public class ProducerDemo { @Resource private RabbitTemplate rabbitTemplate; //@RequestMapping("/sendDirect") String sendDirect(@RequestBody String message) throws Exception { System.out.println("开始生产"); CorrelationData data = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, BusinessConfig.YEWU1_DEMO_ROUTINGKEY, message, data); System.out.println("结束生产"); System.out.println("发送id:" + data); return "OK,sendDirect:" + message; } }
5.消费者
大量的逻辑,请参考注释。
public enum RabbitEnum { /** * 处理成功 */ ACCEPT, /** * 可以重试的错误 */ RETRY, /** * 无需重试的错误 */ REJECT @Component public class ConsumerDemo { private final static Logger logger = LoggerFactory.getLogger(ConsumerDemo.class); @Resource private RabbitTemplate rabbitTemplate; // @RabbitListener(queues = "yewu1_demo_queue") protected void consumer(Message message, Channel channel) throws Exception { RabbitEnum ackSign = RabbitEnum.RETRY; System.out.println(message.getMessageProperties().getCorrelationId()); try { // 可以加入重复消费判断 int i = 1 / 0; } catch (Exception e) { ackSign = RabbitEnum.RETRY; throw e; } finally { // 通过finally块来保证Ack/Nack会且只会执行一次 if (ackSign == RabbitEnum.ACCEPT) { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else if (ackSign == RabbitEnum.RETRY) { String correlationData = (String)message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); System.out.println(message.getMessageProperties().getCorrelationId()); long retryCount = getRetryCount(message.getMessageProperties()); if (retryCount >= 3) { // 重试次数超过3次,则将消息发送到失败队列等待特定消费者处理或者人工处理 try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); rabbitTemplate.convertAndSend(DeadConfig.FAIL_EXCHANGE_NAME, DeadConfig.FAIL_ROUTING_KEY, message, new CorrelationData(correlationData)); logger.info("连续失败三次,将消息发送到死信队列,发送消息:" + new String(message.getBody())); } catch (Exception e1) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); logger.error("发送死信队列报错:" + e1.getMessage() + ",原始消息:" + new String(message.getBody())); } } else { try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 重试次数不超过3次,则将消息发送到重试队列等待重新被消费 rabbitTemplate.convertAndSend(RetryConfig.YEWU1_RETRY_EXCHANGE_NAME, RetryConfig.YEWU1_DEMO_RETRY_ROUTING_KEY, message, new CorrelationData(correlationData)); logger.info("消费失败,消息发送到重试队列;" + "原始消息:" + new String(message.getBody()) + ";第" + (retryCount + 1) + "次重试"); } catch (Exception e1) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); logger.error("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息"); } } } } } /** * 获取消息被重试的次数 */ public long getRetryCount(MessageProperties messageProperties) { Long retryCount = 0L; if (null != messageProperties) { List<Map<String, ?>> deaths = messageProperties.getXDeathHeader(); if (deaths != null && deaths.size() > 0) { Map<String, Object> death = (Map<String, Object>)deaths.get(0); retryCount = (Long)death.get("count"); } } return retryCount; } }