RabbitMQ高阶使用延时任务

简介: RabbitMQ高阶使用延时任务

1 从打车开始说起

我们把滴滴打车的流程简化下

  1. 登录app后点击打车开始进行打车
  2. 打车服务开始为司机派单
  3. 司机接单后开始给来接驾
  4. 上车乘客后处于行程中
  5. 行程结束后完成本次打车服务

1.1 需要解决的问题

我们需要实现派单服务,用户发送打车订单后需要进行进行派单,如果在指定时间内没有找到司机就会收到派单超时的通知,并且能够实时查看当前排队的抢单人数

下面我们来介绍下涉打车涉及到的一些问题

1.1.1 打车超时

主要讲解打车服务在超时后的处理,比如打车后等待多长时间没有打到车后会通知等待超时

2 延时任务

我们先看下如何来解决打车超时的问题

2.1 什么是延时任务

在开发中,往往会遇到一些关于延时任务的需求,例如

  • 生成订单30分钟未支付,则自动取消
  • 生成订单60秒后,给用户发短信
  • 滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。

2.1.1 和定时任务区别

对上述的任务,我们给一个专业的名字来形容,那就是 延时任务,那么这里就会产生一个问题,这个 延时任务定时任务的区别究竟在哪里呢?一共有如下几点区别
  1. 定时任务有明确的触发时间,延时任务没有
  1. 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期
  2. 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务

2.2 延时队列使用场景

那么什么时候需要用延时队列呢?考虑一下以下场景:

  1. 订单在十分钟之内未支付则自动取消。
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 账单在一周内未支付,则自动结算。
  1. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  2. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  3. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

可以想一下美团点餐,超时时间

2.3 常见方案

下面我们来介绍下常见的延时任务的解决方案

2.3.1 数据库轮询

该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作

优点

简单易行,支持集群操作

缺点

  1. 对服务器内存消耗大
  1. 存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
  2. 假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大

2.3.1 JDK的延迟队列

该方案是利用JDK自带的DelayQueue来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。

优点

效率高,任务触发时间延迟低。

缺点

  1. 服务器重启后,数据全部消失,怕宕机
  2. 集群扩展相当麻烦
  3. 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常
  4. 代码复杂度较高

2.3.3 netty时间轮算法

时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick

这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。


如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈,位置是在2圈之后的5上面(20 % 8 + 1)


优点


效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。


缺点


服务器重启后,数据全部消失,怕宕机

集群扩展相当麻烦

因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常

2.3.4 使用消息队列

我们可以采用RabbitMQ的延时队列,RabbitMQ具有以下两个特性,可以实现延迟队列


RabbitMQ可以针对Queue和Message设置 x-message-ttl,来控制消息的生存时间,如果超时,则消息变为dead letter

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了dead letter,则按照这两个参数重新路由。

优点


高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。


缺点


本身的易用度要依赖于RabbitMq的运维,因为要引用RabbitMq,所以复杂度和成本变高

2.4 延时队列

RabbitMQ中没有对消息延迟进行实现,但是我们可以通过TTL以及死信路由来实现消息延迟。

2.4.1 TTL(消息过期时间)

在介绍延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——TTL(Time To Live)。


TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒,换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

2.4.1.1 配置队列TTL

一种是在创建队列的时候设置队列的“x-message-ttl”属性

@Bean
public Queue taxiOverQueue() {
    Map<String, Object> args = new HashMap<>(2);
    args.put("x-message-ttl", 30000);
    return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}

这样所有被投递到该队列的消息都最多不会存活超过30s,但是消息会到哪里呢,如果没有任何处理,消息会被丢弃,如果配置有死信队列,超时的消息会被投递到死信队列

2.5 死信队列

讲到延时消息就不能不讲死信队列

2.5.1 什么是死信队列

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解

一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

2.5.2 死信队列使用场景

RabbitMQ中的死信交换器(dead letter exchange)可以接收下面三种场景中的消息:

  • 消费者对消息使用了basicReject或者basicNack回复,并且requeue参数设置为false,即不再将该消息重新在消费者间进行投递
  • 消息在队列中超时,RabbitMQ可以在单个消息或者队列中设置TTL属性
  • 队列中的消息已经超过其设置的最大消息个数

2.5.3 死信队列如何使用

死信交换器不是默认的设置,这里是被投递消息被拒绝后的一个可选行为,是在创建队列的时进行声明的,往往用在对问题消息的诊断上。

死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作,在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的参数是x-dead-letter-exchange

2.5.4 相关代码

@Bean
public Queue taxiOverQueue() {
    Map<String, Object> args = new HashMap<>(2);
    // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
    args.put("x-dead-letter-exchange", TAXI_DEAD_QUEUE_EXCHANGE);
    // x-dead-letter-routing-key  这里声明当前队列的死信路由key
    args.put("x-dead-letter-routing-key", TAXI_DEAD_KEY);
    return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}

2.6 打车超时处理

用户通过调用打车服务将数据放进RabbitMQ的死信队列进行延时操作,等待一段时间后,正常的业务处理还没有处理到我们发起的数据,将会进行超时处理,通过通知服务将我们的处理结构通过websocket方式推送到我们的客户端。

2.6.1 打车超时实现

在创建队列的时候配置死信交换器并设置队列的“x-message-ttl”属性

@Bean
public Queue taxiDeadQueue() {
    return new Queue(TAXI_DEAD_QUEUE,true);
}
@Bean
public Queue taxiOverQueue() {
    Map<String, Object> args = new HashMap<>(2);
    // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
    args.put("x-dead-letter-exchange", TAXI_DEAD_QUEUE_EXCHANGE);
    // x-dead-letter-routing-key  这里声明当前队列的死信路由key
    args.put("x-dead-letter-routing-key", TAXI_DEAD_KEY);
    // x-message-ttl  声明队列的TTL
    args.put("x-message-ttl", 30000);
    return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}

这样所有被投递到该队列的消息都最多不会存活超过30s,超时后的消息会被投递到死信交换器

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
746 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
存储 消息中间件 Java
SpringBoot整合RocketMQ发送延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息
1080 0
|
2月前
|
消息中间件 Java 调度
"解锁RabbitMQ云版:揭秘电商巨头、日志大师、任务狂人的秘密武器,你的系统升级就差这一步!"
【8月更文挑战第14天】在分布式与微服务架构中,RabbitMQ云版本作为消息队列服务,助力系统间解耦与异步通信。通过三个场景展示其实用性:1) 订单处理系统中,利用RabbitMQ实现跨服务流程的解耦;2) 日志收集与分析,异步发送日志至中央系统,保障业务流畅;3) 任务调度,处理耗时任务避免阻塞主线程。这些应用充分展现了RabbitMQ云版本的强大功能和灵活性。
30 0
|
4月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 存储 RocketMQ
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
61 0
|
消息中间件 SQL 存储
微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题
129 0
|
消息中间件 Kafka
MQ 学习日志(八) 消息队列的延时以及过期失效问题处理
消息队列的延时以及过期失效问题处理
288 0
|
消息中间件 Shell RocketMQ
RocketMQ进阶-延时消息
RocketMQ进阶-延时消息
1111 0
|
消息中间件 缓存 Java
5 张图带你理解 RocketMQ 延时消息机制
5 张图带你理解 RocketMQ 延时消息机制
381 1
5 张图带你理解 RocketMQ 延时消息机制
|
消息中间件 监控 NoSQL
python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务
在之前的一篇文章中提到了用[Django+Celery+Redis实现了异步任务队列](https://v3u.cn/a_id_54),只不过消息中间件使用了redis,redis作为消息中间件可谓是差强人意,功能和性能上都不如Rabbitmq,所以本次使用tornado框架结合celery,同时消息中间件使用Rabbitmq来实现异步发邮件,并且使用flower来监控任务队列。
python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务

热门文章

最新文章