2.1.3 虚拟 Transformation 的转换
虚拟的 Transformation 生成的时候不会转换为 SteramNode,而是添加为虚拟节点。
private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag, ShuffleMode shuffleMode) { //当上游是sideoutput时,递归调用,并传入sideoutput信息 if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; if (outputTag == null) { outputTag = virtualSideOutputNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode); } //当上游是select时,递归调用,并传入select信息 else if (virtualSelectNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSelectNodes.get(virtualId).f0; if (outputNames.isEmpty()) { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode); } //当上游是Partition时,递归调用,并传入Partition信息 else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } shuffleMode = virtualPartitionNodes.get(virtualId).f2; addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode); } //不是以上逻辑转换的情况,真正构建StreamEdge else { StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); // If no partitioner was specified and the parallelism of upstream and downstream // operator matches use forward partitioning, use rebalance otherwise. //没有指定partitioner时,会为其选择forward或者rebalance if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner<Object>(); } else if (partitioner == null) { partitioner = new RebalancePartitioner<Object>(); } if (partitioner instanceof ForwardPartitioner) { if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { throw new UnsupportedOperationException("Forward partitioning does not allow " + "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); } } if (shuffleMode == null) { shuffleMode = ShuffleMode.UNDEFINED; } //创建StreamEdge,并将该SteramEdge添加到上游的输出,下游的输入。 StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode); getStreamNode(edge.getSourceId()).addOutEdge(edge); getStreamNode(edge.getTargetId()).addInEdge(edge); } }
2.2 作业图
JobGraph 可以由流计算的 StreamGraph 和批处理的 OptimizedPlan 转换而来。流计算中,在 StreamGraph 的基础上进行了一些优化,如果通过 OperatorChain 机制将算子合并起来,在执行时,调度在同一个 Task 线程上,避免数据的跨线程、跨网段的传递。
2.2.1 JobGraph 核心对象
- JobVertex
经过算子融合优化后符合条件的多个 SteramNode 可能会融合在一起生成一个 JobVertex,即一个 JobVertex 包含一个或多个算子,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。 - JobEdge
JobEdge 是 JobGraph 中连接 IntermediateDataSet 和 JobVertex 的边,表示 JobGraph 中的一个数据流转通道,其上游数据源是 IntermediateDataSet,下游消费者是 JobVertex。数据通过 JobEdge 由 IntermediateDataSet 传递给 JobVertex。 - IntermediateDataSet
中间数据集 IntermediateDataSet 是一种逻辑结构,用来表示 JobVertex 的输出,即该 JobVertex 中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。
IntermediateDataSet 的个数与该 JobVertex 对应的 StreamNode 的出边数量相同,可以是一个或者多个。
2.2.2 JobGraph 生成过程
StreamingJobGraphGenerator 负责流计算 JobGraph 的生成,在转换前需要进行一系列的预处理。
private JobGraph createJobGraph() { preValidate(); // make sure that all vertices start immediately //设置调度模式 jobGraph.setScheduleMode(streamGraph.getScheduleMode()); // Generate deterministic hashes for the nodes in order to identify them across // submission iff they didn't change. //为每个节点生成确定的hashid作为唯一表示,在提交和执行过程中保持不变。 Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); // Generate legacy version hashes for backwards compatibility //为了向后保持兼容,为每个节点生成老版本的hash id List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); for (StreamGraphHasher hasher : legacyStreamGraphHashers) { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>(); //真正对SteramGraph进行转换,生成JobGraph图 setChaining(hashes, legacyHashes, chainedOperatorHashes); setPhysicalEdges(); //设置共享slotgroup setSlotSharingAndCoLocation(); setManagedMemoryFraction( Collections.unmodifiableMap(jobVertices), Collections.unmodifiableMap(vertexConfigs), Collections.unmodifiableMap(chainedConfigs), id -> streamGraph.getStreamNode(id).getMinResources(), id -> streamGraph.getStreamNode(id).getManagedMemoryWeight()); //配置checkpoint configureCheckpointing(); jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings()); //如果有之前的缓存文件的配置,则重新读入 JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph); // set the ExecutionConfig last when it has been finalized try { //设置执行环境配置 jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); } catch (IOException e) { throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." + "This indicates that non-serializable types (like custom serializers) were registered"); } return jobGraph; }
预处理完毕后,开始构建 JobGraph 中的点和边,从 Source 向下遍历 StreamGraph,逐步创建 JObGraph,在创建的过程中同事完成算子融合(OperatorChain)优化。
执行具体的 Chain 和 JobVertex 生成、JobEdge 的关联、IntermediateDataSet。从 StreamGraph 读取数据的 StreamNode 开始,递归遍历同时将 StreamOperator 连接在一起。
整理构建的逻辑如下(看上图!!!):
1)从 Source 开始,Source 与下游的 FlatMap 不可连接,Source 是起始节点,自己成为一个 JobVertx。
2)此时开始一个新的连接分析,FlatMap 是起始节点,与下游的 KeyedAgg 也不可以连接,那么 FlatMap 自己成为一个 JobVertex。
3)此时开始一个新的连接分析。KeyedAgg 是起始节点,并且与下游的 Sink 可以连接,那么递归地分析 Sink 节点,构造 Sink 与其下游是否可以连接,因为 Slink 没有下游,所以 KeyedAgg 和 Sink 节点连接在一起,共同构成了一个 JobVertex。在这个 JobVertex 中,KeyedAgg 是起始节点,index 编号为 0,sink 节点 index 编号为 1.
构建 JobVertex 的时候需要将 StreamNode 中的重要配置信息复制到 JobVertex 中。构建好 JobVertex 之后,需要构建 JobEdge 将 JobVertex 连接起来。KeyedAgg 和 Sink 之间构成了一个算子连接,连接内部的算子之间无序构成 JobEdge 进行连接。
在构建 JobEdge 的时候,很重要的一点是确定上游 JobVertex 和下游 JobVertex 的数据交换方式。此时根据 ShuffleMode 来确定 ResultPartition 类型,用 FlinkPartition 来确定 JobVertex 的连接方式。
Shuffle 确定了 ResultPartition,那么就可以确定上游 JobVertex 输出的 IntermediateDataSet 的类型了,也就知道 JobEdge 的输入 IntermediateDataSet。
ForwardPartitioner 和 RescalePartitioner 两种类型的 Partitioner 转换为 DistributionPattern.POINTWISE 的分发模式。其他类型的 Partitioner 统一转换为 DistributionPattern.ALL_TO_ALL 模式。
JobGraph 的构建和 OperatorChain 优化:
private List<StreamEdge> createChain( Integer startNodeId, Integer currentNodeId, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, int chainIndex, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { if (!builtVertices.contains(startNodeId)) { List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>(); List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>(); List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>(); StreamNode currentNode = streamGraph.getStreamNode(currentNodeId); //获取当前节点的出边,判断是否符合OperatorChain的条件 //分为两类:chainableoutputs,nonchainableoutputs for (StreamEdge outEdge : currentNode.getOutEdges()) { if (isChainable(outEdge, streamGraph)) { chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } } //对于chainable的边,递归调用createchain //返回值添加到transitiveOutEdges中 for (StreamEdge chainable : chainableOutputs) { transitiveOutEdges.addAll( createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); } //对于无法chain在一起的边,边的下游节点作为Operatorchain的Head节点 //进行递归调用,返回值添加到transitiveOutEdges中 for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); } List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>()); byte[] primaryHashBytes = hashes.get(currentNodeId); OperatorID currentOperatorId = new OperatorID(primaryHashBytes); for (Map<Integer, byte[]> legacyHash : legacyHashes) { operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId))); } chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs)); if (currentNode.getInputFormat() != null) { getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat()); } if (currentNode.getOutputFormat() != null) { getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat()); } //如果当前节点是起始节点,则直接创建JobVertex,否则返回一个空的StreamConfig StreamConfig config = currentNodeId.equals(startNodeId) ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) : new StreamConfig(new Configuration()); //将StreamNode中的配置信息序列化到Streamconfig中。 setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); //再次判断,如果是Chain的起始节点,执行connect()方法,创建JobEdge和IntermediateDataset //否则将当前节点的StreamConfig 添加到chainedConfig中。 if (currentNodeId.equals(startNodeId)) { config.setChainStart(); config.setChainIndex(0); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); for (StreamEdge edge : transitiveOutEdges) { connect(startNodeId, edge); } config.setOutEdgesInOrder(transitiveOutEdges); config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId)); } else { chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>()); config.setChainIndex(chainIndex); StreamNode node = streamGraph.getStreamNode(currentNodeId); config.setOperatorName(node.getOperatorName()); chainedConfigs.get(startNodeId).put(currentNodeId, config); } config.setOperatorID(currentOperatorId); if (chainableOutputs.isEmpty()) { config.setChainEnd(); } return transitiveOutEdges; } else { return new ArrayList<>(); } }
2.2.3 算子融合
一个 Operatorchain 在同一个 Task 线程内执行。OperatorChain 内的算子之间,在同一个线程内通过方法调用的方式传递数据,能减少线程之间的切换,减少消息的序列化/反序列化,无序借助内存缓存区,也无须通过网络在算子间传递数据,可在减少延迟的同时提高整体吞吐量
operatorchain 的条件:
1)下游节点的入度为 1
2)SteramEdge 的下游节点对应的算子不为 null
3)StreamEdge 的上游节点对应的算子不为 null
4)StreamEdge 的上下游节点拥有相同的 slotSharingGroup,默认都是 default.
5)下游算子的连接策略为 ALWAYS.
6)上游算子的连接策略为 ALWAYS 或者 HEAD.
7)StreamEdge 的分区类型为 ForwardPartitioner
8)上下游节点的并行度一致
9)当前 StreamGraph 允许 chain
2.3 执行图
2.3.1 ExecutionGraph 核心对象
- ExecutionJobVertex
该对象和 JobGraph 中的 JobVertex 一一对应。该对象还包含了一组 ExecutionVertex,数量与该 JobVertex 中所包含的 SteramNode 的并行度一致。
ExecutionJobVertex 用来将一个 JobVertex 封装成一 ExecutionJobVertex,并以此创建 ExecutionVertex、Execution、IntermediateResult 和 IntermediateResultPartition,用于丰富 ExecutionGraph。
在 ExecutionJobVertex 的构造函数中,首先是依据对应的 JobVertex 的并发度,生成对应个数的 ExecutionVertex。其中,一个 ExecutionVertex 代表一个 ExecutionJobVertex 的并发子 Task。然后是将原来 JobVertex 的中间结果 IntermediateDataSet 转化为 ExecutionGrap 中 IntermediateResult - ExecutionVertex
ExecutionJobVertex 中会对作业进行并行化处理,构造可以并行执行的实例,每个并行执行的实例就是 ExecutionVertex.
构建 ExecutionVertex 的同时,也回构建 ExecutionVertex 的输出 IntermediateResult。并且将 ExecutionEdge 输出为 IntermediatePartition。
ExecutionVertex 的构造函数中,首先会创建 IntermediatePartition,并通过 IntermediateResult.setPartition()建立 IntermediateResult 和 IntermediateResultPartition 之间的关系,然后生成 Execution,并配置资源相关。 - IntermediateResult
IntermediateResult 又叫做中间结果集,该对象是个逻辑概念,表示 ExecutionJobVertex 的输出,和 JobGraph 中的 IntermediateDataSet 一一对应,同样,一个 ExecutionJobVertex 可以有多个中间二级果,取决于当前 JobVertex 有几个出边。
一个中间结果集包含多个中间结果分区 IntermediateResultPartition,其个数等于该 JobVertex 的并发度。 - IntermediateResultPartition IntermediateResultPartition 又叫做中间结果分区,表示 1 个 ExecutionVertex 输出结果,与 ExecutionEdge 相关联。
- ExecutionEdge
表示 ExecutionVertex 的输入,连接到上游产生的 IntermediateResultPartition。一个 Execution 对应于唯一的一个 IntermediateResultPartition 和一个 ExecutionVertex。一个 ExecutionVertex 可以有多个 ExecutionEdge。 - Execution
ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将 ExecutionVertex 中的信息包装为一个 Execution,执行一个 ExecutionVertex 的一次尝试。JobManager 和 TaskManager 之间关于 Task 的部署和 Task 执行状态的更新都是通过 ExecutionAttemptID 来标识实例的。在故障或者数据需要重算的情况下,ExecutionVertex 可能会有多个 ExecutionAttemptID.一个 Execution 通过 ExecutionAttemptID 标识。
2.3.2 ExecutionGrap 生成过程
初始话作业调度器的时候,根据 JobGra ph 生活 ExecutionGraph。在 SchedulerBase 的构造方法中触发构建,最终调用 SchedulerBase#createExecutionGraph 触发实际的构建动作,使用 ExecutionGraphBuiler 构建 ExecutionGraph。
核心代码 attachJobGraph:
构建 ExecutionEdge 的连接策略:
- 点对点连接(DistributionPatter n.POINTWISE)
该策略用来连接当前 ExecutionVertex 与上游的 IntermediataeResultParition。
连接分三种情况
1)一对一连接:并发的 Task 数量与分区数相等。
2)多对一连接:下游的 Task 数量小于上游的分区数,此时分两种情况:
a:下游 Task 可以分配同数量的结果分区 IntermediataeResultParition。如上游有 4 个结果分区,下游有 2 个 Task,那么每个 Task 会分配两个结果分区进行消费。
b:每个 Task 消费的上游分区结果数据不均,如上游有 3 个结果分区,下游有两个 Task,那么一个 Task 分配 2 个结果分区消费,另一个分配一个结果分区消费。
3)一对多连接:下游的 Task 数量多余上游的分区数,此时两种情况:
- a:每个结果分区的下游消费 Task 数据量相同,如上游有两个结果分区,下游有 4 个 Task,每个结果分区被两个 Task 消费。
- b:每个结果分区的下游消费 Task 数量不相同,如上游有两个结果分区,下游有 3 个 Task,那么一个结果分区分配 2 个 Task 消费,另一个结果分区分配一个 Task 消费。
- 全连接(DistributionPattern.ALL_TO_ALL)
该策略下游的 ExecutionVertex 与上 游的所有 IntermediataeResultParition 建立连接,消费其生产的数据。一般全连接的情况意味着数据在 Shuffle。