Flink实战(四) - DataSet API编程

简介: 1 你将学到◆ DataSet API开发概述◆ 计数器◆ DataSource◆ 分布式缓存◆ Transformation◆ Sink2 Data Set API 简介Flink中的DataSet程序是实现数据集转换(例如,过滤,映射,连接,分组)的常规程序.

1 你将学到

◆ DataSet API开发概述

◆ 计数器

◆ DataSource

◆ 分布式缓存

◆ Transformation

◆ Sink

2 Data Set API 简介

Flink中的DataSet程序是实现数据集转换(例如,过滤,映射,连接,分组)的常规程序.

最初从某些Source源创建数据集(例如,通过读取文件或从本地集合创建)

结果通过sink返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)


Flink程序可以在各种环境中运行,单机运行或嵌入其他程序中

执行可以在本地JVM中执行,也可以在集群机器上执行.

  • 有关Flink API基本概念的介绍,请参阅本系列的上一篇

Flink实战(三) - 编程模型及核心概念

为了创建自己的Flink DataSet程序,鼓励从Flink程序的解剖开始,逐步添加自己的转换!

3 测试环境

4 Data Sources简介

数据源创建初始数据集,例如来自文件或Java集合。创建数据集的一般机制是在InputFormat后面抽象的

Flink附带了几种内置格式,可以从通用文件格式创建数据集。其中许多都在ExecutionEnvironment上有快捷方法。

4.1 基于文件

  • readTextFile(path)/ TextInputFormat
    按行读取文件并将它们作为字符串返回
  • readTextFileWithValue(path)/ TextValueInputFormat
    按行读取文件并将它们作为StringValues返回。 StringValues是可变字符串
  • readCsvFile(path)/ CsvInputFormat
    解析逗号(或其他字符)分隔字段的文件。返回元组,案例类对象或POJO的DataSet。支持基本的java类型及其Value对应的字段类型
  • readFileOfPrimitives(path,delimiter)/ PrimitiveInputFormat
    使用给定的分隔符解析新行(或其他char序列)分隔的原始数据类型(如String或Integer)的文件
  • readSequenceFile(Key,Value,path)/ SequenceFileInputFormat
    创建JobConf并从类型为SequenceFileInputFormat,Key class和Value类的指定路径中读取文件,并将它们作为Tuple2 返回。

4.2 基于集合

  • fromCollection(Iterable) - 从Iterable创建数据集。 Iterable返回的所有元素必须属于同一类型
  • fromCollection(Iterator) - 从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
  • fromElements(elements:_ *) - 根据给定的对象序列创建数据集。所有对象必须属于同一类型
  • fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
  • generateSequence(from,to) - 并行生成给定时间间隔内的数字序列。

4.3 通用

  • readFile(inputFormat,path)/ FileInputFormat
    接受文件输入格式
  • createInput(inputFormat)/ InputFormat
    接受通用输入格式5 从集合创建DataSet5.1 Scala实现

5.2 Java实现

6 从文件/文件夹创建DataSet

6.1 Scala实现

文件

文件夹

Java实现

7 从csv文件创建Dataset

7.1 Scala实现

  • 注意忽略第一行

  • includedFields参数使用
  • 定义一个POJO8 从递归文件夹的内容创建DataSet8.1 Scala实现

9从压缩文件中创建DataSet

Flink目前支持输入文件的透明解压缩,如果它们标有适当的文件扩展名。 特别是,这意味着不需要进一步配置输入格式,并且任何FileInputFormat都支持压缩,包括自定义输入格式。

压缩文件可能无法并行读取,从而影响作业可伸缩性。

下表列出了当前支持的压缩方法

9.1 Scala实现

10 Transformation

10.1 map

Map转换在DataSet的每个元素上应用用户定义的map函数。 它实现了一对一的映射,也就是说,函数必须返回一个元素。

以下代码将Integer对的DataSet转换为Integers的DataSet:

Scala实现

Java实现

10.2 filter

Scala实现

Java实现

10.3 mapPartition

MapPartition在单个函数调用中转换并行分区。 map-partition函数将分区作为Iterable获取,并且可以生成任意数量的结果值。 每个分区中的元素数量取决于并行度和先前的操作。

Scala实现

Java实现

10.4 first

Scala实现

10.5 Cross

11 Data Sinks

11.1 Java描述

Data Sinks使用DataSet并用于存储或返回它们

使用OutputFormat描述数据接收器操作

Flink带有各种内置输出格式,这些格式封装在DataSet上的操作后面:

  • writeAsText()/ TextOutputFormat
    将元素按行顺序写入字符串。通过调用每个元素的toString()方法获得字符串。
  • writeAsFormattedText()/ TextOutputFormat
    按字符串顺序写入元素。通过为每个元素调用用户定义的format()方法来获取字符串。
  • writeAsCsv(...)/ CsvOutputFormat
    将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
  • print()/ printToErr()/ print(String msg)/ printToErr(String msg)
    打印标准输出/标准错误流上每个元素的toString()值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用。如果并行度大于1,则输出也将以生成输出的任务的标识符为前缀。
  • write()/ FileOutputFormat
    自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • output()/ OutputFormat
    最通用的输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。

可以将DataSet输入到多个操作。程序可以编写或打印数据集,同时对它们执行其他转换。

例子

标准数据接收方法:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });

使用自定义输出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

本地排序输出

可以使用元组字段位置或字段表达式以指定顺序在指定字段上对数据接收器的输出进行本地排序。 这适用于每种输出格式。

以下示例显示如何使用此功能:

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();

// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);

参考

DataSet Transformations

相关源码

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
636 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
284 11
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
1056 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
278 0
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
804 12
Flink CDC YAML:面向数据集成的 API 设计
|
11月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
517 5
|
11月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
557 5
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
1680 3
探索Flink动态CEP:杭州银行的实战案例
|
Java Shell 流计算
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
162 1
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
422 3

热门文章

最新文章