SPARK outputDeterministicLevel的作用--任务全部重试或者部分重试

简介: SPARK outputDeterministicLevel的作用--任务全部重试或者部分重试

背景


目前spark的repartition()方法是随机分配数据到下游,这会导致一个问题,有时候如果我们用repartition方法的时候,如果任务发生了重试,就有可能导致任务的数据不准确,那这个时候改怎么解决这个问题呢?


分析


在Spark RDD中存在着名为outputDeterministicLevel的变量,如下:

private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value = {
    if (isReliablyCheckpointed) {
      DeterministicLevel.DETERMINATE
    } else {
      getOutputDeterministicLevel
    }
  }

那么该变量的作用是什么呢?让我们分析一下:


改变量最终会被StageisIndeterminate方法调用:

 def isIndeterminate: Boolean = {
    rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
  }

而该方法会被DAGScheduler调用,有两处地方会被调用:


  • submitMissingTasks中调用
   private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
    logDebug("submitMissingTasks(" + stage + ")")
    // Before find missing partition, do the intermediate state clean work first.
    // The operation here can make sure for the partially completed intermediate stage,
    // `findMissingPartitions()` returns all partitions every time.
    stage match {
      case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
        mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)
      case _ =>
    }

该方法主要用于在重新提交失败的stage时候,用来判断是否需要重新计算上游的所有任务。


  • handleTaskCompletion中调用
      case FetchFailed(bmAddress, shuffleId, _, mapIndex, _, failureMessage) =>
            。。。
            val noResubmitEnqueued = !failedStages.contains(failedStage)
            failedStages += failedStage
            failedStages += mapStage
            if (noResubmitEnqueued) {
              // If the map stage is INDETERMINATE, which means the map tasks may return
              // different result when re-try, we need to re-try all the tasks of the failed
              // stage and its succeeding stages, because the input data will be changed after the
              // map tasks are re-tried.
              // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
              // guaranteed to be determinate, so the input data of the reducers will not change
              // even if the map tasks are re-tried.
             if (mapStage.isIndeterminate) {

这里如果任务Fetch失败了,根据该shuffle所对应的上游stage是不是isIndeterminate来向DAGScheduler提交ResubmitFailedStages事件,从而调用submitMissingTasks方法进行上游所有任务或者单个任务的重试。


再回到outputDeterministicLevel变量,该变量会调用getOutputDeterministicLevel方法进行循环调用上游的outputDeterministicLevel变量来确定outputDeterministicLevel的值。


结论


所以根据以上分析,我们可以改写对应的RDD的outputDeterministicLevel变量或者getOutputDeterministicLevel方法来进行stage任务的全部重试与否

相关文章
|
3月前
|
分布式计算 监控 Spark
Spark 任务运行时日志分析
Spark 任务运行时日志分析
75 0
|
2月前
|
分布式计算 运维 Serverless
EMR Serverless Spark PySpark流任务体验报告
阿里云EMR Serverless Spark是一款全托管的云原生大数据计算服务,旨在简化数据处理流程,降低运维成本。测评者通过EMR Serverless Spark提交PySpark流任务,体验了从环境准备、集群创建、网络连接到任务管理的全过程。通过这次测评,可以看出阿里云EMR Serverless Spark适合有一定技术基础的企业,尤其是需要高效处理大规模数据的场景,但新用户需要投入时间和精力学习和适应。
7157 43
EMR Serverless Spark PySpark流任务体验报告
|
1月前
|
分布式计算 Java Serverless
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
本文以 ECS 连接 EMR Serverless Spark 为例,介绍如何通过 EMR Serverless spark-submit 命令行工具进行 Spark 任务开发。
292 7
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
|
1月前
|
分布式计算 运维 Serverless
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。
206 7
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
|
3天前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
24 0
|
2月前
|
弹性计算 分布式计算 DataWorks
DataWorks产品使用合集之spark任务如何跨空间取表数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
22 1
|
2月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之使用spark.sql执行rename分区操作,遇到任务报错退出的情况,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
2月前
|
分布式计算 运维 Serverless
通过Serverless Spark提交PySpark流任务的实践体验
EMR Serverless Spark服务是阿里云推出的一种全托管、一站式的数据计算平台,旨在简化大数据计算的工作流程,让用户更加专注于数据分析和价值提炼,而非基础设施的管理和运维。下面就跟我一起通过Serverless Spark提交PySpark流任务吧。
66 1
|
2月前
|
分布式计算 资源调度 Java
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
33 0
|
2月前
|
分布式计算 Shell 调度
看看airflow怎样调度python写的spark任务吧
看看airflow怎样调度python写的spark任务吧
26 0