【万字长文】详解Flink作业提交流程(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【万字长文】详解Flink作业提交流程

2.1.3 虚拟 Transformation 的转换

虚拟的 Transformation 生成的时候不会转换为 SteramNode,而是添加为虚拟节点。

640.png

private void addEdgeInternal(Integer upStreamVertexID,
   Integer downStreamVertexID,
   int typeNumber,
   StreamPartitioner<?> partitioner,
   List<String> outputNames,
   OutputTag outputTag,
   ShuffleMode shuffleMode) {
  //当上游是sideoutput时,递归调用,并传入sideoutput信息
  if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
   int virtualId = upStreamVertexID;
   upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
   if (outputTag == null) {
    outputTag = virtualSideOutputNodes.get(virtualId).f1;
   }
   addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
  }
  //当上游是select时,递归调用,并传入select信息
  else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
   int virtualId = upStreamVertexID;
   upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
   if (outputNames.isEmpty()) {
    // selections that happen downstream override earlier selections
    outputNames = virtualSelectNodes.get(virtualId).f1;
   }
   addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
  }
  //当上游是Partition时,递归调用,并传入Partition信息
  else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
   int virtualId = upStreamVertexID;
   upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
   if (partitioner == null) {
    partitioner = virtualPartitionNodes.get(virtualId).f1;
   }
   shuffleMode = virtualPartitionNodes.get(virtualId).f2;
   addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
  }
  //不是以上逻辑转换的情况,真正构建StreamEdge
  else {
   StreamNode upstreamNode = getStreamNode(upStreamVertexID);
   StreamNode downstreamNode = getStreamNode(downStreamVertexID);
   // If no partitioner was specified and the parallelism of upstream and downstream
   // operator matches use forward partitioning, use rebalance otherwise.
   //没有指定partitioner时,会为其选择forward或者rebalance
   if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
    partitioner = new ForwardPartitioner<Object>();
   } else if (partitioner == null) {
    partitioner = new RebalancePartitioner<Object>();
   }
   if (partitioner instanceof ForwardPartitioner) {
    if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
     throw new UnsupportedOperationException("Forward partitioning does not allow " +
       "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
       ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
       " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
    }
   }
   if (shuffleMode == null) {
    shuffleMode = ShuffleMode.UNDEFINED;
   }
   //创建StreamEdge,并将该SteramEdge添加到上游的输出,下游的输入。
   StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
   getStreamNode(edge.getSourceId()).addOutEdge(edge);
   getStreamNode(edge.getTargetId()).addInEdge(edge);
  }
 }

2.2 作业图

JobGraph 可以由流计算的 StreamGraph 和批处理的 OptimizedPlan 转换而来。流计算中,在 StreamGraph 的基础上进行了一些优化,如果通过 OperatorChain 机制将算子合并起来,在执行时,调度在同一个 Task 线程上,避免数据的跨线程、跨网段的传递。

640.png

2.2.1 JobGraph 核心对象

  • JobVertex
    经过算子融合优化后符合条件的多个 SteramNode 可能会融合在一起生成一个 JobVertex,即一个 JobVertex 包含一个或多个算子,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。
  • JobEdge
    JobEdge 是 JobGraph 中连接 IntermediateDataSet 和 JobVertex 的边,表示 JobGraph 中的一个数据流转通道,其上游数据源是 IntermediateDataSet,下游消费者是 JobVertex。数据通过 JobEdge 由 IntermediateDataSet 传递给 JobVertex。
  • IntermediateDataSet
    中间数据集 IntermediateDataSet 是一种逻辑结构,用来表示 JobVertex 的输出,即该 JobVertex 中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。
    IntermediateDataSet 的个数与该 JobVertex 对应的 StreamNode 的出边数量相同,可以是一个或者多个。

2.2.2 JobGraph 生成过程

640.png

640.png

StreamingJobGraphGenerator 负责流计算 JobGraph 的生成,在转换前需要进行一系列的预处理。

private JobGraph createJobGraph() {
     preValidate();
     // make sure that all vertices start immediately
     //设置调度模式
     jobGraph.setScheduleMode(streamGraph.getScheduleMode());
     // Generate deterministic hashes for the nodes in order to identify them across
     // submission iff they didn't change.
     //为每个节点生成确定的hashid作为唯一表示,在提交和执行过程中保持不变。
     Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
     // Generate legacy version hashes for backwards compatibility
     //为了向后保持兼容,为每个节点生成老版本的hash id
     List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
     for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
      legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
     }
     Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
     //真正对SteramGraph进行转换,生成JobGraph图
     setChaining(hashes, legacyHashes, chainedOperatorHashes);
     setPhysicalEdges();
     //设置共享slotgroup
     setSlotSharingAndCoLocation();
     setManagedMemoryFraction(
      Collections.unmodifiableMap(jobVertices),
      Collections.unmodifiableMap(vertexConfigs),
      Collections.unmodifiableMap(chainedConfigs),
      id -> streamGraph.getStreamNode(id).getMinResources(),
      id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());
     //配置checkpoint
     configureCheckpointing();
     jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
     //如果有之前的缓存文件的配置,则重新读入
     JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
     // set the ExecutionConfig last when it has been finalized
     try {
      //设置执行环境配置
      jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
     }
     catch (IOException e) {
      throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
        "This indicates that non-serializable types (like custom serializers) were registered");
     }
     return jobGraph;
    }

预处理完毕后,开始构建 JobGraph 中的点和边,从 Source 向下遍历 StreamGraph,逐步创建 JObGraph,在创建的过程中同事完成算子融合(OperatorChain)优化。

640.png

执行具体的 Chain 和 JobVertex 生成、JobEdge 的关联、IntermediateDataSet。从 StreamGraph 读取数据的 StreamNode 开始,递归遍历同时将 StreamOperator 连接在一起。

整理构建的逻辑如下(看上图!!!):

1)从 Source 开始,Source 与下游的 FlatMap 不可连接,Source 是起始节点,自己成为一个 JobVertx。

2)此时开始一个新的连接分析,FlatMap 是起始节点,与下游的 KeyedAgg 也不可以连接,那么 FlatMap 自己成为一个 JobVertex。

3)此时开始一个新的连接分析。KeyedAgg 是起始节点,并且与下游的 Sink 可以连接,那么递归地分析 Sink 节点,构造 Sink 与其下游是否可以连接,因为 Slink 没有下游,所以 KeyedAgg 和 Sink 节点连接在一起,共同构成了一个 JobVertex。在这个 JobVertex 中,KeyedAgg 是起始节点,index 编号为 0,sink 节点 index 编号为 1.

构建 JobVertex 的时候需要将 StreamNode 中的重要配置信息复制到 JobVertex 中。构建好 JobVertex 之后,需要构建 JobEdge 将 JobVertex 连接起来。KeyedAgg 和 Sink 之间构成了一个算子连接,连接内部的算子之间无序构成 JobEdge 进行连接。

在构建 JobEdge 的时候,很重要的一点是确定上游 JobVertex 和下游 JobVertex 的数据交换方式。此时根据 ShuffleMode 来确定 ResultPartition 类型,用 FlinkPartition 来确定 JobVertex 的连接方式。

Shuffle 确定了 ResultPartition,那么就可以确定上游 JobVertex 输出的 IntermediateDataSet 的类型了,也就知道 JobEdge 的输入 IntermediateDataSet。

ForwardPartitioner 和 RescalePartitioner 两种类型的 Partitioner 转换为 DistributionPattern.POINTWISE 的分发模式。其他类型的 Partitioner 统一转换为 DistributionPattern.ALL_TO_ALL 模式。

JobGraph 的构建和 OperatorChain 优化:

private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
     if (!builtVertices.contains(startNodeId)) {
      List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
      List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
      List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
      StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
      //获取当前节点的出边,判断是否符合OperatorChain的条件
      //分为两类:chainableoutputs,nonchainableoutputs
      for (StreamEdge outEdge : currentNode.getOutEdges()) {
       if (isChainable(outEdge, streamGraph)) {
        chainableOutputs.add(outEdge);
       } else {
        nonChainableOutputs.add(outEdge);
       }
      }
      //对于chainable的边,递归调用createchain
      //返回值添加到transitiveOutEdges中
      for (StreamEdge chainable : chainableOutputs) {
       transitiveOutEdges.addAll(
         createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
      }
      //对于无法chain在一起的边,边的下游节点作为Operatorchain的Head节点
      //进行递归调用,返回值添加到transitiveOutEdges中
      for (StreamEdge nonChainable : nonChainableOutputs) {
       transitiveOutEdges.add(nonChainable);
       createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
      }
      List<Tuple2<byte[], byte[]>> operatorHashes =
       chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
      byte[] primaryHashBytes = hashes.get(currentNodeId);
      OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
      for (Map<Integer, byte[]> legacyHash : legacyHashes) {
       operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
      }
      chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
      chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
      chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
      if (currentNode.getInputFormat() != null) {
       getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
      }
      if (currentNode.getOutputFormat() != null) {
       getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
      }
      //如果当前节点是起始节点,则直接创建JobVertex,否则返回一个空的StreamConfig
      StreamConfig config = currentNodeId.equals(startNodeId)
        ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
        : new StreamConfig(new Configuration());
      //将StreamNode中的配置信息序列化到Streamconfig中。
      setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
      //再次判断,如果是Chain的起始节点,执行connect()方法,创建JobEdge和IntermediateDataset
      //否则将当前节点的StreamConfig 添加到chainedConfig中。
      if (currentNodeId.equals(startNodeId)) {
       config.setChainStart();
       config.setChainIndex(0);
       config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
       for (StreamEdge edge : transitiveOutEdges) {
        connect(startNodeId, edge);
       }
       config.setOutEdgesInOrder(transitiveOutEdges);
       config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
      } else {
       chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
       config.setChainIndex(chainIndex);
       StreamNode node = streamGraph.getStreamNode(currentNodeId);
       config.setOperatorName(node.getOperatorName());
       chainedConfigs.get(startNodeId).put(currentNodeId, config);
      }
      config.setOperatorID(currentOperatorId);
      if (chainableOutputs.isEmpty()) {
       config.setChainEnd();
      }
      return transitiveOutEdges;
     } else {
      return new ArrayList<>();
     }
    }

2.2.3 算子融合

一个 Operatorchain 在同一个 Task 线程内执行。OperatorChain 内的算子之间,在同一个线程内通过方法调用的方式传递数据,能减少线程之间的切换,减少消息的序列化/反序列化,无序借助内存缓存区,也无须通过网络在算子间传递数据,可在减少延迟的同时提高整体吞吐量

operatorchain 的条件:

1)下游节点的入度为 1

2)SteramEdge 的下游节点对应的算子不为 null

3)StreamEdge 的上游节点对应的算子不为 null

4)StreamEdge 的上下游节点拥有相同的 slotSharingGroup,默认都是 default.

5)下游算子的连接策略为 ALWAYS.

6)上游算子的连接策略为 ALWAYS 或者 HEAD.

7)StreamEdge 的分区类型为 ForwardPartitioner

8)上下游节点的并行度一致

9)当前 StreamGraph 允许 chain

2.3 执行图

640.png

2.3.1 ExecutionGraph 核心对象

  • ExecutionJobVertex
    该对象和 JobGraph 中的 JobVertex 一一对应。该对象还包含了一组 ExecutionVertex,数量与该 JobVertex 中所包含的 SteramNode 的并行度一致。
    ExecutionJobVertex 用来将一个 JobVertex 封装成一 ExecutionJobVertex,并以此创建 ExecutionVertex、Execution、IntermediateResult 和 IntermediateResultPartition,用于丰富 ExecutionGraph。
    在 ExecutionJobVertex 的构造函数中,首先是依据对应的 JobVertex 的并发度,生成对应个数的 ExecutionVertex。其中,一个 ExecutionVertex 代表一个 ExecutionJobVertex 的并发子 Task。然后是将原来 JobVertex 的中间结果 IntermediateDataSet 转化为 ExecutionGrap 中 IntermediateResult
  • ExecutionVertex
    ExecutionJobVertex 中会对作业进行并行化处理,构造可以并行执行的实例,每个并行执行的实例就是 ExecutionVertex.
    构建 ExecutionVertex 的同时,也回构建 ExecutionVertex 的输出 IntermediateResult。并且将 ExecutionEdge 输出为 IntermediatePartition。
    ExecutionVertex 的构造函数中,首先会创建 IntermediatePartition,并通过 IntermediateResult.setPartition()建立 IntermediateResult 和 IntermediateResultPartition 之间的关系,然后生成 Execution,并配置资源相关。
  • IntermediateResult
    IntermediateResult 又叫做中间结果集,该对象是个逻辑概念,表示 ExecutionJobVertex 的输出,和 JobGraph 中的 IntermediateDataSet 一一对应,同样,一个 ExecutionJobVertex 可以有多个中间二级果,取决于当前 JobVertex 有几个出边。
    一个中间结果集包含多个中间结果分区 IntermediateResultPartition,其个数等于该 JobVertex 的并发度。
  • IntermediateResultPartition IntermediateResultPartition 又叫做中间结果分区,表示 1 个 ExecutionVertex 输出结果,与 ExecutionEdge 相关联。
  • ExecutionEdge
    表示 ExecutionVertex 的输入,连接到上游产生的 IntermediateResultPartition。一个 Execution 对应于唯一的一个 IntermediateResultPartition 和一个 ExecutionVertex。一个 ExecutionVertex 可以有多个 ExecutionEdge。
  • Execution
    ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将 ExecutionVertex 中的信息包装为一个 Execution,执行一个 ExecutionVertex 的一次尝试。JobManager 和 TaskManager 之间关于 Task 的部署和 Task 执行状态的更新都是通过 ExecutionAttemptID 来标识实例的。在故障或者数据需要重算的情况下,ExecutionVertex 可能会有多个 ExecutionAttemptID.一个 Execution 通过 ExecutionAttemptID 标识。

2.3.2 ExecutionGrap 生成过程

初始话作业调度器的时候,根据 JobGra ph 生活 ExecutionGraph。在 SchedulerBase 的构造方法中触发构建,最终调用 SchedulerBase#createExecutionGraph 触发实际的构建动作,使用 ExecutionGraphBuiler 构建 ExecutionGraph。

640.png

核心代码 attachJobGraph:

640.png

构建 ExecutionEdge 的连接策略:

  • 点对点连接(DistributionPatter n.POINTWISE)
    该策略用来连接当前 ExecutionVertex 与上游的 IntermediataeResultParition。
    连接分三种情况
    1)一对一连接:并发的 Task 数量与分区数相等。
    2)多对一连接:下游的 Task 数量小于上游的分区数,此时分两种情况:
    a:下游 Task 可以分配同数量的结果分区 IntermediataeResultParition。如上游有 4 个结果分区,下游有 2 个 Task,那么每个 Task 会分配两个结果分区进行消费。

      b:每个 Task 消费的上游分区结果数据不均,如上游有 3 个结果分区,下游有两个 Task,那么一个 Task 分配 2 个结果分区消费,另一个分配一个结果分区消费。

     3)一对多连接:下游的 Task 数量多余上游的分区数,此时两种情况:

  • a:每个结果分区的下游消费 Task 数据量相同,如上游有两个结果分区,下游有 4 个 Task,每个结果分区被两个 Task 消费。
  • b:每个结果分区的下游消费 Task 数量不相同,如上游有两个结果分区,下游有 3 个 Task,那么一个结果分区分配 2 个 Task 消费,另一个结果分区分配一个 Task 消费。
  • 全连接(DistributionPattern.ALL_TO_ALL)
    该策略下游的 ExecutionVertex 与上 游的所有 IntermediataeResultParition 建立连接,消费其生产的数据。一般全连接的情况意味着数据在 Shuffle。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
206 0
|
3月前
|
消息中间件 分布式计算 大数据
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
199 0
|
5月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何从savepoint重新启动作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
机器学习/深度学习 人工智能 运维
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
|
5月前
|
监控 Serverless Apache
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
|
5月前
|
监控 Serverless 数据库
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
|
5月前
|
监控 Java Serverless
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
|
5月前
|
Java 流计算
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
|
5月前
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
|
5月前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的

热门文章

最新文章