Flink运行时之流处理程序生成流图

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 流处理程序生成流图 DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph)。 什么是流图 流图(StreamGraph)是表示流处理程序拓扑的数据结构,它封装了生成作业图(JobGraph)的必要信息。

流处理程序生成流图

DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph)。

什么是流图

流图(StreamGraph)是表示流处理程序拓扑的数据结构,它封装了生成作业图(JobGraph)的必要信息。它的类继承关系如下图所示:

StreamGraph-class-diagram

当你基于StreamGraph的继承链向上追溯,会发现它实现了FlinkPlan接口。

Flink效仿了传统的关系型数据库在执行SQL时生成执行计划并对其进行优化的思路。FlinkPlan是Flink生成执行计划的基接口,定义在Flink优化器模块中,流处理程序对应的计划是StreamingPlan,但是当前针对流处理程序没有进行优化,因此这个类可看作是一个预留设计。

一个简单的实现“word count”的流处理程序,其StreamGraph的形象化表示如下图:

Flink-StreamGraph

Flink官方提供了一个计划可视化器来图形化执行计划,该计划可视化器基于Flink API所生成的计划的JSON格式表示绘制图形。但是需要注意的是,计划的JSON形式表示缺失了很多属性以及部分节点(比如虚拟节点等);

上面的图是由“节点”和“边”组成的。节点在Flink中对应的数据结构是StreamNode,而边对应的数据结构是StreamEdge,StreamNode和StreamEdge之间有着双向的依赖关系。StreamEdge包含了其连接的源节点sourceVertex和目的节点targetVertex:

StreamEdge-structure

而StreamNode中包含了与其连接的入边集合inEdges和出边集合outEdges:

StreamNode-structure

StreamEdge和StreamNode都有唯一的编号进行标识,但是各自编号的生成规则并不相同。

StreamNode的编号id的生成是通过调用StreamTransformation的静态方法getNewNodeId获得的,其实现是一个静态计数器:

protected static Integer idCounter = 0;
public static int getNewNodeId() {   
    idCounter++;   
    return idCounter;
}

StreamEdge的编号edgeId是字符串类型,其生成的规则为:

this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames 
                + "_" + outputPartitioner;

它是由多个段连接起来的,语义的文字表述如下:

源顶点_目的顶点_输入类型数量_输出选择器的名称_输出分区器

edgeId除了用来实现StreamEdge的hashCode及equals方法之外并没有其他实际意义。

StreamNode是表示流处理中算子的数据结构,source和sink在StreamGraph中也是以StreamNode表示,它们也是一种算子,只是因为它们是流的输入和输出因而有特定的称呼。

StreamNode除了存储了输入端和输出端的StreamEdge集合,还封装了算子的其他关键属性,比如其并行度、分区的键信息、输入与输出类型的序列化器等。

从直观上来看你已经知道了StreamNode和StreamEdge是StreamGraph的重要组成部分,但是为了生成JobGraph,StreamGraph很显然必须得包含更多的内容。总结一下,StreamGraph中包含的属性可分为三大类:

  • 流处理程序的执行配置;
  • 流处理程序拓扑中包含的节点和边的信息;
  • 迭代相关的信息;

当然围绕这些属性的方法非常多,比如添加边和节点,创建迭代的source/sink等。

其中的一个关键方法getJobGraph将用于生成JobGraph:

public JobGraph getJobGraph() {     
    if (isIterative() && checkpointConfig.isCheckpointingEnabled() 
        && !checkpointConfig.isForceCheckpointing()) {      
        throw new UnsupportedOperationException(            
            "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "                  
            + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "                  
            + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");   
    }   
    StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);   
    return jobgraphGenerator.createJobGraph();
}

从上面的代码段也可见,当流处理程序中包含迭代逻辑时,检查点功能暂时不被支持,在异常信息中Flink阐述了缘由:在迭代作业中无法保证“恰好一次”的语义。

流处理程序依赖StreamingJobGraphGenerator来生成JobGraph,至于如何生成,后续会进行剖析。

生成流图的源码分析

了解了什么是流图(StreamGraph)之后,我们来分析它是如何生成的。流图的生成是通过StreamExecutionEnvironment的getStreamGraph实例方法触发的:

public StreamGraph getStreamGraph() {   
    if (transformations.size() <= 0) {      
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");   
    }   
    return StreamGraphGenerator.generate(this, transformations);
}

从代码段中可见,StreamGraph的生成依赖于一个名为transformations的集合对象,它是环境对象所收集到的所有的转换对象的集合,该集合中存储着一个流处理程序中所有的转换操作对应的StreamTransformation对象。

每当在DataStream对象上调用transform方法或者调用已经被实现了的一些内置的转换函数(如map、filter等,这些转换函数在内部也调用了transform方法),这些调用都会使得其对应的转换对象被加入到transformations集合中去。StreamTransformation表示创建DataStream对象的转换,流处理程序中存在多种DataStream,每种底层都对应着一个StreamTransformation。DataStream持有执行环境对象的引用,当调用transform方法时,它会调用执行环境对象的addOperator方法,将特定的StreamTransformation对象加入到transformations集合中去,这就是transformations集合中元素的来源。

DataStream API的设计存在着多重对象的封装,我们以flatMap转换操作为例图示各种对象之间的构建关系:

StreamTransformation-relationship-with-others

在Flink的源码中,这些对象的命名也并不是那么准确,比如上图中的SingleOutputStreamOperator其实是一种DataStream,但却以Operator结尾,让人匪夷所思。因此较为准确的鉴定它们类型的方式是通过查看它们的继承链来进行识别。

StreamGraph的生成依赖于生成器StreamGraphGenerator,每调用一次静态方法generate才会在内部创建一个StreamGraphGenerator的实例,一个实例对应着一个StreamGraph对象。StreamGraphGenerator调用内部的实例方法generateInternal来遍历transformations集合的每个对象:

private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {   
    for (StreamTransformation<?> transformation: transformations) {
        transform(transformation);   
    }   
    return streamGraph;
}

在transform方法中,它枚举了Flink中每一种转换类型,并对当前传入的转换类型进行判断,然后将其分发给特定的转换方法进行转换,最终返回当前StreamGraph对象中跟该转换有关的节点编号集合。

这里我们以常用的单输入转换方法transformOnInputTransform为例来进行分析:

private <IN, OUT> Collection<Integer> transformOnInputTransform(
    OneInputTransformation<IN, OUT> transform) {
    //递归地对该转换的输入端进行转换   
    Collection<Integer> inputIds = transform(transform.getInput());   
    // 递归调用可能会产生重复,这里需要以转换过的对象进行检查   
    if (alreadyTransformed.containsKey(transform)) {      
        return alreadyTransformed.get(transform);   
    }

    //结合输入端对应的节点编号来判断并得出槽共享组的名称   
    String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);   
    //将当前算子(节点)加入到流图中
    streamGraph.addOperator(transform.getId(),         
        slotSharingGroup,         
        transform.getOperator(),         
        transform.getInputType(),         
        transform.getOutputType(),         
        transform.getName());
    //如果有键选择器,则进行设置   
    if (transform.getStateKeySelector() != null) {      
        TypeSerializer<?> keySerializer = 
            transform.getStateKeyType().createSerializer(env.getConfig());      
        streamGraph.setOneInputStateKey(transform.getId(), 
            transform.getStateKeySelector(), keySerializer);   
    }   
    streamGraph.setParallelism(transform.getId(), transform.getParallelism()); 
    //构建从当前转换对应的节点到输入转换对应的节点之间的边  
    for (Integer inputId: inputIds) {      
        streamGraph.addEdge(inputId, transform.getId(), 0);   
    }   
    //返回当前转换对应的节点编号
    return Collections.singleton(transform.getId());
}

每遍历完一个转换对象,就离构建完整的流图更近一步。不同的转换操作类型,它们为流图提供的“部件”并不完全相同,有的转换只构建节点(如SourceTransformation),有的转换除了构建节点还构建边(如SinkTransformation),有的只构建虚拟节点(如PartitionTransformation、SelectTransformation等)。

关于虚拟节点,这里需要说明的是并非所有转换操作都具有实际的物理意义(即物理上对应具体的算子)。有些转换操作只是逻辑概念(例如select,split,partition,union),它们不会构建真实的StreamNode对象。比如某个流处理应用对应的转换树如下图:

StreamTransformation-demo

但在运行时,其生成的StreamGraph却是下面这种形式:

StreamGraph-demo

从图中可以看到,转换树中对应的一些逻辑操作在StreamGraph中并不存在,Flink将这些逻辑转换操作转换成了虚拟节点,它们的信息会被绑定到从source到map转换的边上。

Flink当前对于流处理的程序是不作优化的,所以StreamGraph就是它的执行计划。你可以通过Flink提供的执行计划的可视化器将StreamGraph所表述的信息以图形化的方式展示出来,就像上文我们展示的那幅图一样。那么我们如何查看我们自己所编写的程序的执行计划呢?其实很简单,我们以Flink源码中flink-examples-streaming模块中的SocketTextStreamWordCount为例,来看一下如何生成执行计划。

我们将SocketTextStreamWordCount最后一行代码注释掉:

env.execute("WordCount from SocketTextStream Example");

然后将其替换成下面这句:

System.out.println(env.getExecutionPlan());

这行语句的作用是打印当前这个程序的执行计划,它将在控制台产生该执行计划的JSON格式表示:

{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream",
"parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":2,
"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Keyed Aggregation",
"pact":"Operator","contents":"Keyed Aggregation","parallelism":2,"predecessors":[{"id":2,
"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Unnamed","pact":"Data Sink",
"contents":"Sink: Unnamed","parallelism":2,"predecessors":[{"id":4,"ship_strategy":"FORWARD",
"side":"second"}]}]}

把上面这段JSON字符串复制到Flink的执行计划可视化器的输入框中,然后点击下方的“Draw”按钮,即可生成。


原文发布时间为:2017-02-05

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
44 0
|
1月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
1月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
46 0
|
3月前
|
Java 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何在程序因故停掉后能从之前的Binlog位置继续读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
65 1
|
1月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
41 0
|
3月前
|
存储 缓存 Java
实时计算 Flink版操作报错合集之怎么处理在运行作业时遇到报错::ClassCastException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
53 0
|
3月前
|
消息中间件 监控 关系型数据库
实时计算 Flink版产品使用问题之运行后,怎么进行监控和报警
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
72 0