如何用 Spring Boot 2.x 使用 Elastic Job 来实现定时任务呢?

简介: 我是小假 期待与你的下一次相遇 ~

Elastic Job

Elastic Job的前生是当当开源的一款分布式任务调度框架,而目前已经加入到了Apache基金会。

该项目下有两个分支:ElasticJob-Lite和ElasticJob-Cloud。ElasticJob-Lite是一个轻量级的任务管理方案,本文接下来的案例就用这个来实现。而 ElasticJob-Cloud则相对重一些,因为它使用容器来管理任务和隔离资源。

更多关于ElasticJob的介绍,也可以点击这里直达官方网站(https://shardingsphere.apache.org/elasticjob/)了解更多信息。

操作实例

第一步:创建一个最基础的Spring Boot项目

第二步:pom.xml中添加elasticjob-lite的starter

  1. <dependencies>
  2.    <dependency>
  3.        <groupId>org.apache.shardingsphere.elasticjob</groupId>
  4.        <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
  5.        <version>3.0.0</version>
  6.    </dependency>
  7.    // ...
  8. </dependencies>

第三步:创建一个简单任务

  1. @Slf4j
  2. @Service
  3. public class MySimpleJob implements SimpleJob {
  4.    @Override
  5.    public void execute(ShardingContext context) {
  6.        log.info("MySimpleJob start : didispace.com {}", System.currentTimeMillis());
  7.    }
  8. }

第四步:编辑配置文件

  1. elasticjob.reg-center.server-lists=localhost:2181
  2. elasticjob.reg-center.namespace=didispace
  3. elasticjob.jobs.my-simple-job.elastic-job-class=com.didispace.chapter72.MySimpleJob
  4. elasticjob.jobs.my-simple-job.cron=0/5 * * * * ?
  5. elasticjob.jobs.my-simple-job.sharding-total-count=1

这里主要有两个部分:

第一部分:elasticjob.reg-center开头的,主要配置elastic job的注册中心和namespace

第二部分:任务配置,以elasticjob.jobs开头,这里的my-simple-job是任务的名称,根据喜好命名即可,但不要重复。任务的下的配置elastic-job-class是任务的实现类,cron是执行规则表达式,sharding-total-count是任务分片的总数。可以通过这个参数来把任务切分,实现并行处理。这里先设置为1,后面讲分片的使用。

运行与测试

完成了上面所有操作时候,可以尝试运行一下上面应用,因为这里需要用到ZooKeeper来协调分布式环境下的任务调度。所以,需要先在本地安装ZooKeeper,然后启动它。注意:上面elasticjob.reg-center.server-lists配置,根据实际使用的ZooKeeper地址和端口做相应修改。

在启动上述Spring Boot应用之后,可以看到如下日志输出:

  1. 2021-07-20 15:33:39.541  INFO 56365 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler 'my-simple-job' initialized from an externally provided properties instance.
  2. 2021-07-20 15:33:39.541  INFO 56365 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler version: 2.3.2
  3. 2021-07-20 15:33:39.551  INFO 56365 --- [           main] org.apache.curator.utils.Compatibility   : Using org.apache.zookeeper.server.quorum.MultipleAddresses
  4. 2021-07-20 15:33:40.067  INFO 56365 --- [           main] c.d.chapter72.Chapter72Application       : Started Chapter72Application in 3.25 seconds (JVM running for 4.965)
  5. 2021-07-20 15:33:40.069  INFO 56365 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : Starting ElasticJob Bootstrap.
  6. 2021-07-20 15:33:40.078  INFO 56365 --- [           main] org.quartz.core.QuartzScheduler          : Scheduler my-simple-job_$_NON_CLUSTERED started.
  7. 2021-07-20 15:33:40.078  INFO 56365 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : ElasticJob Bootstrap started.
  8. 2021-07-20 15:33:45.157  INFO 56365 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766425157
  9. 2021-07-20 15:33:50.010  INFO 56365 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766430010
  10. 2021-07-20 15:33:55.013  INFO 56365 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766435013

既然是分布式任务调度,那么再启动一个(注意,在同一台机器启动的时候,会端口冲突,可以在启动命令中加入-Dserver.port=8081来区分端口),在第二个启动的服务日志也打印了类似的内容

  1. 2021-07-20 15:34:06.430  INFO 56371 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler 'my-simple-job' initialized from an externally provided properties instance.
  2. 2021-07-20 15:34:06.430  INFO 56371 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler version: 2.3.2
  3. 2021-07-20 15:34:06.436  INFO 56371 --- [           main] org.apache.curator.utils.Compatibility   : Using org.apache.zookeeper.server.quorum.MultipleAddresses
  4. 2021-07-20 15:34:06.786  INFO 56371 --- [           main] c.d.chapter72.Chapter72Application       : Started Chapter72Application in 1.446 seconds (JVM running for 1.884)
  5. 2021-07-20 15:34:06.787  INFO 56371 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : Starting ElasticJob Bootstrap.
  6. 2021-07-20 15:34:06.792  INFO 56371 --- [           main] org.quartz.core.QuartzScheduler          : Scheduler my-simple-job_$_NON_CLUSTERED started.
  7. 2021-07-20 15:34:06.792  INFO 56371 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : ElasticJob Bootstrap started.
  8. 2021-07-20 15:34:10.182  INFO 56371 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766450182
  9. 2021-07-20 15:34:15.010  INFO 56371 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766455010
  10. 2021-07-20 15:34:20.013  INFO 56371 --- [le-job_Worker-1] com.didispace.chapter72.MySimpleJob      : MySimpleJob start : didispace.com 1626766460013

此时,在回头看看之前第一个启动的应用,日志输出停止了。由于设置了分片总数为1,所以这个任务启动之后,只会有一个实例接管执行。这样就避免了多个进行同时重复的执行相同逻辑而产生问题的情况。同时,这样也支持了任务执行的高可用。比如:可以尝试把第二个启动的应用(正在打印日志的)终止掉。可以发现,第一个启动的应用(之前已经停止输出日志)继续开始打印任务日志了。

在整个实现过程中,并没有自己手工的去编写任何的分布式锁等代码去实现任务调度逻辑,只需要关注任务逻辑本身,然后通过配置分片的方式来控制任务的分割,就可以轻松的实现分布式集群环境下的定时任务管理了。是不是在复杂场景下,这种方式实现起来要比@Scheduled更方便呢?

使用Elastic Job的namespace配置,防止任务名称的冲突

报错

  1. Caused by: org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException: Job conflict with register center. The job 'my-simple-job' in register center's class is 'com.didispace.chapter72.MySimpleJob', your job class is 'com.didispace.chapter74.MySimpleJob'
  2. at org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService.checkConflictJob(ConfigurationService.java:86) ~[elasticjob-lite-core-3.0.0.jar:3.0.0]
  3. at org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService.setUpJobConfiguration(ConfigurationService.java:70) ~[elasticjob-lite-core-3.0.0.jar:3.0.0]
  4. at org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade.setUpJobConfiguration(SetUpFacade.java:66) ~[elasticjob-lite-core-3.0.0.jar:3.0.0]
  5. at org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduler.<init>(JobScheduler.java:84) ~[elasticjob-lite-core-3.0.0.jar:3.0.0]
  6. at org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap.<init>(ScheduleJobBootstrap.java:36) ~[elasticjob-lite-core-3.0.0.jar:3.0.0]
  7. at org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobBootstrapConfiguration.registerClassedJob(ElasticJobBootstrapConfiguration.java:101) ~[elasticjob-lite-spring-boot-starter-3.0.0.jar:3.0.0]
  8. at org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobBootstrapConfiguration.constructJobBootstraps(ElasticJobBootstrapConfiguration.java:84) ~[elasticjob-lite-spring-boot-starter-3.0.0.jar:3.0.0]
  9. at org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobBootstrapConfiguration.createJobBootstrapBeans(ElasticJobBootstrapConfiguration.java:57) ~[elasticjob-lite-spring-boot-starter-3.0.0.jar:3.0.0]
  10. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
  11. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151]
  12. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
  13. at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
  14. at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389) ~[spring-beans-5.3.8.jar:5.3.8]
  15. at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333) ~[spring-beans-5.3.8.jar:5.3.8]
  16. at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157) ~[spring-beans-5.3.8.jar:5.3.8]
  17. ... 17 common frames omitted

根据错误消息Job conflict with register center. The job ‘my-simple-job’ in register center’s,初步判断是ZooKeeper中存储的任务配置出现冲突:任务名一样,但实现类不同。

如果在一个大一些的团队做开发的时候,只要存在多系统的话,那么定时任务的重名其实是很有可能发生。比如:很多应用都可能存在一些定时清理某些资源的任务,就很可能起一样的名字,然后注册到同一个ZooKeeper,最后出现冲突。那么有什么好办法来解决这个问题吗?

方法一:任务创建的统一管理

最原始的处理方法,就是集中的管理任务创建流程,比如:可以开一个Wiki页面,所有任务在这个页面上登记,每个人登记的时候,可以查一下想起的名字是否已经存在。如果存在了就再想一个名字,并做好登记。

这种方法很简单,也很好理解。但存在的问题是,当任务非常非常多的时候,这个页面内容就很大,维护起来也是非常麻烦的。

方法二:巧用Elastic Job的namespace属性来隔离任务名称

回忆一下之前第一篇写定时任务的时候,关于注册中心的配置是不是有下面两项:

  1. elasticjob.reg-center.server-lists=localhost:2181
  2. elasticjob.reg-center.namespace=didispace

第一个elasticjob.reg-center.server-lists不多说,就是ZooKeeper的访问地址。这里要重点讲的就是第二个参数elasticjob.reg-center.namespace

其实在ZooKeeper中注册任务的时候,真正冲突的并不纯粹是因为任务名称,而是namespace + 任务名称,全部一样,才会出现问题。所以,是不是引入把每个应用创建的任务都设定一个独立的namespace,那么是不是就隔离了呢?

继续思考一下,每个应用是不是肯定有一些属性是肯定不一样的呢?最后,给出了下面这样的建议:

  1. spring.application.name=chapter74
  2. elasticjob.reg-center.server-lists=localhost:2181
  3. elasticjob.reg-center.namespace=${spring.application.name}

即:将定时任务服务的elasticjob.reg-center.namespace设置为当前Spring Boot应用的名称一致spring.application.name

通常,在规划各个Spring Boot应用的时候,都会做好唯一性的规划,这样未来注册到Eureka、Nacos等注册中心的时候,也可以保证唯一。利用好这个唯一参数,也可以方便把各个应用的定时任务也都隔离出来。

使用Elastic Job的分片配置

然而,还有一类问题是在做定时任务时候容易出现的,就是任务执行速度时间过长;同时,为了实现定时任务的高可用,还启动了很多任务实例,但每个任务执行时候就一个实例在跑,资源利用率不高。

所以,接下来就来继续介绍,使用Elastic Job的分片配置,来为任务执行加加速,资源利用抬抬高的目标!

操作实践

第一步:创建一个分片执行的任务

  1. @Slf4j
  2. @Service
  3. public class MyShardingJob implements SimpleJob {
  4.    @Override
  5.    public void execute(ShardingContext context) {
  6.        switch (context.getShardingItem()) {
  7.            case 0:
  8.                log.info("分片1:执行任务");
  9.                break;
  10.            case 1:
  11.                log.info("分片2:执行任务");
  12.                break;
  13.            case 2:
  14.                log.info("分片3:执行任务");
  15.                break;
  16.        }
  17.    }
  18. }

这里通过switch来判断当前任务上下文的sharding-item值来执行不同的分片任务。sharding-item的值取决于后面将要配置的分片总数,但注意是从0开始计数的。这里仅采用了日志打印的方式,来展示分片效果,真正实现业务逻辑的时候,一定记得根据分片数量对执行任务也要做分片操作的设计。比如:可以根据批量任务的id求模的方式来区分不同分片处理不同的数据,以避免又重复执行而出现问题。

第二步:在配置文件中,设置配置任务的实现类、执行表达式、以及将要重要测试的分片总数参数

  1. elasticjob.jobs.my-sharding-job.elastic-job-class=com.didispace.chapter73.MyShardingJob
  2. elasticjob.jobs.my-sharding-job.cron=0/5 * * * * ?
  3. elasticjob.jobs.my-sharding-job.sharding-total-count=3

这里设置为3,所以任务会被分为3个分片,每个分片对应第一步中一个switch的分支。

运行与测试

单实例运行

在完成了上面的代码之后,尝试启动上面实现的第一个实例。

此时,可以看到,每间隔5秒,这个实例会打印这样的日志:

  1. 2021-07-21 17:42:00.122  INFO 63478 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : Starting ElasticJob Bootstrap.
  2. 2021-07-21 17:42:00.126  INFO 63478 --- [           main] org.quartz.core.QuartzScheduler          : Scheduler my-sharding-job_$_NON_CLUSTERED started.
  3. 2021-07-21 17:42:00.126  INFO 63478 --- [           main] .s.b.j.ScheduleJobBootstrapStartupRunner : ElasticJob Bootstrap started.
  4. 2021-07-21 17:42:05.254  INFO 63478 --- [-sharding-job-1] com.didispace.chapter73.MyShardingJob    : 分片1:执行任务
  5. 2021-07-21 17:42:05.254  INFO 63478 --- [-sharding-job-3] com.didispace.chapter73.MyShardingJob    : 分片3:执行任务
  6. 2021-07-21 17:42:05.254  INFO 63478 --- [-sharding-job-2] com.didispace.chapter73.MyShardingJob    : 分片2:执行任务
  7. 2021-07-21 17:42:10.011  INFO 63478 --- [-sharding-job-4] com.didispace.chapter73.MyShardingJob    : 分片1:执行任务
  8. 2021-07-21 17:42:10.011  INFO 63478 --- [-sharding-job-5] com.didispace.chapter73.MyShardingJob    : 分片2:执行任务
  9. 2021-07-21 17:42:10.011  INFO 63478 --- [-sharding-job-6] com.didispace.chapter73.MyShardingJob    : 分片3:执行任务

每次任务都被拆分成了3个分片任务,就如上文中所说的,每个分片对应一个switch的分支。由于当前情况下,只启动了一个实例,所以3个分片任务都被分配到了这个唯一的实例上。

双实例运行

接下来,再启动一个实例(注意使用-Dserver.port来改变不同的端口,不然本地会启动不成功)。此时,两个实例的日志出现了变化:

实例1的日志:

  1. 2021-07-21 17:44:50.190  INFO 63478 --- [ng-job_Worker-1] com.didispace.chapter73.MyShardingJob    : 分片2:执行任务
  2. 2021-07-21 17:44:55.007  INFO 63478 --- [ng-job_Worker-1] com.didispace.chapter73.MyShardingJob    : 分片2:执行任务
  3. 2021-07-21 17:45:00.010  INFO 63478 --- [ng-job_Worker-1] com.didispace.chapter73.MyShardingJob    : 分片2:执行任务

实例2的日志:

  1. 2021-07-21 17:44:50.272  INFO 63484 --- [-sharding-job-1] com.didispace.chapter73.MyShardingJob    : 分片1:执行任务
  2. 2021-07-21 17:44:50.273  INFO 63484 --- [-sharding-job-2] com.didispace.chapter73.MyShardingJob    : 分片3:执行任务
  3. 2021-07-21 17:44:55.009  INFO 63484 --- [-sharding-job-3] com.didispace.chapter73.MyShardingJob    : 分片1:执行任务
  4. 2021-07-21 17:44:55.009  INFO 63484 --- [-sharding-job-4] com.didispace.chapter73.MyShardingJob    : 分片3:执行任务

随着实例数量的增加,可以看到分片的分配发生了变化。这也就意味着,当一个任务开始执行的时候,两个任务执行实例都被利用了起来,这样任务执行和资源利用的效率就可以得到优化。


相关文章
|
存储 缓存 文件存储
如何保证分布式文件系统的数据一致性
分布式文件系统需要向上层应用提供透明的客户端缓存,从而缓解网络延时现象,更好地支持客户端性能水平扩展,同时也降低对文件服务器的访问压力。当考虑客户端缓存的时候,由于在客户端上引入了多个本地数据副本(Replica),就相应地需要提供客户端对数据访问的全局数据一致性。
31853 78
如何保证分布式文件系统的数据一致性
|
前端开发 容器
HTML5+CSS3前端入门教程---从0开始通过一个商城实例手把手教你学习PC端和移动端页面开发第8章FlexBox布局(上)
HTML5+CSS3前端入门教程---从0开始通过一个商城实例手把手教你学习PC端和移动端页面开发第8章FlexBox布局
17655 18
|
人工智能 负载均衡 网络性能优化
灵骏可预期网络:Built for AI Infrastructure
通用人工智能离我们越来越近,全世界的关注和投入正在带来日新“周”异的变化。回顾人工智能的诞生和发展历程,人类计算能力的进步几乎牵动了每一次的重大技术突破,当前的大模型热潮更是如此,只是动辄千万亿参数级的模型体量,所需计算资源远超单颗芯片的上限,超大规模的计算集群成为支撑技术发展和应用创新的关键基础设施。面向智能:云基础设施网络技术面临新挑战如何突破单个芯片、单个服务器节点的算力上限,在超大规模情况
31193 10
灵骏可预期网络:Built for AI Infrastructure
|
设计模式 存储 监控
设计模式(C++版)
看懂UML类图和时序图30分钟学会UML类图设计原则单一职责原则定义:单一职责原则,所谓职责是指类变化的原因。如果一个类有多于一个的动机被改变,那么这个类就具有多于一个的职责。而单一职责原则就是指一个类或者模块应该有且只有一个改变的原因。bad case:IPhone类承担了协议管理(Dial、HangUp)、数据传送(Chat)。good case:里式替换原则定义:里氏代换原则(Liskov 
36193 19
设计模式(C++版)
|
存储 编译器 C语言
抽丝剥茧C语言(初阶 下)(下)
抽丝剥茧C语言(初阶 下)
|
机器学习/深度学习 人工智能 自然语言处理
带你简单了解Chatgpt背后的秘密:大语言模型所需要条件(数据算法算力)以及其当前阶段的缺点局限性
带你简单了解Chatgpt背后的秘密:大语言模型所需要条件(数据算法算力)以及其当前阶段的缺点局限性
24468 14
|
机器学习/深度学习 弹性计算 监控
重生之---我测阿里云U1实例(通用算力型)
阿里云产品全线降价的一力作,2023年4月阿里云推出新款通用算力型ECS云服务器Universal实例,该款服务器的真实表现如何?让我先测为敬!
36515 15
重生之---我测阿里云U1实例(通用算力型)
为笔记本更换固态硬盘的方法
本文介绍为笔记本电脑拆机、更换固态硬盘的具体方法~
18011 41
为笔记本更换固态硬盘的方法
|
SQL 存储 弹性计算
Redis性能高30%,阿里云倚天ECS性能摸底和迁移实践
Redis在倚天ECS环境下与同规格的基于 x86 的 ECS 实例相比,Redis 部署在基于 Yitian 710 的 ECS 上可获得高达 30% 的吞吐量优势。成本方面基于倚天710的G8y实例售价比G7实例低23%,总性价比提高50%;按照相同算法,相对G8a,性价比为1.4倍左右。
|
存储 算法 Java
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的限流器RateLimiter功能服务
随着互联网的快速发展,越来越多的应用程序需要处理大量的请求。如果没有限制,这些请求可能会导致应用程序崩溃或变得不可用。因此,限流器是一种非常重要的技术,可以帮助应用程序控制请求的数量和速率,以保持稳定和可靠的运行。
29747 52