elastic-job 定时任务集成

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: elastic-job 定时任务集成

“前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。”


一、基本使用


elastic-job的任务类型分为三种:


a. Simple类型作业,意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。


b. Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。


c.Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。


本文档提供的jar包是对elastic-job进行了二次封装,封装成了springboot的starter启动器,使用更加简单。后续开发引入我们自定义的starter完成定时任务功能即可。


先以最简单的SimpleJob类型说明如何使用集成:


第一步添加依赖:


<!--elastic-job的starter集成-->
<dependency>
  <groupId>com.wuzheng</groupId>
  <artifactId>spring-boot-elastic-job-starter</artifactId>
  <version>1.0.4</version>
</dependency>


点击此处查看starter源码,可以自己下载到本地或私服,引入依赖


第二步:增加Zookeeper注册中心的配置


elastic.job.zk.serverLists=192.168.0.121:2181,192.168.0.122:2181,192.168.0.129:2181
elastic.job.zk.namespace=yourself namespace


Zookeeper配置的前缀是elastic.job.zk,详细的属性配置请查看ZookeeperProperties


第三步:开启Elastic-Job自动配置


开启自动配置只需要在Spring Boot的启动类上增加@EnableElasticJob注解


import java.util.concurrent.CountDownLatch;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import com.cxytiandi.elasticjob.annotation.EnableElasticJob;
/**
 * ElasticJob Spring Boot集成案例
 * 
 * @author hfl
 *
 */
@SpringBootApplication
@EnableElasticJob
public class JobApplication {
  public static void main(String[] args) {
  SpringApplication.run(JobApplication.class, args);
  }
}


第四步 配置任务


@ElasticJobConf(name = "MySimpleJob", cron = "0/10 * * * * ?", 
  shardingItemParameters = "0=0,1=1", description = "简单任务")
public class MySimpleJob implements SimpleJob {
  public void execute(ShardingContext context) {
  String shardParamter = context.getShardingParameter();
  System.out.println("分片参数:"+shardParamter);
  int value = Integer.parseInt(shardParamter);
  for (int i = 0; i < 3; i++) {
    if (i % 2 == value) {
    String time = new SimpleDateFormat("HH:mm:ss").format(new Date());
    System.out.println(time + ":开始执行简单任务" + i);
    }
  }
  }
}


任务的配置只需要在任务类上增加一个ElasticJobConf注解,注解中有很多属性,这些属性都是任务的配置,详细的属性配置如下链接:

ElasticJobConf.java


测试下,启动自己的任务所在微服务,看下效果

1dc618a0ed9580ce8bfa6facb208c08f.png


自此,最简单的小demo其实已经完成了,但是实际中还有一些高级功能需要了解,在此也一并说明下:


二、扩展功能


扩展包括:


1.想在配置文件设置任务参数怎么写?

2.每次任务执行日志如何记录?

3.想执行业务脚本任务如何操作?

4.任务执行前后如何添加监听事件?

5.任务业务执行失败如何自定义异常处理?

6.动态作业怎么实现?

下面具体说明:


1.想在配置文件设置任务参数怎么写?


这个问题即application.properties中如何配置任务信息?


使用注解是比较方便,但是在属性文件中配置任务的参数信息更灵活,也便于以后使用appolo或者necos等工具进行集中配置文件管理。


我们可以同时指定注解方法,也可以同时在属性文件中设置,但是当属性文件中配置了任务的信息,优先级就比注解中的高。具体做法如下:


首先还是在任务类上加@ElasticJobConf(name = “MySimpleJob”)注解,只需要增加一个name即可,任务名是唯一的。


剩下的配置都可以在属性文件中进行配置,格式为elastic.job.任务名.配置属性=属性值


elastic.job.MySimpleJob.cron=0/10 * * * * ?
elastic.job.MySimpleJob.overwrite=true
elastic.job.MySimpleJob.shardingTotalCount=1
elastic.job.MySimpleJob.shardingItemParameters=0=0,1=1
elastic.job.MySimpleJob.jobParameter=test
elastic.job.MySimpleJob.failover=true
elastic.job.MySimpleJob.misfire=true
elastic.job.MySimpleJob.description=simple job
elastic.job.MySimpleJob.monitorExecution=false
elastic.job.MySimpleJob.listener=com.cxytiandi.job.core.MessageElasticJobListener
elastic.job.MySimpleJob.jobExceptionHandler=com.cxytiandi.job.core.CustomJobExceptionHandler
elastic.job.MySimpleJob.disabled=true



1dc618a0ed9580ce8bfa6facb208c08f.png

5d4c6812c8535adbb050f4ddf2e1bce8.png


2.每次任务执行日志如何记录?


elastic-job已经为我们提供了事件追踪数据源功能,可以将每一次的任务执行情况,任务执行轨迹,任务执行历史持久化到数据库或者其他日志收集框架中,下面演示如何保存到数据库中:


事件追踪功能在注解中也只需要配置eventTraceRdbDataSource=你的数据源 就可以使用了,数据源用什么连接池无限制,唯一需要注意的一点是你的数据源必须在spring-boot-elastic-job-starter之前创建,因为spring-boot-elastic-job-starter中依赖了你的数据源,下面我以druid作为连接池来进行讲解。


<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>druid-spring-boot-starter</artifactId>
  <version>1.1.2</version>
</dependency>


配置连接池属性:


spring.datasource.druid.log.url=jdbc:mysql://localhost:3306/event_log
spring.datasource.druid.log.username=root
spring.datasource.druid.log.password=123456
spring.datasource.druid.log.driver-class-name=com.mysql.jdbc.Driver


然后在项目中定义一个配置类,配置连接池,手动配置的原因是连接池可以在elastic-job-starter之前被初始化。


@Configuration
public class BeanConfig {
  /**
  * 任务执行事件数据源
  * @return
  */
  @Bean("datasource")
  @ConfigurationProperties("spring.datasource.druid.log")
  public DataSource dataSourceTwo(){
     return DruidDataSourceBuilder.create().build();
  }
}


然后在注解中增加数据源的配置eventTraceRdbDataSource 即可:

注解配置方式如下:


@ElasticJobConf(name = "MySimpleJob", cron = "0/10 * * * * ?", 
  shardingItemParameters = "0=0,1=1", description = "简单任务", eventTraceRdbDataSource = "datasource")


1dc618a0ed9580ce8bfa6facb208c08f.png5d4c6812c8535adbb050f4ddf2e1bce8.png


也可以通过官方提供的可视化工具看到任务执行情况

46a9d80a6e05e4e3b19d57a0ee70bcdf.png


注意: 经测试:


1:经测试: 该功能Mysql数据库的话只支持5.7,Mysql5.8并不会生效


2: 数据库名必须是elastic_job_log 才会自动创建这2张表,其他数据库名请手动添加job_execution_log,job_status_trace_log这2张表(后面附2张表建表sql)


3:zookeepr注册中心必须能够连到mysql


CREATE TABLE `job_execution_log` (
  `id` varchar(40) NOT NULL,
  `job_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '作业名称',
  `task_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任务名称,每次作业运行生成新任务',
  `hostname` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '主机名称',
  `ip` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '主机IP',
  `sharding_item` int NOT NULL COMMENT '分片项',
  `execution_source` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '作业执行来源。可选值为NORMAL_TRIGGER, MISFIRE, FAILOVER',
  `failure_cause` varchar(4000) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '执行失败原因',
  `is_success` int NOT NULL COMMENT '是否执行成功',
  `start_time` timestamp NULL DEFAULT NULL COMMENT '作业开始执行时间',
  `complete_time` timestamp NULL DEFAULT NULL COMMENT '作业结束执行时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='历史轨迹表';
CREATE TABLE `job_status_trace_log` (
  `id` varchar(40) NOT NULL,
  `job_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '作业名称',
  `original_task_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '原任务id',
  `task_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任务id',
  `slave_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '执行作业服务器的名称,Lite版本为服务器的IP地址,Cloud版本为Mesos执行机主键',
  `source` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任务执行源,可选值为CLOUD_SCHEDULER, CLOUD_EXECUTOR, LITE_EXECUTOR',
  `execution_type` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任务执行类型,可选值为NORMAL_TRIGGER, MISFIRE, FAILOVER',
  `sharding_item` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '分片项集合,多个分片项以逗号分隔',
  `state` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '任务执行状态,可选值为TASK_STAGING, TASK_RUNNING, TASK_FINISHED, TASK_KILLED, TASK_LOST, TASK_FAILED, TASK_ERROR',
  `message` varchar(4000) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '相关信息',
  `creation_time` timestamp NULL DEFAULT NULL COMMENT '记录创建时间',
  PRIMARY KEY (`id`),
  KEY `TASK_ID_STATE_INDEX` (`task_id`,`state`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='历史状态表';


3.想执行业务脚本任务如何操作?


elastic-job提供了Script任务类型.

由于Script任务的执行逻辑是在具体的脚本中,是通过scriptCommandLine来指定执行脚本的路径。我这边为了统一的去发现项目中的任务列表,还是需要建一个脚本的Java类,加上ElasticJobConf注解,只是不需要写逻辑而已,示例如下:


/**
 * 脚本任务不需要写逻辑,逻辑在被执行的脚本中,这边只是定义一个任务而已
 *
 */
@ElasticJobConf(name = "MyScriptJob")
public class MyScriptJob implements ScriptJob {
  public void execute(ShardingContext context) {
  }
}


配置:


elastic.job.MyScriptJob.cron=0/10 * * * * ?
elastic.job.MyScriptJob.overwrite=true
elastic.job.MyScriptJob.scriptCommandLine=D:\\1.bat


1.bat脚本内容如下:


@echo ------【脚本任务】Sharding Context: %*

1dc618a0ed9580ce8bfa6facb208c08f.png


4.任务执行前后如何添加监听事件?


首先书写自己的监听事件:


/**
 * 作业监听器, 执行前后发送钉钉消息进行通知
 * @author hfl
 */
public class MessageElasticJobListener implements ElasticJobListener {
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String msg = date + " 【五征定时任务-" + shardingContexts.getJobName() + "】任务开始执行====" + JsonUtils.toJson(shardingContexts);
//        DingDingMessageUtil.sendTextMessage(msg);
        System.out.println(msg);
    }
    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
      String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String msg = date + " 【五征定时任务-" + shardingContexts.getJobName() + "】任务执行结束====" + JsonUtils.toJson(shardingContexts);
//        DingDingMessageUtil.sendTextMessage(msg);
        System.out.println(msg);
    }
}


然后在配置文件中job参数上添加listener参数:


elastic.job.MySimpleJob.listener=com.wuzheng.job.core.MessageElasticJobListener


1dc618a0ed9580ce8bfa6facb208c08f.png


运行MySimpleJob看下执行结果:

5d4c6812c8535adbb050f4ddf2e1bce8.png

可以看到任务前后执行了我们自己的逻辑.


5.任务业务执行失败如何自定义异常处理?


和监听器配置一样,第一步书写自定义异常,第二步配置job参数指定自定义异常类。


/**
 * 自定义异常处理,在任务异常时使用钉钉发送通知
 * @author hfl
 */
public class CustomJobExceptionHandler implements JobExceptionHandler {
    private Logger logger = LoggerFactory.getLogger(CustomJobExceptionHandler.class);
    @Override
    public void handleException(String jobName, Throwable cause) {
        logger.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
        DingDingMessageUtil.sendTextMessage("【"+jobName+"】任务异常。" + cause.getMessage());
    }
}


异常参数设置


elastic.job.MySimpleJob.jobExceptionHandler=com.wuzheng.job.core.CustomJobExceptionHandler



我们自定义个异常测试下(System.out.println(2/0);)

@ElasticJobConf(name = "MySimpleJob")
public class MySimpleJob implements SimpleJob {
  @Override
  public void execute(ShardingContext context) {
  System.out.println(2/0);
  String shardParamter = context.getShardingParameter();
  System.out.println("分片参数:"+shardParamter);
  for (int i = 0; i < 2; i++) {
    String time = new SimpleDateFormat("HH:mm:ss").format(new Date());
    System.out.println(time + ":开始执行简单任务" + i);
  }
  }
}


看下效果:


已经指向了我们自定义的异常的类及其类里面的异常处理逻辑代码:

1dc618a0ed9580ce8bfa6facb208c08f.png


6.动态作业怎么实现?


以上的定时任务基本都是这个流程:首先配置job参数,然后编写业务job,最后启动定时任务所在的微服务,这样该微服务下的所有定时任务job都统一注册进zookeeper注册中心了,然后自动就会启动这些定时任务。


如果有个定时任务,不想微服务启动的时候让其启用,而是想动态的注册任务参数并控制何时启用。这个需求实现也很简单,我们封装的elastic-job的starter就已提供实现。

自定义的starter提供了restful的2个接口,一个注册任务接口: /job;一个删除任务接口:/job/remove?jobName=任务名。


首先书写自己的任务


package com.wuzheng.job.demo.dynamic;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
/**
 * 动态添加任务演示
 * @author hfl
 *
 */
public class DynamicJob implements SimpleJob {
  @Override
  public void execute(ShardingContext shardingContext) {
  // 可以根据JobParameter来对不同的数据进行操作
  System.out.println(shardingContext.getJobParameter());
  System.out.println(shardingContext.getShardingParameter());
  }
}



置好之后,启动这个定时任务微服务项目并不会启用这个任务,因为我们没有任务参数,也没在job上加@ElasticJobConf注解,扫描不到这个任务,但是通过REST API可以动态的注册任务,API列表如下:


/job
添加任务是POST请求,数据格式为JSON体提交,格式如下:
{
"jobName":"DynamicJob333",
"cron":"0/10 * * * * ?",
"jobType":"SIMPLE",
"jobClass":"com.wuzheng.job.demo.dynamic.DynamicJob",
"jobParameter":"2222222",
"shardingTotalCount":1
}


点击查看完整字段地址


注意:jobClass必须事先存在于服务中

使用postman测试:

1dc618a0ed9580ce8bfa6facb208c08f.png


去控制台参看,显示已经成功注册到zookeeper中心,作业状态正常!

5d4c6812c8535adbb050f4ddf2e1bce8.png


/job/remove 删除任务是GET请求,参数只要任务名称即可,比如:/job/remove?jobName=任务名。可以用于任务完成之后清空注册中心的任务信息。


以上所有demo的源码下载地址请点击:https://gitee.com/hufanglei/elastic-job-learn/tree/master/spring-boot-elastic-job-example



相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
144 1
|
5月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
15055 32
|
前端开发 Java 调度
SpringCloud微服务实战——搭建企业级开发框架(四十二):集成分布式任务调度平台XXL-JOB,实现定时任务功能
定时任务几乎是每个业务系统必不可少的功能,计算到期时间、过期时间等,定时触发某项任务操作。在使用单体应用时,基本使用Spring提供的注解即可实现定时任务,而在使用微服务集群时,这种方式就要考虑添加分布式锁来防止多个微服务同时运行定时任务而导致同一个任务重复执行。
998 55
SpringCloud微服务实战——搭建企业级开发框架(四十二):集成分布式任务调度平台XXL-JOB,实现定时任务功能
|
7月前
|
SQL Java 调度
SpringBoot集成quartz定时任务trigger_state状态ERROR解决办法
SpringBoot集成quartz定时任务trigger_state状态ERROR解决办法
|
SQL 存储 Java
SpringBoot集成Quartz(定时任务)
SpringBoot集成Quartz(定时任务)
SpringBoot集成Quartz(定时任务)
|
存储 NoSQL Java
SpringBoot集成Redis业务功能 02、定时任务+Redis删除特定前缀key的优雅实现
SpringBoot集成Redis业务功能 02、定时任务+Redis删除特定前缀key的优雅实现
|
监控 Java 测试技术
听说可以十分钟掌握Spring Boot 集成定时任务、异步调用?
在项目开发中,经常需要定时任务来帮助我们来做一些内容,比如定时发送短信/站内信息、数据汇总统计、业务监控等,所以就要用到我们的定时任务,在Spring Boot中编写定时任务是非常简单的事,下面通过实例介绍如何在Spring Boot中创建定时任务
165 0
听说可以十分钟掌握Spring Boot 集成定时任务、异步调用?
|
数据可视化 Java 调度
可视化定时任务,quartz集成全解析
在日常的工作中,定时任务也是一个非常常见的需求,可以使用注解实现,但是使用这种方式有几个问题,首先就是如果修改比较麻烦,而且没有提供特定的页面对定时任务进行可视化管理。所以quartz就应运而生。本文将介绍如何实现springboot与quartz的整合。
623 0
可视化定时任务,quartz集成全解析
|
2月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用