RabbitMq的延时队列以及本地延迟队列-阿里云开发者社区

开发者社区> 开发与运维> 正文

RabbitMq的延时队列以及本地延迟队列

简介: RabbitMq的延时队列以及本地延迟队列

之前对延迟队列业务场景的理解,应该就是像延时订单未付款的自动取消、短信重发通知、或者其他终结状态的触发。但是也有另外一种场景可以用:比如场景切换,服务复用的时候可以考虑一下。比如讲讲开发中碰到的问题吧。是一个信息授权的场景。以前新增一个下级渠道商,需要手动去授权本渠道的销售信息给下一级渠道。然后现在需要改造,新增的时候同时授权过去。以前授权信息的消费者是定时(几分钟一次)去读渠道商的数据库信息,其他时候是拿到缓存里的渠道商去同步授权。所以就存在一个问题,即刻创建下一级渠道即刻去触发授权,授权消费者还没重新去捞渠道商,目前渠道商的缓存里没有新增的那一个,就没有授权信息下去。然而捞不到就不要同步,没毛病页不用报错。只能说这设计不适用当前我们改造的场景罢了。其实也很纳闷,消息带过去的渠道ID消费者在当前缓存找不到渠道,就去数据库查,如果有就直接加载到缓存一起同步授权不就好了,不就能适应更多场景,毕竟作为一个公用同步模块。但是已有的没办法改造,想想别的办法把。那就。。。等消费者去执行下一次捞渠道商再去触发他吧,那就是延迟发送消息了,延迟队列想法就这样用上了。
尝试了两种延迟队列,分别是java.util.concurrent.Delayed包下的本地延迟队列和RabbitMq的死信队列。对比了下肯定是要是要使用RabbitMq异步中间件的。DelayQueue是一个无界的BlockingQueue,实现的是一个单机的、JVM内存中的延迟队列,并没有集群的支持,而且无法满足在对业务系统泵机(即服务重启也会丢失)的时、消息消费异常的时候做相应的逻辑处理。但是也附上本地延迟队列的实现:
(1)先定义一个实现Delayed是队列消息
image
(2)再定义消费者,实现线程Runnable,等待线程池调用
image
(3)发送延迟队列,等待消费
DelayQueue queue = new DelayQueue();
AuthGoodsMessage message = new AuthGoodsMessage(new Random().nextInt(), dto, 120);
queue.offer(message);
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(new AuthGoodsConsumer(queue));
exec.shutdown();

对于延迟消息RabbitMq的实战:
image
(1)定义队列以及路由、交换机的配置(类加上注解@Configuration)

@Bean
public Queue businessTestDeadQueue(){
    Map<String,Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange","business.test.dead.exchange.name");
    args.put("x-dead-letter-routing-key","business.test.dead.routing.key.name");
    args.put("x-message-ttl",120 * 1000);
    return new Queue("business.test.dead.queue.name",true,false,false,args);
}

@Bean
 public TopicExchange businessTestDeadExchange(){
    return new TopicExchange("business.test.dead.produce.exchange.name",true,false);
}

@Bean
public Binding businessTestDeadBinding() {
    return BindingBuilder.bind(goodsAuthDeadQueue()).to(goodsAuthDeadExchange()).with("business.test.dead.produce.routing.key.name");
}

@Bean
public Queue businessTestDeadRealQueue(){
    return new Queue("business.test.dead.real.queue.name",true);
}

@Bean
public TopicExchange businessTestDeadRealExchange(){
    return new TopicExchange("business.test.dead.exchange.name",true,false);
}

@Bean
public Binding businessTestDeadRealBinding() {
    return BindingBuilder.bind(goodsAuthDeadRealQueue()).to(goodsAuthDeadRealExchange()).with("business.test.dead.routing.key.name");
}

(2)定义真实消费者(类加上注解@RabbitListener(queues = "business.test.dead.real.queue.name", containerFactory = ListenerSelector.multiThread)

public void exe(@Payload byte[] body) {
    String data = new String(body);
    logger.info("----------接受延时信息:{}------------------", data);
    具体实体 object = JSONObject.parseObject(data, 具体实体.class);

 }

(3)发送延迟消息
image

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

其他文章