【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析2

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析2


四、操作数据流


进行具体的转换操作:


DataStream<WordWithCount> windowCounts = text
        .flatMap(new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String value, Collector<WordWithCount> out) {
                for (String word : value.split("\\s")) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        })
        .keyBy("word")
        .timeWindow(Time.seconds(5), Time.seconds(1))
        .reduce(new ReduceFunction<WordWithCount>() {
            @Override
            public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                return new WordWithCount(a.word, a.count + b.count);
            }
        });

这段逻辑中,对数据流做了四次操作,分别是flatMap、keyBy、timeWindow、reduce,接下来分别介绍每个转换都做了些什么操作。


4.1 flatMap 转换


flatMap的入参是一个FlatMapFunction的具体实现,功能就是将接收到的字符串,按空格切割成不同单词,然后每个单词构建一个WordWithCount实例,然后向下游转发,用于后续的数据统计。然后调用DataStream的flatMap方法,进行数据流的转换,如下:


public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
   TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
         getType(), Utils.getCallLocationName(), true);
   /** 根据传入的flatMapper这个Function,构建StreamFlatMap这个StreamOperator的具体子类实例 */
   return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
   /** 读取输入转换的输出类型, 如果是MissingTypeInfo, 则及时抛出异常, 终止操作 */
   transformation.getOutputType();
   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
         this.transformation,
         operatorName,
         operator,
         outTypeInfo,
         environment.getParallelism());
   @SuppressWarnings({ "unchecked", "rawtypes" })
   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
   getExecutionEnvironment().addOperator(resultTransform);
   return returnStream;
}

整个构建过程,与构建数据源的过程相似。


a、先根据传入的flatMapper这个Function构建一个StreamOperator的具体子类StreamFlatMap的实例;

b、根据a中构建的StreamFlatMap的实例,构建出OneInputTransFormation这个StreamTransformation的子类的实例;

c、再构建出DataStream的子类SingleOutputStreamOperator的实例。


除了构建出了 SingleOutputStreamOperator 这个实例为并返回外,还有代码:


getExecutionEnvironment().addOperator(resultTransform);
public void addOperator(StreamTransformation<?> transformation) {
   Preconditions.checkNotNull(transformation, "transformation must not be null.");
   this.transformations.add(transformation);
}

就是将上述构建的OneInputTransFormation的实例,添加到了StreamExecutionEnvironment的属性transformations这个类型为List。


4.2 keyBy 转换


这里的keyBy转换,入参是一个字符串”word”,意思是按照WordWithCount中的word字段进行分区操作。


public KeyedStream<T, Tuple> keyBy(String... fields) {
   return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
}

先根据传入的字段名数组,以及数据流的输出数据类型信息,构建出用来描述key的ExpressionKeys的实例,ExpressionKeys有两个属性:


/** key字段的列表, FlatFieldDescriptor 描述了每个key, 在所在类型中的位置以及key自身的数据类信息 */
private List<FlatFieldDescriptor> keyFields;
/** 包含key的数据类型的类型信息, 与构造函数入参中的字段顺序一一对应 */
private TypeInformation<?>[] originalKeyTypes;


在获取key的描述之后,继续调用keyBy的重载方法:


private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
   return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
         getType(), getExecutionConfig())));
}

这里首先构建了一个KeySelector的子类ComparableKeySelector的实例,作用就是从具体的输入实例中,提取出key字段对应的值(可能是多个key字段)组成的元组(Tuple)。


对于这里的例子,就是从每个WordWithCount实例中,提取出word字段的值。


然后构建一个KeyedStream的实例,KeyedStream也是DataStream的子类。构建过程如下:


public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
   this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
}
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
   super(
      dataStream.getExecutionEnvironment(),
      new PartitionTransformation<>(
         dataStream.getTransformation(),
         new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
   this.keySelector = keySelector;
   this.keyType = validateKeyType(keyType);
}


在进行父类构造函数调用之前,先基于keySelector构造了一个KeyGroupStreamPartitioner的实例,再进一步构造了一个PartitionTransformation实例。


这里与flatMap的转换略有不同:


a、flatMap中,根据传入的flatMapper这个Function构建的是StreamOperator这个接口的子类的实例,而keyBy中,则是根据keySelector构建了ChannelSelector接口的子类实例;

b、keyBy中构建的StreamTransformation实例,并没有添加到StreamExecutionEnvironment的属性transformations这个列表中。


ChannelSelector只有一个接口,根据传入的数据流中的具体数据记录,以及下个节点的并行度来决定该条记录需要转发到哪个通道。


public interface ChannelSelector<T extends IOReadableWritable> {
   int[] selectChannels(T record, int numChannels);
}
    KeyGroupStreamPartitioner中该方法的实现如下:
public int[] selectChannels(
   SerializationDelegate<StreamRecord<T>> record,
   int numberOfOutputChannels) {
   K key;
   try {
      /** 通过keySelector从传入的record中提取出对应的key */
      key = keySelector.getKey(record.getInstance().getValue());
   } catch (Exception e) {
      throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
   }
   /** 根据提取的key,最大并行度,以及输出通道数,决定出record要转发到的通道编号 */
   returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);
   return returnArray;
}


再进一步看一下KeyGroupRangerAssignment的assignKeyToParallelOperator方法的实现逻辑。


public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
   return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
public static int assignToKeyGroup(Object key, int maxParallelism) {
   return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
   return MathUtils.murmurHash(keyHash) % maxParallelism;
}
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
   return keyGroupId * parallelism / maxParallelism;
}


a、先通过key的hashCode,算出maxParallelism的余数,也就是可以得到一个[0, maxParallelism)的整数;

b、在通过公式 keyGroupId * parallelism / maxParallelism ,计算出一个[0, parallelism)区间的整数,从而实现分区功能。


4.3 timeWindow 转换


这里timeWindow转换的入参是两个时间,第一个参数表示窗口长度,第二个参数表示窗口滑动的时间间隔。


public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
   if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
      return window(SlidingProcessingTimeWindows.of(size, slide));
   } else {
      return window(SlidingEventTimeWindows.of(size, slide));
   }
}


根据环境配置的数据流处理时间特征构建不同的WindowAssigner的具体实例。


WindowAssigner的功能就是对于给定的数据流中的记录,决定出该记录应该放入哪些窗口中,并提供触发器等供。默认的时间特征是ProcessingTime,所以这里会构建一个SlidingProcessingTimeWindow实例,来看下SlidingProcessingTimeWindow类的assignWindows方法的实现逻辑。


public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
   /** 根据传入的WindowAssignerContext获取当前处理时间 */
   timestamp = context.getCurrentProcessingTime();
   List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
   /** 获取最近一次的窗口的开始时间 */
   long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
   /** 循环找出满足条件的所有窗口 */
   for (long start = lastStart;
      start > timestamp - size;
      start -= slide) {
      windows.add(new TimeWindow(start, start + size));
   }
   return windows;
}


看一下根据给定时间戳获取最近一次的窗口的开始时间的实现逻辑。


public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
   return timestamp - (timestamp - offset + windowSize) % windowSize;
}

通过一个例子来解释上述代码的逻辑。比如:


a、timestamp = 1520406257000 // 2018-03-07 15:04:17

b、offset = 0

c、windowSize = 60000

d、(timestamp - offset + windowSize) % windowSize = 17000

e、说明在时间戳 1520406257000 之前最近的窗口是在 17000 毫秒的地方

f、timestamp - (timestamp - offset + windowSize) % windowSize = 1520406240000 // 2018-03-07 15:04:00

g、这样就可以保证每个时间窗口都是从整点开始, 而offset则是由于时区等原因需要时间调整而设置。


通过上述获取WindowAssigner的子类实例后,调用window方法:


public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
   return new WindowedStream<>(this, assigner);
}

比keyBy转换的逻辑还简单,就是构建了一个WindowedStream实例,然后返回,就结束了。而WindowedStream是一个新的数据流,不是DataStream的子类。


WindowedStream描述一个数据流中的元素会基于key进行分组,并且对于每个key,对应的元素会被划分到多个时间窗口内。然后窗口会基于触发器,将对应窗口中的数据转发到下游节点。



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
12天前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
163 5
|
8天前
|
SQL 关系型数据库 分布式数据库
Flink问题之程序直接结束如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
76 1
|
8天前
|
SQL 存储 数据处理
Flink SQL 问题之提交程序运行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
25 3
|
28天前
|
SQL 并行计算 大数据
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
关于Flink服务的搭建与部署,由于其涉及诸多实战操作而理论部分相对较少,小编打算采用一个独立的版本和环境来进行详尽的实战讲解。考虑到文字描述可能无法充分展现操作的细节和流程,我们决定以视频的形式进行分析和介绍。因此,在本文中,我们将暂时不涉及具体的搭建和部署步骤。
372 3
【大数据技术攻关专题】「Apache-Flink零基础入门」手把手+零基础带你玩转大数据流式处理引擎Flink(基础加强+运行原理)
|
2月前
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
380 1
|
2月前
|
SQL 关系型数据库 MySQL
Apache Flink 和 Paimon 在自如数据集成场景中的使用
Apache Flink 和 Paimon 在自如数据集成场景中的使用
130 0
|
2月前
|
搜索推荐 大数据 数据处理
Apache Flink:开启实时数据流处理的新纪元
Apache Flink 是一个强大的开源数据流处理框架,它引领着实时数据处理的新潮流。本文将介绍 Apache Flink 的基本概念和核心特性,并探讨其在实践中的应用场景和优势。通过深入了解 Apache Flink,我们可以看到它对于大数据处理和分析的重要意义,并且为读者提供了一些实践上的启示。
82 0
|
3月前
|
SQL 运维 API
Apache Flink 学习教程----持续更新
Apache Flink 学习教程----持续更新
140 0
|
3月前
|
Apache 流计算
Apache Flink教程
Apache Flink教程
128 0
|
2月前
|
SQL 存储 关系型数据库
Apache Flink 和 Paimon 在自如数据集成场景中的使用
自如目前线上有基于 Hive 的离线数仓和基于 Flink、Kafka 的实时数仓,随着业务发展,我们也在探索引入湖仓一体的架构更好的支持业务,我们对比了 Iceberg、Hudi、Paimon 后,最终选择 Paimon 作为我们湖仓一体的存储引擎,本文分享下自如在引入 Paimon 做数据集成的一些探索实践。
663 1
Apache Flink 和 Paimon 在自如数据集成场景中的使用

热门文章

最新文章

推荐镜像

更多