4.4 reduce 转换
reduce转换的入参是一个ReduceFunction的具体实现,这里的逻辑就是对收到的WordWithCount实例集合,将其中word字段相同的实际的count值累加。
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) { if (function instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " + "Please use reduce(ReduceFunction, WindowFunction) instead."); } /** 闭包清理 */ function = input.getExecutionEnvironment().clean(function); return reduce(function, new PassThroughWindowFunction<K, W, T>()); } public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) { TypeInformation<T> inType = input.getType(); TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType); return reduce(reduceFunction, function, resultType); } public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { if (reduceFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction."); } function = input.getExecutionEnvironment().clean(function); reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); String callLocation = Utils.getCallLocationName(); String udfName = "WindowedStream." + callLocation; String opName; KeySelector<T, K> keySel = input.getKeySelector(); OneInputStreamOperator<T, R> operator; if (evictor != null) { @SuppressWarnings({"unchecked", "rawtypes"}) TypeSerializer<StreamRecord<T>> streamRecordSerializer = (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)), trigger, evictor, allowedLateness, lateDataOutputTag); } else { ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents", reduceFunction, input.getType().createSerializer(getExecutionEnvironment().getConfig())); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueProcessWindowFunction<>(function), trigger, allowedLateness, lateDataOutputTag); } return input.transform(opName, resultType, operator); }
通过对reduce重载方法的逐步调用,会走到上述代码的else逻辑中,这里也是先构建了StreamOperator的具体子类实例。
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { SingleOutputStreamOperator<R> returnStream = super.transform(operatorName, outTypeInfo, operator); OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation(); transform.setStateKeySelector(keySelector); transform.setStateKeyType(keyType); return returnStream; }
父类的transform中的逻辑如下:
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { /** 读取输入转换的输出类型, 如果是MissingTypeInfo, 则及时抛出异常, 终止操作 */ transformation.getOutputType(); OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operator, outTypeInfo, environment.getParallelism()); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); getExecutionEnvironment().addOperator(resultTransform); return returnStream; }
逻辑与flatMap相似,也是基于StreamOperator构建了一个StreamTransformation的子类OneInputTransformation的实例,然后构建了DataStream的子类SingleOutputStreamOperator的实例,最后也将构建的StreamTransformation的子类实例添加到了StreamExecutionEnvironment的属性transformations这个列表中。
经过上述操作,对数据流中的数据进行分组聚合的操作就完成了。
五、输出统计结果
统计结果的输出如下:
windowCounts.print();
print方法就是在数据流的最后添加了一个Sink,用于承接统计结果。
public DataStreamSink<T> print() { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(); return addSink(printFunction); }
其中PrintSinkFunction的类继承图如下所示:
作为一个SinkFunction接口的实现,看下其对invoke方法的实现:
public void invoke(IN record) { if (prefix != null) { stream.println(prefix + record.toString()); } else { stream.println(record.toString()); } }
实现逻辑很清晰,就是将记录输出打印。继续看addSink方法:
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { transformation.getOutputType(); if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction)); DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }
实现逻辑与数据源是相似的,先构建StreamOperator,再构建DataStreamSink,在DataStreamSink的构建中,会构造出StreamTransformation实例,最后会将这个StreamTransformation实例添加到StreamExecutionEnvironment的属性transformations这个列表中。
经过上述步骤,就完成了数据流的源构造、数据流的转换操作、数据流的Sink构造,在这个过程中,每次转换都会产生一个新的数据流,而每个数据流下几乎都有一个StreamTransformation的子类实例,对于像flatMap、reduce这些转换得到的数据流里的StreamTransformation会被添加到StreamExecutionEnvironment的属性transformations这个列表中,这个属性在后续构建StreamGraph时会使用到。
另外在这个数据流的构建与转换过程中,每个DataStream中的StreamTransformation的具体子类中都有一个input属性,该属性会记录该StreamTransformation的上游的DataStream的StreamTransformation引用,从而使得整个数据流中的StreamTransformation构成了一个隐式的链表,由于一个数据流可能会转换成多个输出数据流,以及多个输入数据流又有可能会合并成一个输出数据流,确定的说,不是隐式列表,而是一张隐式的图。
述数据转换完成后,就会进行任务的执行,就是执行如下代码:
env.execute("Socket Window WordCount");
这里就会根据上述的转换过程,先生成StreamGraph,再根据StreamGraph生成JobGraph,然后通过客户端提交到集群进行调度执行。