如何用 Spring Boot 2.x 使用 Elastic Job 来实现定时任务呢?

简介: 我是小假 期待与你的下一次相遇 ~

Elastic Job

Elastic Job的前生是当当开源的一款分布式任务调度框架,而目前已经加入到了Apache基金会。

该项目下有两个分支:ElasticJob-Lite和ElasticJob-Cloud。ElasticJob-Lite是一个轻量级的任务管理方案,本文接下来的案例就用这个来实现。而 ElasticJob-Cloud则相对重一些,因为它使用容器来管理任务和隔离资源。

更多关于ElasticJob的介绍,也可以点击这里直达官方网站(https://shardingsphere.apache.org/elasticjob/)了解更多信息。

操作实例

第一步:创建一个最基础的Spring Boot项目

第二步:pom.xml中添加elasticjob-lite的starter

  1. <dependencies>
  2.    <dependency>
  3.        <groupId>org.apache.shardingsphere.elasticjob</groupId>
  4.        <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
  5.        <version>3.0.0</version>
  6.    </dependency>
  7.    // ...
  8. </dependencies>

第三步:创建一个简单任务

  1. @Slf4j
  2. @Service
  3. public class MySimpleJob implements SimpleJob {
  4.    @Override
  5.    public void execute(ShardingContext context) {
  6.        log.info("MySimpleJob start : didispace.com {}", System.currentTimeMillis());
  7.    }
  8. }

第四步:编辑配置文件

  1. elasticjob.reg-center.server-lists=localhost:2181
  2. elasticjob.reg-center.namespace=didispace
  3. elasticjob.jobs.my-simple-job.elastic-job-class=com.didispace.chapter72.MySimpleJob
  4. elasticjob.jobs.my-simple-job.cron=0/5 * * * * ?
  5. elasticjob.jobs.my-simple-job.sharding-total-count=1

这里主要有两个部分:

第一部分:elasticjob.reg-center开头的,主要配置elastic job的注册中心和namespace

第二部分:任务配置,以elasticjob.jobs开头,这里的my-simple-job是任务的名称,根据喜好命名即可,但不要重复。任务的下的配置elastic-job-class是任务的实现类,cron是执行规则表达式,sharding-total-count是任务分片的总数。可以通过这个参数来把任务切分,实现并行处理。这里先设置为1,后面讲分片的使用。

运行与测试

完成了上面所有操作时候,可以尝试运行一下上面应用,因为这里需要用到ZooKeeper来协调分布式环境下的任务调度。所以,需要先在本地安装ZooKeeper,然后启动它。注意:上面elasticjob.reg-center.server-lists配置,根据实际使用的ZooKeeper地址和端口做相应修改。

在启动上述Spring Boot应用之后,可以看到如下日志输出:

  1. 2021-07-20 15:33:39.541  INFO 56365 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler 'my-simple-job' initialized from an externally provided properties instance.
  2. 2021-07-20 15:33:39.541  INFO 56365 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler version: 2.3.2
  3. 2021-07-20 15:33:39.551  INFO 56365 --- [           main] org.apache.curator.utils.Compatibility   : Using org.apache.zookeeper.server.quorum.MultipleAddresses
  4. 2021-07-20 15:33:40.067  INFO 56365 --- [           main] c.d.chapter72.Chapter72Application       : Started Chapter72Application in 3.25 seconds (JVM running for 4.965)
  5. 2021-07-20 15:33:40.069  INFO 56365 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : Starting ElasticJob Bootstrap.
  6. 2021-07-20 15:33:40.078  INFO 56365 --- [           main] org.quartz.core.QuartzScheduler          : Scheduler my-simple-job_$_NON_CLUSTERED started.
  7. 2021-07-20 15:33:40.078  INFO 56365 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : ElasticJob Bootstrap started.
  8. 2021-07-20 15:33:45.157  INFO 56365 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766425157
  9. 2021-07-20 15:33:50.010  INFO 56365 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766430010
  10. 2021-07-20 15:33:55.013  INFO 56365 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766435013

既然是分布式任务调度,那么再启动一个(注意,在同一台机器启动的时候,会端口冲突,可以在启动命令中加入-Dserver.port=8081来区分端口),在第二个启动的服务日志也打印了类似的内容

  1. 2021-07-20 15:34:06.430  INFO 56371 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler 'my-simple-job' initialized from an externally provided properties instance.
  2. 2021-07-20 15:34:06.430  INFO 56371 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler version: 2.3.2
  3. 2021-07-20 15:34:06.436  INFO 56371 --- [           main] org.apache.curator.utils.Compatibility   : Using org.apache.zookeeper.server.quorum.MultipleAddresses
  4. 2021-07-20 15:34:06.786  INFO 56371 --- [           main] c.d.chapter72.Chapter72Application       : Started Chapter72Application in 1.446 seconds (JVM running for 1.884)
  5. 2021-07-20 15:34:06.787  INFO 56371 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : Starting ElasticJob Bootstrap.
  6. 2021-07-20 15:34:06.792  INFO 56371 --- [           main] org.quartz.core.QuartzScheduler          : Scheduler my-simple-job_$_NON_CLUSTERED started.
  7. 2021-07-20 15:34:06.792  INFO 56371 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : ElasticJob Bootstrap started.
  8. 2021-07-20 15:34:10.182  INFO 56371 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766450182
  9. 2021-07-20 15:34:15.010  INFO 56371 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766455010
  10. 2021-07-20 15:34:20.013  INFO 56371 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766460013

此时,在回头看看之前第一个启动的应用,日志输出停止了。由于设置了分片总数为1,所以这个任务启动之后,只会有一个实例接管执行。这样就避免了多个进行同时重复的执行相同逻辑而产生问题的情况。同时,这样也支持了任务执行的高可用。比如:可以尝试把第二个启动的应用(正在打印日志的)终止掉。可以发现,第一个启动的应用(之前已经停止输出日志)继续开始打印任务日志了。

在整个实现过程中,并没有自己手工的去编写任何的分布式锁等代码去实现任务调度逻辑,只需要关注任务逻辑本身,然后通过配置分片的方式来控制任务的分割,就可以轻松的实现分布式集群环境下的定时任务管理了。是不是在复杂场景下,这种方式实现起来要比@Scheduled更方便呢?

使用Elastic Job的namespace配置,防止任务名称的冲突

报错

  1. Caused by: org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException: Job conflict with register center. The job 'my-simple-job' in register center's class is 'com.didispace.chapter72.MySimpleJob', your job class is 'com.didispace.chapter74.MySimpleJob'
  2. at org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService.checkConflictJob(ConfigurationService.java:86) ~[elasticjob-lite-core-3.0.0.jar:3.0.0]
  3. at org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService.setUpJobConfiguration(ConfigurationService.java:70) ~[elasticjob-lite-core-3.0.0.jar:3.0.0]
  4. at org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade.setUpJobConfiguration(SetUpFacade.java:66) ~[elasticjob-lite-core-3.0.0.jar:3.0.0]
  5. at org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduler.<init>(JobScheduler.java:84) ~[elasticjob-lite-core-3.0.0.jar:3.0.0]
  6. at org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap.<init>(ScheduleJobBootstrap.java:36) ~[elasticjob-lite-core-3.0.0.jar:3.0.0]
  7. at org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobBootstrapConfiguration.registerClassedJob(ElasticJobBootstrapConfiguration.java:101) ~[elasticjob-lite-spring-boot-starter-3.0.0.jar:3.0.0]
  8. at org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobBootstrapConfiguration.constructJobBootstraps(ElasticJobBootstrapConfiguration.java:84) ~[elasticjob-lite-spring-boot-starter-3.0.0.jar:3.0.0]
  9. at org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobBootstrapConfiguration.createJobBootstrapBeans(ElasticJobBootstrapConfiguration.java:57) ~[elasticjob-lite-spring-boot-starter-3.0.0.jar:3.0.0]
  10. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
  11. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151]
  12. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
  13. at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
  14. at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389) ~[spring-beans-5.3.8.jar:5.3.8]
  15. at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333) ~[spring-beans-5.3.8.jar:5.3.8]
  16. at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157) ~[spring-beans-5.3.8.jar:5.3.8]
  17. ... 17 common frames omitted

根据错误消息Job conflict with register center. The job ‘my-simple-job’ in register center’s,初步判断是ZooKeeper中存储的任务配置出现冲突:任务名一样,但实现类不同。

如果在一个大一些的团队做开发的时候,只要存在多系统的话,那么定时任务的重名其实是很有可能发生。比如:很多应用都可能存在一些定时清理某些资源的任务,就很可能起一样的名字,然后注册到同一个ZooKeeper,最后出现冲突。那么有什么好办法来解决这个问题吗?

方法一:任务创建的统一管理

最原始的处理方法,就是集中的管理任务创建流程,比如:可以开一个Wiki页面,所有任务在这个页面上登记,每个人登记的时候,可以查一下想起的名字是否已经存在。如果存在了就再想一个名字,并做好登记。

这种方法很简单,也很好理解。但存在的问题是,当任务非常非常多的时候,这个页面内容就很大,维护起来也是非常麻烦的。

方法二:巧用Elastic Job的namespace属性来隔离任务名称

回忆一下之前第一篇写定时任务的时候,关于注册中心的配置是不是有下面两项:

  1. elasticjob.reg-center.server-lists=localhost:2181
  2. elasticjob.reg-center.namespace=didispace

第一个elasticjob.reg-center.server-lists不多说,就是ZooKeeper的访问地址。这里要重点讲的就是第二个参数elasticjob.reg-center.namespace

其实在ZooKeeper中注册任务的时候,真正冲突的并不纯粹是因为任务名称,而是namespace + 任务名称,全部一样,才会出现问题。所以,是不是引入把每个应用创建的任务都设定一个独立的namespace,那么是不是就隔离了呢?

继续思考一下,每个应用是不是肯定有一些属性是肯定不一样的呢?最后,给出了下面这样的建议:

  1. spring.application.name=chapter74
  2. elasticjob.reg-center.server-lists=localhost:2181
  3. elasticjob.reg-center.namespace=${spring.application.name}

即:将定时任务服务的elasticjob.reg-center.namespace设置为当前Spring Boot应用的名称一致spring.application.name

通常,在规划各个Spring Boot应用的时候,都会做好唯一性的规划,这样未来注册到Eureka、Nacos等注册中心的时候,也可以保证唯一。利用好这个唯一参数,也可以方便把各个应用的定时任务也都隔离出来。

使用Elastic Job的分片配置

然而,还有一类问题是在做定时任务时候容易出现的,就是任务执行速度时间过长;同时,为了实现定时任务的高可用,还启动了很多任务实例,但每个任务执行时候就一个实例在跑,资源利用率不高。

所以,接下来就来继续介绍,使用Elastic Job的分片配置,来为任务执行加加速,资源利用抬抬高的目标!

操作实践

第一步:创建一个分片执行的任务

  1. @Slf4j
  2. @Service
  3. public class MyShardingJob implements SimpleJob {
  4.    @Override
  5.    public void execute(ShardingContext context) {
  6.        switch (context.getShardingItem()) {
  7.            case 0:
  8.                log.info("分片1:执行任务");
  9.                break;
  10.            case 1:
  11.                log.info("分片2:执行任务");
  12.                break;
  13.            case 2:
  14.                log.info("分片3:执行任务");
  15.                break;
  16.        }
  17.    }
  18. }

这里通过switch来判断当前任务上下文的sharding-item值来执行不同的分片任务。sharding-item的值取决于后面将要配置的分片总数,但注意是从0开始计数的。这里仅采用了日志打印的方式,来展示分片效果,真正实现业务逻辑的时候,一定记得根据分片数量对执行任务也要做分片操作的设计。比如:可以根据批量任务的id求模的方式来区分不同分片处理不同的数据,以避免又重复执行而出现问题。

第二步:在配置文件中,设置配置任务的实现类、执行表达式、以及将要重要测试的分片总数参数

  1. elasticjob.jobs.my-sharding-job.elastic-job-class=com.didispace.chapter73.MyShardingJob
  2. elasticjob.jobs.my-sharding-job.cron=0/5 * * * * ?
  3. elasticjob.jobs.my-sharding-job.sharding-total-count=3

这里设置为3,所以任务会被分为3个分片,每个分片对应第一步中一个switch的分支。

运行与测试

单实例运行

在完成了上面的代码之后,尝试启动上面实现的第一个实例。

此时,可以看到,每间隔5秒,这个实例会打印这样的日志:

  1. 2021-07-21 17:42:00.122  INFO 63478 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : Starting ElasticJob Bootstrap.
  2. 2021-07-21 17:42:00.126  INFO 63478 --- [           main] org.quartz.core.QuartzScheduler          : Scheduler my-sharding-job_$_NON_CLUSTERED started.
  3. 2021-07-21 17:42:00.126  INFO 63478 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : ElasticJob Bootstrap started.
  4. 2021-07-21 17:42:05.254  INFO 63478 --- [-sharding-job-1] com.didispace.chapter73.MyShardingJob    : 分片1:执行任务
  5. 2021-07-21 17:42:05.254  INFO 63478 --- [-sharding-job-3] com.didispace.chapter73.MyShardingJob    : 分片3:执行任务
  6. 2021-07-21 17:42:05.254  INFO 63478 --- [-sharding-job-2] com.didispace.chapter73.MyShardingJob    : 分片2:执行任务
  7. 2021-07-21 17:42:10.011  INFO 63478 --- [-sharding-job-4] com.didispace.chapter73.MyShardingJob    : 分片1:执行任务
  8. 2021-07-21 17:42:10.011  INFO 63478 --- [-sharding-job-5] com.didispace.chapter73.MyShardingJob    : 分片2:执行任务
  9. 2021-07-21 17:42:10.011  INFO 63478 --- [-sharding-job-6] com.didispace.chapter73.MyShardingJob    : 分片3:执行任务

每次任务都被拆分成了3个分片任务,就如上文中所说的,每个分片对应一个switch的分支。由于当前情况下,只启动了一个实例,所以3个分片任务都被分配到了这个唯一的实例上。

双实例运行

接下来,再启动一个实例(注意使用-Dserver.port来改变不同的端口,不然本地会启动不成功)。此时,两个实例的日志出现了变化:

实例1的日志:

  1. 2021-07-21 17:44:50.190  INFO 63478 --- [ng-job_Worker-1] com.didispace.chapter73.MyShardingJob    : 分片2:执行任务
  2. 2021-07-21 17:44:55.007  INFO 63478 --- [ng-job_Worker-1] com.didispace.chapter73.MyShardingJob    : 分片2:执行任务
  3. 2021-07-21 17:45:00.010  INFO 63478 --- [ng-job_Worker-1] com.didispace.chapter73.MyShardingJob    : 分片2:执行任务

实例2的日志:

  1. 2021-07-21 17:44:50.272  INFO 63484 --- [-sharding-job-1] com.didispace.chapter73.MyShardingJob    : 分片1:执行任务
  2. 2021-07-21 17:44:50.273  INFO 63484 --- [-sharding-job-2] com.didispace.chapter73.MyShardingJob    : 分片3:执行任务
  3. 2021-07-21 17:44:55.009  INFO 63484 --- [-sharding-job-3] com.didispace.chapter73.MyShardingJob    : 分片1:执行任务
  4. 2021-07-21 17:44:55.009  INFO 63484 --- [-sharding-job-4] com.didispace.chapter73.MyShardingJob    : 分片3:执行任务

随着实例数量的增加,可以看到分片的分配发生了变化。这也就意味着,当一个任务开始执行的时候,两个任务执行实例都被利用了起来,这样任务执行和资源利用的效率就可以得到优化。


相关文章
|
Java 数据库 Spring
Spring Boot 实现定时任务的动态增删启停
Spring Boot 实现定时任务的动态增删启停
236 0
|
6月前
|
监控 Java BI
《深入理解Spring》定时任务——自动化调度的时间管理者
Spring定时任务通过@Scheduled注解和Cron表达式实现灵活调度,支持固定频率、延迟执行及动态配置,结合线程池与异常处理可提升可靠性,适用于报表生成、健康检查等场景,助力企业级应用自动化。
|
10月前
|
Java Spring
使用 Spring Boot 多个定时任务阻塞问题的解决方案
我是小假 期待与你的下一次相遇 ~
552 5
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
531 1
|
Java 调度 Spring
Spring之定时任务基本使用篇
本文介绍了在Spring Boot项目中使用定时任务的基本方法。主要通过`@Scheduled`注解实现,需添加`@EnableScheduling`开启定时任务功能。文中详细解析了Cron表达式的语法及常见实例,如每秒、每天特定时间执行等。此外,还探讨了多个定时任务的执行方式(并行或串行)及其潜在问题,并留待后续深入讨论。
800 64
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
16428 114
|
NoSQL Java Redis
Spring Boot 监听 Redis Key 失效事件实现定时任务
Spring Boot 监听 Redis Key 失效事件实现定时任务
437 0
|
Java BI 调度
Java Spring的定时任务的配置和使用
遵循上述步骤,你就可以在Spring应用中轻松地配置和使用定时任务,满足各种定时处理需求。
862 2
|
存储 Java API
简单两步,Spring Boot 写死的定时任务也能动态设置:技术干货分享
【10月更文挑战第4天】在Spring Boot开发中,定时任务通常通过@Scheduled注解来实现,这种方式简单直接,但存在一个显著的限制:任务的执行时间或频率在编译时就已经确定,无法在运行时动态调整。然而,在实际工作中,我们往往需要根据业务需求或外部条件的变化来动态调整定时任务的执行计划。本文将分享一个简单两步的解决方案,让你的Spring Boot应用中的定时任务也能动态设置,从而满足更灵活的业务需求。
1286 4
|
SQL Java 调度
实时计算 Flink版产品使用问题之使用Spring Boot启动Flink处理任务时,使用Spring Boot的@Scheduled注解进行定时任务调度,出现内存占用过高,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。