【Flink】(三)详解 Flink DataStream API(Environment、Source、Transform、Sink)1

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(三)详解 Flink DataStream API(Environment、Source、Transform、Sink)1

文章目录


一、Environment

     1.1 getExecutionEnvironment

1.2 createLocalEnvironment

1.3 createRemoteEnvironment


二、Source

2.1 从集合读取数据

2.2 从文件读取数据

2.3 以 kafka 消息队列的数据作为来源

2.4 自定义 Source


三、转换算子(Transform)

3.1 map

3.2 flatMap

3.3 Filter

3.4 KeyBy

3.5 滚动聚合算子(Rolling Aggregation)

3.6 Reduce

3.7 Split 和 Select

3.8 Connect 和 CoMap

3.9 Union

Connect 与 Union 区别:


四、支持的数据类型

4.1 基础数据类型

4.2 Java 和 Scala 元组(Tuples)

4.3 Scala 样例类(case classes)

4.4 Java 简单对象(POJOs)

4.5 其它(Arrays, Lists, Maps, Enums, 等等)


五、实现 UDF 函数——更细粒度的控制流

5.1 函数类(Function Classes)

5.2 匿名函数(Lambda Functions)

5.3 富函数(Rich Functions)


六、Sink

6.1 Kafka

6.2 Redis

6.3 Elasticsearch

6.4 JDBC 自定义 sink


七、总结


一、Environment


1.1 getExecutionEnvironment


创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。


val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment


val env = StreamExecutionEnvironment.getExecutionEnvironment


如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1


1.2 createLocalEnvironment


返回本地执行环境,需要在调用时指定默认的并行度。


val env = StreamExecutionEnvironment.createLocalEnvironment(1)

1.3 createRemoteEnvironment


返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。


val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")


二、Source


2.1 从集合读取数据


// 定义样例类,传感器 id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object Sensor {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 // 从自定义集合中读取数据
 val stream1 = env
  .fromCollection(List(
    SensorReading("sensor_1", 1547718199, 35.80018327300259),
    SensorReading("sensor_6", 1547718201, 15.402984393403084),
    SensorReading("sensor_7", 1547718202, 6.720945201171228),
    SensorReading("sensor_10", 1547718205, 38.101067604893444)
 ))
 stream1.print("stream1:").setParallelism(1)
 env.execute()
 } }

2.2 从文件读取数据


val stream2 = env.readTextFile("YOUR_FILE_PATH")

2.3 以 kafka 消息队列的数据作为来源


需要引入 kafka 连接器的依赖:


pom.xml
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
 <version>1.7.2</version>
</dependency>


具体代码如下:


val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group") 
properties.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new 
SimpleStringSchema(), properties))

2.4 自定义 Source


除了以上的 source 数据来源,我们还可以自定义 source,主要是用于测试环境。需要做的,只是传入一个 SourceFunction 就可以。具体调用如下:


val stream4 = env.addSource( new MySensorSource() )

我们希望可以随机生成传感器数据,MySensorSource 具体的代码实现如下:


class MySensorSource extends SourceFunction[SensorReading]{
// flag: 表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = {
  running = false
}
  // 正常生成数据
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
  // 初始化一个随机数发生器
  val rand = new Random()
  var curTemp = 1.to(10).map(
  i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 ) )
  // 用无线循环,产生数据流
  while(running){
  // 更新温度值
  curTemp = curTemp.map(
  t => (t._1, t._2 + rand.nextGaussian() )
  )
  // 获取当前时间戳
  val curTime = System.currentTimeMillis()
  curTemp.foreach(
  t => ctx.collect(SensorReading(t._1, curTime, t._2))
  )
  Thread.sleep(100) 
  } 
  }
 }

三、转换算子(Transform)


3.1 map


val streamMap = stream.map { x => x * 2 }


3.2 flatMap


flatMap 的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]

例如: flatMap(List(1,2,3))(i ⇒ List(i,i))

结果是 List(1,1,2,2,3,3),

而 List(“a b”, “c d”).flatMap(line ⇒ line.split(" "))

结果是 List(a, b, c, d)。


val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}


3.3 Filter


val streamFilter = stream.filter{
    x => x == 1
}


3.4 KeyBy


DataStream → KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。


3.5 滚动聚合算子(Rolling Aggregation)


这些算子可以针对 KeyedStream 的每一个支流做聚合。


sum()

min()

max()

minBy()

maxBy()


3.6 Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

val stream2 = env.readTextFile("YOUR_PATH\\sensor.txt")
 .map( data => {
 val dataArray = data.split(",")
 SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, 
dataArray(2).trim.toDouble)
 })
  .keyBy("id")
 .reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature) )


3.7 Split 和 Select


Split



DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。


Select



SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。


需求:传感器数据按照温度高低(以 30 度为界),拆分成两个流。


val splitStream = stream2
 .split( sensorData => {
 if (sensorData.temperature > 30) Seq("high") else Seq("low")
} )
val high = splitStream.select("high")
val low = splitStream.select("low")
val all = splitStream.select("high", "low")


3.8 Connect 和 CoMap


DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。


CoMap,CoFlatMap



ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map 和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。


val warning = high.map( sensorData => (sensorData.id, 
sensorData.temperature) )
val connected = warning.connect(low)
val coMap = connected.map(
 warningData => (warningData._1, warningData._2, "warning"),
 lowData => (lowData.id, "healthy") )


3.9 Union


DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream。

//合并以后打印
val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)
unionStream.print("union:::")

Connect 与 Union 区别:


Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。


Connect 只能操作两个流,Union 可以操作多个。



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
Java 开发工具 流计算
flink最新master代码编译出现Java Runtime Environment 问题
在尝试编译Flink源码时遇到Java运行时环境致命错误:EXCEPTION_ACCESS_VIOLATION。问题出现在JVM.dll+0x88212。使用的是Java 11.0.28和Java HotSpot(TM) 64-Bit Server VM。系统为Windows客户端,没有生成核心dump文件。错误日志保存在hs_err_pid39364.log和replay_pid39364.log。要解决这个问题,建议检查JDK版本兼容性,更新JDK或参照错误报告文件提交Bug至http://bugreport.java.com/bugreport/crash.jsp。
|
1天前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
121 3
|
16天前
|
存储 算法 API
Flink DataStream API 批处理能力演进之路
本文由阿里云 Flink 团队郭伟杰老师撰写,旨在向 Flink Batch 社区用户介绍 Flink DataStream API 批处理能力的演进之路。
261 1
Flink DataStream API 批处理能力演进之路
|
20天前
|
消息中间件 关系型数据库 MySQL
[flink 实时流基础] 输出算子(Sink)
[flink 实时流基础] 输出算子(Sink)
|
20天前
|
Java 大数据 API
[AIGC] Flink入门教程:理解DataStream API(Java版)
[AIGC] Flink入门教程:理解DataStream API(Java版)
|
2月前
|
SQL 分布式计算 测试技术
Flink API的4个层次
【2月更文挑战第28天】
|
2月前
|
消息中间件 SQL Kafka
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
本文整理自阿里云实时计算团队 Apache Flink Committer 和 PMC Member 任庆盛在 FFA 2023 核心技术专场(二)中的分享。
304 1
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
|
2月前
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
70 1
|
SQL 消息中间件 API
Flink关系型API的公共部分
关系型程序的公共部分 下面的代码段展示了Table&SQL API所编写流式程序的程序模式: val env = StreamExecutionEnvironment.getExecutionEnvironment //创建TableEnvironment对象 val tableEnv = TableEnvironment.
2729 0
|
3月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
598 5