源码分析ElasticJob故障失效转移机制

简介: 本节将探讨ElasticJob故障失效转移机制。我们知道ElasticJob是一款基于Quartz的分布式任务调度框架,这里的分布式是数据的分布式,ElasticJob的核心设计理念是一个任务在多个节点上执行,每个节点处理一部分数据(任务待处理数据分片)。

本节将探讨ElasticJob故障失效转移机制。我们知道ElasticJob是一款基于Quartz的分布式任务调度框架,这里的分布式是数据的分布式,ElasticJob的核心设计理念是一个任务在多个节点上执行,每个节点处理一部分数据(任务待处理数据分片)。那如果一个任务节点宕机后,则一次任务调度期间,一部分数据将不会被处理,为了解决由于任务节点宕机引起任务一个调度周期的一次任务执行部分数据未处理,可以设置开启故障失效转移,将本次任务转移到其他正常的节点上执行,实现与该任务在单节点上进行调度相同的效果(本次调度处理的数据量),ElasticJob故障失效转移类图如图所示:
这里写图片描述

  • FailoverListenerManager:故障失效转移监听管理器。
  • FailoverListenerManager$JobCrashedJobListener job实现(Job实例宕机)事件监听管理器。
  • FailoverListenerManager$FailoverSettingsChangedJobListener 失效转移配置变化事件监听器。

1、故障失效转移事件监听管理器详解

1.1 JobCrashedJobListener Job实例节点宕机事件监听器

class JobCrashedJobListener extends AbstractJobListener {
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {     // @1
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);                                 // @2
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {               // @3
                    return;
                }
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);                                                  //@4
                if (!failoverItems.isEmpty()) {                                                                                                                               //@5
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlag(each);                                                                                          
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {                                                                //@6
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }

代码@1:如果配置文件中设置开启故障失效转移机制,监听到${namespace}/jobname/instances节点下子节点的删除事件时,则被认为有节点宕机,将执行故障失效转移相关逻辑。
代码@2:获取被宕机的任务实例ID(jobInstanceId)。
代码@3:如果被删除的任务节点ID与当前实例的ID相同,则忽略。
代码@4:根据jobInstanceId获取作业服务器的失效转移分片项集合。

其实现逻辑如下:FailoverService#getFailoverItems

/**
     * 获取作业服务器的失效转移分片项集合.
     *
     * @param jobInstanceId 作业运行实例主键
     * @return 作业失效转移的分片项集合
     */
    public List<Integer> getFailoverItems(final String jobInstanceId) {
        List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
        List<Integer> result = new ArrayList<>(items.size());
        for (String each : items) {
            int item = Integer.parseInt(each);
            String node = FailoverNode.getExecutionFailoverNode(item);
            if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
                result.add(item);
            }
        }
        Collections.sort(result);
        return result;
    }

首先获取${namespace}/jobname/sharding目录下的直接子节点(当前的分片信息),判断${namespace}/jobname/sharding/{item}/failover节点是否存在,如果存在判断该分片是否为当前任务的分片节点,如果是,则返回。该方法的主要目的就是获取已经转移到当前任务节点的分片信息。

代码@5,判断是否有失败分片转移到当前节点,初始状态肯定为空,将执行代码@6,设置故障转移相关准备环境。

代码@6,获取分配给Crashed(宕机的job实例)的所有分片节点,遍历已发生故障的分片,将这些分片设置为故障,待故障转移,设置为故障的实现方法为:创建${namespace}/jobname/leader/failover/items/{item}。

代码@7:执行FailoverService#failoverIfNecessary是否执行故障转移。

/**
     * 如果需要失效转移, 则执行作业失效转移.
     */
    public void failoverIfNecessary() {
        if (needFailover()) {
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }
    
    private boolean needFailover() {
        return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                && !JobRegistry.getInstance().isJobRunning(jobName);
    }

其实现思路:【needFailover方法】首先判断是否存在&dollar;{namespace}/jobname/leader/failover/items节点是否存在,并且其节点下是否有子节点,并且节点也运行该任务,则需要执行故障失效转移。执行失效转移的逻辑也是进行失效转移选主,其分布式锁节点为:${namespace}/jobname/leader/failover/latch,谁先获得锁,则执行失效故障转移具体逻辑(FailoverLeaderExecutionCallback),具体的失效转移算法为:

FailoverService#FailoverLeaderExecutionCallback:

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
        @Override
        public void execute() {
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {      // @1
                return;
            }
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));    // @2
            log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());  // @3
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));     // @4
            // TODO 不应使用triggerJob, 而是使用executor统一调度
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);    // @5
            if (null != jobScheduleController) {
                jobScheduleController.triggerJob();  
            }
        }
    }

代码@1:如果当前实例停止运行该job或无需执行失效故障转移,则返回。

代码@2:获取第一个待故障转移的分片,获取${namespace}/jobname/leader/failover/items/{itemnum,获取分片序号itemnum。

代码@3:创建临时节点${namespace}/jobname/sharding/itemnum/failover节点。

代码@4:删除${namespace}/jobname/leader/failover/items/{itemnum}节点。

代码@5:触发任务调度,并结束当前节点的故障失效转移,然后释放锁,下一个节点获取锁,进行转移${namespace}/jobname/leader/failover/items目录下的失效分片。

PS:故障实现转移基本实现思路为:当一个任务节点宕机后,其他节点会监听到实例删除事件,从实例目录中获取其实例ID,并从ZK中获取原先分配故障实例的分片信息,并将这些分片标记为需要故障转移(创建${namespace}/jobname/leader/failover/items/{item}持久节点),然后判断是否需要执行故障转移操作。

执行故障转移操作的前提条件是:

  1. 当前任务实例也调度该job;
  2. 存在${namespace}/jobname/leader/failover/items节点并有子节点。如果满足上述两个条件,则执行失效转移,多个存活节点进行选主(LeaderLatch),创建分布式锁节点(${namespace}/jobname/leader/failover/latch),获取锁的节点优先执行获取分片节点,其具体过程如上述所示,每个存活节点一次故障转移只竞争一个分片。

2、故障分片重新执行逻辑分析

上述事件监听器主要的作用是当任务节点失效后,其他存活节点“瓜分”失效节点的分片,创建${namespace}/jobname/sharding/{item}/failover节点。但这些分片的任务并没有真正执行,本小结将梳理故障节点分片的执行。

可以看得出来,分片故障转移,就是在对应的故障分片下创建了failover节点,在获取分片信息上下文时会优先处理,这也是在分析分片流程时并未重点讲解的。因此,在进入下述内容之前,请先阅读 源码分析ElasticJob的分片机制。

回到定时任务调度执行入口:AbstractElasticJobExecutor#execute

/**
     * 执行作业.
     */
    public final void execute() {
        try {
            jobFacade.checkJobExecutionEnvironment();
        } catch (final JobExecutionEnvironmentException cause) {
            jobExceptionHandler.handleException(jobName, cause);
        }
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();  // 获取分片上下文环境
        ... 
    }

LiteJobFacade#getShardingContexts
@Override
    public ShardingContexts getShardingContexts() {
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {    // @1
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();    // @2
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);    // @3
            }
        }
        shardingService.shardingIfNecessary();
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        if (isFailover) {
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
        return executionContextService.getJobShardingContext(shardingItems);
    }

代码@1:获取分片上下文时,如果启用了故障失效转移机制,优先获取故障失效转移的分片上下文。

代码@2:获取本节点获取的实现分片信息。其基本逻辑是遍历&dollar;{namespace}/jobname/sharding下的字节点,获取该任务当前的所有分片信息,遍历每个节点,获取序号,然后依次判断是否存在(&dollar;{namespace}/jobname/sharding/{item}/failover),并且该节点的内容为当前的实例ID,则加入到分片结果中。

代码@3:根据失效分片序号构建分片上下文环境,执行该分片上的任务,根据分片上下文环境,执行任务。【AbstractElasticJob#execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);】执行完本次任务调度后,将删除分片的故障标记,待下一次任务调度时重新分片。

删除分片的故障标记代码如下:LiteJobFacade#registerJobCompleted

public void registerJobCompleted(final ShardingContexts shardingContexts) {
        executionService.registerJobCompleted(shardingContexts);  // @1
        if (configService.load(true).isFailover()) {
            failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());   // @2
        }
}

代码@1:将分片的调度任务设置为执行完成,首先在内存中设置任务为非运行中(JobRegistry.getInstance().setJobRunning(false)),如果开启了monitorExecution,则需要删除分片的运行标记,具体做法是,删除&dollar;{namespace}/jobname/sharding/{item}/running节点。

代码@2:如果启用了故障失效转移,调用updateFailoverComplete方法,更新故障实现转移处理完成,删除${namespace}/jobname/sharding/{item}/failover节点,下次任务统一调度的时候,所有的分片会重新再分片,也就完成一次故障失效转移。

总结

故障实现转移,其实就是在一次任务调度期间,分片节点宕机,导致分配在宕机服务上的分片任务未执行,那这一部数据在本次任务调度期间未被处理,为了及时处理那部分数据库,ElasticJob支持故障失效转移,就是在一次任务调度期间,将其他宕机服务所分配的分片上下文转移到当前存活的节点上执行,执行完毕后,才会开始下一次调动任务。

下一次调动任务运行时,会重新进行分片。
ElasticJob是一款分布式任务调度平台,这里的分布式更多指的还是数据的分布式,就是一个任务在多个分片上执行,每个节点根据分片上下文获取部分数据进行处理(数据分片)。


原文发布时间为:2018-12-03
本文作者:丁威,《RocketMQ技术内幕》作者。
本文来自中间件兴趣圈,了解相关信息可以关注中间件兴趣圈

目录
相关文章
|
机器学习/深度学习 应用服务中间件 Linux
API一键搭建智能时光相册,记录你的美
API时代,要搭建一个云相册,就相对来说简单很多,或者说一个开发人员就可以快速实现,并且还能具备智能分析识别、归类、搜索等功能齐全的智能云相册。
4305 0
|
8月前
|
机器学习/深度学习 存储 PyTorch
PyTorch内存优化的10种策略总结:在有限资源环境下高效训练模型
在大规模深度学习模型训练中,GPU内存容量常成为瓶颈,特别是在训练大型语言模型和视觉Transformer时。本文系统介绍了多种内存优化策略,包括混合精度训练、低精度训练(如BF16)、梯度检查点、梯度累积、张量分片与分布式训练、
346 14
PyTorch内存优化的10种策略总结:在有限资源环境下高效训练模型
|
机器学习/深度学习 搜索推荐 大数据
深度解析:如何通过精妙的特征工程与创新模型结构大幅提升推荐系统中的召回率,带你一步步攻克大数据检索难题
【10月更文挑战第2天】在处理大规模数据集的推荐系统项目时,提高检索模型的召回率成为关键挑战。本文分享了通过改进特征工程(如加入用户活跃时段和物品相似度)和优化模型结构(引入注意力机制)来提升召回率的具体策略与实现代码。严格的A/B测试验证了新模型的有效性,为改善用户体验奠定了基础。这次实践加深了对特征工程与模型优化的理解,并为未来的技术探索提供了方向。
530 2
深度解析:如何通过精妙的特征工程与创新模型结构大幅提升推荐系统中的召回率,带你一步步攻克大数据检索难题
|
分布式计算 Java Hadoop
NameNode 处理线程配置(心跳并发)
NameNode线程池处理客户端和数据节点请求,如读写文件及心跳、块报告。通过调整`dfs.namenode.handler.count`(默认10,示例设为21)在`hdfs-site.xml`中可控制并发处理能力。线程数过多或过少都可能影响性能,需平衡资源使用并进行基准测试以确定最佳值。合理线程数可通过公式`int(math.log(N) * 20)`计算,N为服务器数量。例如,3台服务器的计算结果为21。
504 4
|
12月前
|
数据采集 监控 数据挖掘
拼多多商品评价API的获取与应用
在数字化商业时代,拼多多商品评价API为开发者和企业提供深入理解消费者反馈、优化产品策略及提升用户体验的重要途径。本文详述了该API的获取方法及其在电商平台运营优化、品牌商市场调研与产品改进、数据分析与市场洞察等领域的广泛应用,强调了遵守使用规范、数据质量处理及性能优化的重要性。
829 0
|
Linux Docker 容器
docker启动完美容器的过程
本文详细介绍了使用Docker创建和管理容器的过程,包括拉取镜像、搜索镜像、创建容器、启动、停止、删除容器,以及查看容器日志和进程信息的常用命令。
587 2
|
机器学习/深度学习 人工智能 算法框架/工具
使用Python实现深度学习模型:智能身份验证与防伪
使用Python实现深度学习模型:智能身份验证与防伪
517 1
|
数据采集 自然语言处理 大数据
​「Python大数据」LDA主题分析模型
使用Python进行文本聚类,流程包括读取VOC数据、jieba分词、去除停用词,应用LDA模型(n_components=5)进行主题分析,并通过pyLDAvis生成可视化HTML。关键代码涉及数据预处理、CountVectorizer、LatentDirichletAllocation以及HTML文件的本地化处理。停用词和业务术语列表用于优化分词效果。
818 0
​「Python大数据」LDA主题分析模型
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用合集之如何对分区表进行合并小文件操作
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
运维 Kubernetes 监控
微服务架构中服务的编排
微服务架构中服务的编排
314 0