优雅实现延时任务之Redis篇

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 什么是延时任务延时任务,顾名思义,就是延迟一段时间后才执行的任务。举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了。

什么是延时任务

延时任务,顾名思义,就是延迟一段时间后才执行的任务。举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了。只要在前一天下班前指定第二天要发送资讯的时间,到了第二天指定的时间点资讯就能准时发出去了。如果大家有运营过公众号,就会知道公众号后台也有文章定时发送的功能。总而言之,延时任务的使用还是很广泛的。关于延时任务的实现方式,我知道的就不下于3种,后面会逐一介绍,今天就讲下如何用redis实现延时任务。

延时任务的特点

在介绍具体方案之前,我们不妨先想一下要实现一个延时系统,有哪些内容是必须存储下来的(这里的存储不一定是指持久化,也可以是放在内存中,取决于延时任务的重要程度)。首先要存储的就是任务的描述。假如你要处理的延时任务是延时发布资讯,那么你至少要存储资讯的id吧。另外,如果你有多种任务类型,比如:延时推送消息、延时清洗数据等等,那么你还需要存储任务的类型。以上总总,都归属于任务描述。除此之外,你还必须存储任务执行的时间点吧,一般来说就是时间戳。此外,我们还需要根据任务的执行时间进行排序,因为延时任务队列里的任务可能会有很多,只有到了时间点的任务才应该被执行,所以必须支持对任务执行时间进行排序。

使用Redis实现延时任务

以上就是一个延迟任务系统必须具备的要素了。回到redis,有什么数据结构可以既存储任务描述,又能存储任务执行时间,还能根据任务执行时间进行排序呢?想来想去,似乎只有Sorted Set了。我们可以把任务的描述序列化成字符串,放在Sorted Set的value中,然后把任务的执行时间戳作为score,利用Sorted Set天然的排序特性,执行时刻越早的会排在越前面。这样一来,我们只要开一个或多个定时线程,每隔一段时间去查一下这个Sorted Set中score小于或等于当前时间戳的元素(这可以通过zrangebyscore命令实现),然后再执行元素对应的任务即可。当然,执行完任务后,还要将元素从Sorted Set中删除,避免任务重复执行。如果是多个线程去轮询这个Sorted Set,还有考虑并发问题,假如说一个任务到期了,也被多个线程拿到了,这个时候必须保证只有一个线程能执行这个任务,这可以通过zrem命令来实现,只有删除成功了,才能执行任务,这样就能保证任务不被多个任务重复执行了。

接下来看代码。首先看下项目结构:

img_8d856335070018585d9b955af87b1677.png

一共4个类:Constants类定义了Redis key相关的常量。DelayTaskConsumer是延时任务的消费者,这个类负责从Redis拉取到期的任务,并封装了任务消费的逻辑。DelayTaskProducer则是延时任务的生产者,主要用于将延时任务放到Redis中。RedisClient则是Redis客户端的工具类。

最主要的类就是DelayTaskConsumer和DelayTaskProducer了。

我们先来看下生产者DelayTaskProducer:


img_df7d631226ec674092fd9d173b1beee5.png

代码很简单,就是将任务描述(为了方便,这里只存储资讯的id)和任务执行的时间戳放到Redis的Sorted Set中。

接下来是延时任务的消费者DelayTaskConsumer:

public class DelayTaskConsumer {

   private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

   public void start(){

       scheduledExecutorService.scheduleWithFixedDelay(new DelayTaskHandler(),1,1, TimeUnit.SECONDS);

   }

   public static class DelayTaskHandler implements Runnable{

       @Override

       public void run() {

           Jedis client = RedisClient.getClient();

           try {

               Set ids = client.zrangeByScore(Constants.DELAY_TASK_QUEUE, 0, System.currentTimeMillis(),

                       0, 1);

               if(ids==null||ids.isEmpty()){

                   return;

               }

               for(String id:ids){

                   Long count = client.zrem(Constants.DELAY_TASK_QUEUE, id);

                   if(count!=null&&count==1){

                       System.out.println(MessageFormat.format("发布资讯。id - {0} , timeStamp - {1} , " +

                               "threadName - {2}",id,System.currentTimeMillis(),Thread.currentThread().getName()));

                   }

               }

           }finally {

               client.close();

           }

       }

   }

}

首先看start方法。在这个方法里面我们利用Java的ScheduledExecutorService开了一个调度线程池,这个线程池会每隔1秒钟调度DelayTaskHandler中的run方法。

DelayTaskHandler类就是具体的调度逻辑了。主要有2个步骤,一个是从Redis Sorted Set中拉取到期的延时任务,另一个是执行到期的延时任务。拉取到期的延时任务是通过zrangeByScore命令实现的,处理多线程并发问题是通过zrem命令实现的。代码不复杂,这里就不多做解释了。

接下来测试一下:


img_6c71f7045d9109988bdd7c900f7d9a54.png

我们首先生产了4个延时任务,执行时间分别是程序开始运行后的5秒、10秒、15秒、20秒,然后启动了10个消费者去消费延时任务。运行效果如下:

img_69a5adc1abdb2ef67d143b3f21736b1e.png

可以看到,任务确实能够在相应的时间点左右被执行,不过有少许时间误差,这个是因为我们拉取到期任务是通过定时任务拉取而不是实时推送的,而且拉取任务时有一部分网络开销,再者,我们的任务处理逻辑是同步处理的,需要上一次的任务处理完,才能拉取下一批任务,这些因素都会造成延时任务的执行时间产生偏差。

总结

以上就是通过Redis实现延时任务的思路了。这里提供的只是一个最简单的版本,实际上还有很多地方可以优化。比如,我们可以把任务的处理逻辑再放到单独的线程池中去执行,这样的话任务消费者只需要负责任务的调度就可以了,好处就是可以减少任务执行时间偏差。还有就是,这里为了方便,任务的描述存储的只是任务的id,如果有多种不同类型的任务,像前面说的发送资讯任务和推送消息任务,那么就要通过额外存储任务的类型来进行区分,或者使用不同的Sorted Set来存放延时任务了。

除此之外,上面的例子每次拉取延时任务时,只拉取1个,如果说某一个时刻要处理的任务数非常多,那么会有一部分任务延迟比较严重,这里可以优化成每次拉取不止1个到期的任务,比如说10个,然后再逐个进行处理,这样的话可以极大地提升调度效率,因为如果是使用上面的方法,拉取10个任务需要10次调度,每次间隔1秒,总共需要10秒才能把10个任务拉取完,如果改成一次拉取10个,只需要1次就能完成了,效率提升还是挺大的。

当然还可以从另一个角度来优化。大家看上面的代码,当拉取到待执行任务时,就直接执行任务,任务执行完该线程也就退出了,但是这个时候,队列里可能还有很多待执行的任务(因为我们拉取任务时,限制了拉取的数量),所以其实在这里可以使用循环,当拉取不到待执行任务时,才结束调度,当有任务时,执行完还有顺便查询下有没有堆积的任务,直到没有堆积任务了才结束线程。

最后一个需要考虑的地方是,上面的代码并没有对任务执行失败的情况进行处理,也就是说如果某个任务执行失败了,那么连重试的机会都没有。因此,在生产环境使用时,还需要考虑任务处理失败的情况。有一个简单的方法是在任务处理时捕获异常,当在处理过程中出现异常时,就将该任务再放回Redis Sorted中,或者由当前线程再重试处理。不过这样做的话,任务的时效性就不能保证了,有可能本来定在早上7点执行的任务,因为失败重试的原因,延迟到7点10分才执行完成,这个要根据业务来进行权衡,比如可以在任务描述中增加重试次数或者是否允许重试的字段,这样在任务执行失败时,就能根据不同的任务采取不同的补偿措施了。

那么使用redis实现延时任务有什么优缺点呢?优点就是可以满足吞吐量。缺点则是存在任务丢失的风险(当redis实例挂了的时候)。因此,如果对性能要求比较高,同时又能容忍少数情况下任务的丢失,那么可以使用这种方式来实现。

欢迎工作一到五年的Java工程师朋友们加入Java架构开发:860113481

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6天前
|
缓存 NoSQL Java
面试官:Redis如何实现延迟任务?
延迟任务是计划任务,用于在未来特定时间执行。常见应用场景包括定时通知、异步处理、缓存管理、计划任务、订单处理、重试机制、提醒和数据采集。Redis虽无内置延迟任务功能,但可通过过期键通知、ZSet或Redisson实现。然而,这种方法精度有限,稳定性较差,适合轻量级需求。Redisson的RDelayedQueue提供更简单的延迟队列实现。
39 9
|
2月前
|
存储 NoSQL API
【小小思考】Redis实现去重任务队列
【2月更文挑战第1天】思考一下如何用Redis实现去重的任务队列,主要有List 、List + Set/Hash/Bloom Filter、ZSet、Lua和开源库等方式。
72 1
|
2月前
|
缓存 NoSQL Java
一次访问Redis延时高问题排查与总结
作者抽丝剥茧的记录了一次访问Redis延时高问题的排查和总结。
386 1
|
7月前
|
Arthas NoSQL Java
一次访问Redis延时高问题排查与总结(2)
本文是一次访问Redis延时高问题排查与总结的续篇,主要讲述了当时没有发现的一些问题和解决方案。
46945 22
|
7月前
|
NoSQL 安全 容灾
1分钟实现Redis数据迁移任务
NineData 基于全量复制、增量日志复制技术,提供了高效、安全可靠的 Redis 不停机迁移方案。当然,除了 Redis,NineData 已经支持数十种常见数据库的迁移复制,实现数据库迁移、数据容灾、数据双活、数据仓库实时集成等业务场景。同时,除了 SAAS 模式外,还提供了企业专属集群模式,满足企业最高的数据安全合规要求。
156 0
|
7月前
|
监控 NoSQL Java
面试官:Redis分布式锁超时了,任务还没执行完怎么办?
今天主要分享的是面试中常见的redis的一些面试内容。如果你正好需要刚好可以帮你回顾一下,如果不需要可以收藏起来后面用到的时候翻出来回顾。
|
10月前
|
NoSQL Redis
【Redis原理机制 四】基于Redis实现延时任务
【Redis原理机制 四】基于Redis实现延时任务
34 0
|
NoSQL Redis
Redis学习4:List数据类型、拓展操作、实现日志等
注意点:对存储空间的顺序进行分析!
Redis学习4:List数据类型、拓展操作、实现日志等
|
存储 NoSQL Redis
Redis学习3:hash类型操作、拓展操作、实现购物等
首先可以理解成一个redis里面有一个小的redis。同时要注意引入了一个field的名字。
Redis学习3:hash类型操作、拓展操作、实现购物等
|
缓存 NoSQL 安全
2021年你还不会Shiro?----10.使用redis实现Shiro的缓存
上一篇文章已经总结了使用ehCache来实现Shiro的缓存管理,步骤也很简单,引入依赖后,直接开启Realm的缓存管理器即可。如果使用Redis来实现缓存管理其实也是一样的,我们也是需要引入redis的依赖,然后开启缓存传入自定义的redis的缓存管理器就行。区别是我们需要为自定义的redis缓存管理器提供自定义的缓存管理类。这个缓存管理类中需要使用到redisTemplate模板,这个模板我们也是需要自己定义。
230 0
2021年你还不会Shiro?----10.使用redis实现Shiro的缓存