Apache Flink Client生成StreamGraph

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 概述 上文我们分析提交流程时,RemoteStreamEnvironment类的execute方法的第一步就是生成StreamGraph。 StreamGraph是用于表示流的拓扑结构的数据结构,它包含了生成JobGraph的必要信息。

概述

上文我们分析提交流程时,RemoteStreamEnvironment类的execute方法的第一步就是生成StreamGraph

StreamGraph是用于表示流的拓扑结构的数据结构,它包含了生成JobGraph的必要信息。它的类继承关系图如下:

StreamGraph-class-diagram

如果你按照StreamGraph的继承链向上追溯,最终会发现它实现了接口FlinkPlan。Flink在这里效仿的是数据库的执行SQL是产生执行计划的机制,FlinkPlan定义在Flink的优化器相关的包中,针对流应用的计划是StreamingPlan

针对Batch类的应用的计划类是OptimizedPlan。Flink会对Batch类的应用进行优化(这点我们后面会分析),而当前针对Streaming类的应用没有优化措施。

StreamGraph的形象化表示如下图:

Flink-StreamGraph

Flink官方提供了一个计划可视化器来图形化执行计划

节点和边

上面的图是由“节点”和“边”组成的。节点在Flink中对应的数据结构是StreamNode,而边在Flink中对应的数据结构是StreamEdgeStreamNodeStreamEdge之间存在着组合的依赖关系,依赖关系可见下图:

StreamNode-StreamEdge-relationship

StreamEdge包含了其连接的源节点sourceVertex和目的节点targetVertex,而StreamNode中包含了与其连接的入边集合inEdges和出边集合outEdgesStreamEdgeStreamNode都有唯一的编号进行标识,但是各自编号的生成规则并不相同。

StreamNode的编号id的生成是通过调用StreamTransformation的静态方法getNewNodeId获得的,其实现是一个静态计数器:

// This is used to assign a unique ID to every StreamTransformation
protected static Integer idCounter = 0;

public static int getNewNodeId() {   
    idCounter++;   
    return idCounter;
}

StreamEdge的编号edgeId是字符串类型,其生成的规则为:

this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames + "_" + outputPartitioner;

它是由多个段连接起来的,语义的文字表述如下:

源顶点_目的顶点_输入类型数量_输出选择器的名称_输出分区器

edgeId除了用来实现StreamEdge的hashCode及equals方法之外并没有其他实际意义。

StreamNode其实是表示operator的数据结构,了解这一点很重要。从Flink开始生成StreamGraph开始,source、sink都是图中的一个节点都是operator,都通过StreamNode这一数据结构来表示,我们常将它们单独拎出来讲是因为它们是流的的输入和输出,但在数据结构层面上它们是一致的。

StreamNode除了存储了输入端和输出端的StreamEdge集合,还封装了operator的其他关键属性,基于这不是我们关注的重点,所以不再赘述。

回过头来我们看JobGraph就不是那么难理解了。它包含了表述整个流拓扑的所有必要信息(比如所有的节点集合、所有的source集合、所有的sink集合、虚拟输出选择节点、虚拟分区节点)。同时还包含了大量操作这些信息的方法。

生成StreamGraph

了解了基础的数据结构之后,我们来分析如何生成JobGraph。定位到getStreamGraph的实现:


public StreamGraph getStreamGraph() {   
    if (transformations.size() <= 0) {      
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");   
    }   

    return StreamGraphGenerator.generate(this, transformations);
}

它依赖于transformations集合,该集合中存储着一个Streaming程序中所有的转换操作对应的StreamTransformation对象。

每当在DataStream对象上调用transform方法或者调用已经被实现了的一些转换操作(如map、flter等,这些转换操作在内部也调用了transform方法),这些调用都会被加入到transformations集合中。

StreamTransformation表示创建DataStream的操作,其实每个DataStream底层都对应着一个StreamTransformation。DataStream持有执行环境对象的引用,当调用transform方法时,它会调用执行环境对象的addOperator方法,将特定的StreamTransformation对象加入到transformations集合中去,这就是transformations集合中元素的来源。

到目前为止我们提到了多个名词,它们之前拥有着强依赖关系,为了避免混淆,我们以flatMap转换操作为例图示各种对象之间的构建关系:

Stream-Object-relationship

在源码中,其实Flink自身的命名也并不是那么准确,比如上图中的SingleOutputStreamOperator其实是一种DataStream,但却以Operator结尾,让人匪夷所思。这种情况下,鉴定它们类型的方式可以通过查看它们的继承链来进行识别。

StreamGraph的生成依赖于生成器StreamGraphGenerator,每调用一次静态方法generate才会在内部创建一个StreamGraphGenerator的实例,一个实例对应着一个StreamGraph对象。StreamGraphGenerator调用内部的实例方法generateInternal来遍历transformations集合的每个对象:


private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {   
    for (StreamTransformation<?> transformation: transformations) {
        transform(transformation);   
    }   

    return streamGraph;
}

transform方法中,它枚举了Flink中每一种转换类型,并对当前传入的转换类型进行判断,然后将其分发给特定的转换方法进行转换,最终返回当前StreamGraph对象中跟该转换有关的节点编号集合。

你可以将整个过程看作是玩拼图游戏,每遍历完一个转换对象,就离构建完整的StreamGraph更近一步。所有类型各异的转换操作各自持有整个StreamGraph的一部分小图片,根据不同的转换操作类型,它们为StreamGraph提供的“部件”并不完全相同,有的转换只构建节点(如SourceTransformation),有的转换除了构建节点还构建边(如SinkTransformation),有的只构建虚拟节点(如PartitionTransformationSplitTransformationSelectTransformation)。

关于虚拟节点,这里需要说明的是并非所有转换操作都具有实际的物理意义(即物理上对应operator)。有些转换操作只具有逻辑概念,例如unionsplitselectpartition。这些转换操作不会构建真实的StreamNode对象。比如某个流处理应用对应的转换树如下图:

StreamTransformation-demo

但在运行时,其生成的执行计划,这里也就等同于StreamGraph却是下图这种形式:

StreamGraph-demo

从图中可以看到,转换图中对应的一些逻辑操作在产生的执行计划时并不存在,Flink将这些逻辑转换操作转换成了虚拟节点,它们的信息会被绑定到从sourcemap转换的这条边上。

在给StreamGraph创建并添加一个operator时,需要给该operator指定slotSharingGroup,这时需要调用方法determineSlotSharingGroup来获得SlotSharingGroup的名称:

private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {   
    if (specifiedGroup != null) {      
        return specifiedGroup;   
    } else {      
        String inputGroup = null;      
        for (int id: inputIds) {         
            String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);         
            if (inputGroup == null) {            
                inputGroup = inputGroupCandidate;         
            } else if (!inputGroup.equals(inputGroupCandidate)) {            
                return "default";         
            }      
        }      

        return inputGroup == null ? "default" : inputGroup;   
    }
}

当用户指定了组名,则直接使用用户指定的名称。如果用户没有指定特定的名称,则需要结合输入节点来做决定:第一种情况如果所有的输入节点都拥有相同的slotSharingGroup名称,那么就使用该组名;否则组名将被命名为default

Flink当前对于流处理的应用是不作优化的,所以其执行计划就是StreamGraph。Flink提供了一个执行计划的可视化器,它将客户端生成的执行计划以图形的方式展示出来,就像本节开始我们展示的那幅图就是可视化器生成的。那么我们怎么来查看我们自己编写的程序的执行计划呢?其实很简单,我们以Flink的flink-examples-streaming包中的SocketTextStreamWordCount为例,来看一下如何生成执行计划。

我们将SocketTextStreamWordCount最后一行代码注释掉:

env.execute("WordCount from SocketTextStream Example");

然后将其替换成下面这句:

System.out.println(env.getExecutionPlan());

这行语句的作用是打印当前这个程序的执行计划,它将在控制台产生该执行计划的JSON格式表示:

{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream",
"parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":2,
"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Keyed Aggregation",
"pact":"Operator","contents":"Keyed Aggregation","parallelism":2,"predecessors":[{"id":2,
"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Unnamed","pact":"Data Sink",
"contents":"Sink: Unnamed","parallelism":2,"predecessors":[{"id":4,"ship_strategy":"FORWARD",
"side":"second"}]}]}System.out.println(env.getExecutionPlan());

把上面这段JSON复制到Flink的执行计划可视化器,点击下方的Draw按钮,即可生成。

小结

本文我们谈论了StreamGraph的数据结构以及StreamGraphGenerator如何生成StreamGraph。鉴于StreamEdgeStreamNode是组成StreamGraph不可或缺的部分,我们还对这两个数据结构进行了简单的分析。当然,StreamGraph还有一个关键的实例方法:getJobGraph,它用于获取流处理程序的JobGraph(该方法继承自StreamingPlan)。至于什么是JobGraph以及如何获取它,我们将在下文进行讨论。



原文发布时间为:2016-07-23


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
413 33
The Past, Present and Future of Apache Flink
|
4月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1054 13
Apache Flink 2.0-preview released
|
4月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
182 3
|
5月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
487 31
Apache Flink 流批融合技术介绍
|
4月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
128 1
|
4月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
414 0
|
4月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
123 0
|
6月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
64 1
|
5月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
6月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
409 2

热门文章

最新文章

推荐镜像

更多