手把手实现一条延时消息(上)

简介: 近期在维护公司的调度平台,其中有个关键功能那就是定时任务;定时任务大家平时肯定接触的不少,比如 JDK 中的 Timer、ScheduledExecutorService、调度框架 Quartz 等。

前言


近期在维护公司的调度平台,其中有个关键功能那就是定时任务;定时任务大家平时肯定接触的不少,比如 JDK 中的 TimerScheduledExecutorService、调度框架 Quartz 等。


通常用于实现 XX 时间后的延时任务,或周期性任务;


比如一个常见的业务场景:用户下单 N 分钟未能支付便自动取消订单。


实现这类需求通常有两种方式:


  • 轮询定时任务:给定周期内扫描所有未支付的订单,查看时间是否到期。


  • 延时消息:订单创建的时候发送一条 N 分钟到期的信息,一旦消息消费后便可判断订单是否可以取消。


先看第一种,这类方式实现较为简单,只需要启动一个定时任务即可;但缺点同样也很明显,这个间隔扫描的时间不好控制。


给短了会造成很多无意义的扫描,增大数据库压力,给长了又会使得误差较大。


当然最大的问题还是效率较低,随着订单增多耗时会呈线性增长,最差的情况甚至会出现上一波轮询还没有扫描完,下一波调度又来了。


这时第二种方案就要显得靠谱多了,通过延时消息可以去掉不必要的订单扫描,实时性也比较高。


延时消息


这里我们不过多讨论这类需求如何实现;重点聊聊这个延时消息,看它是如何实现的,基于实现延时消息的数据结构还能实现定时任务。


我在之前的开源 IM 项目中也加入了此类功能,可以很直观的发送一条延时消息,效果如下:



使用 :delay hahah 2 发送了一条两秒钟的延时消息,另外一个客户端将会在两秒钟之后收到该消息。


具体的实现步骤会在后文继续分析。


时间轮


要实现延时消息就不得不提到一种数据结构【时间轮】,时间轮听这名字可以很直观的抽象出它的数据结构。



其实本质上它就是一个环形的数组,如图所示,假设我们创建了一个长度为 8 的时间轮。


task0 = 当我们需要新建一个 5s 延时消息,则只需要将它放到下标为 5 的那个槽中。


task1 = 而如果是一个 10s 的延时消息,则需要将它放到下标为 2 的槽中,但同时需要记录它所对应的圈数,不然就和 2 秒的延时消息重复了。


task2= 当创建一个 21s 的延时消息时,它所在的位置就和 task0 相同了,都在下标为 5 的槽中,所以为了区别需要为他加上圈数为 2。



通过这张图可以更直观的理解。


当我们需要取出延时消息时,只需要每秒往下移动这个指针,然后取出该位置的所有任务即可。


当然取出任务之前还得判断圈数是否为 0 ,不为 0 时说明该任务还得再轮几圈,同时需要将圈数 -1 。


这样就可避免轮询所有的任务,不过如果时间轮的槽比较少,导致某一个槽上的任务非常多那效率也比较低,这就和 HashMaphash 冲突是一样的。


编码实现


理论讲完后我们来看看实际的编码实现,为此我创建了一个 RingBufferWheel 类。


它的主要功能如下:


  • 可以添加指定时间的延时任务,在这个任务中可以实现自己的业务逻辑。


  • 停止运行(包含强制停止和所有任务完成后停止)。


  • 查看待执行任务数量。


首先直接看看这个类是如何使用的。



我在这里创建了 65 个延时任务,每个任务都比前一个延后 1s 执行;同时自定义了一个 Job 类来实现自己的业务逻辑,最后调用 stop(false) 会在所有任务执行完毕后退出。



相关文章
|
JSON 数据格式 物联网
HTTP协议接入物联网平台(Getman模拟)
物联网平台通过HTTP连接通信(Getman模拟)
3589 0
HTTP协议接入物联网平台(Getman模拟)
|
druid 数据库
数据库连接池——Druid
数据库连接池——Druid 一、好处 更方便地获取连接对象,效率高 资源可以更好的重复利用
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何保证消息不丢失
3分钟白话RocketMQ系列—— 如何保证消息不丢失
4939 1
|
7月前
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
372 11
RocketMQ原理—3.源码设计简单分析下
|
消息中间件 存储 RocketMQ
Rocketmq如何保证消息不丢失
文章分析了RocketMQ如何通过生产者端的同步发送与重试机制、Broker端的持久化存储与消息重试投递策略、以及消费者端的手动提交ack与幂等性处理,来确保消息在整个传输和消费过程中的不丢失。
|
人工智能 异构计算 Python
解锁视频生成新时代! 探索智谱CogVideoX-2b:轻松生成6秒视频的详细指南
解锁视频生成新时代! 探索智谱CogVideoX-2b:轻松生成6秒视频的详细指南
570 1
解锁视频生成新时代! 探索智谱CogVideoX-2b:轻松生成6秒视频的详细指南
|
缓存 算法 网络协议
一文详细理解计算机网络 - 数据链路层(考试和面试必备)
这篇文章详细介绍了计算机网络中数据链路层的概念、基本问题、点对点信道和广播信道的数据链路协议(如PPP和CSMA/CD),以及局域网和以太网的相关知识。
2511 0
一文详细理解计算机网络 - 数据链路层(考试和面试必备)
|
安全 Java 编译器
JDK8到JDK21版本升级的新特性问题之JDK17重要的新特性有哪些
JDK8到JDK21版本升级的新特性问题之JDK17重要的新特性有哪些
|
人工智能 编解码 自然语言处理
通义万相功能使用实战
【7月更文第2天】阿里云的通义万相是款AI绘画工具,让用户通过文本描述创建个性化头像。首先,注册阿里云账号并登录平台。明确头像风格、特征和背景,然后在平台上选择“文本生成图像”,输入详细描述。设定尺寸后提交生成。系统会提供多个选项,用户可选择、调整或重新生成。满意后下载头像,应用于社交平台。记得提供清晰的描述以获取最佳效果,勇于探索不同的创意组合。通义万相,让AI助你实现艺术想象。
1302 0
|
Java 网络安全 Maven
要在云效中使用JDK 21进行打包
【2月更文挑战第18天】要在云效中使用JDK 21进行打包
529 4