源码分析ElasticJob故障失效转移机制

简介: 本节将探讨ElasticJob故障失效转移机制。我们知道ElasticJob是一款基于Quartz的分布式任务调度框架,这里的分布式是数据的分布式,ElasticJob的核心设计理念是一个任务在多个节点上执行,每个节点处理一部分数据(任务待处理数据分片)。

本节将探讨ElasticJob故障失效转移机制。我们知道ElasticJob是一款基于Quartz的分布式任务调度框架,这里的分布式是数据的分布式,ElasticJob的核心设计理念是一个任务在多个节点上执行,每个节点处理一部分数据(任务待处理数据分片)。那如果一个任务节点宕机后,则一次任务调度期间,一部分数据将不会被处理,为了解决由于任务节点宕机引起任务一个调度周期的一次任务执行部分数据未处理,可以设置开启故障失效转移,将本次任务转移到其他正常的节点上执行,实现与该任务在单节点上进行调度相同的效果(本次调度处理的数据量),ElasticJob故障失效转移类图如图所示:
这里写图片描述

  • FailoverListenerManager:故障失效转移监听管理器。
  • FailoverListenerManager$JobCrashedJobListener job实现(Job实例宕机)事件监听管理器。
  • FailoverListenerManager$FailoverSettingsChangedJobListener 失效转移配置变化事件监听器。

1、故障失效转移事件监听管理器详解

1.1 JobCrashedJobListener Job实例节点宕机事件监听器

class JobCrashedJobListener extends AbstractJobListener {
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {     // @1
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);                                 // @2
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {               // @3
                    return;
                }
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);                                                  //@4
                if (!failoverItems.isEmpty()) {                                                                                                                               //@5
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlag(each);                                                                                          
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {                                                                //@6
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }

代码@1:如果配置文件中设置开启故障失效转移机制,监听到${namespace}/jobname/instances节点下子节点的删除事件时,则被认为有节点宕机,将执行故障失效转移相关逻辑。
代码@2:获取被宕机的任务实例ID(jobInstanceId)。
代码@3:如果被删除的任务节点ID与当前实例的ID相同,则忽略。
代码@4:根据jobInstanceId获取作业服务器的失效转移分片项集合。

其实现逻辑如下:FailoverService#getFailoverItems

/**
     * 获取作业服务器的失效转移分片项集合.
     *
     * @param jobInstanceId 作业运行实例主键
     * @return 作业失效转移的分片项集合
     */
    public List<Integer> getFailoverItems(final String jobInstanceId) {
        List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
        List<Integer> result = new ArrayList<>(items.size());
        for (String each : items) {
            int item = Integer.parseInt(each);
            String node = FailoverNode.getExecutionFailoverNode(item);
            if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
                result.add(item);
            }
        }
        Collections.sort(result);
        return result;
    }

首先获取${namespace}/jobname/sharding目录下的直接子节点(当前的分片信息),判断${namespace}/jobname/sharding/{item}/failover节点是否存在,如果存在判断该分片是否为当前任务的分片节点,如果是,则返回。该方法的主要目的就是获取已经转移到当前任务节点的分片信息。

代码@5,判断是否有失败分片转移到当前节点,初始状态肯定为空,将执行代码@6,设置故障转移相关准备环境。

代码@6,获取分配给Crashed(宕机的job实例)的所有分片节点,遍历已发生故障的分片,将这些分片设置为故障,待故障转移,设置为故障的实现方法为:创建${namespace}/jobname/leader/failover/items/{item}。

代码@7:执行FailoverService#failoverIfNecessary是否执行故障转移。

/**
     * 如果需要失效转移, 则执行作业失效转移.
     */
    public void failoverIfNecessary() {
        if (needFailover()) {
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }
    
    private boolean needFailover() {
        return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                && !JobRegistry.getInstance().isJobRunning(jobName);
    }

其实现思路:【needFailover方法】首先判断是否存在&dollar;{namespace}/jobname/leader/failover/items节点是否存在,并且其节点下是否有子节点,并且节点也运行该任务,则需要执行故障失效转移。执行失效转移的逻辑也是进行失效转移选主,其分布式锁节点为:${namespace}/jobname/leader/failover/latch,谁先获得锁,则执行失效故障转移具体逻辑(FailoverLeaderExecutionCallback),具体的失效转移算法为:

FailoverService#FailoverLeaderExecutionCallback:

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
        @Override
        public void execute() {
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {      // @1
                return;
            }
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));    // @2
            log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());  // @3
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));     // @4
            // TODO 不应使用triggerJob, 而是使用executor统一调度
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);    // @5
            if (null != jobScheduleController) {
                jobScheduleController.triggerJob();  
            }
        }
    }

代码@1:如果当前实例停止运行该job或无需执行失效故障转移,则返回。

代码@2:获取第一个待故障转移的分片,获取${namespace}/jobname/leader/failover/items/{itemnum,获取分片序号itemnum。

代码@3:创建临时节点${namespace}/jobname/sharding/itemnum/failover节点。

代码@4:删除${namespace}/jobname/leader/failover/items/{itemnum}节点。

代码@5:触发任务调度,并结束当前节点的故障失效转移,然后释放锁,下一个节点获取锁,进行转移${namespace}/jobname/leader/failover/items目录下的失效分片。

PS:故障实现转移基本实现思路为:当一个任务节点宕机后,其他节点会监听到实例删除事件,从实例目录中获取其实例ID,并从ZK中获取原先分配故障实例的分片信息,并将这些分片标记为需要故障转移(创建${namespace}/jobname/leader/failover/items/{item}持久节点),然后判断是否需要执行故障转移操作。

执行故障转移操作的前提条件是:

  1. 当前任务实例也调度该job;
  2. 存在${namespace}/jobname/leader/failover/items节点并有子节点。如果满足上述两个条件,则执行失效转移,多个存活节点进行选主(LeaderLatch),创建分布式锁节点(${namespace}/jobname/leader/failover/latch),获取锁的节点优先执行获取分片节点,其具体过程如上述所示,每个存活节点一次故障转移只竞争一个分片。

2、故障分片重新执行逻辑分析

上述事件监听器主要的作用是当任务节点失效后,其他存活节点“瓜分”失效节点的分片,创建${namespace}/jobname/sharding/{item}/failover节点。但这些分片的任务并没有真正执行,本小结将梳理故障节点分片的执行。

可以看得出来,分片故障转移,就是在对应的故障分片下创建了failover节点,在获取分片信息上下文时会优先处理,这也是在分析分片流程时并未重点讲解的。因此,在进入下述内容之前,请先阅读 源码分析ElasticJob的分片机制。

回到定时任务调度执行入口:AbstractElasticJobExecutor#execute

/**
     * 执行作业.
     */
    public final void execute() {
        try {
            jobFacade.checkJobExecutionEnvironment();
        } catch (final JobExecutionEnvironmentException cause) {
            jobExceptionHandler.handleException(jobName, cause);
        }
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();  // 获取分片上下文环境
        ... 
    }

LiteJobFacade#getShardingContexts
@Override
    public ShardingContexts getShardingContexts() {
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {    // @1
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();    // @2
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);    // @3
            }
        }
        shardingService.shardingIfNecessary();
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        if (isFailover) {
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
        return executionContextService.getJobShardingContext(shardingItems);
    }

代码@1:获取分片上下文时,如果启用了故障失效转移机制,优先获取故障失效转移的分片上下文。

代码@2:获取本节点获取的实现分片信息。其基本逻辑是遍历&dollar;{namespace}/jobname/sharding下的字节点,获取该任务当前的所有分片信息,遍历每个节点,获取序号,然后依次判断是否存在(&dollar;{namespace}/jobname/sharding/{item}/failover),并且该节点的内容为当前的实例ID,则加入到分片结果中。

代码@3:根据失效分片序号构建分片上下文环境,执行该分片上的任务,根据分片上下文环境,执行任务。【AbstractElasticJob#execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);】执行完本次任务调度后,将删除分片的故障标记,待下一次任务调度时重新分片。

删除分片的故障标记代码如下:LiteJobFacade#registerJobCompleted

public void registerJobCompleted(final ShardingContexts shardingContexts) {
        executionService.registerJobCompleted(shardingContexts);  // @1
        if (configService.load(true).isFailover()) {
            failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());   // @2
        }
}

代码@1:将分片的调度任务设置为执行完成,首先在内存中设置任务为非运行中(JobRegistry.getInstance().setJobRunning(false)),如果开启了monitorExecution,则需要删除分片的运行标记,具体做法是,删除&dollar;{namespace}/jobname/sharding/{item}/running节点。

代码@2:如果启用了故障失效转移,调用updateFailoverComplete方法,更新故障实现转移处理完成,删除${namespace}/jobname/sharding/{item}/failover节点,下次任务统一调度的时候,所有的分片会重新再分片,也就完成一次故障失效转移。

总结

故障实现转移,其实就是在一次任务调度期间,分片节点宕机,导致分配在宕机服务上的分片任务未执行,那这一部数据在本次任务调度期间未被处理,为了及时处理那部分数据库,ElasticJob支持故障失效转移,就是在一次任务调度期间,将其他宕机服务所分配的分片上下文转移到当前存活的节点上执行,执行完毕后,才会开始下一次调动任务。

下一次调动任务运行时,会重新进行分片。
ElasticJob是一款分布式任务调度平台,这里的分布式更多指的还是数据的分布式,就是一个任务在多个分片上执行,每个节点根据分片上下文获取部分数据进行处理(数据分片)。


原文发布时间为:2018-12-03
本文作者:丁威,《RocketMQ技术内幕》作者。
本文来自中间件兴趣圈,了解相关信息可以关注中间件兴趣圈

目录
相关文章
采用zookeeper的EPHEMERAL节点机制实现服务集群的陷阱
在集群管理中使用Zookeeper的EPHEMERAL节点机制存在很多的陷阱,毛估估,第一次使用zk来实现集群管理的人应该有80%以上会掉坑,有些坑比较隐蔽,在网络问题或者异常的场景时才会出现,可能很长一段时间才会暴露出来。
14486 1
|
14天前
|
运维 监控 安全
自动恢复机制在哪些情况下可能无法正常工作,有哪些替代方案?
自动恢复机制在哪些情况下可能无法正常工作,有哪些替代方案?
|
6月前
|
存储 监控 负载均衡
保证Redis的高可用性是一个涉及多个层面的任务,主要包括数据持久化、复制与故障转移、集群化部署等方面
【5月更文挑战第15天】保证Redis高可用性涉及数据持久化、复制与故障转移、集群化及优化策略。RDB和AOF是数据持久化方法,哨兵模式确保故障自动恢复。Redis Cluster实现分布式部署,提高负载均衡和容错性。其他措施包括身份认证、多线程、数据压缩和监控报警,以增强安全性和稳定性。通过综合配置与监控,可确保Redis服务的高效、可靠运行。
226 2
|
3月前
|
机器学习/深度学习 传感器 算法
GTS自动补偿机制的作用
【8月更文挑战第25天】
37 2
|
3月前
|
监控 网络协议 Linux
心跳机制方案
心跳机制方案
54 1
|
4月前
|
消息中间件 存储 监控
深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制
深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制
90 1
|
4月前
|
负载均衡 中间件 定位技术
中间件故障转移和容错实现方法
【7月更文挑战第24天】
62 2
|
6月前
|
监控 物联网 Java
打造高可用系统:深入了解心跳检测机制
本文介绍了分布式系统中**心跳检测**的重要机制,用于监测系统节点的健康状态和通信畅通。心跳检测通过定期发送信号,若节点在预定期限内未响应则视为可能失效。处理机制包括重试、报警和自动修复。文章还提到了**周期检测**和**累计失效检测**两种策略,并给出Java代码示例展示心跳检测实现。此外,列举了心跳检测在分布式数据库、微服务和物联网等场景的应用,以及优化策略如动态调整心跳频率和优化超时机制。最后,强调了心跳检测对系统稳定性和高可用性的关键作用。
488 2
|
6月前
|
Kubernetes Java 索引
Elasticsearch 源码探究 001——故障探测和恢复机制
Elasticsearch 源码探究 001——故障探测和恢复机制
67 0
|
6月前
|
XML 负载均衡 Dubbo
了解Dubbo配置:优先级、重试和容错机制的秘密【五】
了解Dubbo配置:优先级、重试和容错机制的秘密【五】
299 0