四、支持的数据类型
Flink 流应用程序处理的是以数据对象表示的事件流。所以在 Flink 内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。Flink 使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink 还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink 支持 Java 和 Scala 中所有常见数据类型。使用最广泛的类型有以下几种。
4.1 基础数据类型
Flink 支持所有的 Java 和 Scala 基础数据类型,Int, Double, Long, String, …
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L) numbers.map( n => n + 1 )
4.2 Java 和 Scala 元组(Tuples)
val persons: DataStream[(String, Integer)] = env.fromElements( ("Adam", 17), ("Sarah", 23) ) persons.filter(p => p._2 > 18)
4.3 Scala 样例类(case classes)
case class Person(name: String, age: Int) val persons: DataStream[Person] = env.fromElements( Person("Adam", 17), Person("Sarah", 23) ) persons.filter(p => p.age > 18)
4.4 Java 简单对象(POJOs)
public class Person { public String name; public int age; public Person() {} public Person(String name, int age) { this.name = name; this.age = age; } } DataStream<Person> persons = env.fromElements( new Person("Alex", 42), new Person("Wendy", 23));
4.5 其它(Arrays, Lists, Maps, Enums, 等等)
Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的 ArrayList,HashMap,Enum 等等。
五、实现 UDF 函数——更细粒度的控制流
5.1 函数类(Function Classes)
Flink 暴露了所有 udf 函数的接口(实现方式为接口或者抽象类)。例如 MapFunction, FilterFunction, ProcessFunction 等等。
下面例子实现了 FilterFunction 接口:
class FilterFilter extends FilterFunction[String] { override def filter(value: String): Boolean = { value.contains("flink") } } val flinkTweets = tweets.filter(new FlinkFilter)
还可以将函数实现成匿名类
val flinkTweets = tweets.filter( new RichFilterFunction[String] { override def filter(value: String): Boolean = { value.contains("flink") } } )
我们 filter 的字符串"flink"还可以当作参数传进去。
val tweets: DataStream[String] = ... val flinkTweets = tweets.filter(new KeywordFilter("flink")) class KeywordFilter(keyWord: String) extends FilterFunction[String] { override def filter(value: String): Boolean = { value.contains(keyWord) } }
5.2 匿名函数(Lambda Functions)
val tweets: DataStream[String] = ... val flinkTweets = tweets.filter(_.contains("flink"))
5.3 富函数(Rich Functions)
“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
RichMapFunction RichFlatMapFunction RichFilterFunction … Rich Function 有一个生命周期的概念。典型的生命周期方法有: open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。 close()方法是生命周期中的最后一个调用的方法,做一些清理工作。 getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态 class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] { var subTaskIndex = 0 override def open(configuration: Configuration): Unit = { subTaskIndex = getRuntimeContext.getIndexOfThisSubtask // 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接 } override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = { if (in % 2 == subTaskIndex) { out.collect((subTaskIndex, in)) } } override def close(): Unit = { // 以下做一些清理工作,例如断开和 HDFS 的连接。 } }
六、Sink
Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。
6.1 Kafka
pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.7.2</version> </dependency>
主函数中添加 sink:
val union = high.union(low).map(_.temperature.toString) union.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "test", new SimpleStringSchema()))
6.2 Redis
pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
定义一个 redis 的 mapper 类,用于定义保存到 redis 时调用的命令:
class MyRedisMapper extends RedisMapper[SensorReading]{ override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature") } override def getValueFromData(t: SensorReading): String = t.temperature.toString override def getKeyFromData(t: SensorReading): String = t.id } val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build() dataStream.addSink( new RedisSink[SensorReading](conf, new MyRedisMapper) )
6.3 Elasticsearch
pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.7.2</version> </dependency>
在主函数中调用:
val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("localhost", 9200)) val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading] { override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { println("saving data: " + t) val json = new util.HashMap[String, String]() json.put("data", t.toString) val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(json) requestIndexer.add(indexRequest) println("saved successfully") } } ) dataStream.addSink( esSinkBuilder.build() )
6.4 JDBC 自定义 sink
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency>
添加 MyJdbcSink
class MyJdbcSink() extends RichSinkFunction[SensorReading]{ var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ // open 主要是创建连接 override def open(parameters: Configuration): Unit = { super.open(parameters) conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456") insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?, ?)") updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?") } // 调用连接,执行 sql override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { updateStmt.setDouble(1, value.temperature) updateStmt.setString(2, value.id) updateStmt.execute() if (updateStmt.getUpdateCount == 0) { insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temperature) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }
在 main 方法中增加,把明细保存到 mysql 中
dataStream.addSink(new MyJdbcSink())
七、总结
本文主要介绍了 Flink DataStream API,它是当前 Flink 中比较底层的一套 API。在实际的开发中,基于该 API 需要用户自己处理 State 与 Time 等一些概念,因此需要较大的工作量。后续课程还会介绍更上层的 Table / SQL 层的 API,未来 Table / SQL 可能会成为 Flink 主流的 API,但是对于接口来说,越底层的接口表达能力越强,在一些需要精细操作的情况下,仍然需要依赖于 DataStream API。