RabbitMq的延时队列以及本地延迟队列

简介: RabbitMq的延时队列以及本地延迟队列

之前对延迟队列业务场景的理解,应该就是像延时订单未付款的自动取消、短信重发通知、或者其他终结状态的触发。但是也有另外一种场景可以用:比如场景切换,服务复用的时候可以考虑一下。比如讲讲开发中碰到的问题吧。是一个信息授权的场景。以前新增一个下级渠道商,需要手动去授权本渠道的销售信息给下一级渠道。然后现在需要改造,新增的时候同时授权过去。以前授权信息的消费者是定时(几分钟一次)去读渠道商的数据库信息,其他时候是拿到缓存里的渠道商去同步授权。所以就存在一个问题,即刻创建下一级渠道即刻去触发授权,授权消费者还没重新去捞渠道商,目前渠道商的缓存里没有新增的那一个,就没有授权信息下去。然而捞不到就不要同步,没毛病页不用报错。只能说这设计不适用当前我们改造的场景罢了。其实也很纳闷,消息带过去的渠道ID消费者在当前缓存找不到渠道,就去数据库查,如果有就直接加载到缓存一起同步授权不就好了,不就能适应更多场景,毕竟作为一个公用同步模块。但是已有的没办法改造,想想别的办法把。那就。。。等消费者去执行下一次捞渠道商再去触发他吧,那就是延迟发送消息了,延迟队列想法就这样用上了。
尝试了两种延迟队列,分别是java.util.concurrent.Delayed包下的本地延迟队列和RabbitMq的死信队列。对比了下肯定是要是要使用RabbitMq异步中间件的。DelayQueue是一个无界的BlockingQueue,实现的是一个单机的、JVM内存中的延迟队列,并没有集群的支持,而且无法满足在对业务系统泵机(即服务重启也会丢失)的时、消息消费异常的时候做相应的逻辑处理。但是也附上本地延迟队列的实现:
(1)先定义一个实现Delayed是队列消息
image
(2)再定义消费者,实现线程Runnable,等待线程池调用
image
(3)发送延迟队列,等待消费
DelayQueue queue = new DelayQueue();
AuthGoodsMessage message = new AuthGoodsMessage(new Random().nextInt(), dto, 120);
queue.offer(message);
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(new AuthGoodsConsumer(queue));
exec.shutdown();

对于延迟消息RabbitMq的实战:
image
(1)定义队列以及路由、交换机的配置(类加上注解@Configuration)

@Bean
public Queue businessTestDeadQueue(){
    Map<String,Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange","business.test.dead.exchange.name");
    args.put("x-dead-letter-routing-key","business.test.dead.routing.key.name");
    args.put("x-message-ttl",120 * 1000);
    return new Queue("business.test.dead.queue.name",true,false,false,args);
}

@Bean
 public TopicExchange businessTestDeadExchange(){
    return new TopicExchange("business.test.dead.produce.exchange.name",true,false);
}

@Bean
public Binding businessTestDeadBinding() {
    return BindingBuilder.bind(goodsAuthDeadQueue()).to(goodsAuthDeadExchange()).with("business.test.dead.produce.routing.key.name");
}

@Bean
public Queue businessTestDeadRealQueue(){
    return new Queue("business.test.dead.real.queue.name",true);
}

@Bean
public TopicExchange businessTestDeadRealExchange(){
    return new TopicExchange("business.test.dead.exchange.name",true,false);
}

@Bean
public Binding businessTestDeadRealBinding() {
    return BindingBuilder.bind(goodsAuthDeadRealQueue()).to(goodsAuthDeadRealExchange()).with("business.test.dead.routing.key.name");
}

(2)定义真实消费者(类加上注解@RabbitListener(queues = "business.test.dead.real.queue.name", containerFactory = ListenerSelector.multiThread)

public void exe(@Payload byte[] body) {
    String data = new String(body);
    logger.info("----------接受延时信息:{}------------------", data);
    具体实体 object = JSONObject.parseObject(data, 具体实体.class);

 }

(3)发送延迟消息
image

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
315 6
|
6月前
|
消息中间件 JSON Java
|
6月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
7月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
231 2
|
6月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
135 0
|
7月前
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
220 0
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
274 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
142 0
|
消息中间件 Ubuntu Shell
ubuntu安装rabbitmq教程 避坑
ubuntu安装rabbitmq教程 避坑
588 0
|
消息中间件 存储 网络协议
Rabbitmq的安装与使用
Rabbitmq的安装与使用
277 0