Flink运行时之客户端提交作业图-上

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 客户端提交作业图 作业图(JobGraph)是Flink的运行时所能理解的作业表示,无论程序通过是DataStream还是DataSet API编写的,它们的JobGraph提交给JobManager以及之后的处理都将得到统一。


客户端提交作业图

作业图(JobGraph)是Flink的运行时所能理解的作业表示,无论程序通过是DataStream还是DataSet API编写的,它们的JobGraph提交给JobManager以及之后的处理都将得到统一。本篇我们将分析客户端如何提交JobGraph给JobManager。

流处理程序提交作业图

在前面讲解Flink的核心概念的时候我们谈到了Flink利用了“惰性求值”的概念,只有当最终调用execute方法时,才会真正开始执行。因此,execute方法是我们的切入点。

DataStream API所编写的程序生成作业图之后,在提交时产生的方法调用时序图示意如下:

上图中的多个run方法是同名的方法重载。

从时序图中可以看到,ClusterClient对其自身抽象方法submitJob的调用是触发作业图提交的方法。随后真正的提交逻辑由JobClient实现。

ClusterClient封装了提交一个程序到远程集群的必要的功能,而StandaloneClusterClient则扩展了ClusterClient的功能,它专门针对独立的集群提供服务,这两个类都位于flink-clients模块中。JobClient则负责将用户的Job提交给JobManager,它充当了提交代理的角色,并返回表示作业执行结果的JobExecutionResult对象。

JobClient是提交所有类型的Job的统一入口,具体的提交细节我们将会在“公共的提交流程”中详细分析。

批处理程序提交作业图

利用DataSet API所编写的程序生成作业图之后,在提交时产生的方法调用的时序图如下:

上图中出现多个重名的run方法为同名方法重载。

从上图中可以看到,批处理程序的JobGraph跟流处理程序的JobGraph在提交之前有非常明显的不同。它引入了PlanExecutor作为Flink程序的计划执行器。而RemoteExecutor是PlanExecutor的实现,用于将程序提交给远程的Flink集群。具体的提交动作被进一步委托给ClusterClient及其实现(StandaloneClusterClient)最终同样被JobClient代理提交给JobManager。

公共的提交流程

从前面的时序图可见Flink对于不同类型的程序的提交流程最终是殊途同归的。因此,接下来我们将对公共的提交流程进行分析。一个程序的JobGraph真正被提交始于对JobClient的submitJobAndWait方法的调用。

submitJobAndWait方法用于将一个JobGraph发送到指定的JobClient actor,随后它会将该JobGraph转发给JobManager。该方法会一直阻塞,直到该作业执行完成或者感知不到JobManager的存活。如果作业被顺利执行完成则返回JobExecutionResult对象而如果JobManager产生故障,则抛出抛出JobExecutionException异常。

一个JobGraph从提交开始会经过多个对象层层递交,各个对象之间的交互关系如下图所示:

JobClient在其中起到了“桥接”作用,它桥接了同步的方法调用和异步的消息通信。更具体得说,JobClient可以看做是一个“静态类”提供了一些静态方法,这里我们主要关注上面的submitJobAndWait方法,该方法内部封装了Actor之间的异步通信(具体的通信对象是JobClientActor,它负责跟JobManager的ActorSystem的Actor对象进行通信),并以阻塞的形式返回结果。而ClusterClient只需调用JobClient的submitJobAndWait方法,而无需关注其内部是如何实现的。

通过调用JobClient的submitJobAndWait静态方法,会触发基于Akka的Actor之间的消息通信来完成后续的提交JobGraph的动作。这之间的交互示意图如下:

这里总共有两个ActorSystem,一个归属于JobClient,另一个归属于JobManager。在submitJobAndWait方法中,其首先会创建一个JobClientActor的ActorRef:

ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

然后向其发起一个SubmitJobAndWait消息,该消息将JobGraph的实例提交给JobClientActor。发起模式是ask,它表示需要一个应答消息。

Akka的消息通信模型有两种:

  1. Fire and forget:消息的生产者不期望从消息的消费者那里得到应答。这种消息会以异步的形式发送,发送方法在发送完成之后立即返回。Akka的actor使用tell方法发送这种消息。
  2. Send and receive:消息的生产者期待并将等待从消费者那里得到应答。这种消息也会以异步的形式发送,发送完成后会返回一个Future对象,该对象表示一个潜在的应答。Akka的actor使用ask方法发送这种消息,并通过Future来获取响应。

JobClient向JobClientActor发送消息的代码段如下:

Future<Object> future = Patterns.ask(jobClientActor,
                                     new JobClientMessages.SubmitJobAndWait(jobGraph),
                                     new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());

该SubmitJobAndWait消息被JobClientActor接收后,最终通过调用tryToSubmitJob方法触发真正的提交动作。在tryToSubmitJob方法中,一个JobGraph的提交将会分为两步:

  1. 将用户程序相关的Jar包上传至JobManager;
  2. 给JobManager Actor发送封装JobGraph的SubmitJob消息;

随后,JobManager Actor会接收到来自JobClientActor的SubmitJob消息,进而触发submitJob方法,该方法的执行主体已经是JobManager了。submitJob包含的逻辑较为复杂,且任何一个检测或者子调用所产生的异常都可能会导致提交失败。我们列举一下该方法完成的主要任务:

  1. 向BlobLibraryCacheManager注册该Job;
  2. 构建ExecutionGraph对象;
  3. 对JobGraph中的每个顶点进行初始化;
  4. 将DAG拓扑中从source开始排序,排序后的顶点集合附加到ExecutionGraph对象;
  5. 获取检查点相关的配置,并将其设置到ExecutionGraph对象;
  6. 向ExecutionGraph注册相关的listener;
  7. 执行恢复操作或者将JobGraph信息写入SubmittedJobGraphStore以在后续用于恢复目的;
  8. 响应给客户端JobSubmitSuccess消息;
  9. 对ExecutionGraph对象进行调度执行;

如果提交流程顺利,用户程序包以及描述Job的JobGraph将会被JobManager接收,随后JobManager会对Job进行调度、部署并执行。JobClient会阻塞等待提交结果返回。在得到返回结果之后,先进行解析判断它是否是Job被成功执行后返回的结果:

if (answer instanceof JobManagerMessages.JobResultSuccess) {
    SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
    if (result != null) {
        try {
            return result.toJobExecutionResult(classLoader);
         } catch (Throwable t) {
             throw new JobExecutionException(jobGraph.getJobID(),
             "Job was successfully executed but JobExecutionResult could not be deserialized.");
        }
    } else {
         throw new JobExecutionException(jobGraph.getJobID(),
         "Job was successfully executed but result contained a null JobExecutionResult.");
    }
}

还是失败后返回的结果:

if (answer instanceof JobManagerMessages.JobResultFailure) {
    LOG.info("Job execution failed");
    SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause();
    if (serThrowable != null) {
        Throwable cause = serThrowable.deserializeError(classLoader);
        if (cause instanceof JobExecutionException) {
            throw (JobExecutionException) cause;
        } else {
            throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause);
        }
    } else {
        throw new JobExecutionException(jobGraph.getJobID(),
        "Job execution failed with null as failure cause.");
    }
} else {
    throw new JobExecutionException(jobGraph.getJobID(),
        "Unknown answer from JobManager after submitting the job: " + answer);
}

以上就是批处理作业和流处理作业共同的提交流程,这中间涉及了JobManager接收到用户提交后一系列处理,这部分的处理细节我们随后进行分析。


原文发布时间为:2017-03-31

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
消息中间件 Kafka 流计算
如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
【2月更文挑战第30天】如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
20 2
|
1月前
|
Kubernetes 网络协议 Java
在Kubernetes上运行Flink应用程序时
【2月更文挑战第27天】在Kubernetes上运行Flink应用程序时
37 10
|
1月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
86 1
|
1月前
|
监控 Apache 开发工具
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
67 0
|
1月前
|
Java 关系型数据库 数据库
在Windows上运行Flink
【2月更文挑战第16天】在Windows上运行Flink
63 2
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
482 5
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1354 1
官宣|Apache Flink 1.19 发布公告
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
143 3
|
1月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
152 0