一.引言
Flink 的数据处理主要分三步,第一步 Source 为数据源,分为 DataSet 和 DataStreaming ,后一步为 Transformation 负责处理和转换数据,针对不同的 DataSource,Transformation 可能会存在差异,最后一步是 sink 负责将结果输出。前面介绍了 DataSet 的 Source 和 Transformation,这里介绍下 DataSet 和 DataStreaming 的 Sink 相关 API。
编辑
Tips:
下述代码区分为 DataSet 和 DataStreaming,所以执行环境会有不同:
// 二者都需要引入 import org.apache.flink.streaming.api.scala._ // DataSet 选择 val env = ExecutionEnvironment.getExecutionEnvironment // DataStreaming 选择 val env = StreamExecutionEnvironment.getExecutionEnvironment
二.DataSet
1.writeAsText
A.存储在本机和HDFS
writeAsText 可以根据地址的不同自适应的存储在 Local FileSystem 和 Hdfs System 上:
// 读取本地路径并输出到本地 val textLines = env.readTextFile("InputPath") textLines.writeAsText("OutputPath") // 读取 Hdfs 路径并输出到 Hdfs val textLinesOnHdfs = env.readTextFile("hdfs://nnHost:nnPort/file") textLinesOnHdfs.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")
B.模式选择
writeAsText 有下述两种模式可以选择:
· WriteMode.NO_OVERWRITE : 指定路径不存在文件,执行写操作
· WriteMode.OVERWRITE:指定路径不存在文件,执行写操作;存在文件则进行覆盖,注意这里不是追加
import org.apache.flink.core.fs.FileSystem.WriteMode val textLines = env.readTextFile("InputPath") textLines.writeAsText("OutputPath", WriteMode.OVERWRITE)
C.输出数量
writeAsText 提供 setParallelism 方法,该方法控制输出文件数量,如果输出路径为 output,不设置该方法时会生成 output 文件夹并在文件夹下生成多个文件,如果 setParallelism(1) ,则会生成 output 文件并将全部输出写入该文件中:
val textLines = env.readTextFile("InputPath") textLines.writeAsText("OutputPath").setParallelism(1)
2.writeAsCsv
writeAsCsv 将输出文件存储为 csv 格式 ,共提供4个参数:
def writeAsCsv(filePath : _root_.scala.Predef.String, rowDelimiter : _root_.scala.Predef.String, fieldDelimiter : _root_.scala.Predef.String, writeMode : org.apache.flink.core.fs.FileSystem.WriteMode) : org.apache.flink.api.java.operators.DataSink[T] = { /* compiled code */ }
·filePath:输出路径
· rowDelimiter:行分割符
· fieldDelimiter:csv 文件各字符分隔符
· wirteMode:输出模式
A.默认输出
val values: DataSet[(String, Int, Double)] = env.fromElements(("A", 1, 2D), ("B", 1, 2D), ("C", 1, 2D), ("D", 1, 2D)) values.writeAsCsv("outputV1.csv")
A,1,2.0 B,1,2.0 C,1,2.0 D,1,2.0
B.选择参数输出
每行数据 \n 分割,每个元祖的字符 "|" 分割
val values: DataSet[(String, Int, Double)] = env.fromElements(("A", 1, 2D), ("B", 1, 2D), ("C", 1, 2D), ("D", 1, 2D)) values.writeAsCsv("outputV2.csv", "\n", "|")
A|1|2.0 B|1|2.0 C|1|2.0 D|1|2.0
3.Stdout,Stderr
print \ printToErr 一般多见于测试数据输出,可以将计算结果输出到控制台的 Stdout 或者 Stderr 上。
val textLines = env.readTextFile("InputPath") textLines.print() textLines.printToErr()
编辑
4.Output with OutputFormat
数据也可以根据自己自定义的 Format 进行输出:
val textLines = env.readTextFile("InputPath") val textFormat = new TextOutputFormat[String](new Path("OutputPath")) textLines.output(textFormat).setParallelism(1)
Tips:
Flink 和 Spark 类似,其内部执行逻辑也是 Lazy Mode,因此 writeAsText,writeAsCsv,output 方法均不会触发 Flink 执行代码逻辑,除了 print 和 printToErr 可以直接触发,上述操作需要再额外调用 env.execute() 才会执行 :
env.execute()
三.DataStream
DataStream 可以将数据写入文件、标准输出、标准错误输出和 Socket。除了官方提供的基本 Sink 组件外,Flink 还额外支持了下述连接器,可以很好地实现工程交互,截止 v1.14.3,Flink 支持一下第三方 Connector:
Connector 类别 | 支持方式 |
Apache Kafka | source/sink |
Apache Cassandra | sink |
Amazon Kinesis Streams | source/sink |
Elasticsearch | sink |
FileSystem | sink |
RabbitMQ | source/sink |
Google PubSub | source/sink |
Hybrid Source | source |
Apache NiFi | source/sink |
Apache Pulsar | source |
Twitter Streaming API | source |
JDBC | sink |
1.Write to Socket
DataSet 和 DataStream 相比后者比前者多一个 WriteToSocket 方法,支持将流数据写入到 Socket
streamSource.writeToSocket("ip", port, new SimpleStringSchema());
可以通过上述方法将数据写入 Socket,例如写入本机 -> ip = "localhost",port=9999,后续通过下述方法即可监听到上述写到 Socket 中的数据并进行后续的处理逻辑。
env.socketTextStream("localhost", 9999)...
2.Kafka Consumer / Producer
A.Consumer
Flink 的 Kafka consumer 称为 FlinkKafkaConsumer
。它提供对一个或多个 Kafka topics 的访问。需要引入如下依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.14.3</version> </dependency>
Kafka 配置 :
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val stream = env .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
共有三个参数需要填写:
Topic: kafka 消费 topic 名称或名称列表
StringSchema: 用于反序列化 kafka 数据的 DeserializationSchema 或者KafkaDeserializationSchema,内部主要需要复写 deserialize 方法用于将 kafka 数据转换并写生成 DataStream。
Properties:kafka 配置,除相关配置外,还需要提供 bootstrao.servers 和 消费的 group_id。
生成 DataStream 并消费 :
val env = StreamExecutionEnvironment.getExecutionEnvironment() val myConsumer = new FlinkKafkaConsumer[String](...) myConsumer.setStartFromEarliest() // 尽可能从最早的记录开始 myConsumer.setStartFromLatest() // 从最新的记录开始 myConsumer.setStartFromTimestamp(...) // 从指定的时间开始(毫秒) myConsumer.setStartFromGroupOffsets() // 默认的方法 val stream = env.addSource(myConsumer) ...
B.Producer
Flink Kafka Producer 被称为 FlinkKafkaProducer
。它允许将消息流写入一个或多个 Kafka topic,需要填写生成 kafka 的 Topic 并将相关输出的参数配置到 properties 中,最后一个参数控制容错语义。
val stream: DataStream[String] = ... val properties = new Properties properties.setProperty("bootstrap.servers", "localhost:9092") val myProducer = new FlinkKafkaProducer[String]( "my-topic", // 目标 topic new SimpleStringSchema(), // 序列化 schema properties, // producer 配置 FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // 容错 stream.addSink(myProducer)
3.Elasticsearch
Sink 也支持与 ES 交互,写入 ES 库中,由于版本比较多,因此 maven 也有较多选择:
ES 5.x <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_2.11</artifactId> <version>1.14.3</version> </dependency> ES 6.x <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.14.3</version> </dependency> ES 7 及更高版本 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>1.14.3</version> </dependency>
下述 Demo 适用于 scala + ES 6.x 及以上:
import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import java.util.ArrayList import java.util.List val input: DataStream[String] = ... val httpHosts = new java.util.ArrayList[HttpHost] httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")) httpHosts.add(new HttpHost("10.2.3.1", 9200, "http")) val esSinkBuilder = new ElasticsearchSink.Builder[String]( httpHosts, new ElasticsearchSinkFunction[String] { def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) { val json = new java.util.HashMap[String, String] json.put("data", element) val rqst: IndexRequest = Requests.indexRequest .index("my-index") .`type`("my-type") .source(json) indexer.add(rqst) } } // 批量请求的配置;下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来 esSinkBuilder.setBulkFlushMaxActions(1) // 为内部创建的 REST 客户端提供一个自定义配置信息的 RestClientFactory esSinkBuilder.setRestClientFactory(new RestClientFactory { override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = { restClientBuilder.setDefaultHeaders(...) restClientBuilder.setMaxRetryTimeoutMillis(...) restClientBuilder.setPathPrefix(...) restClientBuilder.setHttpClientConfigCallback(...) } }) // 最后,构建并添加 sink 到作业管道中 input.addSink(esSinkBuilder.build)
4.自定义 Sink
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
如上述方法不能满足工程需求,则可以自己定义 Sink 方式,这里需要继承 RichSinkFunction[T],T 为上一个 DataStream 的数据类型,并重写下述 3 个函数完成 Sink 需求:
· open(parameters: Configuration):初始化函数,负责初始化 sink 相关的连接和客户端 client
· invoke(value: T): 每个要 sink 的数据都用通过 invoke 方法并处理 T 最终写入
· close:关闭 open 函数启动的 connection 和 client
四.总结
上述方法简单介绍了 DataSet 和 DataStream 的常用 Sink 方式,Flink 主要优势还是体现在 DataStream 流式处理上,所以 DataSet 相对内容较少。Kafka + Flink 是非常常见的处理流程,所以上面主要给出了 Kafka + Flink 的相关示例,如有更多需求可参考官方 API 提供的更详细的方法 -> Flink Connectors。