Flink / Scala - DataSet & DataStream Sink 输出数据详解

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: 一.引言Flink 的数据处理主要分三步,第一步 Source 为数据源,分为 DataSet 和 DataStreaming ,后一步为 Transformation 负责处理和转换数据,针对不同的 DataSource,Transformation 可能会存在差异,最后一步是 sink 负责将结果输出。前面介绍了 DataSet 的 Source 和 Transformation,这里介绍下 DataSet 和 DataStreaming 的 Sink 相关 API。Tips:下述代码区

 一.引言

Flink 的数据处理主要分三步,第一步 Source 为数据源,分为 DataSet 和 DataStreaming ,后一步为 Transformation 负责处理和转换数据,针对不同的 DataSource,Transformation 可能会存在差异,最后一步是 sink 负责将结果输出。前面介绍了 DataSet 的 Source 和 Transformation,这里介绍下 DataSet 和 DataStreaming 的 Sink 相关 API。

image.gif编辑

Tips:

下述代码区分为 DataSet 和 DataStreaming,所以执行环境会有不同:

// 二者都需要引入
    import org.apache.flink.streaming.api.scala._
    // DataSet 选择
    val env = ExecutionEnvironment.getExecutionEnvironment
    // DataStreaming 选择
    val env = StreamExecutionEnvironment.getExecutionEnvironment

image.gif

二.DataSet

1.writeAsText

A.存储在本机和HDFS

writeAsText 可以根据地址的不同自适应的存储在 Local FileSystem 和 Hdfs System 上:

// 读取本地路径并输出到本地
    val textLines = env.readTextFile("InputPath")
    textLines.writeAsText("OutputPath")
    // 读取 Hdfs 路径并输出到 Hdfs
    val textLinesOnHdfs = env.readTextFile("hdfs://nnHost:nnPort/file")
    textLinesOnHdfs.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")

image.gif

B.模式选择

writeAsText 有下述两种模式可以选择:

· WriteMode.NO_OVERWRITE : 指定路径不存在文件,执行写操作

· WriteMode.OVERWRITE:指定路径不存在文件,执行写操作;存在文件则进行覆盖,注意这里不是追加

import org.apache.flink.core.fs.FileSystem.WriteMode
    val textLines = env.readTextFile("InputPath")
    textLines.writeAsText("OutputPath", WriteMode.OVERWRITE)

image.gif

C.输出数量

writeAsText 提供 setParallelism 方法,该方法控制输出文件数量,如果输出路径为 output,不设置该方法时会生成 output 文件夹并在文件夹下生成多个文件,如果 setParallelism(1) ,则会生成 output 文件并将全部输出写入该文件中:

val textLines = env.readTextFile("InputPath")
    textLines.writeAsText("OutputPath").setParallelism(1)

image.gif

2.writeAsCsv

writeAsCsv 将输出文件存储为 csv 格式 ,共提供4个参数:

def writeAsCsv(filePath : _root_.scala.Predef.String, 
                rowDelimiter : _root_.scala.Predef.String, 
                fieldDelimiter : _root_.scala.Predef.String, 
                writeMode : org.apache.flink.core.fs.FileSystem.WriteMode) : org.apache.flink.api.java.operators.DataSink[T] = { /* compiled code */ }

image.gif

·filePath:输出路径

· rowDelimiter:行分割符

· fieldDelimiter:csv 文件各字符分隔符

· wirteMode:输出模式

A.默认输出

val values: DataSet[(String, Int, Double)] = env.fromElements(("A", 1, 2D), ("B", 1, 2D), ("C", 1, 2D), ("D", 1, 2D))
    values.writeAsCsv("outputV1.csv")

image.gif

A,1,2.0
B,1,2.0
C,1,2.0
D,1,2.0

image.gif

B.选择参数输出

每行数据 \n 分割,每个元祖的字符 "|" 分割

val values: DataSet[(String, Int, Double)] = env.fromElements(("A", 1, 2D), ("B", 1, 2D), ("C", 1, 2D), ("D", 1, 2D))
    values.writeAsCsv("outputV2.csv", "\n", "|")

image.gif

A|1|2.0
B|1|2.0
C|1|2.0
D|1|2.0

image.gif

3.Stdout,Stderr

print \ printToErr 一般多见于测试数据输出,可以将计算结果输出到控制台的 Stdout 或者 Stderr 上。

val textLines = env.readTextFile("InputPath")
    textLines.print()
    textLines.printToErr()

image.gif

image.gif编辑

4.Output with OutputFormat

数据也可以根据自己自定义的 Format 进行输出:

val textLines = env.readTextFile("InputPath")
    val textFormat = new TextOutputFormat[String](new Path("OutputPath"))
    textLines.output(textFormat).setParallelism(1)

image.gif

Tips:

Flink 和 Spark 类似,其内部执行逻辑也是 Lazy Mode,因此 writeAsText,writeAsCsv,output 方法均不会触发 Flink 执行代码逻辑,除了 print 和 printToErr 可以直接触发,上述操作需要再额外调用 env.execute() 才会执行 :

env.execute()

image.gif

三.DataStream

DataStream 可以将数据写入文件、标准输出、标准错误输出和 Socket。除了官方提供的基本 Sink 组件外,Flink 还额外支持了下述连接器,可以很好地实现工程交互,截止 v1.14.3,Flink 支持一下第三方 Connector:

Connector 类别 支持方式
Apache Kafka source/sink
Apache Cassandra sink
Amazon Kinesis Streams source/sink
Elasticsearch sink
FileSystem sink
RabbitMQ source/sink
Google PubSub source/sink
Hybrid Source  source
Apache NiFi  source/sink
Apache Pulsar source
Twitter Streaming API source
JDBC sink

1.Write to Socket

DataSet 和 DataStream 相比后者比前者多一个 WriteToSocket 方法,支持将流数据写入到 Socket

streamSource.writeToSocket("ip", port, new SimpleStringSchema());

image.gif

可以通过上述方法将数据写入 Socket,例如写入本机 -> ip = "localhost",port=9999,后续通过下述方法即可监听到上述写到 Socket 中的数据并进行后续的处理逻辑。

env.socketTextStream("localhost", 9999)...

image.gif

2.Kafka Consumer / Producer

A.Consumer

Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。需要引入如下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.14.3</version>
</dependency>

image.gif

Kafka 配置 :

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val stream = env
    .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))

image.gif

共有三个参数需要填写:

Topic: kafka 消费 topic 名称或名称列表

StringSchema: 用于反序列化 kafka 数据的 DeserializationSchema 或者KafkaDeserializationSchema,内部主要需要复写 deserialize 方法用于将 kafka 数据转换并写生成 DataStream。

Properties:kafka 配置,除相关配置外,还需要提供 bootstrao.servers 和 消费的 group_id。

生成 DataStream 并消费 :

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest()      // 尽可能从最早的记录开始
myConsumer.setStartFromLatest()        // 从最新的记录开始
myConsumer.setStartFromTimestamp(...)  // 从指定的时间开始(毫秒)
myConsumer.setStartFromGroupOffsets()  // 默认的方法
val stream = env.addSource(myConsumer)
...

image.gif

B.Producer

Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic,需要填写生成 kafka 的 Topic 并将相关输出的参数配置到 properties 中,最后一个参数控制容错语义。

val stream: DataStream[String] = ...
val properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
val myProducer = new FlinkKafkaProducer[String](
        "my-topic",               // 目标 topic
        new SimpleStringSchema(), // 序列化 schema
        properties,               // producer 配置
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // 容错
stream.addSink(myProducer)

image.gif

3.Elasticsearch

Sink 也支持与 ES 交互,写入 ES 库中,由于版本比较多,因此 maven 也有较多选择:

ES 5.x  
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
    <version>1.14.3</version>
</dependency>
ES 6.x  
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.14.3</version>
</dependency>
ES 7 及更高版本  
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
    <version>1.14.3</version>
</dependency>

image.gif

下述 Demo 适用于 scala + ES 6.x 及以上:

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import java.util.ArrayList
import java.util.List
val input: DataStream[String] = ...
val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[String](
  httpHosts,
  new ElasticsearchSinkFunction[String] {
     def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
          val json = new java.util.HashMap[String, String]
          json.put("data", element)
          val rqst: IndexRequest = Requests.indexRequest
            .index("my-index")
            .`type`("my-type")
            .source(json)
          indexer.add(rqst)
     } 
  }
// 批量请求的配置;下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
esSinkBuilder.setBulkFlushMaxActions(1)
// 为内部创建的 REST 客户端提供一个自定义配置信息的 RestClientFactory
esSinkBuilder.setRestClientFactory(new RestClientFactory {
  override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
       restClientBuilder.setDefaultHeaders(...)
       restClientBuilder.setMaxRetryTimeoutMillis(...)
       restClientBuilder.setPathPrefix(...)
       restClientBuilder.setHttpClientConfigCallback(...)
  }
})
// 最后,构建并添加 sink 到作业管道中
input.addSink(esSinkBuilder.build)

image.gif

4.自定义 Sink

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction

image.gif

如上述方法不能满足工程需求,则可以自己定义 Sink 方式,这里需要继承 RichSinkFunction[T],T 为上一个 DataStream 的数据类型,并重写下述 3 个函数完成 Sink 需求:

· open(parameters: Configuration):初始化函数,负责初始化 sink 相关的连接和客户端 client

· invoke(value: T): 每个要 sink 的数据都用通过 invoke 方法并处理 T 最终写入

· close:关闭 open 函数启动的 connection 和 client

四.总结

上述方法简单介绍了 DataSet 和 DataStream 的常用 Sink 方式,Flink 主要优势还是体现在 DataStream 流式处理上,所以 DataSet 相对内容较少。Kafka + Flink 是非常常见的处理流程,所以上面主要给出了 Kafka + Flink 的相关示例,如有更多需求可参考官方 API 提供的更详细的方法 -> Flink Connectors

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1天前
|
资源调度 Oracle Java
实时计算 Flink版产品使用问题之在YARN集群上运行时,如何查看每个并行度的详细处理数据情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
SQL Oracle Java
实时计算 Flink版产品使用问题之采集Oracle数据时,为什么无法采集到其他TABLESPACE的表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
SQL Oracle 数据处理
实时计算 Flink版产品使用问题之如何优化数据读取速度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
分布式计算 Oracle 关系型数据库
实时计算 Flink版产品使用问题之获取Oracle的数据时无法获取clob类型的数据,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 Java 关系型数据库
Flink实战(五) - DataStream API编程(下)
Flink实战(五) - DataStream API编程(下)
252 0
Flink实战(五) - DataStream API编程(下)
|
消息中间件 监控 Java
Flink实战(五) - DataStream API编程(上)
Flink实战(五) - DataStream API编程(上)
466 0
Flink实战(五) - DataStream API编程(上)
|
21天前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
580 7
阿里云实时计算Flink在多行业的应用和实践
|
1天前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之怎么调整Flink Web U显示的日志行数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。