Flink – submitJob

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介:

Jobmanager的submitJob逻辑,

复制代码
/**
   * Submits a job to the job manager. The job is registered at the libraryCacheManager which
   * creates the job's class loader. The job graph is appended to the corresponding execution
   * graph and the execution vertices are queued for scheduling.
   *
   * @param jobGraph representing the Flink job
   * @param jobInfo the job info
   * @param isRecovery Flag indicating whether this is a recovery or initial submission
   */
  private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
    if (jobGraph == null) {
      jobInfo.notifyClients(
        decorateMessage(JobResultFailure(
          new SerializedThrowable(
            new JobSubmissionException(null, "JobGraph must not be null.")))))
    }
    else {
      val jobId = jobGraph.getJobID
      val jobName = jobGraph.getName
      var executionGraph: ExecutionGraph = null

      try {
        // Important: We need to make sure that the library registration is the first action,
        // because this makes sure that the uploaded jar files are removed in case of
        // unsuccessful
        try {
          libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,
            jobGraph.getClasspaths)
        }
        var userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) //加载Jar

        val restartStrategy = //加载重启策略
          Option(jobGraph.getSerializedExecutionConfig()
            .deserializeValue(userCodeLoader)
            .getRestartStrategy())
            .map(RestartStrategyFactory.createRestartStrategy)
            .filter(p => p != null) match {
            case Some(strategy) => strategy
            case None => restartStrategyFactory.createRestartStrategy()
          }

        val jobMetrics = jobManagerMetricGroup match { //生成job manager metric group
          case Some(group) =>
            group.addJob(jobGraph) match {
              case (jobGroup:Any) => jobGroup
              case null => new UnregisteredMetricsGroup()
            }
          case None =>
            new UnregisteredMetricsGroup()
        }

        val numSlots = scheduler.getTotalNumberOfSlots() //现有的slots数目

        // see if there already exists an ExecutionGraph for the corresponding job ID
        val registerNewGraph = currentJobs.get(jobGraph.getJobID) match {
          case Some((graph, currentJobInfo)) =>
            executionGraph = graph
            currentJobInfo.setLastActive()
            false
          case None =>
            true
        }

        executionGraph = ExecutionGraphBuilder.buildGraph( //build ExecutionGraph
          executionGraph,
          jobGraph,
          flinkConfiguration,
          futureExecutor,
          ioExecutor,
          userCodeLoader,
          checkpointRecoveryFactory,
          Time.of(timeout.length, timeout.unit),
          restartStrategy,
          jobMetrics,
          numSlots,
          log.logger)
        
        if (registerNewGraph) { //如果是新的JobGraph,注册到currentJobs
          currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))
        }

        // get notified about job status changes
        executionGraph.registerJobStatusListener( //jobmananger加到通知listeners
          new StatusListenerMessenger(self, leaderSessionID.orNull))

        jobInfo.clients foreach { //client加到通知listeners
          // the sender wants to be notified about state changes
          case (client, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) =>
            val listener  = new StatusListenerMessenger(client, leaderSessionID.orNull)
            executionGraph.registerExecutionListener(listener)
            executionGraph.registerJobStatusListener(listener)
          case _ => // do nothing
        }

      } catch { //失败
        case t: Throwable =>
          log.error(s"Failed to submit job $jobId ($jobName)", t)

          libraryCacheManager.unregisterJob(jobId)
          currentJobs.remove(jobId)

          if (executionGraph != null) {
            executionGraph.fail(t) //fail executionGraph
          }

          val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {
            t
          } else {
            new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)
          }

          jobInfo.notifyClients(
            decorateMessage(JobResultFailure(new SerializedThrowable(rt)))) //通知提交失败
          return
      }

      //上面是准备executionGraph,下面是异步提交
      // execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
      // because it is a blocking operation
      future {
        try {
          if (isRecovery) {
            // this is a recovery of a master failure (this master takes over)
            executionGraph.restoreLatestCheckpointedState(false, false) //加载checkpoint状态
          }
          else {
            // load a savepoint only if this is not starting from a newer checkpoint
            // as part of an master failure recovery
            val savepointSettings = jobGraph.getSavepointRestoreSettings
            if (savepointSettings.restoreSavepoint()) { //处理savePoint
              try {
                val savepointPath = savepointSettings.getRestorePath()
                val allowNonRestored = savepointSettings.allowNonRestoredState()

                log.info(s"Starting job from savepoint '$savepointPath'" +
                  (if (allowNonRestored) " (allowing non restored state)" else "") + ".")

                  // load the savepoint as a checkpoint into the system
                  val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(
                    jobId,
                    executionGraph.getAllVertices,
                    savepointPath,
                    executionGraph.getUserClassLoader,
                    allowNonRestored)

                executionGraph.getCheckpointCoordinator.getCheckpointStore
                  .addCheckpoint(savepoint)

                // Reset the checkpoint ID counter
                val nextCheckpointId: Long = savepoint.getCheckpointID + 1
                log.info(s"Reset the checkpoint ID to $nextCheckpointId")
                executionGraph.getCheckpointCoordinator.getCheckpointIdCounter
                  .setCount(nextCheckpointId)

                executionGraph.restoreLatestCheckpointedState(true, allowNonRestored)
              } catch {
                case e: Exception =>
                  jobInfo.notifyClients(
                    decorateMessage(JobResultFailure(new SerializedThrowable(e))))
                  throw new SuppressRestartsException(e)
              }
            }

            try {
              submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //存储该JobGraph到zk,ZooKeeperSubmittedJobGraphStore
            } catch {
              case t: Throwable =>
                // Don't restart the execution if this fails. Otherwise, the
                // job graph will skip ZooKeeper in case of HA.
                jobInfo.notifyClients(
                  decorateMessage(JobResultFailure(new SerializedThrowable(t))))
                throw new SuppressRestartsException(t)
            }
          }

          jobInfo.notifyClients(
            decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) //通知clients提交成功

          if (leaderElectionService.hasLeadership) {
            // There is a small chance that multiple job managers schedule the same job after if
            // they try to recover at the same time. This will eventually be noticed, but can not be
            // ruled out from the beginning.

            // NOTE: Scheduling the job for execution is a separate action from the job submission.
            // The success of submitting the job must be independent from the success of scheduling
            // the job.
            log.info(s"Scheduling job $jobId ($jobName).")

            executionGraph.scheduleForExecution(scheduler) //开始调度
          } else {
            // Remove the job graph. Otherwise it will be lingering around and possibly removed from
            // ZooKeeper by this JM.
            self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))

            log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
              "this. I am not scheduling the job for execution.")
          }
        } catch {
          case t: Throwable => try {
            executionGraph.fail(t)
          } catch {
            case tt: Throwable =>
              log.error("Error while marking ExecutionGraph as failed.", tt)
          }
        }
      }(context.dispatcher)
    }
  }
复制代码

可以看到executionGraph在调度前就已经通知用户提交成功

 

当job发生问题,需要调用到tryRestartOrFail

复制代码
private boolean tryRestartOrFail() {
        JobStatus currentState = state;

        if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
            synchronized (progressLock) { //

                final boolean isFailureCauseAllowingRestart = !(failureCause instanceof SuppressRestartsException);
                final boolean isRestartStrategyAllowingRestart = restartStrategy.canRestart(); //重启策略是否允许重启
                boolean isRestartable = isFailureCauseAllowingRestart && isRestartStrategyAllowingRestart;

                if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
                    restartStrategy.restart(this);

                    return true;
                } else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) { //如果不允许重启,就failed
                    final List<String> reasonsForNoRestart = new ArrayList<>(2);
                    if (!isFailureCauseAllowingRestart) {
                        reasonsForNoRestart.add("a type of SuppressRestartsException was thrown");
                    }
                    if (!isRestartStrategyAllowingRestart) {
                        reasonsForNoRestart.add("the restart strategy prevented it");
                    }

                    LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(),
                        StringUtils.join(reasonsForNoRestart, " and "), failureCause);
                    postRunCleanup();

                    return true;
                } else {
                    // we must have changed the state concurrently, thus we cannot complete this operation
                    return false;
                }
            }
        } else {
            // this operation is only allowed in the state FAILING or RESTARTING
            return false;
        }
    }
复制代码

 

有两处会调用到tryRestartOrFail

1. ExecutionGraph.jobVertexInFinalState

复制代码
void jobVertexInFinalState() {
    synchronized (progressLock) {
        if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
            throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
        }

        numFinishedJobVertices++;

        if (numFinishedJobVertices == verticesInCreationOrder.size()) { //当所有的vertices都已经finished

            // we are done, transition to the final state
            JobStatus current;
            while (true) {
                current = this.state;

                if (current == JobStatus.RUNNING) {
                    if (transitionState(current, JobStatus.FINISHED)) {
                        postRunCleanup();
                        break;
                    }
                }
                else if (current == JobStatus.CANCELLING) {
                    if (transitionState(current, JobStatus.CANCELED)) {
                        postRunCleanup();
                        break;
                    }
                }
                else if (current == JobStatus.FAILING) {
                    if (tryRestartOrFail()) { //如果failing,调用tryRestartOrFail
                        break;
                    }
                    // concurrent job status change, let's check again
                }
复制代码


2. 显式的调用到ExecutionGraph.fail

复制代码
} else if (current == JobStatus.RESTARTING) {
    this.failureCause = t;

    if (tryRestartOrFail()) {
        return;
    }
    // concurrent job status change, let's check again
}
复制代码

 

上面调用到restartStrategy.restart(this);

restartStrategy有很多种,我们先看看

FixedDelayRestartStrategy

 

@Override
    public void restart(final ExecutionGraph executionGraph) {
        currentRestartAttempt++;
        FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutor());
    }

异步的调用,ExecutionGraphRestarter.restartWithDelay

最终调用到

executionGraph.restart();
复制代码
public void restart() {
        try {
            synchronized (progressLock) {
                this.currentExecutions.clear();

                Collection<CoLocationGroup> colGroups = new HashSet<>();

                for (ExecutionJobVertex jv : this.verticesInCreationOrder) {

                    CoLocationGroup cgroup = jv.getCoLocationGroup();
                    if(cgroup != null && !colGroups.contains(cgroup)){
                        cgroup.resetConstraints();
                        colGroups.add(cgroup);
                    }

                    jv.resetForNewExecution();
                }

                for (int i = 0; i < stateTimestamps.length; i++) {
                    if (i != JobStatus.RESTARTING.ordinal()) {
                        // Only clear the non restarting state in order to preserve when the job was
                        // restarted. This is needed for the restarting time gauge
                        stateTimestamps[i] = 0;
                    }
                }
                numFinishedJobVertices = 0;
                transitionState(JobStatus.RESTARTING, JobStatus.CREATED);

                // if we have checkpointed state, reload it into the executions
                if (checkpointCoordinator != null) {
                    checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
                }
            }

            scheduleForExecution(slotProvider); //加入schedule
        }
        catch (Throwable t) {
            LOG.warn("Failed to restart the job.", t);
            fail(t);
        }
    }
复制代码

 

关于重启策略,

参考https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/restart_strategies.html

If checkpointing is not enabled, the “no restart” strategy is used. If checkpointing is activated and the restart strategy has not been configured, the fixed-delay strategy is used with Integer.MAX_VALUE restart attempts.

 

StreamingJobGraphGenerator
复制代码
private void configureCheckpointing() {
        CheckpointConfig cfg = streamGraph.getCheckpointConfig();

        long interval = cfg.getCheckpointInterval();
        if (interval > 0) {
            // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
            if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {
                // if the user enabled checkpointing, the default number of exec retries is infinite.
                streamGraph.getExecutionConfig().setRestartStrategy(
                    RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
            }
        }
复制代码
当打开checkpoint的时候,默认是使用fixedDelayRestart,并Integer.MAX_VALUE次重启
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
JavaScript 前端开发
uniapp仿微信聊天室|仿微信界面
基于uniapp+vue仿微信聊天室uniapp-chatroom项目,vue语法及类似小程序api开发原生APP应用,实现了发送图文消息、表情(gif动图),图片预览、地图位置、红包、仿微信朋友圈等功能。
3104 0
|
9月前
|
Cloud Native Apache 流计算
资料合集|Flink Forward Asia 2024 上海站
Apache Flink 年度技术盛会聚焦“回顾过去,展望未来”,涵盖流式湖仓、流批一体、Data+AI 等八大核心议题,近百家厂商参与,深入探讨前沿技术发展。小松鼠为大家整理了 FFA 2024 演讲 PPT ,可在线阅读和下载。
8350 18
资料合集|Flink Forward Asia 2024 上海站
|
SQL 分布式计算 数据库
畅捷通基于Flink的实时数仓落地实践
本文整理自畅捷通总架构师、阿里云MVP专家郑芸老师在 Flink Forward Asia 2023 中闭门会上的分享。
8447 15
畅捷通基于Flink的实时数仓落地实践
|
11月前
|
分布式计算 大数据 Serverless
云栖实录 | 开源大数据全面升级:Native 核心引擎、Serverless 化、湖仓架构引领云上大数据发展
在2024云栖大会开源大数据专场上,阿里云宣布推出实时计算Flink产品的新一代向量化流计算引擎Flash,该引擎100%兼容Apache Flink标准,性能提升5-10倍,助力企业降本增效。此外,EMR Serverless Spark产品启动商业化,提供全托管Serverless服务,性能提升300%,并支持弹性伸缩与按量付费。七猫免费小说也分享了其在云上数据仓库治理的成功实践。其次 Flink Forward Asia 2024 将于11月在上海举行,欢迎报名参加。
559 6
云栖实录 | 开源大数据全面升级:Native 核心引擎、Serverless 化、湖仓架构引领云上大数据发展
|
11月前
|
数据采集 存储 人工智能
AI时代数据湖实践
本文分享了如何利用阿里云的存储解决方案构建一个具备高效处理、高时效性的AI数据湖,通过高吞吐训练和高效推理帮助企业快速实现数据价值,以及用户在使用中的最佳实践。
1164 3
|
XML Java 数据格式
【spring源码系列-01】spring底层源码整体概述
【spring源码系列-01】spring底层源码整体概述
281 3
|
自然语言处理 算法 fastjson
fastjson2与fury的巅峰对决,谁会笑到最后?
fastjson2与fury的巅峰对决,谁会笑到最后?
423 0
|
JSON 自然语言处理 Java
性能飙升20倍!!! 超高性能协议框架fury完爆protostuff
性能飙升20倍!!! 超高性能协议框架fury完爆protostuff
406 0
|
监控 数据可视化 Java
SpringCloud学习(十五):Hystrix图形化Dashboard搭建与实战
SpringCloud学习(十五):Hystrix图形化Dashboard搭建与实战
332 0
SpringCloud学习(十五):Hystrix图形化Dashboard搭建与实战
|
网络协议 Linux