Flink原理与实现系列文章 :
Flink 原理与实现:架构和拓扑概览
Flink 原理与实现:如何生成 StreamGraph
Flink 原理与实现:如何生成 JobGraph
Flink原理与实现:如何生成ExecutionGraph及物理执行图
Flink的逻辑/执行计划优化,有一个很大的特点就是,会将多个operator,串在一起作为一个operator chain来执行。关于operator chain,在 Flink 原理与实现:理解 Flink 中的计算资源 中已经有了初步的介绍,在阅读本文之前,建议先阅读上文。
本文将从源码上进一步分析,探究operator chain内部是如何实现的。
OperatorChain是在StreamTask的invoke
方法中被创建的:
// ...
operatorChain = new OperatorChain<>(this);
headOperator = operatorChain.getHeadOperator();
// ...
在Flink原理与实现:如何生成ExecutionGraph及物理执行图中提到,StreamTask是真正的执行task中的invokable operator(的基类),因此所有的task都会创建OperatorChain这个对象。只是在执行的时候,如果一个operator无法被chain起来,那它就只有headOperator,chain里就没有其他operator了。
OperatorChain构造函数:
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(containingTask, configuration,
chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
if (headOperator != null) {
headOperator.setup(containingTask, configuration, getChainEntryPoint());
}
// add head operator to end of chain
allOps.add(headOperator);
这里headerOperator.setup
方法第三个参数为Output,相当于把chainEntryPoint作为output传入head operator。setup方法一路调用,直到基类AbstractStreamOperator,可以看到:
this.output = new CountingOutput(output,((OperatorMetricGroup)this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
即对output封装成了AbstractStreamOperator.CountingOutput,主要是为了统计metrics信息。
而output自身在operator chain中,是一个CopyingChainingOutput,或者ChainingOutput(根据是否配置了reuse objects)。
这里的headOperator即为operator chain中第一个operator,在这里即为StreamGroupedReduce。
它在执行processElement的时候,如果有调用output.collect,则会调用CountingOutput。它的collect方法很简单:
@Override
public void collect(StreamRecord<OUT> record) {
numRecordsOut.inc();
output.collect(record);
}
即更新metrics和调用ChainingOutput.collect方法,看看这个方法:
@Override
public void collect(StreamRecord<T> record) {
try {
numRecordsIn.inc();
StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
operator.setKeyContextElement1(copy);
operator.processElement(copy);
}
catch (Exception e) {
throw new RuntimeException("Could not forward element to next operator", e);
}
}
这里的operator是chainedOperator,即除了headOperator之外,剩余的operators的chain。
调用这个operator.processElement,就会像上面一样,循环调用operator chain里的所有operator,一直到chain end。
以word count为例,应用代码如下:
// ...
DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0).sum(1).filter(new FilterFunction<Tuple2<String, Integer>>() {
@Override
public boolean filter(Tuple2<String, Integer> value) throws Exception {
return value.f1 > 1;
}
});
env.execute("Streaming WordCount");
它实际上形成了以下的调用链:
StreamGroupedReduce.processElement
--> CountingOutput.collect
--> CopyChainingOutput.collect
--> StreamFilter.processElement
--> CountingOutput.collect
--> CopyChainingOutput.collect
--> StreamSink.processElement
--> CountingOutput.collect
--> BroadcastingOutputCollector.collect
下面会解析如何生成这个调用链。我们返回到OperatorChain的构造函数中,看一下这行代码:
this.chainEntryPoint = createOutputCollector(containingTask, configuration,
chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
到底做了什么。
这个方法的重要代码如下:
// 遍历当前operatorConfig的输出边
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
// 下游operator id
int outputId = outputEdge.getTargetId();
// 得到下游operator的stream config
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
// 根据下游operator的stream config,创建chained operator
Output<StreamRecord<T>> output = createChainedOperator(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
再看下createChainedOperator
方法:
// 第一行就递归调用了createOutputCollector方法,创建当前operator下游operator的collector
Output<StreamRecord<OUT>> output = createOutputCollector(
containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
// setup当前operator,其实是把下游operator的collector作为当前operator的output
// 这样当前operator调用collect的时候,就会调用下游operator的方法。
OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
chainedOperator.setup(containingTask, operatorConfig, output);
allOperators.add(chainedOperator);
// 根据是否reuse object,创建ChainingOutput或者CopyingChainingOutput
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
return new ChainingOutput<>(chainedOperator);
}
else {
TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
return new CopyingChainingOutput<>(chainedOperator, inSerializer);
}
由于这个过程是递归的,所以chained operators实际上是从下游往上游去反向一个个创建和setup的。以word count为例,chained operators为:StreamGroupedReduce - StreamFilter - StreamSink,而实际初始化顺序则相反:StreamSink - StreamFilter - StreamGroupedReduce。
在OperatorChain类中,headOperator为StreamGroupedReduce。createOutputCollector的调用过程如下:
createOutputCollector(operatorConfig=<StreamGroupedReduce config>, ...)
--> chainedOpConfig = <StreamFilter config>
--> createChainedOperator(chainedOpConfig=<StreamFilter config>)
--> createOutputCollector(<StreamFilter config>)
--> chainedOpConfig = <StreamSink config>
--> createChainedOperator(<StreamSink config>)
--> createOutputCollector(<StreamSink config>)
--> chainedOpConfig = null, 返回BroadcastingOutputCollector
--> StreamSink.setup(<output=BroadcastingOutputCollector>)
--> return CopyingChainingOutput
--> output = CopyingChainingOutput
--> StreamFilter.setup(<output=CopyingChainingOutput>)
--> return CopyingChainingOutput
--> output = CopyingChainingOutput
--> headOperator.setup(<output=CopyingChainingOutput>)
最后我们来看一下,如果operator chain中只有一个operator的情况,它生成了什么。
在word count的例子中,在StreamSource之后的flatMap
,就是这种情况,它不能跟后面的操作chain在一起。
首先OperatorChain构造函数中的chainedConfigs
会为空,因为下游没有跟它chain在一起的operator。接下来看下它的chainEntryPoint
:
在createOutputCollector
方法中,由于没有chained outputs,因此会直接返回RecordWriterOutput,即headOperator的output就直接交给record writer输出了。