springboot整合延迟队列

简介: springboot整合延迟队列

公众号merlinsea


  • 背景介绍
  • JD、淘系、天猫、拼多多电商平台,规定新注册的商家,审核通过后需要在【规定时间】内上架商品,否则冻结账号。 核心点:消息发布一段时间以后才能进行消费,这一段时间体现在普通队列中!!!

  • 延迟队列的项目模型

640.jpg

前提:项目中引入amqp依赖,application.yml中配置好rabbitmq的服务端信息

  • RabbitMQConfig类【核心】
  • 普通队列创建的时候是带上参数进行创建的,这里的参数包括普通队列消息过期时间+过期后投递的死信交换机名称+过期后私信消息的新的路由key


@Configuration
public class RabbitMQConfig {
    /**
     * 死信队列名称
     */
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
    /**
     * 死信交换机名称
     */
    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
    /**
     * 死信交换机->死信队列的绑定key
     */
    public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
    /**
     * 创建死信交换机
     * @return
     */
    @Bean
    public Exchange lockMerchantDeadExchange(){
        return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE,true,false);
    }
    /**
     * 创建死信队列
     * @return
     */
    @Bean
    public Queue lockMerchantDeadQueue(){
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }
    /**
     * 绑定死信交换机和死信队列,并指定绑定key
     * @return
     */
    @Bean
    public Binding lockMerchantBinding(){
        return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE,
                LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null);
    }
    /**
     * 普通队列名称
     */
    public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
    /**
     * 普通的交换名称
     */
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
    /**
     * 普通交换机->普通队列的绑定key
     */
    public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";
    /**
     * 创建普通交换机
     * @return
     */
    @Bean
    public Exchange newMerchantExchange(){
        return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false);
    }
    /**
     * 创建普通队列【重点】
     * 核心:需要设置参数,队列的过期时间,队列绑定的死信交换机是哪个,重新指定的死信消息的路由key
     * @return
     */
    @Bean
    public Queue newMerchantQueue(){
        Map<String,Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE);
        //消息过期后,进入到死信交换机的重新指定的路由key
        args.put("x-dead-letter-routing-key","lock_merchant_routing_key");
        //过期时间,单位毫秒
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }
    /**
     * 绑定普通交换机和普通队列,指定绑定key
     * @return
     */
    @Bean
    public Binding newMerchantBinding(){
        return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null);
    }
}


  • 生产者发布消息到普通交换机


@RestController
@RequestMapping("/api/admin/merchant")
public class MerchantAccountController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("check")
    public Object check(){
        //修改数据库的商家账号状态  TODO
        //审核通过,发布消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家账号通过审核");
        Map<String,Object> map = new HashMap<>();
        map.put("code",0);
        map.put("msg","账号审核通过,请10秒内上传1个商品");
        return map;
    }
}


  • 消费者监听死信队列
  • 注意这里消费者是监听死信队列,这可以保证消息一定是延迟一段时间【这段时间在普通队列中保存】后才会被消息,有点类似定时任务。
@Component
@RabbitListener(queues = "lock_merchant_dead_queue")
public class MerchantMQDeadListener {
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("死信队列msgTag="+msgTag);
        System.out.println("死信队列body="+body);
        //做复杂业务逻辑  TODO
        //告诉broker,消息已经被确认
        channel.basicAck(msgTag,false);
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 Java
SpringBoot RabbitMQ死信队列
SpringBoot RabbitMQ死信队列
113 0
|
7月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
62 1
|
4月前
|
编解码 NoSQL Java
使用Spring Boot + Redis 队列实现视频文件上传及FFmpeg转码的技术分享
【8月更文挑战第30天】在当前的互联网应用中,视频内容的处理与分发已成为不可或缺的一部分。对于视频平台而言,高效、稳定地处理用户上传的视频文件,并对其进行转码以适应不同设备的播放需求,是提升用户体验的关键。本文将围绕使用Spring Boot结合Redis队列技术来实现视频文件上传及FFmpeg转码的过程,分享一系列技术干货。
251 3
|
7月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
507 0
|
6月前
|
监控 NoSQL Java
在Spring Boot中集成Redisson实现延迟队列
在Spring Boot中集成Redisson实现延迟队列
352 6
|
7月前
|
消息中间件 Java
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
155 1
|
7月前
|
消息中间件 Java Maven
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称
|
消息中间件 Java
Springboot与RabbitMQ消息超时时间、队列消息超时时间
Springboot与RabbitMQ消息超时时间、队列消息超时时间
398 0
|
消息中间件 Java
springboot rabbitmq 找不到队列
springboot rabbitmq 找不到队列
|
消息中间件 Java Maven
SpringBoot整合RabbitMQ实现死信队列
前面一文通过[Java整合RabbitMQ实现生产消费(7种通讯方式)](https://wangbinguang.blog.csdn.net/article/details/128284902),本文基于SpringBoot实现RabbitMQ中的死信队列和延迟队列。
196 0