Flink运行时之客户端提交作业图-下

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: submitJob方法分析 JobClientActor通过向JobManager的Actor发送SubmitJob消息来提交Job,JobManager接收到消息对象之后,构建一个JobInfo对象以封装Job的基本信息,然后将这两个对象传递给submitJob方法: case SubmitJo.

submitJob方法分析

JobClientActor通过向JobManager的Actor发送SubmitJob消息来提交Job,JobManager接收到消息对象之后,构建一个JobInfo对象以封装Job的基本信息,然后将这两个对象传递给submitJob方法:

我们会以submitJob的关键方法调用来串讲其主要逻辑。首先判断jobGraph参数,如果为空则直接回应JobResultFailure消息:

接着,向类库缓存管理器注册该Job相关的库文件、类路径:

必须确保该步骤率先成功执行,因为一旦后续产生任何异常才可以确保上传的类库和Jar等被成功从类库缓存管理器中移除。从这开始的整个代码段都被包裹在try语句块中,一旦捕获到任何异常,会通过libraryCacheManager的unregisterJob方法将相关Jar文件删除:

接下来是获得用户代码的类加载器classLoader以及发生失败时的重启策略restartStrategy:

接着,获得执行图ExecutionGraph对象的实例。首先尝试从缓存中查找,如果缓存中存在则直接返回,否则直接创建然后加入缓存:

获得了executionGraph之后会对其相关属性进行设置,这些属性包括调度模式、是否允许被加入调度队列、计划的Json格式表示。

接下来初始化JobVertex的一些属性:

获得JobGraph中从source开始的按照拓扑顺序排序的顶点集合,然后将该集合附加到ExecutionGraph上,附加的过程完成了很多事情,我们后续进行分析:

接下来将快照配置和检查点配置的信息写入ExecutionGraph:

JobManager自身会注册Job状态变更的事件回调:

如果Client也需要感知到执行结果以及Job状态的变更,那么也会为Client注册事件回调:

以上这些代码从将Job相关的Jar加入到类库缓存管理器开始,都被包裹在try块中,如果产生异常将进入catch代码块中进行异常处理:

异常处理时首先根据jobID移除类库缓存中跟当前Job有关的类库,接着从currentJobsMap中移除job对应的ExecutionGraph,JobInfo元组信息。然后调用ExecutionGraph的fail方法,促使其失败。最后,将产生的异常以JobResultFailure消息告知客户端并结束方法调用。

从当前开始直到最后的这段代码可能会造成阻塞,将会被包裹在future块中并以异步的方式执行。先判断当前的是否是恢复模式,如果是恢复模式则从最近的检查点恢复:

如果不是恢复模式,但快照配置中存在保存点路径,也将基于保存点来重置状态:

然后会把当前的JobGraph信息写入SubmittedJobGraphStore,它主要用于恢复的目的

执行到这一步,就可以向Client回复JobSubmitSuccess消息了:

接下来会基于ExecutionGraph触发Job的调度,这是Task被执行的前提:

为了防止多个JobManager同时调度相同的Job的情况产生,这里首先判断当前节点是否是Leader。如果是,才会进行调度。否则将会向自身发送一条RemoveJob消息,以进入其他处理逻辑。

到此为止,submitJob方法的梳理就算完成了。因为这是JobManager接收到Client提交的Job后的主要处理方法,所以包含的逻辑比较多。


原文发布时间为:2017-04-02

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
消息中间件 Kafka 流计算
如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
【2月更文挑战第30天】如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
22 2
|
2月前
|
Kubernetes 网络协议 Java
在Kubernetes上运行Flink应用程序时
【2月更文挑战第27天】在Kubernetes上运行Flink应用程序时
38 10
|
2月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
95 1
|
2月前
|
监控 Apache 开发工具
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
68 0
|
2月前
|
关系型数据库 MySQL 数据处理
Flink CDC产品常见问题之运行mysql to doris pipeline报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
3月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
596 5
|
2月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1624 2
官宣|Apache Flink 1.19 发布公告
|
2月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
289 3
|
2月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
169 0