Spring Boot + Redis 实现延时队列,写得太好了!

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 首先我们分析下这个流程用户提交任务。首先将任务推送至延迟队列中。

来源:blog.csdn.net/qq330983778/article/details/99341671


业务流程

首先我们分析下这个流程


用户提交任务。首先将任务推送至延迟队列中。

延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。

然后生成延迟任务(仅仅包含任务id)放入某个桶中

时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。

监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间

如果合法则计算时间,如果时间合法:根据topic将任务放入对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容

消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。

完成消费后,发送finish消息,服务端根据job id删除对应信息。

image.png


对象

我们现在可以了解到中间存在的几个组件


延迟队列,为Redis延迟队列。实现消息传递

Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为job

Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入

Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个Bucket

Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。

其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。


任务状态

ready:可执行状态,

delay:不可执行状态,等待时钟周期。

reserved:已被消费者读取,但没有完成消费。

deleted:已被消费完成或者已被删除。

对外提供的接口

image.png


额外的内容

首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。

根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。

文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。

文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。

实现

现在我们根据设计内容完成设计。这一块设计我们分四步完成


任务及相关对象

目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delay job),Spring Boot 基础就不介绍了,推荐下这个实战教程: https://github.com/javastacks/spring-boot-best-practice


任务对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {
    /**
     * 延迟任务的唯一标识,用于检索任务
     */
    @JsonSerialize(using = ToStringSerializer.class)
    private Long id;
    /**
     * 任务类型(具体业务类型)
     */
    private String topic;
    /**
     * 任务的延迟时间
     */
    private long delayTime;
    /**
     * 任务的执行超时时间
     */
    private long ttrTime;
    /**
     * 任务具体的消息内容,用于处理具体业务逻辑用
     */
    private String message;
    /**
     * 重试次数
     */
    private int retryCount;
    /**
     * 任务状态
     */
    private JobStatus status;
}

任务引用对象

@Data
@AllArgsConstructor
public class DelayJob implements Serializable {
    /**
     * 延迟任务的唯一标识
     */
    private long jodId;
    /**
     * 任务的执行时间
     */
    private long delayDate;
    /**
     * 任务类型(具体业务类型)
     */
    private String topic;
    public DelayJob(Job job) {
        this.jodId = job.getId();
        this.delayDate = System.currentTimeMillis() + job.getDelayTime();
        this.topic = job.getTopic();
    }
    public DelayJob(Object value, Double score) {
        this.jodId = Long.parseLong(String.valueOf(value));
        this.delayDate = System.currentTimeMillis() + score.longValue();
    }
}

容器

目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器

job任务池,为普通的K/V结构,提供基础的操作

@Component
@Slf4j
public class JobPool {
    @Autowired
    private RedisTemplate redisTemplate;
    private String NAME = "job.pool";
    private BoundHashOperations getPool () {
        BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
        return ops;
    }
    /**
     * 添加任务
     * @param job
     */
    public void addJob (Job job) {
        log.info("任务池添加任务:{}", JSON.toJSONString(job));
        getPool().put(job.getId(),job);
        return ;
    }
    /**
     * 获得任务
     * @param jobId
     * @return
     */
    public Job getJob(Long jobId) {
        Object o = getPool().get(jobId);
        if (o instanceof Job) {
            return (Job) o;
        }
        return null;
    }
    /**
     * 移除任务
     * @param jobId
     */
    public void removeDelayJob (Long jobId) {
        log.info("任务池移除任务:{}",jobId);
        // 移除任务
        getPool().delete(jobId);
    }
}

延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作

@Slf4j
@Component
public class DelayBucket {
    @Autowired
    private RedisTemplate redisTemplate;
    private static AtomicInteger index = new AtomicInteger(0);
    @Value("${thread.size}")
    private int bucketsSize;
    private List <String> bucketNames = new ArrayList <>();
    @Bean
    public List <String> createBuckets() {
        for (int i = 0; i < bucketsSize; i++) {
            bucketNames.add("bucket" + i);
        }
        return bucketNames;
    }
    /**
     * 获得桶的名称
     * @return
     */
    private String getThisBucketName() {
        int thisIndex = index.addAndGet(1);
        int i1 = thisIndex % bucketsSize;
        return bucketNames.get(i1);
    }
    /**
     * 获得桶集合
     * @param bucketName
     * @return
     */
    private BoundZSetOperations getBucket(String bucketName) {
        return redisTemplate.boundZSetOps(bucketName);
    }
    /**
     * 放入延时任务
     * @param job
     */
    public void addDelayJob(DelayJob job) {
        log.info("添加延迟任务:{}", JSON.toJSONString(job));
        String thisBucketName = getThisBucketName();
        BoundZSetOperations bucket = getBucket(thisBucketName);
        bucket.add(job,job.getDelayDate());
    }
    /**
     * 获得最新的延期任务
     * @return
     */
    public DelayJob getFirstDelayTime(Integer index) {
        String name = bucketNames.get(index);
        BoundZSetOperations bucket = getBucket(name);
        Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
        if (CollectionUtils.isEmpty(set)) {
            return null;
        }
        ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
        Object value = typedTuple.getValue();
        if (value instanceof DelayJob) {
            return (DelayJob) value;
        }
        return null;
    }
    /**
     * 移除延时任务
     * @param index
     * @param delayJob
     */
    public void removeDelayTime(Integer index,DelayJob delayJob) {
        String name = bucketNames.get(index);
        BoundZSetOperations bucket = getBucket(name);
        bucket.remove(delayJob);
    }
}

待完成任务,内部使用topic进行细分,每个topic对应一个list集合

@Component
@Slf4j
public class ReadyQueue {
    @Autowired
    private RedisTemplate redisTemplate;
    private String NAME = "process.queue";
    private String getKey(String topic) {
        return NAME + topic;
    }
    /**
     * 获得队列
     * @param topic
     * @return
     */
    private BoundListOperations getQueue (String topic) {
        BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
        return ops;
    }
    /**
     * 设置任务
     * @param delayJob
     */
    public void pushJob(DelayJob delayJob) {
        log.info("执行队列添加任务:{}",delayJob);
        BoundListOperations listOperations = getQueue(delayJob.getTopic());
        listOperations.leftPush(delayJob);
    }
    /**
     * 移除并获得任务
     * @param topic
     * @return
     */
    public DelayJob popJob(String topic) {
        BoundListOperations listOperations = getQueue(topic);
        Object o = listOperations.leftPop();
        if (o instanceof DelayJob) {
            log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));
            return (DelayJob) o;
        }
        return null;
    }
}

轮询处理

设置了线程池为每个bucket设置一个轮询操作

@Component
public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {
    @Autowired
    private DelayBucket delayBucket;
    @Autowired
    private JobPool     jobPool;
    @Autowired
    private ReadyQueue  readyQueue;
    @Value("${thread.size}")
    private int length;
    @Override 
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        ExecutorService executorService = new ThreadPoolExecutor(
                length, 
                length,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue <Runnable>());
        for (int i = 0; i < length; i++) {
            executorService.execute(
                    new DelayJobHandler(
                            delayBucket,
                            jobPool,
                            readyQueue,
                            i));
        }
    }
}

测试请求

/**
 * 测试用请求
 * @author daify
 **/
@RestController
@RequestMapping("delay")
public class DelayController {
    @Autowired
    private JobService jobService;
    /**
     * 添加
     * @param request
     * @return
     */
    @RequestMapping(value = "add",method = RequestMethod.POST)
    public String addDefJob(Job request) {
        DelayJob delayJob = jobService.addDefJob(request);
        return JSON.toJSONString(delayJob);
    }
    /**
     * 获取
     * @return
     */
    @RequestMapping(value = "pop",method = RequestMethod.GET)
    public String getProcessJob(String topic) {
        Job process = jobService.getProcessJob(topic);
        return JSON.toJSONString(process);
    }
    /**
     * 完成一个执行的任务
     * @param jobId
     * @return
     */
    @RequestMapping(value = "finish",method = RequestMethod.DELETE)
    public String finishJob(Long jobId) {
        jobService.finishJob(jobId);
        return "success";
    }
    @RequestMapping(value = "delete",method = RequestMethod.DELETE)
    public String deleteJob(Long jobId) {
        jobService.deleteJob(jobId);
        return "success";
    }
}

测试

添加延迟任务

通过postman请求:localhost:8000/delay/add

image.png

此时这条延时任务被添加进了线程池中

2019-08-12 21:21:36.589  INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
2019-08-12 21:21:36.609  INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket    : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}

根据设置10秒钟之后任务会被添加至ReadyQueue中

2019-08-12 21:21:46.744  INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue     : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)

获得任务

这时候我们请求localhost:8000/delay/pop

这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中

2019-08-09 19:36:02.342  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}
2019-08-09 19:36:02.364  INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}
2019-08-09 19:36:02.384  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket    : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}

按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中

2019-08-12 21:21:48.239  INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:48.261  INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}

任务的删除/消费

现在我们请求:localhost:8000/delay/delete


image.png


此时在Job pool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。

2019-08-12 21:21:54.880  INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool  : 任务池移除任务:3
2019-08-12 21:21:59.104  INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler  : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}
相关文章
|
1月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
101 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
6月前
|
NoSQL 安全 Java
深入理解 RedisConnectionFactory:Spring Data Redis 的核心组件
在 Spring Data Redis 中,`RedisConnectionFactory` 是核心组件,负责创建和管理与 Redis 的连接。它支持单机、集群及哨兵等多种模式,为上层组件(如 `RedisTemplate`)提供连接抽象。Spring 提供了 Lettuce 和 Jedis 两种主要实现,其中 Lettuce 因其线程安全和高性能特性被广泛推荐。通过手动配置或 Spring Boot 自动化配置,开发者可轻松集成 Redis,提升应用性能与扩展性。本文深入解析其作用、实现方式及常见问题解决方法,助你高效使用 Redis。
598 4
|
7月前
|
NoSQL Java 关系型数据库
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
本文介绍在 Spring Boot 中集成 Redis 的方法。Redis 是一种支持多种数据结构的非关系型数据库(NoSQL),具备高并发、高性能和灵活扩展的特点,适用于缓存、实时数据分析等场景。其数据以键值对形式存储,支持字符串、哈希、列表、集合等类型。通过将 Redis 与 Mysql 集群结合使用,可实现数据同步,提升系统稳定性。例如,在网站架构中优先从 Redis 获取数据,故障时回退至 Mysql,确保服务不中断。
285 0
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
|
3月前
|
NoSQL Java Redis
Redis基本数据类型及Spring Data Redis应用
Redis 是开源高性能键值对数据库,支持 String、Hash、List、Set、Sorted Set 等数据结构,适用于缓存、消息队列、排行榜等场景。具备高性能、原子操作及丰富功能,是分布式系统核心组件。
412 2
|
5月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
200 32
|
9月前
|
XML Java 应用服务中间件
Spring Boot 两种部署到服务器的方式
本文介绍了Spring Boot项目的两种部署方式:jar包和war包。Jar包方式使用内置Tomcat,只需配置JDK 1.8及以上环境,通过`nohup java -jar`命令后台运行,并开放服务器端口即可访问。War包则需将项目打包后放入外部Tomcat的webapps目录,修改启动类继承`SpringBootServletInitializer`并调整pom.xml中的打包类型为war,最后启动Tomcat访问应用。两者各有优劣,jar包更简单便捷,而war包适合传统部署场景。需要注意的是,war包部署时,内置Tomcat的端口配置不会生效。
2202 17
Spring Boot 两种部署到服务器的方式
|
7月前
|
NoSQL Java API
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Spring Boot 集成 Redis
本文介绍了在Spring Boot中集成Redis的方法,包括依赖导入、Redis配置及常用API的使用。通过导入`spring-boot-starter-data-redis`依赖和配置`application.yml`文件,可轻松实现Redis集成。文中详细讲解了StringRedisTemplate的使用,适用于字符串操作,并结合FastJSON将实体类转换为JSON存储。还展示了Redis的string、hash和list类型的操作示例。最后总结了Redis在缓存和高并发场景中的应用价值,并提供课程源代码下载链接。
1696 0
|
7月前
|
NoSQL Java Redis
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 安装
本教程介绍在 VMware 虚拟机(CentOS 7)或阿里云服务器中安装 Redis 的过程,包括安装 gcc 编译环境、下载 Redis(官网或 wget)、解压安装、修改配置文件(如 bind、daemonize、requirepass 等设置)、启动 Redis 服务及测试客户端连接。通过 set 和 get 命令验证安装是否成功。适用于初学者快速上手 Redis 部署。
163 0
|
7月前
|
Java 数据库 微服务
微服务——SpringBoot使用归纳——Spring Boot中的项目属性配置——指定项目配置文件
在实际项目中,开发环境和生产环境的配置往往不同。为简化配置切换,可通过创建 `application-dev.yml` 和 `application-pro.yml` 分别管理开发与生产环境配置,如设置不同端口(8001/8002)。在 `application.yml` 中使用 `spring.profiles.active` 指定加载的配置文件,实现环境快速切换。本节还介绍了通过配置类读取参数的方法,适用于微服务场景,提升代码可维护性。课程源码可从 [Gitee](https://gitee.com/eson15/springboot_study) 下载。
264 0
|
10月前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
585 5