正文
一、死信队列
DLX(dead-letter-exchange),死信队列也是一般的队列,当消息变成死信时,消息会投递到死信队列中,经过死信队列进行消费的一种形式,对应的交换机叫死信交换机DLX。
二、产生原因
1、当消息投递到mq后,没有消费者去消费,而消息过期后会进入死信队列。
package com.xiaojie.springboot.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * @author xiaojie * @version 1.0 * @description:死信队列配置 * @date 2021/10/8 21:07 */ @Component public class DLXConfig { //定义队列 private static final String MY_DIRECT_QUEUE = "snail_direct_queue"; //定义队列 private static final String MY_DIRECT_DLX_QUEUE = "xiaojie_direct_dlx_queue"; //定义死信交换机 private static final String MY_DIRECT_DLX_EXCHANGE = "xiaojie_direct_dlx_exchange"; //定义交换机 private static final String MY_DIRECT_EXCHANGE = "snail_direct_exchange"; //死信路由键 private static final String DIRECT_DLX_ROUTING_KEY = "msg.dlx"; //绑定死信队列 @Bean public Queue dlxQueue() { return new Queue(MY_DIRECT_DLX_QUEUE); } //绑定死信交换机 @Bean public DirectExchange dlxExchange() { return new DirectExchange(MY_DIRECT_DLX_EXCHANGE); } @Bean public Queue snailQueue() { Map<String, Object> args = new HashMap<>(2); // 绑定我们的死信交换机 args.put("x-dead-letter-exchange", MY_DIRECT_DLX_EXCHANGE); // 绑定我们的路由key args.put("x-dead-letter-routing-key", DIRECT_DLX_ROUTING_KEY); return new Queue(MY_DIRECT_QUEUE, true, false, false, args); } @Bean public DirectExchange snailExchange() { return new DirectExchange(MY_DIRECT_EXCHANGE); } //绑定队列到交换机 @Bean public Binding snailBindingExchange(Queue snailQueue, DirectExchange snailExchange) { return BindingBuilder.bind(snailQueue).to(snailExchange).with("msg.send"); } //绑定死信队列到死信交换机 @Bean public Binding dlxBindingExchange(Queue dlxQueue, DirectExchange dlxExchange) { return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DIRECT_DLX_ROUTING_KEY); } }
生产者产生消息后,并没与消费者去消费,等待消息过期后,自动进入死信队列
public class DLXProvider { //定义交换机 private static final String MY_DIRECT_EXCHANGE = "snail_direct_exchange"; //普通队列路由键 private static final String DIRECT_ROUTING_KEY = "msg.send"; @Autowired private RabbitTemplate rabbitTemplate; public String sendDlxMsg(){ String msg="我是模拟死信队列的消息。。。。。"; rabbitTemplate.convertAndSend(MY_DIRECT_EXCHANGE, DIRECT_ROUTING_KEY, msg, (message) -> { //设置有效时间,如果消息不被消费,进入死信队列 message.getMessageProperties().setExpiration("10000"); return message; }); return "success"; } }
2、当队列满了之后
@Bean public Queue snailQueue() { Map<String, Object> args = new HashMap<>(2); // 绑定我们的死信交换机 args.put("x-dead-letter-exchange", MY_DIRECT_DLX_EXCHANGE); // 绑定我们的路由key args.put("x-dead-letter-routing-key", DIRECT_DLX_ROUTING_KEY); // args.put("x-message-ttl", 5000); //为队列设置过期时间 // x-max-length:队列最大容纳消息条数,大于该值,mq拒绝接受消息,消息进入死信队列 args.put("x-max-length", 5); return new Queue(MY_DIRECT_QUEUE, true, false, false, args); }
注意:如果在添加了这一条(队列长度)发生异常时,请删除掉交换机和队列后,重新启动程序,重新进行绑定。
3、消费者拒绝消费消息(消费端发生异常,mq无法收到消费端的ack)
package com.xiaojie.springboot.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import java.util.Map; /** * @Description: 消费snail消息的消费者 * @author: xiaojie * @date: 2021.10.09 */ @Component @Slf4j public class SnailConsumer { @RabbitListener(queues = "snail_direct_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { // 获取消息Id String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); log.info("获取到的消息>>>>>>>{},消息id>>>>>>{}", msg, messageId); try { int result = 1 / 0; System.out.println("result" + result); // // 手动ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收 channel.basicAck(deliveryTag, false); } catch (Exception e) { //拒绝消费消息(丢失消息) 给死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }
三、解决订单超时
代码实现
绑定订单死信队列
package com.xiaojie.springboot.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * @author xiaojie * @version 1.0 * @description:解决订单超时未支付问题,绑定订单死信队列 * @date 2021/10/8 23:12 */ @Component public class OrderDlxConfig { @Value(value="${xiaojie.order.queue}") private String orderQueue; //订单队列 @Value(value="${xiaojie.order.exchange}") private String orderExchange;//订单队列 @Value(value="${xiaojie.dlx.queue}") private String orderDeadQueue;//订单死信队列 @Value(value="${xiaojie.dlx.exchange}") private String orderDeadExChange;//订单死信交换机 @Value(value="${xiaojie.order.routingKey}") private String orderRoutingKey;//订单路由键 @Value(value="${xiaojie.dlx.routingKey}") private String orderDeadRoutingKey;//死信队列路由键 @Bean public Queue orderQueue(){ Map<String, Object> args = new HashMap<>(2); // 绑定我们的死信交换机 args.put("x-dead-letter-exchange", orderDeadExChange); // 绑定我们的路由key args.put("x-dead-letter-routing-key", orderDeadRoutingKey); return new Queue(orderQueue, true, false, false, args); } @Bean public Queue orderDeadQueue(){ return new Queue(orderDeadQueue); } //绑定交换机 @Bean public DirectExchange orderExchange(){ return new DirectExchange(orderExchange); } @Bean public DirectExchange orderDeadExchange(){ return new DirectExchange(orderDeadExChange); } //绑定路由键 @Bean public Binding orderBindingExchange(Queue orderQueue, DirectExchange orderExchange) { return BindingBuilder.bind(orderQueue).to(orderExchange).with(orderRoutingKey); } //绑定死信队列到死信交换机 @Bean public Binding deadBindingExchange(Queue orderDeadQueue, DirectExchange orderDeadExchange) { return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with(orderDeadRoutingKey); } }
创建订单完成之后,发送消息
package com.xiaojie.springboot.service.impl; import com.alibaba.fastjson.JSONObject; import com.xiaojie.springboot.entity.Order; import com.xiaojie.springboot.mapper.OrderMapper; import com.xiaojie.springboot.service.OrderService; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.UUID; /** * @author xiaojie * @version 1.0 * @description: 订单实现类 * @date 2021/10/8 22:16 */ @Service public class OrderServiceImpl implements OrderService { @Autowired private OrderMapper orderMapper; @Autowired private RabbitTemplate rabbitTemplate; @Value(value = "${xiaojie.order.exchange}") private String orderExchange; @Value(value = "${xiaojie.order.routingKey}") private String orderRoutingKey; @Override public String saveOrder(Order order) { String orderId = UUID.randomUUID().toString(); order.setOrderId(orderId); order.setOrderName("test"); order.setPayMoney(3000D); Integer result = orderMapper.addOrder(order); if (result > 0) { String msg = JSONObject.toJSONString(order); //发送mq sendMsg(msg, orderId); return "success"; } return "fail"; } /** * @description: 发送mq消息 * @param: * @param: msg * @param: orderId * @return: void * @author xiaojie * @date: 2021/10/8 22:33 */ @Async //异步线程发送 ,此处需要单独创建一个类去创建该方法,不然该异步线程可能不会生效 public void sendMsg(String msg, String orderId) { rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置过期时间30s message.getMessageProperties().setExpiration("30000"); // message.getMessageProperties().setMessageId(orderId); return message; } }); } @Override public Order getByOrderId(String orderId) { return orderMapper.getOrder(orderId); } @Override public Integer updateOrderStatus(String orderId) { return orderMapper.updateOrder(orderId); } }
死信队列消费者
package com.xiaojie.springboot.service; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import com.xiaojie.springboot.entity.Order; import com.xiaojie.springboot.myenum.OrderStatus; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; /** * @author xiaojie * @version 1.0 * @description: 死信队列解决订单超时问题 * @date 2021/10/8 22:43 */ @Component @RabbitListener(bindings = @QueueBinding( value = @Queue("xiaojie_order_dlx_queue"), exchange = @Exchange(value = "xiaojie_order_dlx_exchange", type = ExchangeTypes.DIRECT), key = "order.dlx")) @Slf4j public class Consumer { @Autowired private OrderService orderService; /* * @param msg * @param headers * @param channel * @死信队列消费消息,如果订单状态是未支付,则修改订单状态 * @author xiaojie * @date 2021/10/9 13:49 * @return void */ @RabbitHandler public void handlerMsg(@Payload String msg, @Headers Map<String, Object> headers, Channel channel) throws IOException { log.info("接收到的消息是direct:{}" + msg); try { Order orderEntity = JSONObject.parseObject(msg, Order.class); if (orderEntity == null) { return; } // 根据订单号码查询该笔订单是否存在 Order order = orderService.getByOrderId(orderEntity.getOrderId()); if (order == null) { return; } //判读订单状态 if (OrderStatus.UNPAY.getStatus() == order.getStatus()) { //未支付,修改订单状态 orderService.updateOrderStatus(orderEntity.getOrderId()); //库存+1 System.out.println("库存+1"); } //delivery tag可以从消息头里边get出来 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); //手动应答,消费者成功消费完消息之后通知mq,从队列移除消息,需要配置文件指明。第二个参数为是否批量处理 channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); //补偿机制 } } }