【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3


4.4 reduce 转换


reduce转换的入参是一个ReduceFunction的具体实现,这里的逻辑就是对收到的WordWithCount实例集合,将其中word字段相同的实际的count值累加。


public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
   if (function instanceof RichFunction) {
      throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " +
         "Please use reduce(ReduceFunction, WindowFunction) instead.");
   }
   /** 闭包清理 */
   function = input.getExecutionEnvironment().clean(function);
   return reduce(function, new PassThroughWindowFunction<K, W, T>());
}
public <R> SingleOutputStreamOperator<R> reduce(
      ReduceFunction<T> reduceFunction,
      WindowFunction<T, R, K, W> function) {
   TypeInformation<T> inType = input.getType();
   TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
   return reduce(reduceFunction, function, resultType);
}
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
   if (reduceFunction instanceof RichFunction) {
      throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
   }
   function = input.getExecutionEnvironment().clean(function);
   reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
   String callLocation = Utils.getCallLocationName();
   String udfName = "WindowedStream." + callLocation;
   String opName;
   KeySelector<T, K> keySel = input.getKeySelector();
   OneInputStreamOperator<T, R> operator;
   if (evictor != null) {
      @SuppressWarnings({"unchecked", "rawtypes"})
      TypeSerializer<StreamRecord<T>> streamRecordSerializer =
            (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
      ListStateDescriptor<StreamRecord<T>> stateDesc =
            new ListStateDescriptor<>("window-contents", streamRecordSerializer);
      opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
      operator =
            new EvictingWindowOperator<>(windowAssigner,
                  windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                  keySel,
                  input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                  stateDesc,
                  new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
                  trigger,
                  evictor,
                  allowedLateness,
                  lateDataOutputTag);
   } else {
      ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
            reduceFunction,
            input.getType().createSerializer(getExecutionEnvironment().getConfig()));
      opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
      operator =
            new WindowOperator<>(windowAssigner,
                  windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                  keySel,
                  input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                  stateDesc,
                  new InternalSingleValueProcessWindowFunction<>(function),
                  trigger,
                  allowedLateness,
                  lateDataOutputTag);
   }
   return input.transform(opName, resultType, operator);
}

通过对reduce重载方法的逐步调用,会走到上述代码的else逻辑中,这里也是先构建了StreamOperator的具体子类实例。


public <R> SingleOutputStreamOperator<R> transform(String operatorName,
      TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
   SingleOutputStreamOperator<R> returnStream = super.transform(operatorName, outTypeInfo, operator);
   OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
   transform.setStateKeySelector(keySelector);
   transform.setStateKeyType(keyType);
   return returnStream;
}

父类的transform中的逻辑如下:


public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
   /** 读取输入转换的输出类型, 如果是MissingTypeInfo, 则及时抛出异常, 终止操作 */
   transformation.getOutputType();
   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
         this.transformation,
         operatorName,
         operator,
         outTypeInfo,
         environment.getParallelism());
   @SuppressWarnings({ "unchecked", "rawtypes" })
   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
   getExecutionEnvironment().addOperator(resultTransform);
   return returnStream;
}

逻辑与flatMap相似,也是基于StreamOperator构建了一个StreamTransformation的子类OneInputTransformation的实例,然后构建了DataStream的子类SingleOutputStreamOperator的实例,最后也将构建的StreamTransformation的子类实例添加到了StreamExecutionEnvironment的属性transformations这个列表中。


经过上述操作,对数据流中的数据进行分组聚合的操作就完成了。


五、输出统计结果


统计结果的输出如下:


windowCounts.print();

print方法就是在数据流的最后添加了一个Sink,用于承接统计结果。


public DataStreamSink<T> print() {
   PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
   return addSink(printFunction);
}

其中PrintSinkFunction的类继承图如下所示:


image.png


作为一个SinkFunction接口的实现,看下其对invoke方法的实现:


public void invoke(IN record) {
   if (prefix != null) {
      stream.println(prefix + record.toString());
   }
   else {
      stream.println(record.toString());
   }
}


实现逻辑很清晰,就是将记录输出打印。继续看addSink方法:


public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
   transformation.getOutputType();
   if (sinkFunction instanceof InputTypeConfigurable) {
      ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
   }
   StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
   DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
   getExecutionEnvironment().addOperator(sink.getTransformation());
   return sink;
}

实现逻辑与数据源是相似的,先构建StreamOperator,再构建DataStreamSink,在DataStreamSink的构建中,会构造出StreamTransformation实例,最后会将这个StreamTransformation实例添加到StreamExecutionEnvironment的属性transformations这个列表中。


经过上述步骤,就完成了数据流的源构造、数据流的转换操作、数据流的Sink构造,在这个过程中,每次转换都会产生一个新的数据流,而每个数据流下几乎都有一个StreamTransformation的子类实例,对于像flatMap、reduce这些转换得到的数据流里的StreamTransformation会被添加到StreamExecutionEnvironment的属性transformations这个列表中,这个属性在后续构建StreamGraph时会使用到。


另外在这个数据流的构建与转换过程中,每个DataStream中的StreamTransformation的具体子类中都有一个input属性,该属性会记录该StreamTransformation的上游的DataStream的StreamTransformation引用,从而使得整个数据流中的StreamTransformation构成了一个隐式的链表,由于一个数据流可能会转换成多个输出数据流,以及多个输入数据流又有可能会合并成一个输出数据流,确定的说,不是隐式列表,而是一张隐式的图。


述数据转换完成后,就会进行任务的执行,就是执行如下代码:


env.execute("Socket Window WordCount");


这里就会根据上述的转换过程,先生成StreamGraph,再根据StreamGraph生成JobGraph,然后通过客户端提交到集群进行调度执行。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
89
分享
相关文章
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
457 33
The Past, Present and Future of Apache Flink
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
Apache Flink 2.0.0: 实时数据处理的新纪元
您有一份 Apache Flink 社区年度报告请查收~
您有一份 Apache Flink 社区年度报告请查收~
Apache Flink 2.0:Streaming into the Future
本文整理自阿里云智能高级技术专家宋辛童、资深技术专家梅源和高级技术专家李麟在 Flink Forward Asia 2024 主会场的分享。三位专家详细介绍了 Flink 2.0 的四大技术方向:Streaming、Stream-Batch Unification、Streaming Lakehouse 和 AI。主要内容包括 Flink 2.0 的存算分离云原生化、流批一体的 Materialized Table、Flink 与 Paimon 的深度集成,以及 Flink 在 AI 领域的应用。
740 13
Apache Flink 2.0:Streaming into the Future
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
2044 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
253 0
Flink CDC 在阿里云实时计算Flink版的云上实践
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
245 56
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等