【Spring专题】「开发指南」手把手教你将@Schedule任务调度升级为分布式调度@DistributeSchedule

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 【Spring专题】「开发指南」手把手教你将@Schedule任务调度升级为分布式调度@DistributeSchedule

背景介绍


很多小伙伴们都跟我留言说过一个类似的问题,就是针对于任务调度框架而言的选取,很多公司都会采用任务调度框架的鼻祖Quartz,那么我们来梳理以下Java领域的任务调度框架吧。




Java领域的定时任务的框架


单机级别任务调度


  • TimeTask:是一个定时器类,通过该类可以为指定的定时任务进行配置。TimerTask类是一个定时任务类,该类实现了Runnable接口,缺点异常未检查会中止线程。


  • ScheduledExecutorService:相对延迟或者周期作为定时任务调度,缺点没有绝对的日期或者时间


  • spring定时框架:配置简单功能较多,如果系统使用单机的话可以优先考虑spring定时器


给予Java原生的一些任务调度框架后有很多组织机构推陈出新,出现了很多的优秀的任务调度框架,它们除了可以实现任务调度,还可以实现了分布式体系下的运行任务。例如以下:



分布式级别任务调度


  • Quartz:Java实现的定时任务调度框架,但Quartz关注点在于定时任务而非数据,并没有一套根据数据处理而定制化的流程。虽然Quartz可以基于数据库实现作业的高可用,但缺少分布式并行调度的功能。


  • TBSchedule:阿里早期开源的分布式任务调度系统,代码较为陈旧,使用timer而非线程池执行任务调度。众所周知,timer在处理异常状况时是有缺陷的。而且TBSchedule作业类型较为单一,只能是获取/处理数据一种模式。还有就是文档缺失比较严重,而且目前已经没有对应的人进行维护了,(处理方式主要分为“抢占式”和“协同分配式”,通过集群的节点分担大批量任务的处理,提高批量任务的处理效率)。


  • Elastic-job:当当开发的弹性分布式任务调度系统,功能丰富强大,采用zookeeper实现分布式协调,实现任务高可用以及分片,并且可以支持云开发,但是开发的模式有点侵入性,但也是一个不错的选择方向


  • Saturn:是唯品会自主研发的分布式的定时任务的调度平台,基于当当的elastic-job 版本开发的,并且可以很好的部署到docker容器上,目前本人还没有做过


  • Xxl-job: 是大众点评员工徐雪里于2015年发布的分布式任务调度平台,是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。





回到主旨如何建立Spring的分布式框架


这时候你会说,如果我们已经有了很多以上的分布式能力的任务调度框架了,我们为什么还要再进行改造Spring模式的任务调度框架呢,好的,就这事一个问题,如果当我们的任务调度不需要那么大的资源以及不能够依靠以上的开源框架去处理的时候,我们只能采用单机版的场景下,那该怎么办呢?


比如说,公司不需要提供额外的部署分布式调度、资源不足不想暴漏任务调度服务的单点问题等场景、甚至很多人习惯了使用任务调度了@Schedule,你让他直接改成以上的开发模式,成本是否也会很高?这该咋办?那么种种不能使用以上开源的分布式任务调度框架的时候,才有了我们今天的课题系列。




废话不多说直接上干货


先说以下我们所需要的原材料有哪些,首先Spring框架是必须的,之后还需要就是可以连接Redis的组件如:jedis、lettuce、Redission等。本次我们采用的是Redission框架,如果有用其他的小伙伴可以直接替换理念就好。



分析以下Spring的@Schedule的工作原理


@EnableScheduling这个注解


以下是源码:

image.png


可以看出Schedule调度框架主要启动需要依靠SchedulingConfiguration这个类进行解析扫描所有的组件。我们深入进去看一下。

image.png


别的我们都不看,我们就关注一个类:ScheduledAnnotationBeanPostProcessor,相信对Spring了解的小伙伴们应该知道了,这个是一个注解注入的扫描器类,专门处理bean的生命周期的,专门讲@Scheduled的方法对应的类进行动态代理植入到对应的容器内,并且分配对应的任务调度触发器机制,以下就是ScheduledAnnotationBeanPostProcessor的处理类的源码。

image.png


再次我们重点关注代码,后置处理的类拦截机制,他会扫描出所有相关的@Schedule对应的类和方法:

image.png

然后进行循环调用processScheduled的方法:

image.png


我们再来看一下processScheduled方法里面最直接封装到对应的schedule调用机制的方法片段。


cron模式

image.png


delay模式

image.png

fix模式

image.png

以上类型分为了三种模式:Cron模式、fix模式、delay模式等,相信了解任务调度的小伙伴们并不陌生吧!



最重要的改造点来了


我们如何通过以上分析得出的结论讲我们的@Schedule改造为分布式模式的呢,关键点就在这里,下图标黄的位置createRunnable方法,这也是Spring提供给我们唯一可以改造的地方。

image.png

createRunnable

image.png


进入createRunnable方法之后,我们需要看一下最后创建出的对象是什么?就是这个ScheduledMethodRunnable,进入这里的ScheduledMethodRunnable的类后可以看到如下

image.png

改造位置

image.png


这里就是真正要执行的代码,我们只需要再这里加入分布式锁,进行拦截,保证多个任务调度,同时只会有一个方法进行执行是否就可以保证了呢。但是如果你说需要进行分片处理,也是可以通过数据条件进行分隔,但是那又是另外一回事了。所以是不是很好就可以解决了?那我们来实现一下啊,以下就是小编完成的分布式改造后的成品。



我的分布式任务调度


定义属于我的分布式调度注解


我才用了加入前缀Distribute用来区分和原来的任务调度。


启动分布式任务调度机制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DistributeSchedulingConfiguration.class)
@Documented
public @interface EnableDistributeScheduling {}
复制代码



标识任务调度机制(1)

源码直接拷贝过来改一下改一下名称就好,调整成为对应的注解名称

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributeScheduleds {
    DistributeScheduled[] value();
}
复制代码



标识任务调度机制(2)

源码直接拷贝过来改一下改一下名称就好,调整成为对应的注解名称

image.png


对应的EnableDistributeScheduling的配置类服务


和源码保持一致,只是讲原来的ScheduledAnnotationBeanPostProcessor改为DistributeScheduledAnnotationBeanPostProcessor的对象即可。

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class DistributeSchedulingConfiguration {
    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public DistributeScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new DistributeScheduledAnnotationBeanPostProcessor();
    }
}
复制代码



定义DistributeSchedulingConfiguration中的
DistributeScheduledAnnotationBeanPostProcessor处理器

只需要找原来的样子调整成为return new DistributeScheduledMethodRunnable(target, invocableMethod,redisDistributionLock);即可。

/**
     * Create a {@link Runnable} for the given bean instance,
     * calling the specified scheduled method.
     * <p>The default implementation creates a {@link ScheduledMethodRunnable}.
     * @param target the target bean instance
     * @param method the scheduled method to call
     * @since 5.1
     * @see ScheduledMethodRunnable#ScheduledMethodRunnable(Object, Method)
     */
    protected Runnable createRunnable(Object target, Method method) {
        Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @DistributeScheduled");
        Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
        RedisDistributionLock redisDistributionLock = applicationContext.getBean(RedisDistributionLock.class);
        return new DistributeScheduledMethodRunnable(target, invocableMethod,redisDistributionLock);
    }
复制代码
定义我的DistributeScheduledMethodRunnable的runnable对象


添加分布式锁组件


添加了我自己封装的分布式所有的组件依赖之后即可。

image.png


改造run方法

添加对执行方法之前的加锁操作,我们采用的key为ClassName+methodName,如果可以还可以加入参数的调整。

image.png

亲测有效!不信可以试试看!



相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
126 1
|
3月前
|
存储 缓存 监控
分布式链路监控系统问题之kywalking在后期维护过程中可能会遇到中间件版本升级的问题如何解决
分布式链路监控系统问题之kywalking在后期维护过程中可能会遇到中间件版本升级的问题如何解决
|
17天前
|
存储 NoSQL Java
Java调度任务如何使用分布式锁保证相同任务在一个周期里只执行一次?
【10月更文挑战第29天】Java调度任务如何使用分布式锁保证相同任务在一个周期里只执行一次?
44 1
|
2月前
|
消息中间件 Java 对象存储
数据一致性挑战:Spring Cloud与Netflix OSS下的分布式事务管理
数据一致性挑战:Spring Cloud与Netflix OSS下的分布式事务管理
52 2
|
4月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
14948 29
|
3月前
|
Java 微服务 Spring
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
文章介绍了如何利用Spring Cloud Alibaba快速构建大型电商系统的分布式微服务,包括服务限流降级等主要功能的实现,并通过注解和配置简化了Spring Cloud应用的接入和搭建过程。
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
|
3月前
|
机器学习/深度学习 资源调度 PyTorch
面向大规模分布式训练的资源调度与优化策略
【8月更文第15天】随着深度学习模型的复杂度不断提高,对计算资源的需求也日益增长。为了加速训练过程并降低运行成本,高效的资源调度和优化策略变得至关重要。本文将探讨在大规模分布式训练场景下如何有效地进行资源调度,并通过具体的代码示例来展示这些策略的实际应用。
387 1
|
3月前
|
Dubbo Java 调度
揭秘!Spring Cloud Alibaba的超级力量——如何轻松驾驭分布式定时任务调度?
【8月更文挑战第20天】在现代微服务架构中,Spring Cloud Alibaba通过集成分布式定时任务调度功能解决了一致性和可靠性挑战。它利用TimerX实现任务的分布式编排与调度,并通过`@SchedulerLock`确保任务不被重复执行。示例代码展示了如何配置定时任务及其分布式锁,以实现每5秒仅由一个节点执行任务,适合构建高可用的微服务系统。
68 0
|
4月前
|
消息中间件 缓存 架构师
对抗软件复杂度问题之降低代码的复杂度,如何解决
对抗软件复杂度问题之降低代码的复杂度,如何解决
|
2月前
|
SQL 监控 druid
springboot-druid数据源的配置方式及配置后台监控-自定义和导入stater(推荐-简单方便使用)两种方式配置druid数据源
这篇文章介绍了如何在Spring Boot项目中配置和监控Druid数据源,包括自定义配置和使用Spring Boot Starter两种方法。