【Flink】(三)详解 Flink DataStream API(Environment、Source、Transform、Sink)2

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
简介: 【Flink】(三)详解 Flink DataStream API(Environment、Source、Transform、Sink)2


四、支持的数据类型


Flink 流应用程序处理的是以数据对象表示的事件流。所以在 Flink 内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。Flink 使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。


Flink 还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。


Flink 支持 Java 和 Scala 中所有常见数据类型。使用最广泛的类型有以下几种。


4.1 基础数据类型


Flink 支持所有的 Java 和 Scala 基础数据类型,Int, Double, Long, String, …


val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )


4.2 Java 和 Scala 元组(Tuples)


val persons: DataStream[(String, Integer)] = env.fromElements( 
  ("Adam", 17),
  ("Sarah", 23) )
persons.filter(p => p._2 > 18)


4.3 Scala 样例类(case classes)


case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
  Person("Adam", 17),
  Person("Sarah", 23) )
persons.filter(p => p.age > 18)


4.4 Java 简单对象(POJOs)

public class Person {
public String name;
public int age;
 public Person() {}
 public Person(String name, int age) { 
this.name = name; 
this.age = age; 
  }
 }
DataStream<Person> persons = env.fromElements( 
new Person("Alex", 42), 
new Person("Wendy", 23));

4.5 其它(Arrays, Lists, Maps, Enums, 等等)


Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的 ArrayList,HashMap,Enum 等等。


五、实现 UDF 函数——更细粒度的控制流


5.1 函数类(Function Classes)


Flink 暴露了所有 udf 函数的接口(实现方式为接口或者抽象类)。例如 MapFunction, FilterFunction, ProcessFunction 等等。


下面例子实现了 FilterFunction 接口:


class FilterFilter extends FilterFunction[String] {
override def filter(value: String): Boolean = {
 value.contains("flink")
  } 
 }
val flinkTweets = tweets.filter(new FlinkFilter)

还可以将函数实现成匿名类


val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink") 
  }
 } )

我们 filter 的字符串"flink"还可以当作参数传进去。

val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(new KeywordFilter("flink"))
class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord) 
  } 
}


5.2 匿名函数(Lambda Functions)


val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(_.contains("flink"))


5.3 富函数(Rich Functions)


“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

RichMapFunction
RichFlatMapFunction
RichFilterFunction
Rich Function 有一个生命周期的概念。典型的生命周期方法有:
open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
var subTaskIndex = 0
override def open(configuration: Configuration): Unit = {
subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接
}
override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
if (in % 2 == subTaskIndex) {
out.collect((subTaskIndex, in))
  } 
}
override def close(): Unit = {
// 以下做一些清理工作,例如断开和 HDFS 的连接。
  }
 }

六、Sink


Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxxx))

官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。

6.1 Kafka


pom.xml


<!--
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11
-->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

主函数中添加 sink:


val union = high.union(low).map(_.temperature.toString)
union.addSink(new FlinkKafkaProducer011[String]("localhost:9092", 
"test", new SimpleStringSchema()))


6.2 Redis


pom.xml


<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis
-->
<dependency>
 <groupId>org.apache.bahir</groupId>
 <artifactId>flink-connector-redis_2.11</artifactId>
 <version>1.0</version>
</dependency>

定义一个 redis 的 mapper 类,用于定义保存到 redis 时调用的命令:


class MyRedisMapper extends RedisMapper[SensorReading]{
 override def getCommandDescription: RedisCommandDescription = {
 new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
 }
 override def getValueFromData(t: SensorReading): String = 
t.temperature.toString
 override def getKeyFromData(t: SensorReading): String = t.id
}
val conf = new 
FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()
dataStream.addSink( new RedisSink[SensorReading](conf, new MyRedisMapper) )


6.3 Elasticsearch


pom.xml


<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
 <version>1.7.2</version>
</dependency>

在主函数中调用:


val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, 
new ElasticsearchSinkFunction[SensorReading] {
 override def process(t: SensorReading, runtimeContext: RuntimeContext, 
requestIndexer: RequestIndexer): Unit = {
  println("saving data: " + t)
  val json = new util.HashMap[String, String]()
  json.put("data", t.toString)
  val indexRequest = 
Requests.indexRequest().index("sensor").`type`("readingData").source(json)
  requestIndexer.add(indexRequest)
  println("saved successfully")
 }
} )
dataStream.addSink( esSinkBuilder.build() )

6.4 JDBC 自定义 sink


<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> 
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.44</version>
</dependency>

添加 MyJdbcSink


class MyJdbcSink() extends RichSinkFunction[SensorReading]{
 var conn: Connection = _
 var insertStmt: PreparedStatement = _
 var updateStmt: PreparedStatement = _
 // open 主要是创建连接
 override def open(parameters: Configuration): Unit = {
 super.open(parameters)
 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", 
"root", "123456")
 insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, 
temp) VALUES (?, ?)")
 updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE 
sensor = ?")
 }
 // 调用连接,执行 sql
 override def invoke(value: SensorReading, context: 
SinkFunction.Context[_]): Unit = {
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
 updateStmt.execute()
 if (updateStmt.getUpdateCount == 0) {
  insertStmt.setString(1, value.id)
  insertStmt.setDouble(2, value.temperature)
  insertStmt.execute()
  }
 }
 override def close(): Unit = {
  insertStmt.close()
  updateStmt.close()
  conn.close()
  } 
 }

在 main 方法中增加,把明细保存到 mysql 中


dataStream.addSink(new MyJdbcSink())

七、总结


本文主要介绍了 Flink DataStream API,它是当前 Flink 中比较底层的一套 API。在实际的开发中,基于该 API 需要用户自己处理 State 与 Time 等一些概念,因此需要较大的工作量。后续课程还会介绍更上层的 Table / SQL 层的 API,未来 Table / SQL 可能会成为 Flink 主流的 API,但是对于接口来说,越底层的接口表达能力越强,在一些需要精细操作的情况下,仍然需要依赖于 DataStream API。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1天前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
1天前
|
SQL 关系型数据库 测试技术
实时数仓 Hologres操作报错合集之执行Flink的sink操作时出现报错,是什么原因
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
1天前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
存储 SQL Java
实时数仓 Hologres产品使用合集之如何使用Flink的sink连接
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
1月前
|
存储 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何在一个任务中使用多个source
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 NoSQL Redis
实时计算 Flink版产品使用问题之配置了最大连续失败数不为1,在Kafka的精准一次sink中,如果ck失败了,这批数据是否会丢失
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在自定义RichSinkFunction中,如何获取source的schema
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之怎么使用DataStream生成结果表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
2月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何确保多并发sink同时更新Redis值时,数据能按事件时间有序地更新并且保持一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。