【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3


4.4 reduce 转换


reduce转换的入参是一个ReduceFunction的具体实现,这里的逻辑就是对收到的WordWithCount实例集合,将其中word字段相同的实际的count值累加。


public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
   if (function instanceof RichFunction) {
      throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " +
         "Please use reduce(ReduceFunction, WindowFunction) instead.");
   }
   /** 闭包清理 */
   function = input.getExecutionEnvironment().clean(function);
   return reduce(function, new PassThroughWindowFunction<K, W, T>());
}
public <R> SingleOutputStreamOperator<R> reduce(
      ReduceFunction<T> reduceFunction,
      WindowFunction<T, R, K, W> function) {
   TypeInformation<T> inType = input.getType();
   TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
   return reduce(reduceFunction, function, resultType);
}
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
   if (reduceFunction instanceof RichFunction) {
      throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
   }
   function = input.getExecutionEnvironment().clean(function);
   reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
   String callLocation = Utils.getCallLocationName();
   String udfName = "WindowedStream." + callLocation;
   String opName;
   KeySelector<T, K> keySel = input.getKeySelector();
   OneInputStreamOperator<T, R> operator;
   if (evictor != null) {
      @SuppressWarnings({"unchecked", "rawtypes"})
      TypeSerializer<StreamRecord<T>> streamRecordSerializer =
            (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
      ListStateDescriptor<StreamRecord<T>> stateDesc =
            new ListStateDescriptor<>("window-contents", streamRecordSerializer);
      opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
      operator =
            new EvictingWindowOperator<>(windowAssigner,
                  windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                  keySel,
                  input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                  stateDesc,
                  new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
                  trigger,
                  evictor,
                  allowedLateness,
                  lateDataOutputTag);
   } else {
      ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
            reduceFunction,
            input.getType().createSerializer(getExecutionEnvironment().getConfig()));
      opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
      operator =
            new WindowOperator<>(windowAssigner,
                  windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                  keySel,
                  input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                  stateDesc,
                  new InternalSingleValueProcessWindowFunction<>(function),
                  trigger,
                  allowedLateness,
                  lateDataOutputTag);
   }
   return input.transform(opName, resultType, operator);
}

通过对reduce重载方法的逐步调用,会走到上述代码的else逻辑中,这里也是先构建了StreamOperator的具体子类实例。


public <R> SingleOutputStreamOperator<R> transform(String operatorName,
      TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
   SingleOutputStreamOperator<R> returnStream = super.transform(operatorName, outTypeInfo, operator);
   OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
   transform.setStateKeySelector(keySelector);
   transform.setStateKeyType(keyType);
   return returnStream;
}

父类的transform中的逻辑如下:


public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
   /** 读取输入转换的输出类型, 如果是MissingTypeInfo, 则及时抛出异常, 终止操作 */
   transformation.getOutputType();
   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
         this.transformation,
         operatorName,
         operator,
         outTypeInfo,
         environment.getParallelism());
   @SuppressWarnings({ "unchecked", "rawtypes" })
   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
   getExecutionEnvironment().addOperator(resultTransform);
   return returnStream;
}

逻辑与flatMap相似,也是基于StreamOperator构建了一个StreamTransformation的子类OneInputTransformation的实例,然后构建了DataStream的子类SingleOutputStreamOperator的实例,最后也将构建的StreamTransformation的子类实例添加到了StreamExecutionEnvironment的属性transformations这个列表中。


经过上述操作,对数据流中的数据进行分组聚合的操作就完成了。


五、输出统计结果


统计结果的输出如下:


windowCounts.print();

print方法就是在数据流的最后添加了一个Sink,用于承接统计结果。


public DataStreamSink<T> print() {
   PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
   return addSink(printFunction);
}

其中PrintSinkFunction的类继承图如下所示:


image.png


作为一个SinkFunction接口的实现,看下其对invoke方法的实现:


public void invoke(IN record) {
   if (prefix != null) {
      stream.println(prefix + record.toString());
   }
   else {
      stream.println(record.toString());
   }
}


实现逻辑很清晰,就是将记录输出打印。继续看addSink方法:


public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
   transformation.getOutputType();
   if (sinkFunction instanceof InputTypeConfigurable) {
      ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
   }
   StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
   DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
   getExecutionEnvironment().addOperator(sink.getTransformation());
   return sink;
}

实现逻辑与数据源是相似的,先构建StreamOperator,再构建DataStreamSink,在DataStreamSink的构建中,会构造出StreamTransformation实例,最后会将这个StreamTransformation实例添加到StreamExecutionEnvironment的属性transformations这个列表中。


经过上述步骤,就完成了数据流的源构造、数据流的转换操作、数据流的Sink构造,在这个过程中,每次转换都会产生一个新的数据流,而每个数据流下几乎都有一个StreamTransformation的子类实例,对于像flatMap、reduce这些转换得到的数据流里的StreamTransformation会被添加到StreamExecutionEnvironment的属性transformations这个列表中,这个属性在后续构建StreamGraph时会使用到。


另外在这个数据流的构建与转换过程中,每个DataStream中的StreamTransformation的具体子类中都有一个input属性,该属性会记录该StreamTransformation的上游的DataStream的StreamTransformation引用,从而使得整个数据流中的StreamTransformation构成了一个隐式的链表,由于一个数据流可能会转换成多个输出数据流,以及多个输入数据流又有可能会合并成一个输出数据流,确定的说,不是隐式列表,而是一张隐式的图。


述数据转换完成后,就会进行任务的执行,就是执行如下代码:


env.execute("Socket Window WordCount");


这里就会根据上述的转换过程,先生成StreamGraph,再根据StreamGraph生成JobGraph,然后通过客户端提交到集群进行调度执行。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1416 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1355 1
官宣|Apache Flink 1.19 发布公告
|
1月前
|
SQL 关系型数据库 MySQL
Flink CDC产品常见问题之pg cdc程序已经停了但是执行删不掉如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
143 3
|
1月前
|
Oracle 关系型数据库 流计算
flink cdc 同步问题之报错org.apache.flink.util.SerializedThrowable:如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
152 0
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
482 5
|
1月前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
86 1
|
1月前
|
缓存 分布式计算 Apache
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
61 0
|
1月前
|
监控 Apache 开发工具
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
67 0

推荐镜像

更多