开发者社区> codyinnowhere> 正文

Flink原理与实现:如何生成ExecutionGraph及物理执行图

简介:
+关注继续查看

阅读本文之前,请先阅读Flink原理与实现系列前面的几篇文章 :

Flink 原理与实现:架构和拓扑概览
Flink 原理与实现:如何生成 StreamGraph
Flink 原理与实现:如何生成 JobGraph

ExecutionGraph生成过程

StreamGraph和JobGraph都是在client生成的,这篇文章将描述如何生成ExecutionGraph以及物理执行图。同时会讲解一个作业提交后如何被调度和执行。

client生成JobGraph之后,就通过submitJob提交至JobMaster。
在其构造函数中,会生成ExecutionGraph:

    this.executionGraph = ExecutionGraphBuilder.buildGraph(...)

看下这个方法,比较长,略过了一些次要的代码片断:


     // 流式作业中,schedule mode固定是EAGER的
        executionGraph.setScheduleMode(jobGraph.getScheduleMode());
        executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());

     // 设置json plan
     // ...

     // 检查executableClass(即operator类),设置最大并发
     // ...
     
        // 按拓扑顺序,获取所有的JobVertex列表
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
        
        // 根据JobVertex列表,生成execution graph
        executionGraph.attachJobGraph(sortedTopology);
        
        // checkpoint检查

可以看到,生成execution graph的代码,主要是在最后一行,即ExecutionGraph.attachJobGraph方法:

    public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException, IOException {
       // 遍历job vertex
        for (JobVertex jobVertex : topologiallySorted) {
            // 根据每一个job vertex,创建对应的ExecutionVertex
            ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, createTimestamp);
            // 将创建的ExecutionJobVertex与前置的IntermediateResult连接起来
            ejv.connectToPredecessors(this.intermediateResults);

            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);

        // sanity check
        // ...
        
            this.verticesInCreationOrder.add(ejv);
        }
    }

可以看到,创建ExecutionJobVertex的重点就在它的构造函数中:

     // 上面是并行度相关的设置
     
     // 序列化后的TaskInformation,这个信息很重要
     // 后面deploy的时候会将TaskInformation分发到具体的Task中。
        this.serializedTaskInformation = new SerializedValue<>(new TaskInformation(
            jobVertex.getID(),
            jobVertex.getName(),
            parallelism,
            maxParallelism,
            // 这个就是Task将要执行的Operator的类名
            jobVertex.getInvokableClassName(),
            jobVertex.getConfiguration()));
     
     // ExecutionVertex列表,按照JobVertex并行度设置      
        this.taskVertices = new ExecutionVertex[numTaskVertices];
        
        this.inputs = new ArrayList<>(jobVertex.getInputs().size());
        
        // slot sharing和coLocation相关代码
        // ...
        
        // 创建intermediate results,这是由当前operator的出度确定的,如果当前operator只向下游一个operator输出,则为1
        // 注意一个IntermediateResult包含多个IntermediateResultPartition
        this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];

        for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
            final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);

            this.producedDataSets[i] = new IntermediateResult(
                    result.getId(),
                    this,
                    numTaskVertices,
                    result.getResultType());
        }

        // 根据job vertex的并行度,创建对应的ExecutionVertex列表。
        // 即,一个JobVertex/ExecutionJobVertex代表的是一个operator,而
        // 具体的ExecutionVertex则代表了每一个Task
        for (int i = 0; i < numTaskVertices; i++) {
            ExecutionVertex vertex = new ExecutionVertex(
                    this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);

            this.taskVertices[i] = vertex;
        }
        
        // sanity check
        // ...
        
        // set up the input splits, if the vertex has any
        // 这是batch相关的代码
        // ...
             
        finishedSubtasks = new boolean[parallelism];

ExecutionJobVertex和ExecutionVertex是创建完了,但是ExecutionEdge还没有创建呢,接下来看一下attachJobGraph方法中这一行代码:

    ejv.connectToPredecessors(this.intermediateResults);

这个方法代码如下:

     // 获取输入的JobEdge列表
        List<JobEdge> inputs = jobVertex.getInputs();
                
        // 遍历每条JobEdge        
        for (int num = 0; num < inputs.size(); num++) {
            JobEdge edge = inputs.get(num);
            
            // 获取当前JobEdge的输入所对应的IntermediateResult
            IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
            if (ires == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
                        + edge.getSourceId());
            }
            
            // 将IntermediateResult加入到当前ExecutionJobVertex的输入中。
            this.inputs.add(ires);
            
            // 为IntermediateResult注册consumer
            // consumerIndex跟IntermediateResult的出度相关
            int consumerIndex = ires.registerConsumer();
            
            for (int i = 0; i < parallelism; i++) {
                ExecutionVertex ev = taskVertices[i];
                // 将ExecutionVertex与IntermediateResult关联起来
                ev.connectSource(num, ires, edge, consumerIndex);
            }
        }

看下ExecutionVertex.connectSource方法代码:

    public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {

     // 只有forward的方式的情况下,pattern才是POINTWISE的,否则均为ALL_TO_ALL
        final DistributionPattern pattern = edge.getDistributionPattern();
        final IntermediateResultPartition[] sourcePartitions = source.getPartitions();

        ExecutionEdge[] edges;

        switch (pattern) {
            case POINTWISE:
                edges = connectPointwise(sourcePartitions, inputNumber);
                break;

            case ALL_TO_ALL:
                edges = connectAllToAll(sourcePartitions, inputNumber);
                break;

            default:
                throw new RuntimeException("Unrecognized distribution pattern.");

        }

        this.inputEdges[inputNumber] = edges;

        // 之前已经为IntermediateResult添加了consumer,这里为IntermediateResultPartition添加consumer,即关联到ExecutionEdge上
        for (ExecutionEdge ee : edges) {
            ee.getSource().addConsumer(ee, consumerNumber);
        }
    }

connectAllToAll方法:

        ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];

        for (int i = 0; i < sourcePartitions.length; i++) {
            IntermediateResultPartition irp = sourcePartitions[i];
            edges[i] = new ExecutionEdge(irp, this, inputNumber);
        }

        return edges;

看这个方法之前,需要知道,ExecutionVertex的inputEdges变量,是一个二维数据。它表示了这个ExecutionVertex上每一个input所包含的ExecutionEdge列表。

即,如果ExecutionVertex有两个不同的输入:输入A和B。其中输入A的partition=1, 输入B的partition=8,那么这个二维数组inputEdges如下(为简短,以irp代替IntermediateResultPartition)

[ ExecutionEdge[ A.irp[0]] ]
[ ExecutionEdge[ B.irp[0], B.irp[1], ..., B.irp[7] ]

所以上面的代码就很容易理解了。

到这里为止,ExecutionJobGraph就创建完成了。接下来看下这个ExecutionGraph是如何转化成Task并开始执行的。


Task调度和执行

接下来我们以最简单的mini cluster为例讲解一下Task如何被调度和执行。

简单略过client端job的提交和StreamGraph到JobGraph的翻译,以及上面ExecutionGraph的翻译。

提交后的job的流通过程大致如下:

env.execute('<job name>')
  --> MiniCluster.runJobBlocking(jobGraph)
  --> MiniClusterDispatcher.runJobBlocking(jobGraph)
  --> MiniClusterDispatcher.startJobRunners
    --> JobManagerRunner.start
    --> JobMaster.<init> (build ExecutionGraph)

创建完JobMaster之后,JobMaster就会进行leader election,得到leader之后,会回调grantLeadership方法,从而调用jobManager.start(leaderSessionID);开始运行job。

JobMaster.start 
    --> JobMaster.startJobExecution(这里还没开始执行呢..)
    --> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());    

重点是在下面这行,获取到resource manage之后,就会回调ResourceManagerLeaderListener.notifyLeaderAddress,整个调用流如下:

ResourceManagerLeaderListener.notifyLeaderAddress
    --> JobMaster.notifyOfNewResourceManagerLeader
    --> ResourceManagerConnection.start
    --> ResourceManagerConnection.onRegistrationSuccess(callback,由flink rpc框架发送并回调)
    --> JobMaster.onResourceManagerRegistrationSuccess

然后终于来到了最核心的调度代码,在JobMaster.onResourceManagerRegistrationSuccess方法中:

    executionContext.execute(new Runnable() {
        @Override
        public void run() {
            try {
                executionGraph.restoreExternalCheckpointedStore();
                executionGraph.setQueuedSchedulingAllowed(true);
                executionGraph.scheduleForExecution(slotPool.getSlotProvider());
            }
            catch (Throwable t) {
                executionGraph.fail(t);
            }
        }
    });

ExecutionGraph.scheduleForExecution --> ExecutionGraph.scheduleEager

这个方法会计算所有的ExecutionVertex总数,并为每个ExecutionVertex分配一个SimpleSlot(暂时不考虑slot sharing的情况),然后封装成ExecutionAndSlot,顾名思义,即ExecutionVertex + Slot(更为贴切地说,应该是ExecutionAttempt + Slot)。

然后调用execAndSlot.executionAttempt.deployToSlot(slot);进行deploy,即Execution.deployToSlot

这个方法先会进行一系列状态迁移和检查,然后进行deploy,比较核心的代码如下:

        final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
            attemptId,
            slot,
            taskState,
            attemptNumber);

        // register this execution at the execution graph, to receive call backs
        vertex.getExecutionGraph().registerExecution(this);
            
        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();        final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);

ExecutionVertex.createDeploymentDescriptor方法中,包含了从Execution Graph到真正物理执行图的转换。如将IntermediateResultPartition转化成ResultPartition,ExecutionEdge转成InputChannelDeploymentDescriptor(最终会在执行时转化成InputGate)。

最后通过RPC方法提交task,实际会调用到TaskExecutor.submitTask方法中。
这个方法会创建真正的Task,然后调用task.startTaskThread();开始task的执行。

在Task构造函数中,会根据输入的参数,创建InputGate, ResultPartition, ResultPartitionWriter等。

startTaskThread方法,则会执行executingThread.start,从而调用Task.run方法。
它的最核心的代码如下:

     // ...
     
        // now load the task's invokable code
        invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);

      // ...
      invokable.setEnvironment(env);
      
      // ...
      this.invokable = invokable;
      invokable.invoke();
      
      // task finishes or fails, do cleanup
      // ...

这里的invokable即为operator对象实例,通过反射创建。具体地,即为OneInputStreamTask,或者SourceStreamTask等。这个nameOfInvokableClass是哪里生成的呢?其实早在生成StreamGraph的时候,这就已经确定了,见StreamGraph.addOperator方法:

        if (operatorObject instanceof StoppableStreamSource) {
            addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
        } else if (operatorObject instanceof StreamSource) {
            addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
        } else {
            addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
        }

这里的OneInputStreamTask.class即为生成的StreamNode的vertexClass。这个值会一直传递,当StreamGraph被转化成JobGraph的时候,这个值会被传递到JobVertex的invokableClass。然后当JobGraph被转成ExecutionGraph的时候,这个值被传入到ExecutionJobVertex.TaskInformation.invokableClassName中,一直传到Task中。

那么用户真正写的逻辑代码在哪里呢?比如word count中的Tokenizer,去了哪里呢?
OneInputStreamTask的基类StreamTask,包含了headOperator和operatorChain。当我们调用dataStream.flatMap(new Tokenizer())的时候,会生成一个StreamFlatMap的operator,这个operator是一个AbstractUdfStreamOperator,而用户的代码new Tokenizer,即为它的userFunction。

所以再串回来,以OneInputStreamTask为例,Task的核心执行代码即为OneInputStreamTask.invoke方法,它会调用StreamTask.run方法,这是个抽象方法,最终会调用其派生类的run方法,即OneInputStreamTask, SourceStreamTask等。

OneInputStreamTask的run方法代码如下:

    final OneInputStreamOperator<IN, OUT> operator = this.headOperator;
    final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
    final Object lock = getCheckpointLock();
        
    while (running && inputProcessor.processInput(operator, lock)) {
        // all the work happens in the "processInput" method
    }

就是一直不停地循环调用inputProcessor.processInput(operator, lock)方法,即StreamInputProcessor.processInput方法:

    public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final Object lock) throws Exception {
     // ...
     
        while (true) {
            if (currentRecordDeserializer != null) {
           // ...
           
                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();
                    
              // 处理watermark,则框架处理
                    if (recordOrMark.isWatermark()) {
                       // watermark处理逻辑
                       // ...
                        continue;
                    } else if(recordOrMark.isLatencyMarker()) {
                        // 处理latency mark,也是由框架处理
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // ***** 这里是真正的用户逻辑代码 *****
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

        // 其他处理逻辑
        // ...
        }
    }

上面的代码中,streamOperator.processElement(record);才是真正处理用户逻辑的代码,以StreamFlatMap为例,即为它的processElement方法:

    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(), collector);
    }

这样,整个调度和执行逻辑就全部串起来啦。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
【POI word】使用POI实现对Word的读取以及生成
项目结构如下:   那第一部分:先是读取Word文档 1 package com.it.WordTest; 2 3 import java.io.FileInputStream; 4 import java.
2111 0
Flink运行时之TaskManager执行Task
TaskManager执行任务 当一个任务被JobManager部署到TaskManager之后,它将会被执行。本篇我们将分析任务的执行细节。 submitTask方法分析 一个任务实例被部署所产生的实际影响就是JobManager会将一个TaskDeploymentDescriptor对象封装在SubmitTask消息中发送给TaskManager。
1309 0
C语言下泊松分布以及指数分布随机数生成器实现
最近实验室的项目需要实现模拟文件访问序列,要求单位时间内的数据请求次数符合泊松分布,而两次请求见的时间间隔符合指数分布。没办法只好重新捡起已经丢掉多时的概率知识。于是也就有了这篇关于在C语言下符合泊松分布和指数分布的随机数生成器的实现。 泊松分布 在实际的事例中,当某一事件,比如进站乘客数量,电话交换机接收到的通话请求以固定的瞬时速率λ独立且随机地出现时,就可以认为该事件在单位时间内发
2700 0
用字符串连接SQL语句并用EXEC执行时,出现名称 '‘不是有效的标识符
原文:用字符串连接SQL语句并用EXEC执行时,出现名称 '‘不是有效的标识符  用字符串连接SQL语句并用EXEC执行时,出现名称 '这里是字符串连接的一条SQL语句‘不是有效的标识符  才发现,在写exec @sql 时,忘了在@sql加(),这样写 exec (@sql) 就不会出错了!
646 0
Python:Flask使用ThreadPoolExecutor执行异步任务
Python:Flask使用ThreadPoolExecutor执行异步任务
324 0
Flink运行时之流处理程序生成流图
流处理程序生成流图 DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph)。 什么是流图 流图(StreamGraph)是表示流处理程序拓扑的数据结构,它封装了生成作业图(JobGraph)的必要信息。
1625 0
生成lua的静态库.动态库.lua.exe和luac.exe
前些日子准备学习下关于lua coroutine更为强大的功能,然而发现根据lua 5.1.4版本来运行一段代码的话也会导致 “lua: attempt to yield across metamethod/C-call boundary”的错误(据悉主线程中调用yield也会如此)。
1063 0
ArrayList 和 LinkedList的执行效率比较
一、概念:     一般我们都知道ArrayList* 由一个数组后推得到的 List。作为一个常规用途的对象容器使用,用于替换原先的 Vector。允许我们快速访问元素,但在从列表中部插入和删除元素时,速度却嫌稍慢。
808 0
4
文章
10
问答
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载