公众号merlinsea
- 背景介绍
- JD、淘系、天猫、拼多多电商平台,规定新注册的商家,审核通过后需要在【规定时间】内上架商品,否则冻结账号。 核心点:消息发布一段时间以后才能进行消费,这一段时间体现在普通队列中!!!
- 延迟队列的项目模型
前提:项目中引入amqp依赖,application.yml中配置好rabbitmq的服务端信息
- RabbitMQConfig类【核心】
- 普通队列创建的时候是带上参数进行创建的,这里的参数包括普通队列消息过期时间+过期后投递的死信交换机名称+过期后私信消息的新的路由key
@Configuration public class RabbitMQConfig { /** * 死信队列名称 */ public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue"; /** * 死信交换机名称 */ public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange"; /** * 死信交换机->死信队列的绑定key */ public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key"; /** * 创建死信交换机 * @return */ @Bean public Exchange lockMerchantDeadExchange(){ return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE,true,false); } /** * 创建死信队列 * @return */ @Bean public Queue lockMerchantDeadQueue(){ return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build(); } /** * 绑定死信交换机和死信队列,并指定绑定key * @return */ @Bean public Binding lockMerchantBinding(){ return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE, LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null); } /** * 普通队列名称 */ public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue"; /** * 普通的交换名称 */ public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange"; /** * 普通交换机->普通队列的绑定key */ public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key"; /** * 创建普通交换机 * @return */ @Bean public Exchange newMerchantExchange(){ return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false); } /** * 创建普通队列【重点】 * 核心:需要设置参数,队列的过期时间,队列绑定的死信交换机是哪个,重新指定的死信消息的路由key * @return */ @Bean public Queue newMerchantQueue(){ Map<String,Object> args = new HashMap<>(3); //消息过期后,进入到死信交换机 args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE); //消息过期后,进入到死信交换机的重新指定的路由key args.put("x-dead-letter-routing-key","lock_merchant_routing_key"); //过期时间,单位毫秒 args.put("x-message-ttl",10000); return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build(); } /** * 绑定普通交换机和普通队列,指定绑定key * @return */ @Bean public Binding newMerchantBinding(){ return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE, NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null); } }
- 生产者发布消息到普通交换机
@RestController @RequestMapping("/api/admin/merchant") public class MerchantAccountController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("check") public Object check(){ //修改数据库的商家账号状态 TODO //审核通过,发布消息 rabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家账号通过审核"); Map<String,Object> map = new HashMap<>(); map.put("code",0); map.put("msg","账号审核通过,请10秒内上传1个商品"); return map; } }
- 消费者监听死信队列
- 注意这里消费者是监听死信队列,这可以保证消息一定是延迟一段时间【这段时间在普通队列中保存】后才会被消息,有点类似定时任务。
@Component @RabbitListener(queues = "lock_merchant_dead_queue") public class MerchantMQDeadListener { @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("死信队列msgTag="+msgTag); System.out.println("死信队列body="+body); //做复杂业务逻辑 TODO //告诉broker,消息已经被确认 channel.basicAck(msgTag,false); } }