手把手实现一条延时消息(上)

简介: 近期在维护公司的调度平台,其中有个关键功能那就是定时任务;定时任务大家平时肯定接触的不少,比如 JDK 中的 Timer、ScheduledExecutorService、调度框架 Quartz 等。

前言


近期在维护公司的调度平台,其中有个关键功能那就是定时任务;定时任务大家平时肯定接触的不少,比如 JDK 中的 TimerScheduledExecutorService、调度框架 Quartz 等。


通常用于实现 XX 时间后的延时任务,或周期性任务;


比如一个常见的业务场景:用户下单 N 分钟未能支付便自动取消订单。


实现这类需求通常有两种方式:


  • 轮询定时任务:给定周期内扫描所有未支付的订单,查看时间是否到期。


  • 延时消息:订单创建的时候发送一条 N 分钟到期的信息,一旦消息消费后便可判断订单是否可以取消。


先看第一种,这类方式实现较为简单,只需要启动一个定时任务即可;但缺点同样也很明显,这个间隔扫描的时间不好控制。


给短了会造成很多无意义的扫描,增大数据库压力,给长了又会使得误差较大。


当然最大的问题还是效率较低,随着订单增多耗时会呈线性增长,最差的情况甚至会出现上一波轮询还没有扫描完,下一波调度又来了。


这时第二种方案就要显得靠谱多了,通过延时消息可以去掉不必要的订单扫描,实时性也比较高。


延时消息


这里我们不过多讨论这类需求如何实现;重点聊聊这个延时消息,看它是如何实现的,基于实现延时消息的数据结构还能实现定时任务。


我在之前的开源 IM 项目中也加入了此类功能,可以很直观的发送一条延时消息,效果如下:



使用 :delay hahah 2 发送了一条两秒钟的延时消息,另外一个客户端将会在两秒钟之后收到该消息。


具体的实现步骤会在后文继续分析。


时间轮


要实现延时消息就不得不提到一种数据结构【时间轮】,时间轮听这名字可以很直观的抽象出它的数据结构。



其实本质上它就是一个环形的数组,如图所示,假设我们创建了一个长度为 8 的时间轮。


task0 = 当我们需要新建一个 5s 延时消息,则只需要将它放到下标为 5 的那个槽中。


task1 = 而如果是一个 10s 的延时消息,则需要将它放到下标为 2 的槽中,但同时需要记录它所对应的圈数,不然就和 2 秒的延时消息重复了。


task2= 当创建一个 21s 的延时消息时,它所在的位置就和 task0 相同了,都在下标为 5 的槽中,所以为了区别需要为他加上圈数为 2。



通过这张图可以更直观的理解。


当我们需要取出延时消息时,只需要每秒往下移动这个指针,然后取出该位置的所有任务即可。


当然取出任务之前还得判断圈数是否为 0 ,不为 0 时说明该任务还得再轮几圈,同时需要将圈数 -1 。


这样就可避免轮询所有的任务,不过如果时间轮的槽比较少,导致某一个槽上的任务非常多那效率也比较低,这就和 HashMaphash 冲突是一样的。


编码实现


理论讲完后我们来看看实际的编码实现,为此我创建了一个 RingBufferWheel 类。


它的主要功能如下:


  • 可以添加指定时间的延时任务,在这个任务中可以实现自己的业务逻辑。


  • 停止运行(包含强制停止和所有任务完成后停止)。


  • 查看待执行任务数量。


首先直接看看这个类是如何使用的。



我在这里创建了 65 个延时任务,每个任务都比前一个延后 1s 执行;同时自定义了一个 Job 类来实现自己的业务逻辑,最后调用 stop(false) 会在所有任务执行完毕后退出。



相关文章
|
4月前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ产品使用合集之如何自定义时间间隔
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
736 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
存储 消息中间件 NoSQL
延时消息常见实现方案
延时消息常见实现方案
延时消息常见实现方案
|
2月前
|
消息中间件 监控 UED
【揭秘消息队列背后的秘密!】如何解决消息队列的延时及过期失效问题?深入剖析与实战指南!
【8月更文挑战第24天】本文以随笔形式探讨了消息队列在实际应用中面临的消息延时及过期失效问题。针对消息延时,文章提出了包括优化消息队列配置、提高消费者效率和利用优先级队列在内的解决方案;并通过示例代码展示了如何优化RabbitMQ中的消费者处理流程。对于消息过期失效问题,则建议设置消息TTL、采用死信队列并实施监控报警机制;同样提供了基于RabbitMQ设置消息TTL的具体实现。这些策略有助于提升消息队列的性能和系统的整体稳定性。
45 2
|
4月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 负载均衡 调度
个推延迟收到消息问题原因分析
个推延迟收到消息问题原因分析
92 1
|
5月前
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
804 0
|
5月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
929 6
|
5月前
|
消息中间件 存储 RocketMQ
大白话-设计RocketMQ延迟消息
RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间的延迟,这一点对于有强迫症的朋友来说就比较难受,但是搞明白为什么这么设计后,就自然释怀了。
|
消息中间件 Shell RocketMQ
谁让你再使用cron发送延时消息,你直接给他一jio!(文末送书)
谁让你再使用cron发送延时消息,你直接给他一jio!(文末送书)
126 0
下一篇
无影云桌面