Flink运行时之客户端提交作业图-上

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 客户端提交作业图 作业图(JobGraph)是Flink的运行时所能理解的作业表示,无论程序通过是DataStream还是DataSet API编写的,它们的JobGraph提交给JobManager以及之后的处理都将得到统一。


客户端提交作业图

作业图(JobGraph)是Flink的运行时所能理解的作业表示,无论程序通过是DataStream还是DataSet API编写的,它们的JobGraph提交给JobManager以及之后的处理都将得到统一。本篇我们将分析客户端如何提交JobGraph给JobManager。

流处理程序提交作业图

在前面讲解Flink的核心概念的时候我们谈到了Flink利用了“惰性求值”的概念,只有当最终调用execute方法时,才会真正开始执行。因此,execute方法是我们的切入点。

DataStream API所编写的程序生成作业图之后,在提交时产生的方法调用时序图示意如下:

上图中的多个run方法是同名的方法重载。

从时序图中可以看到,ClusterClient对其自身抽象方法submitJob的调用是触发作业图提交的方法。随后真正的提交逻辑由JobClient实现。

ClusterClient封装了提交一个程序到远程集群的必要的功能,而StandaloneClusterClient则扩展了ClusterClient的功能,它专门针对独立的集群提供服务,这两个类都位于flink-clients模块中。JobClient则负责将用户的Job提交给JobManager,它充当了提交代理的角色,并返回表示作业执行结果的JobExecutionResult对象。

JobClient是提交所有类型的Job的统一入口,具体的提交细节我们将会在“公共的提交流程”中详细分析。

批处理程序提交作业图

利用DataSet API所编写的程序生成作业图之后,在提交时产生的方法调用的时序图如下:

上图中出现多个重名的run方法为同名方法重载。

从上图中可以看到,批处理程序的JobGraph跟流处理程序的JobGraph在提交之前有非常明显的不同。它引入了PlanExecutor作为Flink程序的计划执行器。而RemoteExecutor是PlanExecutor的实现,用于将程序提交给远程的Flink集群。具体的提交动作被进一步委托给ClusterClient及其实现(StandaloneClusterClient)最终同样被JobClient代理提交给JobManager。

公共的提交流程

从前面的时序图可见Flink对于不同类型的程序的提交流程最终是殊途同归的。因此,接下来我们将对公共的提交流程进行分析。一个程序的JobGraph真正被提交始于对JobClient的submitJobAndWait方法的调用。

submitJobAndWait方法用于将一个JobGraph发送到指定的JobClient actor,随后它会将该JobGraph转发给JobManager。该方法会一直阻塞,直到该作业执行完成或者感知不到JobManager的存活。如果作业被顺利执行完成则返回JobExecutionResult对象而如果JobManager产生故障,则抛出抛出JobExecutionException异常。

一个JobGraph从提交开始会经过多个对象层层递交,各个对象之间的交互关系如下图所示:

JobClient在其中起到了“桥接”作用,它桥接了同步的方法调用和异步的消息通信。更具体得说,JobClient可以看做是一个“静态类”提供了一些静态方法,这里我们主要关注上面的submitJobAndWait方法,该方法内部封装了Actor之间的异步通信(具体的通信对象是JobClientActor,它负责跟JobManager的ActorSystem的Actor对象进行通信),并以阻塞的形式返回结果。而ClusterClient只需调用JobClient的submitJobAndWait方法,而无需关注其内部是如何实现的。

通过调用JobClient的submitJobAndWait静态方法,会触发基于Akka的Actor之间的消息通信来完成后续的提交JobGraph的动作。这之间的交互示意图如下:

这里总共有两个ActorSystem,一个归属于JobClient,另一个归属于JobManager。在submitJobAndWait方法中,其首先会创建一个JobClientActor的ActorRef:

ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

然后向其发起一个SubmitJobAndWait消息,该消息将JobGraph的实例提交给JobClientActor。发起模式是ask,它表示需要一个应答消息。

Akka的消息通信模型有两种:

  1. Fire and forget:消息的生产者不期望从消息的消费者那里得到应答。这种消息会以异步的形式发送,发送方法在发送完成之后立即返回。Akka的actor使用tell方法发送这种消息。
  2. Send and receive:消息的生产者期待并将等待从消费者那里得到应答。这种消息也会以异步的形式发送,发送完成后会返回一个Future对象,该对象表示一个潜在的应答。Akka的actor使用ask方法发送这种消息,并通过Future来获取响应。

JobClient向JobClientActor发送消息的代码段如下:

Future<Object> future = Patterns.ask(jobClientActor,
                                     new JobClientMessages.SubmitJobAndWait(jobGraph),
                                     new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());

该SubmitJobAndWait消息被JobClientActor接收后,最终通过调用tryToSubmitJob方法触发真正的提交动作。在tryToSubmitJob方法中,一个JobGraph的提交将会分为两步:

  1. 将用户程序相关的Jar包上传至JobManager;
  2. 给JobManager Actor发送封装JobGraph的SubmitJob消息;

随后,JobManager Actor会接收到来自JobClientActor的SubmitJob消息,进而触发submitJob方法,该方法的执行主体已经是JobManager了。submitJob包含的逻辑较为复杂,且任何一个检测或者子调用所产生的异常都可能会导致提交失败。我们列举一下该方法完成的主要任务:

  1. 向BlobLibraryCacheManager注册该Job;
  2. 构建ExecutionGraph对象;
  3. 对JobGraph中的每个顶点进行初始化;
  4. 将DAG拓扑中从source开始排序,排序后的顶点集合附加到ExecutionGraph对象;
  5. 获取检查点相关的配置,并将其设置到ExecutionGraph对象;
  6. 向ExecutionGraph注册相关的listener;
  7. 执行恢复操作或者将JobGraph信息写入SubmittedJobGraphStore以在后续用于恢复目的;
  8. 响应给客户端JobSubmitSuccess消息;
  9. 对ExecutionGraph对象进行调度执行;

如果提交流程顺利,用户程序包以及描述Job的JobGraph将会被JobManager接收,随后JobManager会对Job进行调度、部署并执行。JobClient会阻塞等待提交结果返回。在得到返回结果之后,先进行解析判断它是否是Job被成功执行后返回的结果:

if (answer instanceof JobManagerMessages.JobResultSuccess) {
    SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
    if (result != null) {
        try {
            return result.toJobExecutionResult(classLoader);
         } catch (Throwable t) {
             throw new JobExecutionException(jobGraph.getJobID(),
             "Job was successfully executed but JobExecutionResult could not be deserialized.");
        }
    } else {
         throw new JobExecutionException(jobGraph.getJobID(),
         "Job was successfully executed but result contained a null JobExecutionResult.");
    }
}

还是失败后返回的结果:

if (answer instanceof JobManagerMessages.JobResultFailure) {
    LOG.info("Job execution failed");
    SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause();
    if (serThrowable != null) {
        Throwable cause = serThrowable.deserializeError(classLoader);
        if (cause instanceof JobExecutionException) {
            throw (JobExecutionException) cause;
        } else {
            throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause);
        }
    } else {
        throw new JobExecutionException(jobGraph.getJobID(),
        "Job execution failed with null as failure cause.");
    }
} else {
    throw new JobExecutionException(jobGraph.getJobID(),
        "Unknown answer from JobManager after submitting the job: " + answer);
}

以上就是批处理作业和流处理作业共同的提交流程,这中间涉及了JobManager接收到用户提交后一系列处理,这部分的处理细节我们随后进行分析。


原文发布时间为:2017-03-31

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
30天前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
95 0
|
30天前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
58 1
|
30天前
|
消息中间件 分布式计算 大数据
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
83 0
|
3月前
|
消息中间件 监控 关系型数据库
实时计算 Flink版产品使用问题之运行后,怎么进行监控和报警
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何从savepoint重新启动作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
机器学习/深度学习 人工智能 运维
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
|
3月前
|
监控 Serverless Apache
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
7天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
569 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。