【Flink】(三)详解 Flink DataStream API(Environment、Source、Transform、Sink)1

简介: 【Flink】(三)详解 Flink DataStream API(Environment、Source、Transform、Sink)1

文章目录


一、Environment

     1.1 getExecutionEnvironment

1.2 createLocalEnvironment

1.3 createRemoteEnvironment


二、Source

2.1 从集合读取数据

2.2 从文件读取数据

2.3 以 kafka 消息队列的数据作为来源

2.4 自定义 Source


三、转换算子(Transform)

3.1 map

3.2 flatMap

3.3 Filter

3.4 KeyBy

3.5 滚动聚合算子(Rolling Aggregation)

3.6 Reduce

3.7 Split 和 Select

3.8 Connect 和 CoMap

3.9 Union

Connect 与 Union 区别:


四、支持的数据类型

4.1 基础数据类型

4.2 Java 和 Scala 元组(Tuples)

4.3 Scala 样例类(case classes)

4.4 Java 简单对象(POJOs)

4.5 其它(Arrays, Lists, Maps, Enums, 等等)


五、实现 UDF 函数——更细粒度的控制流

5.1 函数类(Function Classes)

5.2 匿名函数(Lambda Functions)

5.3 富函数(Rich Functions)


六、Sink

6.1 Kafka

6.2 Redis

6.3 Elasticsearch

6.4 JDBC 自定义 sink


七、总结


一、Environment


1.1 getExecutionEnvironment


创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。


val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment


val env = StreamExecutionEnvironment.getExecutionEnvironment


如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1


1.2 createLocalEnvironment


返回本地执行环境,需要在调用时指定默认的并行度。


val env = StreamExecutionEnvironment.createLocalEnvironment(1)

1.3 createRemoteEnvironment


返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。


val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")


二、Source


2.1 从集合读取数据


// 定义样例类,传感器 id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object Sensor {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 // 从自定义集合中读取数据
 val stream1 = env
  .fromCollection(List(
    SensorReading("sensor_1", 1547718199, 35.80018327300259),
    SensorReading("sensor_6", 1547718201, 15.402984393403084),
    SensorReading("sensor_7", 1547718202, 6.720945201171228),
    SensorReading("sensor_10", 1547718205, 38.101067604893444)
 ))
 stream1.print("stream1:").setParallelism(1)
 env.execute()
 } }

2.2 从文件读取数据


val stream2 = env.readTextFile("YOUR_FILE_PATH")

2.3 以 kafka 消息队列的数据作为来源


需要引入 kafka 连接器的依赖:


pom.xml
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
 <version>1.7.2</version>
</dependency>


具体代码如下:


val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group") 
properties.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new 
SimpleStringSchema(), properties))

2.4 自定义 Source


除了以上的 source 数据来源,我们还可以自定义 source,主要是用于测试环境。需要做的,只是传入一个 SourceFunction 就可以。具体调用如下:


val stream4 = env.addSource( new MySensorSource() )

我们希望可以随机生成传感器数据,MySensorSource 具体的代码实现如下:


class MySensorSource extends SourceFunction[SensorReading]{
// flag: 表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = {
  running = false
}
  // 正常生成数据
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
  // 初始化一个随机数发生器
  val rand = new Random()
  var curTemp = 1.to(10).map(
  i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 ) )
  // 用无线循环,产生数据流
  while(running){
  // 更新温度值
  curTemp = curTemp.map(
  t => (t._1, t._2 + rand.nextGaussian() )
  )
  // 获取当前时间戳
  val curTime = System.currentTimeMillis()
  curTemp.foreach(
  t => ctx.collect(SensorReading(t._1, curTime, t._2))
  )
  Thread.sleep(100) 
  } 
  }
 }

三、转换算子(Transform)


3.1 map


val streamMap = stream.map { x => x * 2 }


3.2 flatMap


flatMap 的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]

例如: flatMap(List(1,2,3))(i ⇒ List(i,i))

结果是 List(1,1,2,2,3,3),

而 List(“a b”, “c d”).flatMap(line ⇒ line.split(" "))

结果是 List(a, b, c, d)。


val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}


3.3 Filter


val streamFilter = stream.filter{
    x => x == 1
}


3.4 KeyBy


DataStream → KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。


3.5 滚动聚合算子(Rolling Aggregation)


这些算子可以针对 KeyedStream 的每一个支流做聚合。


sum()

min()

max()

minBy()

maxBy()


3.6 Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

val stream2 = env.readTextFile("YOUR_PATH\\sensor.txt")
 .map( data => {
 val dataArray = data.split(",")
 SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, 
dataArray(2).trim.toDouble)
 })
  .keyBy("id")
 .reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature) )


3.7 Split 和 Select


Split



DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。


Select



SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。


需求:传感器数据按照温度高低(以 30 度为界),拆分成两个流。


val splitStream = stream2
 .split( sensorData => {
 if (sensorData.temperature > 30) Seq("high") else Seq("low")
} )
val high = splitStream.select("high")
val low = splitStream.select("low")
val all = splitStream.select("high", "low")


3.8 Connect 和 CoMap


DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。


CoMap,CoFlatMap



ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map 和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。


val warning = high.map( sensorData => (sensorData.id, 
sensorData.temperature) )
val connected = warning.connect(low)
val coMap = connected.map(
 warningData => (warningData._1, warningData._2, "warning"),
 lowData => (lowData.id, "healthy") )


3.9 Union


DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream。

//合并以后打印
val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)
unionStream.print("union:::")

Connect 与 Union 区别:


Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。


Connect 只能操作两个流,Union 可以操作多个。



相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
849 0
|
12月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
792 12
Flink CDC YAML:面向数据集成的 API 设计
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
11月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
516 5
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
982 0
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
234 0
|
SQL 关系型数据库 测试技术
实时数仓 Hologres操作报错合集之执行Flink的sink操作时出现报错,是什么原因
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
6月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
612 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄

热门文章

最新文章