手把手实现一条延时消息(下)

简介: 近期在维护公司的调度平台,其中有个关键功能那就是定时任务;定时任务大家平时肯定接触的不少,比如 JDK 中的 Timer、ScheduledExecutorService、调度框架 Quartz 等。

构造函数



先来看看其中的构造函数,这里一共有两个构造函数,用于接收一个线程池及时间轮的大小。


线程池的作用会在后面讲到。


这里的时间轮大小也是有讲究的,它的长度必须得是 2∧n,至于为什么有这个要求后面也会讲到。


默认情况下会初始化一个长度为 64 的数组。


添加任务



下面来看看添加任务的逻辑,根据我们之前的那张抽象图其实很容易实现。



首先我们要定义一个 Task 类,用于抽象任务;它本身也是一个线程,一旦延时到期便会执行其中的 run 函数,所以使用时便可继承该类,将业务逻辑写在 run() 中即可。

它其中还有两个成员变量,也很好理解。


  • cycleNum 用于记录该任务所在时间轮的圈数。


  • key 在这里其实就是延时时间。



//通过 key 计算应该存放的位置
    private Set<Task> get(int key) {
        int index = mod(key, bufferSize);
        return (Set<Task>) ringBuffer[index];
    }
    private int mod(int target, int mod) {
        // equals target % mod
        target = target + tick.get() ;
        return target & (mod - 1);
    }


首先是根据延时时间 (key) 计算出所在的位置,其实就和 HashMap 一样的取模运算,只不过这里使用了位运算替代了取模,同时效率会高上不少。


这样也解释了为什么数组长度一定得是 2∧n


然后查看该位置上是否存在任务,不存在就新建一个;存在自然就是将任务写入这个集合并更新回去。


private int cycleNum(int target, int mod) {
        //equals target/mod
        return target >> Integer.bitCount(mod - 1);
    }


其中的 cycleNum() 自然是用于计算该任务所处的圈数,也是考虑到效率问题,使用位运算替代了除法。


private void put(int key, Set<Task> tasks) {
        int index = mod(key, bufferSize);
        ringBuffer[index] = tasks;
    }


put() 函数就非常简单了,就是将任务写入指定数组下标即可。


启动时间轮


任务写进去后下一步便是启动这个时间轮了,我这里定义了一个 start() 函数。



其实本质上就是开启了一个后台线程来做这个事情:



它会一直从时间轮中取出任务来运行,而运行这些任务的线程便是我们在初始化时传入的线程池;所以所有的延时任务都是由自定义的线程池调度完成的,这样可以避免时间轮的阻塞。


这里调用的 remove(index) 很容易猜到是用于获取当前数组中的所有任务。



逻辑很简单就不再赘述,不过其中的 size2Notify() 倒是值得说一下。



他是用于在停止任务时,主线程等待所有延时任务执行完毕的唤醒条件。这类用法几乎是所有线程间通信的常规套路,值得收入技能包。


停止时间轮


刚才提到的唤醒主线程得配合这里的停止方法使用:



如果是强制停止那便什么也不管,直接更新停止标志,同时关闭线程池即可。


但如果是软停止(等待所有任务执行完毕)时,那就得通过上文提到的方式阻塞主线程,直到任务执行完毕后被唤醒。


CIM 中的应用


介绍了核心原理和基本 API 后,我们来看看实际业务场景如何结合使用(背景是一个即时通讯项目)。


我这里所使用的场景在文初也提到了,就是真的发送一条延时消息;



现有的消息都是实时消息,所以要实现一个延时消息便是在现有的发送客户端处将延时消息放入到这个时间轮中,在任务到期时再执行真正的消息发送逻辑。


由于项目本身结合了 Spring,所以第一步自然是配置 bean



bean 配置好后其实就可以使用了。



每当发送的是延时消息时,只需要将这个消息封装为一个 Job 放到时间轮中,然后在自己的业务类中完成业务即可。


后续可以优化下 api,不用每次新增任务都要调用 start() 方法。


这样一个延时消息的应用便完成了。


总结


时间轮这样的应用还非常多,比如 Netty 中的 HashedWheelTimer 工具原理也差不多,可以用于维护长连接心跳信息。


甚至 Kafka 在这基础上还优化出了层级时间轮,大家感兴趣的话可以自行搜索资料


本文的所有源码都可在此处查阅:


github.com/crossoverJi…


相关文章
|
4月前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ产品使用合集之如何自定义时间间隔
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
736 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
存储 消息中间件 NoSQL
延时消息常见实现方案
延时消息常见实现方案
延时消息常见实现方案
|
2月前
|
消息中间件 监控 UED
【揭秘消息队列背后的秘密!】如何解决消息队列的延时及过期失效问题?深入剖析与实战指南!
【8月更文挑战第24天】本文以随笔形式探讨了消息队列在实际应用中面临的消息延时及过期失效问题。针对消息延时,文章提出了包括优化消息队列配置、提高消费者效率和利用优先级队列在内的解决方案;并通过示例代码展示了如何优化RabbitMQ中的消费者处理流程。对于消息过期失效问题,则建议设置消息TTL、采用死信队列并实施监控报警机制;同样提供了基于RabbitMQ设置消息TTL的具体实现。这些策略有助于提升消息队列的性能和系统的整体稳定性。
45 2
|
4月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 负载均衡 调度
个推延迟收到消息问题原因分析
个推延迟收到消息问题原因分析
92 1
|
5月前
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
804 0
|
5月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
929 6
|
5月前
|
消息中间件 存储 RocketMQ
大白话-设计RocketMQ延迟消息
RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间的延迟,这一点对于有强迫症的朋友来说就比较难受,但是搞明白为什么这么设计后,就自然释怀了。
|
消息中间件 Shell RocketMQ
谁让你再使用cron发送延时消息,你直接给他一jio!(文末送书)
谁让你再使用cron发送延时消息,你直接给他一jio!(文末送书)
126 0
下一篇
无影云桌面