Flink / Scala - DataStream Transformations 常用转换函数详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文介绍 Flink 的主要数据形式: DataStream,即流式数据的常用转换函数,通过 Transformation 可以将一个 DataStream 转换为新的 DataStream。

 一.引言

本文介绍 Flink 的主要数据形式: DataStream,即流式数据的常用转换函数,通过 Transformation 可以将一个 DataStream 转换为新的 DataStream。

Tips:

下述介绍 demo 均采用如下 case class 作为数据类型,并通过自定义的 SourceFromCycle 函数每s 生成10个元素。特别注意 Source 函数还增加了 isWait 参数,控制该 Source 是否延迟 3s 生成数据。全局 env 为 StreamExecutionEnvironment。

case class Data(num: Int)
  // 每s生成一批数据
  class SourceFromCycle(isWait: Boolean = false) extends RichSourceFunction[Data] {
    private var isRunning = true
    var start = 0
    override def run(ctx: SourceFunction.SourceContext[Data]): Unit = {
      if (isWait) {
        TimeUnit.SECONDS.sleep(3)
      }
      while (isRunning) {
        (start until (start + 100)).foreach(num => {
          ctx.collect(Data(num))
          if (num % 10 == 0) {
            TimeUnit.SECONDS.sleep(1)
          }
        })
        start += 100
      }
    }
    override def cancel(): Unit = {
      isRunning = false
    }
  }
  val env = StreamExecutionEnvironment.getExecutionEnvironment

image.gif

二.常用 Transformation

1.Map - DataStream → DataStream

处理单个元素并返回单个元素。mapDemo 针对每个元素返回该 num 与 num+1,最终 sink 保存至文件。

def mapDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(1)
    val transformationStream = dataStream.map(data => {
      (data.num, data.num + 1)
    })
    val output = "./output/"
    transformationStream.writeAsText(output, WriteMode.OVERWRITE)
  }

image.gif

image.gif编辑

2.Filter - DataStream → DataStream

对每个元素计算布尔函数,并保留该函数返回true的元素。filterDemo 仅保留1开头的数据。

def filterDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(1)
    dataStream.filter(data => {
      data.num.toString.startsWith("1")
    }).print()
  }

image.gif

image.gif编辑

3.FlatMap - DataStream → DataStream

通过一个元素生成0个、一个或多个元素。FlatMapDemo 返回自身与自身+1的 tuple。0 返回 0,1、1返回1,2 ... 以此类推。

def flatMapDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(1)
    dataStream.flatMap(data => {
      val info = ArrayBuffer[Data]()
      info.append(data)
      info.append(Data(data.num + 1))
      info.iterator
    }).print()
  }

image.gif

image.gif编辑

4.KeyBy - DataStream → KeyedStream

将流划分为不相连的分区,具有相同 key 的记录会被分配到相同的分区, keyBy() 内部是通过 Hash 分区实现的,可以通过不同的方法指定 key。下面使用每个数字的第一位对数字进行分区,可以看到对应 TaskId 下的数字都有相同的第一位。Tips: 获取 TaskId 通过 RuntimeContext 得到。

def keyByDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(5)
    dataStream.keyBy(data => {
      data.num.toString.slice(0, 1)
    }).process(new ProcessFunction[Data, (Int, Int)] {
      override def processElement(i: Data, context: ProcessFunction[Data, (Int, Int)]#Context, collector: Collector[(Int, Int)]): Unit = {
        val taskId = getRuntimeContext.getIndexOfThisSubtask
        collector.collect((taskId, i.num))
      }
    }).print()
  }

image.gif

image.gif编辑

5.Reduce - KeyedStream → DataStream

键控数据流上的“滚动”减记。将当前元素与最近的简化值组合,并发出新值。下述 reduce 聚合将相同开头的数字结果进行累加。由于是动态累加,所以产出的数字没有规律。

def reduceDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(5)
    dataStream.keyBy(data => {
      data.num.toString.slice(0, 1)
    }).reduce(new ReduceFunction[Data] {
      override def reduce(o1: Data, o2: Data): Data = {
        Data(o1.num + o2.num)
      }
    }).print()
  }

image.gif

image.gif编辑

6.Window - KeyedStream → WindowedStream

Windows可以在已经分区的KeyedStreams上定义。Windows根据某些特征(例如,在最近5秒内到达的数据)对每个键中的数据进行分组。根据类型不同有滑动窗口与滚动窗口,下述 Demo 采用第一位数字分区,并生成5s间隔的窗口,窗口内包含5s内全部的数据。

def windowDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(5)
    dataStream.keyBy(data => {
      data.num.toString.slice(0, 1)
    }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new ProcessWindowFunction[Data, String, String, TimeWindow] {
        override def process(key: String, context: Context, elements: Iterable[Data], out: Collector[String]): Unit = {
          val log = key + "\t" + elements.toArray.mkString(",")
          out.collect(log)
        }
      })
      .print()
  }

image.gif

image.gif编辑

7.WindowAll - DataStream → AllWindowedStream

Windows可以在常规的数据流上定义。Windows根据某些特征(例如,在最近5秒内到达的数据)对所有流事件进行分组。Window 和 WindowAll 都是聚合指定时间内的数据,差别在于 Window 聚合每个分区的数据,即将相同 key 的数据聚合,所以会生成 HashNum 个 window,而 WindowAll 汇聚规定时间内的全部数据,不区分 key,所以其并行度只有1。由于 WindowAll 不区分分区,所以看到 windowAll 得到的窗口中包含的数据很多,而 window 得到的窗口中数据少,但都具有相同的 key 即首数字相同。

def windowAllDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(5)
    dataStream.keyBy(data => {
      data.num.toString.slice(0, 1)
    }).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new ProcessAllWindowFunction[Data, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[Data], out: Collector[String]): Unit = {
          val log = elements.toArray.map(_.num).mkString(",")
          out.collect(log)
        }
      })
      .print()
  }

image.gif

image.gif编辑

8.WindowReduce - WindowedStream → DataStream

将函数reduce函数应用到窗口并返回减少后的值。下述方法将 windowAll 内的全部数字累加并返回为 DataStream,每个数字为 5s 内所有数字的总和。

def windowReduceDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(5)
    dataStream.keyBy(data => {
      data.num.toString.slice(0, 1)
    }).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .reduce(new ReduceFunction[Data] {
        override def reduce(o1: Data, o2: Data): Data = {
          Data(o1.num + o2.num)
        }
      }).print()
  }

image.gif

image.gif编辑

9.Union - DataStream → DataStream

合并两个或多个数据流,创建一个包含所有流中的所有元素的新流。注意:如果你将一个数据流与它本身合并,你将在结果流中获得每个元素两次。由于 union 了数据流本身,所以每个元素可以获得两次。

def unionDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(5)
    dataStream.union(dataStream).print()
  }

image.gif

image.gif编辑

10.Join - DataStream,DataStream → DataStream

在给定的键和公共窗口上连接两个数据流。将两流中的数据按照指定 key 进行连接并处理。下述 Demo 将相同数字连接并求和。

def joinDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(5)
    dataStream
      .join(dataStream)
      .where(x => x.num).equalTo(x => x.num)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
      .apply(new JoinFunction[Data, Data, String] {
        override def join(in1: Data, in2: Data): String = {
          val out = in1.num.toString + " + " + in2.num.toString + " = " + (in1.num + in2.num).toString
          out
        }
      }).print()
  }

image.gif

image.gif编辑

11.InnerJoin - KeyedStream,KeyedStream → DataStream

在给定的时间间隔内用公共键连接两个键流中的两个元素,其中元素需满足:

e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

image.gif

简言之就是两条相同 key 的数据只有在规定时间的上下界内才会被连接,如果超时即使 key 相同也不会聚合。

def windowInnerJoinDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle()).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Data] {
      override def extractAscendingTimestamp(t: Data): Long = System.currentTimeMillis()
    })
    val dataStreamOther = env.addSource(new SourceFromCycle()).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Data] {
      override def extractAscendingTimestamp(t: Data): Long = System.currentTimeMillis()
    })
    env.setParallelism(5)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    dataStream.keyBy(_.num).intervalJoin(dataStreamOther.keyBy(_.num))
      .between(Time.seconds(-1), Time.milliseconds(1))
      .upperBoundExclusive()
      .lowerBoundExclusive()
      .process((in1: Data, in2: Data, context: ProcessJoinFunction[Data, Data, String]#Context, collector: Collector[String]) => {
        val out = in1.num.toString + " + " + in2.num.toString + " = " + (in1.num + in2.num).toString
        collector.collect(out)
      })
      .print()
  }

image.gif

上述 demo 设置时间为 -1s -> 1s,即前后共 2s 的时间容忍度,首先运行下述示例,其中 isWait 参数均为 false,即数据流均不延迟,产出正常。

val dataStream = env.addSource(new SourceFromCycle(isWait = false)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Data] {
      override def extractAscendingTimestamp(t: Data): Long = System.currentTimeMillis()
    })
    val dataStreamOther = env.addSource(new SourceFromCycle()).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Data] {
      override def extractAscendingTimestamp(t: Data): Long = System.currentTimeMillis()
    })

image.gif

数据可以正常联结。

image.gif编辑

下面将第一个流的 isWait 参数设置为 true:

val dataStream = env.addSource(new SourceFromCycle(isWait = true)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Data] {
      override def extractAscendingTimestamp(t: Data): Long = System.currentTimeMillis()
    })
    val dataStreamOther = env.addSource(new SourceFromCycle()).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Data] {
      override def extractAscendingTimestamp(t: Data): Long = System.currentTimeMillis()
    })

image.gif

由于 isWait = true 时生成数据会延迟 3s,超过容忍度的 2s,所以数据无法联结,无数据产出。

12.WindowCoGroup - DataStream,DataStream → DataStream

在给定的键和公共窗口上对两个数据流进行协组。将相同 key 的数据组成 window 并按照该 key 将两流对应 key 的数据同时处理,下述 demo 处理两流中 5s 内首数字相同的所有数字。

def windowCoGroup(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(5)
    dataStream
      .coGroup(dataStream)
      .where(_.num.toString.head)
      .equalTo(_.num.toString.head)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .apply {
        (t1: Iterator[Data], t2: Iterator[Data], out: Collector[String]) =>
          val t1Output = t1.toArray.mkString(",")
          val t2Output = t2.toArray.mkString(",")
          val output = t1Output + "\t" + t2Output
          out.collect(output)
      }.print()
  }

image.gif

image.gif编辑

13.Connect - DataStream,DataStream → ConnectedStream

“连接”两个保持其类型的数据流。连接允许两个流之间的共享状态。Connect 有两种用处,一种是合并两个数据流,这里与 union 有一些不同,union 合并相同类型的数据流,即 Stream1,Stream2 都必须为 DataStream[T],Connect 可以合并不同类型的数据流,单数需要分别处理并最终 sink 相同的数据类型 T,例如 Stream1 为类型 A,Stream2 为类型 B,经过处理,二者都返回 T,则可以使用 Connect。第二种用法是 BroadcastStream,作为广播变量供另一个流共享,可以参考 Flink / Scala - DataStream Broadcast State 模式示例详解。下述 demo 对两个流的数据通过两个 Process 方法分开处理。

def connectDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    env.setParallelism(5)
    val connectStream = dataStream.connect(dataStream)
    connectStream.process(new CoProcessFunction[Data, Data, String] {
      override def processElement1(in1: Data, context: CoProcessFunction[Data, Data, String]#Context, collector: Collector[String]): Unit = {
        collector.collect("[Stream1]-" + in1.num.toString)
      }
      override def processElement2(in2: Data, context: CoProcessFunction[Data, Data, String]#Context, collector: Collector[String]): Unit = {
        collector.collect("[Stream2]-" + in2.num.toString)
      }
    }).print()
  }

image.gif

image.gif编辑

14.CoMap - ConnectedStream → DataStream

类似于连接数据流上的map和flatMap。下述 demo 对两个流数据单独处理并汇总。

def windowCoMapDemo(env: StreamExecutionEnvironment): Unit = {
    env.setParallelism(5)
    val dataStream = env.addSource(new SourceFromCycle())
    dataStream.connect(dataStream).map(new CoMapFunction[Data, Data, String] {
      override def map1(in1: Data): String = "[1]:" + in1.num
      override def map2(in2: Data): String = "[2]:" + in2.num
    }).print()
  }

image.gif

image.gif编辑

15.CoFlatMap - ConnectedStream → DataStream

基本用法同上,可以返回0或多个数据,所以这里为 flatMap 函数提供了 Collector,可以生成多个数据,而 CoMap 则直接使用 map 函数构成 1-1 对应的关系。

def windowCoFlatMapDemo(env: StreamExecutionEnvironment): Unit = {
    env.setParallelism(5)
    val dataStream = env.addSource(new SourceFromCycle())
    dataStream.connect(dataStream).flatMap(new CoFlatMapFunction[Data, Data, String] {
      override def flatMap1(in1: Data, collector: Collector[String]): Unit = {
        collector.collect("[1]-" + in1.num)
        collector.collect("[1 pow2]-" + in1.num * in1.num)
      }
      override def flatMap2(in2: Data, collector: Collector[String]): Unit = {
        collector.collect("[2]-" + in2.num)
        collector.collect("[2 pow2]-" + in2.num * in2.num)
      }
    }).print()
  }

image.gif

16.Iterate - DataStream → IterativeStream → ConnectedStream

通过将一个操作符的输出重定向到前一个操作符,在流中创建一个“反馈”循环。这对于定义不断更新模型的算法特别有用。下面的代码从一个流开始,并连续地应用迭代体。大于0的元素被发送回反馈通道,其余的元素被下游转发。反馈循环流,该处理方式会将流内数据分为反馈流和下发流,前者的数据会持续循环的进行处理和迭代,而后者则会输出到 sink 结束其周期,多用于模型迭代,例如正样本匮乏的场景可以多次使用正样本反馈,而负样本则定时丢弃。下述 Demo 持续保留偶数,输出奇数。第一次 appear 奇偶数均有机会参与,但是到输出时只有 Data(奇数) 输出因为 Data(偶数) 继续进行反馈迭代。

def windowIterateDemo(env: StreamExecutionEnvironment): Unit = {
    val dataStream = env.addSource(new SourceFromCycle())
    dataStream.iterate {
      iteration => {
        val iterationBody = iteration.map(data => {
          println(data.num + " " + "appear!")
          data
        })
        // 反馈流,持续参与迭代 && 输出流,离开迭代
        (iterationBody.filter(_.num % 2 == 0).setParallelism(1), iterationBody.filter(_.num % 2 != 0).setParallelism(1))
      }
    }.print()
  }

image.gif

image.gif编辑

三.总结

image.gif编辑

Flink Stream 的基本 Transformation 大致就这些,以上方法官方 API 只给出简易 demo 无法运行所以匹配了数字的简单实例,一些 API 和 ProcessFunction 可能由于 Flink 版本不同有一些写法的不同,不过整体思路与思想不会改变,这里也顺便整理下 Flink Source -> Transformation -> Sink 的知识点。

FlinkDataSet Source -> Flink / Scala - DataSource 之 DataSet 获取数据总结

FlinkDataStream Source -> Flink / Scala - DataSource 之 DataStream 获取数据总结

FlinkDataSet Transformation -> Flink / Scala - DataSet Transformations 常用转换函数详解

FlinkDataStream Transformation -> Flink / Scala - DataStream Transformations 常用转换函数

Flink DataSet Sink -> Flink / Scala - DataSet Sink 输出数据详解

Flink DataStream Sink -> Flink / Scala - DataStream Sink 输出数据详解

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
SQL Oracle 关系型数据库
Flink的表值函数
【2月更文挑战第18天】Flink的表值函数
16 3
|
1月前
|
SQL Oracle 关系型数据库
Flink的表值函数(Table-Valued Function,TVF)是一种返回值是一张表的函数
【2月更文挑战第17天】Flink的表值函数(Table-Valued Function,TVF)是一种返回值是一张表的函数
20 1
|
1月前
|
SQL 存储 Apache
在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
【2月更文挑战第16天】在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
186 2
|
2月前
|
消息中间件 SQL canal
Flink转换问题之DataStream转成table失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Apache
Flink报错问题之使用hive udf函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
消息中间件 Java Kafka
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
Flink中的DataStream和DataSet有什么区别?请解释其概念和用途。
30 0
|
8月前
|
存储 API 流计算
Flink DataStream API-概念、模式、作业流程和程序
前几篇介绍了Flink的入门、架构原理、安装等,相信你对Flink已经了解入门。接下来开始介绍Flink DataStream API内容,先介绍DataStream API基本概念和使用,然后介绍核心概念,最后再介绍经典案例和代码实现。本篇内容:Flink DataStream API的概念、模式、作业流程和程序。
Flink DataStream API-概念、模式、作业流程和程序
|
8月前
|
并行计算 API 流计算
Flink之处理函数 (ProcessFunction)1
Flink之处理函数 (ProcessFunction)
75 0
|
8月前
|
Java 程序员 网络安全
Flink处理函数实战之四:窗口处理
学习Flink低阶处理函数中的ProcessAllWindowFunction和ProcessWindowFunction
Flink处理函数实战之四:窗口处理
|
4月前
|
SQL 传感器 分布式计算
Flink(五)【DataStream 转换算子(上)】
Flink(五)【DataStream 转换算子(上)】

热门文章

最新文章