本节将探讨ElasticJob故障失效转移机制。我们知道ElasticJob是一款基于Qu-artz的分布式任务调度框架,主要是指数据的分布式。ElasticJob的核心设计理念是一个任务在多个节点上执行,每个节点处理一部分数据。那如果一个任务节点宕机后,则一次任务调度期间,一部分数据将不会被处理,为了解决由于任务节点宕机引起任务一个调度周期的一次任务执行部分数据未处理,可以设置开启故障失效转移,将本次任务转移到其他正常的节点上执行,实现与该任务在单节点上进行调度相同的效果(本次调度处理的数据量),ElasticJob故障失效转移类图如图所示:
- FailoverListenerManager:故障失效转移监听管理器。
- FailoverListenerManager#JobCrashedJobListener job实现(Job实例宕机)事件监听管理器。
- FailoverListenerManager#FailoverSettingsChangedJobListener 失效转移配置变化事件监听器。
注:这里有md不这次美元符号,故暂时用#代表内部类。
Job实例节点宕机事件监听器
1class JobCrashedJobListener extends AbstractJobListener { 2 protected void dataChanged(final String path, final Type eventType, final String data) { 3 if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) { // @1 4 String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1); // @2 5 if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) { // @3 6 return; 7 } 8 List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId); //@4 9 if (!failoverItems.isEmpty()) { //@5 10 for (int each : failoverItems) { 11 failoverService.setCrashedFailoverFlag(each); 12 failoverService.failoverIfNecessary(); 13 } 14 } else { 15 for (int each : shardingService.getShardingItems(jobInstanceId)) { //@6 16 failoverService.setCrashedFailoverFlag(each); 17 failoverService.failoverIfNecessary(); 18 } 19 } 20 } 21 } 22}
代码@1:如果配置文件中设置开启故障失效转移机制,监听到{namespace}/jo-bname/instances节点下子节点的删除事件时,则被认为有节点宕机,将执行故障失效转移相关逻辑。
代码@2:获取被宕机的任务实例ID(jo-bInstanceId)。
代码@3:如果被删除的任务节点ID与当前实例的ID相同,则忽略。
代码@4:根据jobInstanceId获取作业服务器的失效转移分片项集合。其具体实现逻辑如下:
1/** 2 * 获取作业服务器的失效转移分片项集合. 3 * 4 * @param jobInstanceId 作业运行实例主键 5 * @return 作业失效转移的分片项集合 6 */ 7 public List<Integer> getFailoverItems(final String jobInstanceId) { 8 List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT); 9 List<Integer> result = new ArrayList<>(items.size()); 10 for (String each : items) { 11 int item = Integer.parseInt(each); 12 String node = FailoverNode.getExecutionFailoverNode(item); 13 if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) { 14 result.add(item); 15 } 16 } 17 Collections.sort(result); 18 return result; 19 }
首先获取{namespace}/jobname/shard-ing目录下的直接子节点(当前的分片信息),判断{namespace}/jobname/shar-ding/{item}/failover节点是否存在,如果存在判断该分片是否为当前任务的分片节点,如果是,则返回。该方法的主要目的就是获取已经转移到当前任务节点的分片信息。
代码@5,判断是否有失败分片转移到当前节点,初始状态肯定为空,将执行代码@6,设置故障转移相关准备环境。
代码@6,获取分配给Crashed(宕机的job实例)的所有分片节点,遍历已发生故障的分片,将这些分片设置为故障,待故障转移,设置为故障的实现方法为:创建{namespace}/jobname/leader/failover/items/{item}。
代码@7:执行FailoverService#failover-IfNecessary是否执行故障转移。
1/** 2 * 如果需要失效转移, 则执行作业失效转移. 3 */ 4 public void failoverIfNecessary() { 5 if (needFailover()) { 6 jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback()); 7 } 8 } 9 10 private boolean needFailover() { 11 return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty() 12 && !JobRegistry.getInstance().isJobRunning(jobName); 13 }
其实现思路:【needFailover方法】首先判断是否存在{namespace}/jobname/leader/failover/items节点是否存在,并且其节点下是否有子节点,并且节点也运行该任务,则需要执行故障失效转移。执行失效转移的逻辑也是进行失效转移选主,其分布式锁节点为:{nam-espace}/jobname/leader/failover/latch,谁先获得锁,则执行失效故障转移具体逻辑(FailoverLeaderExecutionCallba-ck),具体的失效转移算法为:
1FailoverService#FailoverLeaderExecutionCallback: 2class FailoverLeaderExecutionCallback implements LeaderExecutionCallback { 3 @Override 4 public void execute() { 5 if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) { // @1 6 return; 7 } 8 int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0)); // @2 9 log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem); 10 jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); // @3 11 jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem)); // @4 12 // TODO 不应使用triggerJob, 而是使用executor统一调度 13 JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); // @5 14 if (null != jobScheduleController) { 15 jobScheduleController.triggerJob(); 16 } 17 } 18 }
代码@1:如果当前实例停止运行该job或无需执行失效故障转移,则返回。
代码@2:获取第一个待故障转移的分片,获取{namespace}/jobname/leader/failover/items/{itemnum,获取分片序号itemnum。
代码@3:创建临时节点{namesp-ace}/j-obname/sharding/itemnum/failover节点。
代码@4:删除{namespace}/jobname/leader/failover/items/{itemnum}节点。
代码@5:触发任务调度,并结束当前节点的故障失效转移,然后释放锁,下一个节点获取锁,进行转移{namespac-e}/jobname/leader/failover/items目录下的失效分片。
PS:故障实现转移基本实现思路为:当一个任务节点宕机后,其他节点会监听到实例删除事件,从实例目录中获取其实例ID,并从ZK中获取原先分配故障实例的分片信息,并将这些分片标记为需要故障转移(创建{namespace}/jobname/leader/failover/items/{item}持久节点),然后判断是否需要执行故障转移操作。执行故障转移操作的前提条件是:
1、当前任务实例也调度该job
2、存在{namespace}/jobname/leader/failover/items节点并有子节点。如果满足上述两个条件,则执行失效转移,多个存活节点进行选主(LeaderLatch),创建分布式锁节点({namespace}/jobname/l-eader/failover/latch),获取锁的节点优先执行获取分片节点,其具体过程如上述所示,每个存活节点一次故障转移只竞争一个分片。
上述事件监听器主要的作用是当任务节点失效后,其他存活节点“瓜分”失效节点的分片,创建{namespace}/jobname/sharding/{item}/failover节点。但这些分片的任务并没有真正执行,本小结将梳理故障节点分片的执行。可以看得出来,分片故障转移,就是在对应的故障分片下创建了failover节点,在获取分片信息上下文时会优先处理,这也是在分析分片流程时并未重点讲解的(也就是本次故障的失效节点将在下次任务执行之前,先处理需要故障转移的分片节点)。
因此,在进入下述内容之前,请先阅读源码分析ElasticJob的分片机制。回到定时任务调度执行入口:AbstractElasticJ-obExecutor#execute
1 /** 2 * 执行作业. 3 */ 4 public final void execute() { 5 try { 6 jobFacade.checkJobExecutionEnvironment(); 7 } catch (final JobExecutionEnvironmentException cause) { 8 jobExceptionHandler.handleException(jobName, cause); 9 } 10 ShardingContexts shardingContexts = jobFacade.getShardingContexts(); // 获取分片上下文环境 11 ... 12 } 13 14LiteJobFacade#getShardingContexts 15@Override 16 public ShardingContexts getShardingContexts() { 17 boolean isFailover = configService.load(true).isFailover(); 18 if (isFailover) { // @1 19 List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems(); // @2 20 if (!failoverShardingItems.isEmpty()) { 21 return executionContextService.getJobShardingContext(failoverShardingItems); // @3 22 } 23 } 24 shardingService.shardingIfNecessary(); 25 List<Integer> shardingItems = shardingService.getLocalShardingItems(); 26 if (isFailover) { 27 shardingItems.removeAll(failoverService.getLocalTakeOffItems()); 28 } 29 shardingItems.removeAll(executionService.getDisabledItems(shardingItems)); 30 return executionContextService.getJobShardingContext(shardingItems); 31 }
代码@1:获取分片上下文时,如果启用了故障失效转移机制,优先获取故障失效转移的分片上下文。
代码@2:获取本节点获取的实现分片信息。其基本逻辑是遍历{namespace}/jo-bname/sharding下的字节点,获取该任务当前的所有分片信息,遍历每个节点,获取序号,然后依次判断是否存在({namespace}/jobname/sharding/{item}/failover),并且该节点的内容为当前的实例ID,则加入到分片结果中。
代码@3:根据失效分片序号构建分片上下文环境,执行该分片上的任务,根据分片上下文环境,执行任务。执行完本次任务调度后,将删除分片的故障标记,待下一次任务调度时重新分片。删除分片的故障标记代码如下:
1Facade#registerJobCompleted 2public void registerJobCompleted(final ShardingContexts shardingContexts) { 3 executionService.registerJobCompleted(shardingContexts); // @1 4 if (configService.load(true).isFailover()) { 5 failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); // @2 6 } 7}
代码@1:将分片的调度任务设置为执行完成,首先在内存中设置任务为非运行中(JobRegistry.getInstance().setJobRu-nning(false)),如果开启了monitorExec-ution,则需要删除分片的运行标记,具体做法是,删除{namespace}/jobname/sharding/{item}/running节点。
代码@2:如果启用了故障失效转移,调用updateFailoverComplete方法,更新故障实现转移处理完成,删除{names-pace}/jobname/sharding/{item}/failover节点,下次任务统一调度的时候,所有的分片会重新再分片,也就完成一次故障失效转移。
故障实现转移,其实就是在一次任务调度期间,分片节点宕机,导致分配在宕机服务上的分片任务未执行,那这一部数据在本次任务调度期间未被处理,为了及时处理那部分数据库,ElasticJob支持故障失效转移,就是在一次任务调度期间,将其他宕机服务所分配的分片上下文转移到当前存活的节点上执行,本次失效的分片节点任务将在下次任务开始调度之前,会优先处理失效分片节点的任务,然后才会开始新一轮调动任务。下一次调动任务运行时,会重新进行分片。
ElasticJob是一款分布式任务调度平台,这里的分布式更多指的还是数据的分布式,就是一个任务在多个分片上执行,每个节点根据分片上下文获取部分数据进行处理(数据分片)。