熟练使用 Elastic Job系列之入门Demo(三)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Elastic-Job-Lite和Elastic-Job-Cloud提供统一作业接口,开发者仅需作业接口做自己的实现,再进行不同的配置以及部署即可完成一个分布式的Job。

系列导航:

熟练使用 Elastic Job系列之概念介绍(一)

juejin.cn/post/685559…

Elastic-Job-Lite和Elastic-Job-Cloud提供统一作业接口,开发者仅需作业接口做自己的实现,再进行不同的配置以及部署即可完成一个分布式的Job。

Java配置启动

  1. 首先引入maven依赖
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>
复制代码
  1. 接着实现统一作业接口进行业务操作
@Slf4j
public class MyElasticJob implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0:
                log.info("0000");
                break;
            case 1:
                log.info("1111");
                break;
            case 2:
                log.info("2222");
                break;
        }
    }
}
复制代码
  1. 对作业进行配置工作
// 作业的基本配置内容
private static LiteJobConfiguration createJobConfiguration() {
    // 定义作业核心配置
    JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
    // 定义SIMPLE类型配置
    SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
    // 定义Lite作业根配置
    return LiteJobConfiguration.newBuilder(simpleJobConfig).build();
}
// zookeeper注册中心配置
private static CoordinatorRegistryCenter createRegistryCenter() {
    CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
    regCenter.init();
    return regCenter;
}
复制代码
  1. 启动运行作业
public class JavaMain {
    public static void main(String[] args) {
        new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
    }
}
复制代码

通过控制台打印输出的内容,可以发现Demo编写运行成功,恭喜你!

[IMFO]2020-07-30 14:39:15,227--main--[org quartz.impl.StaschedulerFactory] Quartz scheduler'demoSimpleJb’initialized from an externally provided properties instance
[I1FO]2020-07-3014:39:15,227--main-- [orgquartz.impl.StdSchedulerFactory] Quartz scheduler version: 2.2.1
[I1FO ]2020-07-30 14:39:15,512 --main-- [org quartz.core.QuartzScheduler] Scheduler demoSimpleJob_$_NOI_CLUSTERED started
[I1FO ]2020-07-30 14:39:31,027 --inner-job-demoSimpleJob-1- [example.MyElasticJob] 0000 
[INFO ]2020-07-30 14:39:31,027 --inner-job-demoSimpleJob-2- [example.MyElasticJob] 1111 
[I1FO]2020-07-30 14:39:31,028 --inner-job-demoSimpleJob-3-  [example.MyElasticJob] 2222 
[INFO ]2020-07-30 14:39:45,387 --inner-job-demoSimpleJob-3- [example.MyElasticJob] 0000 
[INFO]2020-07-30 14:39:45,387 --inner-job-demoSimpleJob-4-  [example.MyElasticJob] 1111 
[INFO]2020-07-30 14:39:45,387 --inner-job-demoSimpleJob-5-  [example.MyElasticJob] 2222 
[I1FO]2020-07-30 14:40:00,511 --inner-job-demoSimpleJob-6-  example MvElasticTob] 0000  
[INF0]2020-07-3014:40:00.511--inner-job-demoSimpleJob-4--[example.MvElasticJob] 1111
[IMF012020-07-3014:40:00.512--inner-iob-demoSimpleJob-6--[example.MvFlasticJob] 2222
[zk:localhost:2181(CONNECTED)2] ls / elastic-iob-lite-springboot,log dir event notification isr change notification, zookeeper, admin, consumers cluster, config,latest producer_id block, brokers, controller epoch]
Telastic-job-lite-springboot,log_dir_event_notification,isr_change_notification, zookeeper, admin elastic-job-demo-quick-demo consume erscluster, config,latest_producer id block, brokers, controller epoch] zk:localhost:2181(CONNECTED)3] ls /
Telastic-job-lite-springboot, log dir event notification,dd-job, isr change notification, zookeeper, admin,elastic-job-demo-quick-demo consumerscluster,config,latest producer id block, brokes税ohtralledepoth区 zk:localhost:2181(CONNECTED)4]

由于大部分人都用spring或Spring boot, 所以这2中方式接下来也是要介绍的。

使用Spring配置启动

  1. 添加如下maven 依赖
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>
 <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
</dependency>
复制代码
  1. 作业的Spring配置 在resource目录下添加文件applicationContext.xml,内容如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
    <!--配置作业注册中心 -->
    <reg:zookeeper id="regCenter" server-lists="192.168.104.102:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000"
                   max-retries="3"/>
    <!-- 配置作业-->
    <job:simple id="demoSimpleSpringJob" class="example.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3"
                sharding-item-parameters="0=A,1=B,2=C"/>
</beans>
复制代码
  1. 启动作业 程序启动读取Spring配置文件,作业将自动加载。
public class JavaMain {
    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
    }
}
复制代码
[IFO ]2020-07-3014:57:32,967--main-- [orgquartz.impl.StaschedulerFactory] Quartz scheduler'demoSimpleSpringJob’initialized from an pyterng 7t provided properties instance
[I1FO ]2020-07-30 14:57:32,967--main-- [orgquartz.impl.StdSchedulerFactory] Quartz scheduler version: 2.2.1
[I1FO ]2020-07-3014:57:33,230 --main-- [org.quartz.core.QuartzScheduler]SchedulerdemoSimpleSpringJob_$_NON_CLUSTERED started
[IFO]2020-07-3014:57:40,385--inner-job-demoSimpleSpringJob-1--[example.MyElasticJob] 0000
[INFO]2020-07-3014:57:40,385--inner-job-demoSimpleSpringJob-2--[example.MyElasticJob] 1111
[INFO]2020-07-3014:57:40,385--inner-job-demoSimpleSpringJob-3--[example.MyElasticJob] 2222
[INFO]2020-07-30 14:57:50,159 --inner-job-demoSimpleSpringJob-4--[example.MyElasticJob] 0000
[I1FO]2020-07-30 14:57:50,160 --inner-job-demoSimpleSpringJob-5--[example.MyElasticJob] 1111
[I1FO]2020-07-3014:57:50,160--inner-job-demoSimpleSpringJob-6--[example.MyElasticJob] 2222  
zk:localhost:2181(CONNECTED) 1] ls /
elastic-job-lite-springboot,log_dir_event_notification,isr_change_notification, zookeeper, admin, consumers, cluster, config, latest_p oroducer id block, brokers, controller epoch] zk:localhost:2181(CONNECTED)2] ls /
[elastic-job-lite-springboot, log dir event notification,isr change notification, zookeeper,admin, elastic-job-demo-quick-demo, consume erscluster. config,latest producer id block, brokers, controller epoch] Izk:localhost:2181(CONNECTED)3] ls/
7k:1ncalhnst:2181(CONNECTEn)41 elastic-job-lite-springboot, log dir event notification, dd-job isr change notification, zookeeper, admin,elastic-job-demo-quick-demo consumers,cluster,config,latest_producer_id_block, brokers, con@roiter

作业类型介绍

Elastic-Job提供Simple、Dataflow和Script 3种作业类型。

Simple类型作业

意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

public class MyElasticJob implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}
复制代码

Dataflow类型作业

Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

可通过DataflowJobConfiguration配置是否流式处理。

流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

public class MyElasticJob implements DataflowJob<Foo> {
    @Override
    public List<Foo> fetchData(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                List<Foo> data = // get data from database by sharding item 0
                return data;
            case 1: 
                List<Foo> data = // get data from database by sharding item 1
                return data;
            case 2: 
                List<Foo> data = // get data from database by sharding item 2
                return data;
            // case n: ...
        }
    }
    @Override
    public void processData(ShardingContext shardingContext, List<Foo> data) {
        // process data
        // ...
    }
}
复制代码

Script类型作业

Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

#!/bin/bash
echo sharding execution context is $*
复制代码

作业运行时会输出

sharding execution context is {“jobName”:“scriptElasticDemoJob”,“shardingTotalCount”:10,“jobParameter”:“”,“shardingItem”:0,“shardingParameter”:“A”}
复制代码

以上方法中的参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。

配置介绍

配置说明

Elastic-Job配置分为3个层级,分别是Core, Type和Root。每个层级使用相似于装饰者模式的方式装配。

Core对应JobCoreConfiguration,用于提供作业核心配置信息,如:作业名称、分片总数、CRON表达式等。

Type对应JobTypeConfiguration,有3个子类分别对应SIMPLE, DATAFLOW和SCRIPT类型作业,提供3种作业需要的不同配置,如:DATAFLOW类型是否流式处理或SCRIPT类型的命令行等。

Root对应JobRootConfiguration,有2个子类分别对应Lite和Cloud部署类型,提供不同部署类型所需的配置,如:Lite类型的是否需要覆盖本地配置或Cloud占用CPU或Memory数量等。

类似如下代码

// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName());
// 定义Lite作业根配置
JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
复制代码

Spring的配置说明

与Spring容器配合使用作业,可将作业Bean配置为Spring Bean,并在作业中通过依赖注入使用Spring容器管理的数据源等对象。可用placeholder占位符从属性文件中取值。Lite可考虑使用Spring命名空间方式简化配置。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
    xmlns:job="http://www.dangdang.com/schema/ddframe/job"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans.xsd 
                        http://www.dangdang.com/schema/ddframe/reg 
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd 
                        http://www.dangdang.com/schema/ddframe/job 
                        http://www.dangdang.com/schema/ddframe/job/job.xsd 
                        ">
    <!--配置作业注册中心 -->
    <reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
    <!-- 配置简单作业-->
    <job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
    <bean id="yourRefJobBeanId" class="xxx.MySimpleRefElasticJob">
        <property name="fooService" ref="xxx.FooService"/>
    </bean>
    <!-- 配置关联Bean作业-->
    <job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
    <!-- 配置数据流作业-->
    <job:dataflow id="throughputDataflow" class="xxx.MyThroughputDataflowElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
    <!-- 配置脚本作业-->
    <job:script id="scriptElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" />
    <!-- 配置带监听的简单作业-->
    <job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C">
        <job:listener class="xx.MySimpleJobListener"/>
        <job:distributed-listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
    </job:simple>
    <!-- 配置带作业数据库事件追踪的简单作业-->
    <job:simple id="eventTraceElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" event-trace-rdb-data-source="yourDataSource">
    </job:simple>
</beans>
复制代码

Java的配置说明

public final class JavaMain {
    public static void main(String[] args) {
        new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
    }
    private static CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
                new ZookeeperConfiguration("192.168.104.102:2181", "elastic-job-demo-quick-demo"));
        regCenter.init();
        return regCenter;
    }
    private static LiteJobConfiguration createJobConfiguration() {
        // 定义作业核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
        // 定义Lite作业根配置
        return LiteJobConfiguration.newBuilder(simpleJobConfig).build();
    }
}
复制代码

详细的规则配置请参考配置手册



相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
运维 监控 搜索推荐
熟练使用 Elastic Job系列之概念介绍(一)
熟练使用 Elastic Job系列之概念介绍(一)
|
Java 调度 Maven
接私活/工作必备-SpringBoot+Elastic-Job的快速启动Demo
接私活/工作必备-SpringBoot+Elastic-Job的快速启动Demo
接私活/工作必备-SpringBoot+Elastic-Job的快速启动Demo
|
Kubernetes 数据可视化 API
k8s学习二:学习基本概念和搭建dashboard
k8s学习二:学习基本概念和搭建dashboard
242 0
k8s学习二:学习基本概念和搭建dashboard
|
SQL IDE Java
xxl-job Demo搭建流程
xxl-job Demo搭建流程
403 0
xxl-job Demo搭建流程
|
运维 算法 Java
Elastic-Job源码解读
文章以任务初始化、任务触发、分片策略、分布式为切入点讲述Elastic Job的源码,一方面自己总结记录、另一方面希望可以帮助到其他的开发者快读理解Elastic Job工作原理。
1249 0
|
C++ Python
ROS入门笔记(十一):编写与测试简单的Service和Client (Python)
ROS入门笔记(十一):编写与测试简单的Service和Client (Python)
528 0
ROS入门笔记(十一):编写与测试简单的Service和Client (Python)
|
弹性计算 运维 jenkins
Flow vs Jenkins 实操对比,如何将Java应用快速发布至ECS
Jenkins 由于其开源特性以及丰富插件能力,长久以来都是中小企业搭建 CICD 流程的首选。不过 Jenkins 存在维护成本高、配置复杂等缺点,云效 Flow 较好地解决了这些问题。 本文从一个 Java 应用部署到云服务器(ECS)的场景切入,对比使用阿里云云效流水线 Flow 和 Jenkins 两种构建部署方式,供大家选型参考。
2298 1
Flow vs Jenkins 实操对比,如何将Java应用快速发布至ECS
|
运维 安全 Java
Elastic-Job使用及原理
Elastic-Job使用及原理
|
存储 分布式计算 资源调度
Spark从入门到入土(一):集群环境搭建(下)
Spark可以不依赖Hadoop运行。如果运行的结果(包括中间结果)不需要存储到HDFS,并且集群管理器不采用YARN的情况下是可以不依赖hadoop的。
Spark从入门到入土(一):集群环境搭建(下)
|
存储 SQL 自然语言处理
elastic基础知识
ElasticSearch是一个分布式、RESTful风格的搜索和数据分析引擎,能够结果不断涌现出的各种用例。作为Elastic Stack的核心,它集中存储数据、帮助您发现意料之中及意料之外的情况。
171 0