Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,外部仅依赖Zookeeper;Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。
笔者项目中采用的Lite版本,所以以下是梳理Elastic-Job-Lite的内容:
一. 整体架构图
整体架构图
二. 功能列表
- 分布式调度协调
Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。
注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。
- 弹性扩容缩容
运行中的作业服务器崩溃,或新增加n台作业服务器,作业框架将在下次作业执行前重新分片,不影响当前作业执行。
- 失效转移
Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。
一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。
- 错过执行作业重触发
自动记录错过执行的作业,并在上次作业完成后自动触发。
- 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
- 自诊断并修复分布式不稳定造成的问题
- 支持并行调度
- 支持作业生命周期操作
- 丰富的作业类型
Elastic-Job提供Simple、Dataflow和Script 3种作业类型。
- Spring整合以及命名空间提供
支持spring容器,自定义命名空间,支持占位符。
- 运维平台
提供运维界面,可以管理作业和注册中心。
三. 作业类型:
Elastic-Job提供Simple、Dataflow和Script 3种作业类型。 方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。
a. Simple类型作业
意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。
b. Dataflow类型作业
Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
c. Script类型作业
Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。
四. 原理:
- 第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务。
- 某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态。
- 主节点选举,服务器上下线,分片总数变更均更新重新分片标记。
- 定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。
- 通过上一项说明可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。
- 每次分片都会按服务器IP排序,保证分片结果不会产生较大波动。
- 实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。
服务启动流程:
启动流程图
作业执行流程:
作业执行流程图
五. Elastic-Job有何使用限制
•作业启动成功后修改作业名称视为新作业,原作业废弃。
•同一台作业服务器可以运行多个相同的作业实例,但每个作业实例必须使用不同的JobInstanceId,因为作业运行时是按照IP和JobInstanceId注册和管理的。
JobInstanceId可在作业配置中设置。
•一旦有服务器波动,或者修改分片项,将会触发重新分片;触发重新分片将会导致运行中的流式处理的作业在执行完本次作业后不再继续执行,等待分片结束后再恢复正常。
•开启monitorExecution才能实现分布式作业幂等性(即不会在多个作业服务器运行同一个分片)的功能,但monitorExecution对短时间内执行的作业(如每5秒一触发)性能影响较大,建议关闭并自行实现幂等性。
六. Simple类型作业应用示例
1.引入Maven依赖:
<!-- 引入elastic-job-lite核心模块 --> <dependency> <groupId>io.elasticjob</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>${latest.release.version}</version> </dependency> <!-- 使用springframework自定义命名空间时引入 --> <dependency> <groupId>io.elasticjob</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>${latest.release.version}</version> </dependency>
2.作业开发:
public class MyElasticJob implements SimpleJob { @Override public void execute(ShardingContext context) { switch (context.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } }
3.作业配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "> <!--配置作业注册中心 --> <reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /> <!-- 配置作业--> <job:simple id="oneOffElasticJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> </beans>
4.作业启动:
Spring启动方式: 将配置Spring命名空间的xml通过Spring启动,作业将自动加载。
5.配置属性详细说明:
可查看官方配置文档:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/
更多详细内容可以查看elasticjob开发指导文档:http://elasticjob.io/docs/elastic-job-lite/02-guide