Flink 原理与实现:Operator Chain原理

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

Flink原理与实现系列文章 :

Flink 原理与实现:架构和拓扑概览
Flink 原理与实现:如何生成 StreamGraph
Flink 原理与实现:如何生成 JobGraph
Flink原理与实现:如何生成ExecutionGraph及物理执行图

Flink的逻辑/执行计划优化,有一个很大的特点就是,会将多个operator,串在一起作为一个operator chain来执行。关于operator chain,在 Flink 原理与实现:理解 Flink 中的计算资源 中已经有了初步的介绍,在阅读本文之前,建议先阅读上文。
本文将从源码上进一步分析,探究operator chain内部是如何实现的。

OperatorChain是在StreamTask的invoke方法中被创建的:

     // ...
        operatorChain = new OperatorChain<>(this);
        headOperator = operatorChain.getHeadOperator();
        // ...

Flink原理与实现:如何生成ExecutionGraph及物理执行图中提到,StreamTask是真正的执行task中的invokable operator(的基类),因此所有的task都会创建OperatorChain这个对象。只是在执行的时候,如果一个operator无法被chain起来,那它就只有headOperator,chain里就没有其他operator了。

OperatorChain构造函数:

            List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
            this.chainEntryPoint = createOutputCollector(containingTask, configuration,
                    chainedConfigs, userCodeClassloader, streamOutputMap, allOps);

            if (headOperator != null) {
                headOperator.setup(containingTask, configuration, getChainEntryPoint());
            }

            // add head operator to end of chain
            allOps.add(headOperator);

这里headerOperator.setup方法第三个参数为Output,相当于把chainEntryPoint作为output传入head operator。setup方法一路调用,直到基类AbstractStreamOperator,可以看到:

    this.output = new CountingOutput(output,((OperatorMetricGroup)this.metrics).getIOMetricGroup().getNumRecordsOutCounter());

即对output封装成了AbstractStreamOperator.CountingOutput,主要是为了统计metrics信息。

而output自身在operator chain中,是一个CopyingChainingOutput,或者ChainingOutput(根据是否配置了reuse objects)。

这里的headOperator即为operator chain中第一个operator,在这里即为StreamGroupedReduce。
它在执行processElement的时候,如果有调用output.collect,则会调用CountingOutput。它的collect方法很简单:

        @Override
        public void collect(StreamRecord<OUT> record) {
            numRecordsOut.inc();
            output.collect(record);
        }

即更新metrics和调用ChainingOutput.collect方法,看看这个方法:

        @Override
        public void collect(StreamRecord<T> record) {
            try {
                numRecordsIn.inc();
                StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
                operator.setKeyContextElement1(copy);
                operator.processElement(copy);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not forward element to next operator", e);
            }
        }

这里的operator是chainedOperator,即除了headOperator之外,剩余的operators的chain。
调用这个operator.processElement,就会像上面一样,循环调用operator chain里的所有operator,一直到chain end。

以word count为例,应用代码如下:

    // ...
    DataStream<String> text = env.fromElements(WordCountData.WORDS);
    DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .keyBy(0).sum(1).filter(new FilterFunction<Tuple2<String, Integer>>() {
                @Override
                public boolean filter(Tuple2<String, Integer> value) throws Exception {
                    return value.f1 > 1;
                }
            });
      env.execute("Streaming WordCount");

它实际上形成了以下的调用链:

StreamGroupedReduce.processElement
--> CountingOutput.collect
--> CopyChainingOutput.collect
    --> StreamFilter.processElement
    --> CountingOutput.collect
    --> CopyChainingOutput.collect
        --> StreamSink.processElement
        --> CountingOutput.collect
        --> BroadcastingOutputCollector.collect

下面会解析如何生成这个调用链。我们返回到OperatorChain的构造函数中,看一下这行代码:

            this.chainEntryPoint = createOutputCollector(containingTask, configuration,
                    chainedConfigs, userCodeClassloader, streamOutputMap, allOps);

到底做了什么。

这个方法的重要代码如下:

        // 遍历当前operatorConfig的输出边
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
           // 下游operator id
            int outputId = outputEdge.getTargetId();
            // 得到下游operator的stream config
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
        
        // 根据下游operator的stream config,创建chained operator
            Output<StreamRecord<T>> output = createChainedOperator(
                    containingTask,
                    chainedOpConfig,
                    chainedConfigs,
                    userCodeClassloader,
                    streamOutputs,
                    allOperators);
            allOutputs.add(new Tuple2<>(output, outputEdge));
        }

再看下createChainedOperator方法:

     // 第一行就递归调用了createOutputCollector方法,创建当前operator下游operator的collector
        Output<StreamRecord<OUT>> output = createOutputCollector(
                containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);

     // setup当前operator,其实是把下游operator的collector作为当前operator的output
     // 这样当前operator调用collect的时候,就会调用下游operator的方法。
        OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
        chainedOperator.setup(containingTask, operatorConfig, output);

        allOperators.add(chainedOperator);

      // 根据是否reuse object,创建ChainingOutput或者CopyingChainingOutput
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            return new ChainingOutput<>(chainedOperator);
        }
        else {
            TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
            return new CopyingChainingOutput<>(chainedOperator, inSerializer);
        }

由于这个过程是递归的,所以chained operators实际上是从下游往上游去反向一个个创建和setup的。以word count为例,chained operators为:StreamGroupedReduce - StreamFilter - StreamSink,而实际初始化顺序则相反:StreamSink - StreamFilter - StreamGroupedReduce。

在OperatorChain类中,headOperator为StreamGroupedReduce。createOutputCollector的调用过程如下:

createOutputCollector(operatorConfig=<StreamGroupedReduce config>, ...)
 --> chainedOpConfig = <StreamFilter config>
 --> createChainedOperator(chainedOpConfig=<StreamFilter config>)
    --> createOutputCollector(<StreamFilter config>)
    --> chainedOpConfig = <StreamSink config>
        --> createChainedOperator(<StreamSink config>)
            --> createOutputCollector(<StreamSink config>)
            --> chainedOpConfig = null, 返回BroadcastingOutputCollector
            --> StreamSink.setup(<output=BroadcastingOutputCollector>)
            --> return CopyingChainingOutput
    --> output = CopyingChainingOutput
    --> StreamFilter.setup(<output=CopyingChainingOutput>)
    --> return CopyingChainingOutput
--> output = CopyingChainingOutput
--> headOperator.setup(<output=CopyingChainingOutput>)            

最后我们来看一下,如果operator chain中只有一个operator的情况,它生成了什么。
在word count的例子中,在StreamSource之后的flatMap,就是这种情况,它不能跟后面的操作chain在一起。

首先OperatorChain构造函数中的chainedConfigs会为空,因为下游没有跟它chain在一起的operator。接下来看下它的chainEntryPoint

createOutputCollector方法中,由于没有chained outputs,因此会直接返回RecordWriterOutput,即headOperator的output就直接交给record writer输出了。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3天前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
|
3天前
|
消息中间件 缓存 监控
Flink背压原理以及解决优化
Flink背压原理以及解决优化
158 0
|
1天前
|
SQL Prometheus Kubernetes
实时计算 Flink版产品使用合集之时间戳读取的原理是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
9 0
|
3天前
|
存储 NoSQL 分布式数据库
【Flink】Flink分布式快照的原理是什么?
【4月更文挑战第21天】【Flink】Flink分布式快照的原理是什么?
|
3天前
|
运维 监控 Java
面经:Storm实时计算框架原理与应用场景
【4月更文挑战第11天】本文是关于Apache Storm实时流处理框架的面试攻略和核心原理解析。文章分享了面试常见主题,包括Storm的架构与核心概念(如Spout、Bolt、Topology、Tuple和Ack机制),编程模型与API,部署与运维,以及应用场景与最佳实践。通过代码示例展示了如何构建一个简单的WordCountTopology,强调理解和运用Storm的关键知识点对于面试和实际工作的重要性。
40 4
面经:Storm实时计算框架原理与应用场景
|
3天前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
92 0
|
3天前
|
SQL 缓存 分布式计算
flink1.18 SqlGateway 的使用和原理分析
# 了解flink1.18 sqlGateway 的安装和使用步骤 # 启动sqlgateway 流程,了解核心的结构 # sql提交流程,了解sql 的流转逻辑 # select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑
|
3天前
|
SQL 并行计算 大数据
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
关于Flink服务的搭建与部署,由于其涉及诸多实战操作而理论部分相对较少,小编打算采用一个独立的版本和环境来进行详尽的实战讲解。考虑到文字描述可能无法充分展现操作的细节和流程,我们决定以视频的形式进行分析和介绍。因此,在本文中,我们将暂时不涉及具体的搭建和部署步骤。
500 3
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
|
3天前
|
存储 NoSQL MongoDB
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
本文整理自阿里云 Flink 团队归源老师关于阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference 的研究。
46954 2
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
|
3天前
|
Java 数据处理 分布式数据库
Flink中的Exactly-Once语义是什么?请解释其作用和实现原理。
Flink中的Exactly-Once语义是什么?请解释其作用和实现原理。
38 0