【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3


4.4 reduce 转换


reduce转换的入参是一个ReduceFunction的具体实现,这里的逻辑就是对收到的WordWithCount实例集合,将其中word字段相同的实际的count值累加。


public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
   if (function instanceof RichFunction) {
      throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " +
         "Please use reduce(ReduceFunction, WindowFunction) instead.");
   }
   /** 闭包清理 */
   function = input.getExecutionEnvironment().clean(function);
   return reduce(function, new PassThroughWindowFunction<K, W, T>());
}
public <R> SingleOutputStreamOperator<R> reduce(
      ReduceFunction<T> reduceFunction,
      WindowFunction<T, R, K, W> function) {
   TypeInformation<T> inType = input.getType();
   TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
   return reduce(reduceFunction, function, resultType);
}
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
   if (reduceFunction instanceof RichFunction) {
      throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
   }
   function = input.getExecutionEnvironment().clean(function);
   reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
   String callLocation = Utils.getCallLocationName();
   String udfName = "WindowedStream." + callLocation;
   String opName;
   KeySelector<T, K> keySel = input.getKeySelector();
   OneInputStreamOperator<T, R> operator;
   if (evictor != null) {
      @SuppressWarnings({"unchecked", "rawtypes"})
      TypeSerializer<StreamRecord<T>> streamRecordSerializer =
            (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
      ListStateDescriptor<StreamRecord<T>> stateDesc =
            new ListStateDescriptor<>("window-contents", streamRecordSerializer);
      opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
      operator =
            new EvictingWindowOperator<>(windowAssigner,
                  windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                  keySel,
                  input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                  stateDesc,
                  new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
                  trigger,
                  evictor,
                  allowedLateness,
                  lateDataOutputTag);
   } else {
      ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
            reduceFunction,
            input.getType().createSerializer(getExecutionEnvironment().getConfig()));
      opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
      operator =
            new WindowOperator<>(windowAssigner,
                  windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                  keySel,
                  input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                  stateDesc,
                  new InternalSingleValueProcessWindowFunction<>(function),
                  trigger,
                  allowedLateness,
                  lateDataOutputTag);
   }
   return input.transform(opName, resultType, operator);
}

通过对reduce重载方法的逐步调用,会走到上述代码的else逻辑中,这里也是先构建了StreamOperator的具体子类实例。


public <R> SingleOutputStreamOperator<R> transform(String operatorName,
      TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
   SingleOutputStreamOperator<R> returnStream = super.transform(operatorName, outTypeInfo, operator);
   OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
   transform.setStateKeySelector(keySelector);
   transform.setStateKeyType(keyType);
   return returnStream;
}

父类的transform中的逻辑如下:


public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
   /** 读取输入转换的输出类型, 如果是MissingTypeInfo, 则及时抛出异常, 终止操作 */
   transformation.getOutputType();
   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
         this.transformation,
         operatorName,
         operator,
         outTypeInfo,
         environment.getParallelism());
   @SuppressWarnings({ "unchecked", "rawtypes" })
   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
   getExecutionEnvironment().addOperator(resultTransform);
   return returnStream;
}

逻辑与flatMap相似,也是基于StreamOperator构建了一个StreamTransformation的子类OneInputTransformation的实例,然后构建了DataStream的子类SingleOutputStreamOperator的实例,最后也将构建的StreamTransformation的子类实例添加到了StreamExecutionEnvironment的属性transformations这个列表中。


经过上述操作,对数据流中的数据进行分组聚合的操作就完成了。


五、输出统计结果


统计结果的输出如下:


windowCounts.print();

print方法就是在数据流的最后添加了一个Sink,用于承接统计结果。


public DataStreamSink<T> print() {
   PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
   return addSink(printFunction);
}

其中PrintSinkFunction的类继承图如下所示:


image.png


作为一个SinkFunction接口的实现,看下其对invoke方法的实现:


public void invoke(IN record) {
   if (prefix != null) {
      stream.println(prefix + record.toString());
   }
   else {
      stream.println(record.toString());
   }
}


实现逻辑很清晰,就是将记录输出打印。继续看addSink方法:


public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
   transformation.getOutputType();
   if (sinkFunction instanceof InputTypeConfigurable) {
      ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
   }
   StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
   DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
   getExecutionEnvironment().addOperator(sink.getTransformation());
   return sink;
}

实现逻辑与数据源是相似的,先构建StreamOperator,再构建DataStreamSink,在DataStreamSink的构建中,会构造出StreamTransformation实例,最后会将这个StreamTransformation实例添加到StreamExecutionEnvironment的属性transformations这个列表中。


经过上述步骤,就完成了数据流的源构造、数据流的转换操作、数据流的Sink构造,在这个过程中,每次转换都会产生一个新的数据流,而每个数据流下几乎都有一个StreamTransformation的子类实例,对于像flatMap、reduce这些转换得到的数据流里的StreamTransformation会被添加到StreamExecutionEnvironment的属性transformations这个列表中,这个属性在后续构建StreamGraph时会使用到。


另外在这个数据流的构建与转换过程中,每个DataStream中的StreamTransformation的具体子类中都有一个input属性,该属性会记录该StreamTransformation的上游的DataStream的StreamTransformation引用,从而使得整个数据流中的StreamTransformation构成了一个隐式的链表,由于一个数据流可能会转换成多个输出数据流,以及多个输入数据流又有可能会合并成一个输出数据流,确定的说,不是隐式列表,而是一张隐式的图。


述数据转换完成后,就会进行任务的执行,就是执行如下代码:


env.execute("Socket Window WordCount");


这里就会根据上述的转换过程,先生成StreamGraph,再根据StreamGraph生成JobGraph,然后通过客户端提交到集群进行调度执行。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6天前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
80 5
|
2天前
|
SQL 关系型数据库 分布式数据库
Flink问题之程序直接结束如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
73 1
|
3天前
|
SQL 存储 数据处理
Flink SQL 问题之提交程序运行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
22 3
|
23天前
|
SQL 并行计算 大数据
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
关于Flink服务的搭建与部署,由于其涉及诸多实战操作而理论部分相对较少,小编打算采用一个独立的版本和环境来进行详尽的实战讲解。考虑到文字描述可能无法充分展现操作的细节和流程,我们决定以视频的形式进行分析和介绍。因此,在本文中,我们将暂时不涉及具体的搭建和部署步骤。
253 3
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
|
1月前
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
373 1
|
1月前
|
SQL 关系型数据库 MySQL
Apache Flink 和 Paimon 在自如数据集成场景中的使用
Apache Flink 和 Paimon 在自如数据集成场景中的使用
124 0
|
1月前
|
搜索推荐 大数据 数据处理
Apache Flink:开启实时数据流处理的新纪元
Apache Flink 是一个强大的开源数据流处理框架,它引领着实时数据处理的新潮流。本文将介绍 Apache Flink 的基本概念和核心特性,并探讨其在实践中的应用场景和优势。通过深入了解 Apache Flink,我们可以看到它对于大数据处理和分析的重要意义,并且为读者提供了一些实践上的启示。
82 0
|
SQL 架构师 API
《Apache Flink 知其然,知其所以然》系列视频课程
# 课程简介 目前在我的公众号新推出了《Apache Flink 知其然,知其所以然》的系列视频课程。在内容上会先对Flink整体架构和所适用的场景做一个基础介绍,让你对Flink有一个整体的认识!然后对核心概念进行详细介绍,让你深入了解流计算中一些核心术语的含义,然后对Flink 各个层面的API,如 SQL/Table&DataStreamAPI/PythonAPI 进行详细的介绍,以及
1273 0
《Apache Flink 知其然,知其所以然》系列视频课程
|
2月前
|
SQL 运维 API
Apache Flink 学习教程----持续更新
Apache Flink 学习教程----持续更新
90 0
|
2月前
|
Apache 流计算
Apache Flink教程
Apache Flink教程
76 0

推荐镜像

更多