写在前面
目前公司使用的作业调度工具是Elastic-Job
,版本2.1.5,三月份因为失效转移配置出过一次线上事故,排查问题的过程中粗略的读了一下源码,刚好借此机会深入理解一下Elastic-Job
。
总体架构
注:图片来自https://github.com/elasticjob/elastic-job-lite
概述
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。(本文不讨论)
Elastic-Job
核心组件:quartz
、Zookeeper
。
quartz
的角色是调度每台机器上的任务(即每台机器上的分片任务何时执行)Zookeeper
则是分布式调度中心
功能
Elastic-Job-Lite
分布式调度协调
弹性扩容缩容
失效转移
错过执行作业重触发
作业分片一致性
保证同一分片在分布式环境中仅一个执行实例
自诊断并修复分布式不稳定造成的问题
支持并行调度
支持作业生命周期操作
丰富的作业类型
Spring整合以及命名空间提供
运维平台
源码解读
任务初始化
JobScheduler
public class JobScheduler {
/**
* 两个固定的key值,存于Quartz的JobDetail#JobDataMap中
*/
public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
...
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
// 添加作业实例
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
// 作业配置
this.liteJobConfig = liteJobConfig;
// 注册中心
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
// 提供了一个分布式任务开始或完成时的前置后置扩展点,这里用户可以用来执行一些任务开启以及完成时的特定逻辑
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
// 任务门面,一个任务一个jobFacade,封装了任务开启、任务失效转移、错过再执行、任务事件等
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final List<ElasticJobListener> elasticJobListeners) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, liteJobConfig.getJobName());
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
}
}
}
/**
* 初始化作业.
*/
public void init() {
// 更新作业配置到ZK
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
// 设置分片数
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
// 创建作业调度控制器
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
// 本地将job与jobScheduleController和注册中心关联起来,同时zk注册中心创建以jobName命名的节点
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
// 注册作业启动信息
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
// 调度任务
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
/**
* 配置Quartz
* 这里体现的是Elastic Job和Quartz的融合,Quartz任务调度的具体语句是scheduler.scheduleJob(jobDetail, createTrigger(cron)),
* 当任务设定时间到了之后,Quartz会去执行org.quartz.Job#execute(org.quartz.JobExecutionContext)方法,Elastic Job对应的Job实现是LiteJob
*
*/
private JobDetail createJobDetail(final String jobClass) {
// Quartz描述调度任务的接口
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
// 注意这个地方的jobFacade和LiteJob中jobFacade是一致的
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
// 注意这个地方的elasticJob和LiteJob中elasticJob是一致的,用于任务调度时判断任务的类型
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
...
任务触发
作业实例内任务触发是通过Quartz
来完成的,按照cron设定的时间定时触发,Quartz
触发Job的实现LiteJob#execute
方法
LiteJob
public final class LiteJob implements Job {
/**
* 注意JobScheduler中的两个静态属性,JobScheduler初始化时将这两个字段存于Quartz的JobDetail#JobDataMap中,
* Quartz在初始化LiteJob时,会从JobDetail的JobDataMap中取到这两个值,具体可见org.quartz.simpl.PropertySettingJobFactory#newJob(org.quartz.spi.TriggerFiredBundle, org.quartz.Scheduler)
* public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
* private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
*/
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
// Quartz任务调度的入口
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
AbstractElasticJobExecutor
public abstract class AbstractElasticJobExecutor {
...省略代码
/**
* 执行作业.
*/
public final void execute() {
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobExceptionHandler.handleException(jobName, cause);
}
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
}
// 如果任务正在执行中,将分配给当前作业实例的分片都记录为misfire
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
try {
// 作业前置扩展
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
// 1、执行任务
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
// 2、错过再执行
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
// 3、失效转移
jobFacade.failoverIfNecessary();
try {
// 作业后置扩展
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
}
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
// 记录任务启动
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, JobStatusTraceEvent.State.TASK_RUNNING, "");
}
try {
// 执行作业
process(shardingContexts, executionSource);
} finally {
// TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, JobStatusTraceEvent.State.TASK_FINISHED, "");
}
} else {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, JobStatusTraceEvent.State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
}
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
// 单分片,当前作业实例只有一个分片
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
final CountDownLatch latch = new CountDownLatch(items.size());
// 多分片情况,一个分片一个线程
for (final int each : items) {
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
}
});
}
try {
// 协调多分片同步完成
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(startEvent);
}
log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
JobExecutionEvent completeEvent;
try {
// 单分片任务处理,实际会转到用户自定义的执行内容
process(new ShardingContext(shardingContexts, item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(completeEvent);
}
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
completeEvent = startEvent.executionFailure(cause);
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtil.transform(cause));
jobExceptionHandler.handleException(jobName, cause);
}
}
protected abstract void process(ShardingContext shardingContext);
}
SimpleJobExecutor
这里仅以SimpleJobExecutor
为例,AbstractElasticJobExecutor
还有两个实现ScriptJobExecutor、DataflowJobExecutor
,内容雷同不赘述。
public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
private final SimpleJob simpleJob;
public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
super(jobFacade);
this.simpleJob = simpleJob;
}
@Override
protected void process(final ShardingContext shardingContext) {
// 用户自定义的任务执行内容
simpleJob.execute(shardingContext);
}
}
分片策略
/**
* 基于平均分配算法的分片策略.
*
* <p>
* 如果分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器.
* 如:
* 1. 如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
* 2. 如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
* 3. 如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
* </p>
*
* @author zhangliang
*/
public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
@Override
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
if (jobInstances.isEmpty()) {
return Collections.emptyMap();
}
// 整除的部分,每台机器平均分配
Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
// 不能整除的部分,从第一台机器开始,一台一个,直到分完为止
addAliquant(jobInstances, shardingTotalCount, result);
return result;
}
private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
int count = 0;
for (JobInstance each : shardingUnits) {
List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
shardingItems.add(i);
}
result.put(each, shardingItems);
count++;
}
return result;
}
private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
int aliquant = shardingTotalCount % shardingUnits.size();
int count = 0;
// 从第一个开始,分配不均的分别加到各台机器上,直到分完为止
for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
if (count < aliquant) {
entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
}
count++;
}
}
}
分布式
Elastic Job使用zk作为注册中心,作业实例信息、分片信息、配置信息、作业运行状态等均已节点方式存于zk。Elastic Job是通过zk的节点变更事件完成分布式任务协同,节点新增、变更、移除等事件会实时同步给分布式环境中的每个作业实例,Elastic Job提供了多种监听器来处理这些事件,监听器父类AbstractJobListener
以及TreeCacheListener
。
TreeCacheListener
zk提供的节点变更监听接口
/**
* Listener for {@link TreeCache} changes
*/
public interface TreeCacheListener
{
/**
* 监听zk事件变更
* Called when a change has occurred
*
* @param client the client
* @param event describes the change
* @throws Exception errors
*/
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception;
}
AbstractJobListener
Elastic Job封装的作业监听器
public abstract class AbstractJobListener implements TreeCacheListener {
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData) {
return;
}
String path = childData.getPath();
if (path.isEmpty()) {
return;
}
dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
}
// 抽象方法,子类监听器按需实现
protected abstract void dataChanged(final String path, final Type eventType, final String data);
}
JobCrashedJobListener
失效转移监听器
class JobCrashedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
// 1失效转移开启、2注册中心事件-节点移除,也就是一台服务器下线、3是instance路径,即jobName/instances路径
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
// path,jobName/instances/ip-@-@pid
// jobInstanceId是这个样子的ip-@-@pid
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
// 如果jobInstanceId和当前机器一致,直接跳过
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
// 获取失效转移的分片,对应zk目录jobName/sharding/分片号/failover,失效转移分片对应的实例id
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
// 如果有jobInstanceId的失效转移分片
for (int each : failoverItems) {
// 把分片存放到目录leader/failover/items
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
} else {
// 获取如果jobInstanceId没有失效转移分片对应的分片,然后存放到目录leader/failover/items/分片号,执行分片分片失效转移
// 从这里看只要是服务器宕机就一定要执行时效转移逻辑了,其实也不是,
// shardingService.getShardingItems(jobInstanceId)会判断服务器是否还可用,不可用的话返回的分片集合就是空的
// 但是,针对dump对内存导致的服务器短暂的不可用,则有可能出现错误,我们的任务异常启动就出现这里
for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}
}
FailoverSettingsChangedJobListener
失效转移配置变更监听器,从控制台关闭失效转移时的处理逻辑,如果是开启的话本地无需处理
class FailoverSettingsChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !LiteJobConfigurationGsonFactory.fromJson(data).isFailover()) {
failoverService.removeFailoverInfo();
}
}
}
其他
JobRegistry 任务管理,一个JVM一个单例,记录任务和注册中心对应关心、任务状态、任务实例
SchedulerFacade 任务调度门面类,一个任务对应一个
JobNodeStorage 作业节点访问
ShardingNode zk节点名称构建规则
JobNodePath 作业节点构建
总结
文章以任务初始化、任务触发、分片策略、分布式为切入点讲述Elastic Job的源码,一方面自己总结记录、另一方面希望可以帮助到其他的开发者快读理解Elastic Job工作原理。