开发者社区> oreo123> 正文

RabbitMq的延时队列以及本地延迟队列

简介: RabbitMq的延时队列以及本地延迟队列
+关注继续查看

之前对延迟队列业务场景的理解,应该就是像延时订单未付款的自动取消、短信重发通知、或者其他终结状态的触发。但是也有另外一种场景可以用:比如场景切换,服务复用的时候可以考虑一下。比如讲讲开发中碰到的问题吧。是一个信息授权的场景。以前新增一个下级渠道商,需要手动去授权本渠道的销售信息给下一级渠道。然后现在需要改造,新增的时候同时授权过去。以前授权信息的消费者是定时(几分钟一次)去读渠道商的数据库信息,其他时候是拿到缓存里的渠道商去同步授权。所以就存在一个问题,即刻创建下一级渠道即刻去触发授权,授权消费者还没重新去捞渠道商,目前渠道商的缓存里没有新增的那一个,就没有授权信息下去。然而捞不到就不要同步,没毛病页不用报错。只能说这设计不适用当前我们改造的场景罢了。其实也很纳闷,消息带过去的渠道ID消费者在当前缓存找不到渠道,就去数据库查,如果有就直接加载到缓存一起同步授权不就好了,不就能适应更多场景,毕竟作为一个公用同步模块。但是已有的没办法改造,想想别的办法把。那就。。。等消费者去执行下一次捞渠道商再去触发他吧,那就是延迟发送消息了,延迟队列想法就这样用上了。
尝试了两种延迟队列,分别是java.util.concurrent.Delayed包下的本地延迟队列和RabbitMq的死信队列。对比了下肯定是要是要使用RabbitMq异步中间件的。DelayQueue是一个无界的BlockingQueue,实现的是一个单机的、JVM内存中的延迟队列,并没有集群的支持,而且无法满足在对业务系统泵机(即服务重启也会丢失)的时、消息消费异常的时候做相应的逻辑处理。但是也附上本地延迟队列的实现:
(1)先定义一个实现Delayed是队列消息
image
(2)再定义消费者,实现线程Runnable,等待线程池调用
image
(3)发送延迟队列,等待消费
DelayQueue queue = new DelayQueue();
AuthGoodsMessage message = new AuthGoodsMessage(new Random().nextInt(), dto, 120);
queue.offer(message);
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(new AuthGoodsConsumer(queue));
exec.shutdown();

对于延迟消息RabbitMq的实战:
image
(1)定义队列以及路由、交换机的配置(类加上注解@Configuration)

@Bean
public Queue businessTestDeadQueue(){
    Map<String,Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange","business.test.dead.exchange.name");
    args.put("x-dead-letter-routing-key","business.test.dead.routing.key.name");
    args.put("x-message-ttl",120 * 1000);
    return new Queue("business.test.dead.queue.name",true,false,false,args);
}

@Bean
 public TopicExchange businessTestDeadExchange(){
    return new TopicExchange("business.test.dead.produce.exchange.name",true,false);
}

@Bean
public Binding businessTestDeadBinding() {
    return BindingBuilder.bind(goodsAuthDeadQueue()).to(goodsAuthDeadExchange()).with("business.test.dead.produce.routing.key.name");
}

@Bean
public Queue businessTestDeadRealQueue(){
    return new Queue("business.test.dead.real.queue.name",true);
}

@Bean
public TopicExchange businessTestDeadRealExchange(){
    return new TopicExchange("business.test.dead.exchange.name",true,false);
}

@Bean
public Binding businessTestDeadRealBinding() {
    return BindingBuilder.bind(goodsAuthDeadRealQueue()).to(goodsAuthDeadRealExchange()).with("business.test.dead.routing.key.name");
}

(2)定义真实消费者(类加上注解@RabbitListener(queues = "business.test.dead.real.queue.name", containerFactory = ListenerSelector.multiThread)

public void exe(@Payload byte[] body) {
    String data = new String(body);
    logger.info("----------接受延时信息:{}------------------", data);
    具体实体 object = JSONObject.parseObject(data, 具体实体.class);

 }

(3)发送延迟消息
image

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
18-微服务技术栈(高级):RabbitMQ仲裁队列与集群扩容
从RabbitMQ 3.8版本开始,引入了新的仲裁队列,他具备与镜像队里类似的功能,但使用更加方便。
24 0
RabbitMQ学习(十):幂等性、优先级队列、惰性队列
以支付为例,用户购买商品后支付,扣款成功,但是返回结果的时候网络异常, 此时钱已经扣了;用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱 了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误 立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等。
30 0
RabbitMQ学习(九):延迟队列
延时队列中,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理。简单来说,延时队列就是用来存放需要在指定时间内被处理的 元素的队列。 其实延迟队列就是死信队列的一种。
24 0
RabbitMQ学习(八):死信队列
死信,顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信。
59 0
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
12 0
RabbitMQ学习笔记(一)----RabbitMQ的基本概念以及5种队列模式
今天开始学习消息中间件,根据项目需求,目前选择的消息中间件是RabbitMQ。让我们一起来认识下RabbitMQ吧。
75 0
14、RabbitMQ教程-死信队列
14、RabbitMQ教程-死信队列
74 0
springcloud:RabbitMQ死信队列与延迟交换机实现(四)
死信队列是消息队列中非常重要的概念,同时我们需要业务场景中都需要延迟发送的概念,比如12306中的30分钟后未支付订单取消。那么本期,我们就来讲解死信队列,以及如何通过延迟交换机来实现延迟发送的需求。
164 0
springcloud:安装rabbitmq并配置延迟队列插件
本期主要讲解如何利用docker快速安装rabbitmq并且配置延迟队列插件
162 0
rabbitMQ延时队列与TTL和DLX、延迟队列的相关介绍
TTL是Time To Live的缩写, 也就是生存时间。 RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。 如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。 默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。
45 0
+关注
文章
问答
文章排行榜
最热
最新
相关电子书
更多
阿里云栖开发者沙龙PHP技术专场-RabbitMQ 的延时队列和镜像队列原理与实战-钱文品
立即下载
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载