背景
目前spark的repartition()
方法是随机分配数据到下游,这会导致一个问题,有时候如果我们用repartition
方法的时候,如果任务发生了重试,就有可能导致任务的数据不准确,那这个时候改怎么解决这个问题呢?
分析
在Spark RDD中存在着名为outputDeterministicLevel的变量,如下:
private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value = { if (isReliablyCheckpointed) { DeterministicLevel.DETERMINATE } else { getOutputDeterministicLevel } }
那么该变量的作用是什么呢?让我们分析一下:
改变量最终会被Stage
的isIndeterminate
方法调用:
def isIndeterminate: Boolean = { rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE }
而该方法会被DAGScheduler
调用,有两处地方会被调用:
- submitMissingTasks中调用
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = { logDebug("submitMissingTasks(" + stage + ")") // Before find missing partition, do the intermediate state clean work first. // The operation here can make sure for the partially completed intermediate stage, // `findMissingPartitions()` returns all partitions every time. stage match { case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable => mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId) case _ => }
该方法主要用于在重新提交失败的stage时候,用来判断是否需要重新计算上游的所有任务。
- handleTaskCompletion中调用
case FetchFailed(bmAddress, shuffleId, _, mapIndex, _, failureMessage) => 。。。 val noResubmitEnqueued = !failedStages.contains(failedStage) failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { // If the map stage is INDETERMINATE, which means the map tasks may return // different result when re-try, we need to re-try all the tasks of the failed // stage and its succeeding stages, because the input data will be changed after the // map tasks are re-tried. // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is // guaranteed to be determinate, so the input data of the reducers will not change // even if the map tasks are re-tried. if (mapStage.isIndeterminate) {
这里如果任务Fetch失败了,根据该shuffle所对应的上游stage是不是isIndeterminate
来向DAGScheduler
提交ResubmitFailedStages
事件,从而调用submitMissingTasks
方法进行上游所有任务或者单个任务的重试。
再回到outputDeterministicLevel
变量,该变量会调用getOutputDeterministicLevel
方法进行循环调用上游的outputDeterministicLevel
变量来确定outputDeterministicLevel
的值。
结论
所以根据以上分析,我们可以改写对应的RDD的outputDeterministicLevel
变量或者getOutputDeterministicLevel
方法来进行stage任务的全部重试与否