Flink实战(四) - DataSet API编程

简介: 1 你将学到◆ DataSet API开发概述◆ 计数器◆ DataSource◆ 分布式缓存◆ Transformation◆ Sink2 Data Set API 简介Flink中的DataSet程序是实现数据集转换(例如,过滤,映射,连接,分组)的常规程序.

1 你将学到

◆ DataSet API开发概述

◆ 计数器

◆ DataSource

◆ 分布式缓存

◆ Transformation

◆ Sink

2 Data Set API 简介

Flink中的DataSet程序是实现数据集转换(例如,过滤,映射,连接,分组)的常规程序.

最初从某些Source源创建数据集(例如,通过读取文件或从本地集合创建)

结果通过sink返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)


Flink程序可以在各种环境中运行,单机运行或嵌入其他程序中

执行可以在本地JVM中执行,也可以在集群机器上执行.

  • 有关Flink API基本概念的介绍,请参阅本系列的上一篇

Flink实战(三) - 编程模型及核心概念

为了创建自己的Flink DataSet程序,鼓励从Flink程序的解剖开始,逐步添加自己的转换!

3 测试环境

4 Data Sources简介

数据源创建初始数据集,例如来自文件或Java集合。创建数据集的一般机制是在InputFormat后面抽象的

Flink附带了几种内置格式,可以从通用文件格式创建数据集。其中许多都在ExecutionEnvironment上有快捷方法。

4.1 基于文件

  • readTextFile(path)/ TextInputFormat
    按行读取文件并将它们作为字符串返回
  • readTextFileWithValue(path)/ TextValueInputFormat
    按行读取文件并将它们作为StringValues返回。 StringValues是可变字符串
  • readCsvFile(path)/ CsvInputFormat
    解析逗号(或其他字符)分隔字段的文件。返回元组,案例类对象或POJO的DataSet。支持基本的java类型及其Value对应的字段类型
  • readFileOfPrimitives(path,delimiter)/ PrimitiveInputFormat
    使用给定的分隔符解析新行(或其他char序列)分隔的原始数据类型(如String或Integer)的文件
  • readSequenceFile(Key,Value,path)/ SequenceFileInputFormat
    创建JobConf并从类型为SequenceFileInputFormat,Key class和Value类的指定路径中读取文件,并将它们作为Tuple2 返回。

4.2 基于集合

  • fromCollection(Iterable) - 从Iterable创建数据集。 Iterable返回的所有元素必须属于同一类型
  • fromCollection(Iterator) - 从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
  • fromElements(elements:_ *) - 根据给定的对象序列创建数据集。所有对象必须属于同一类型
  • fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
  • generateSequence(from,to) - 并行生成给定时间间隔内的数字序列。

4.3 通用

  • readFile(inputFormat,path)/ FileInputFormat
    接受文件输入格式
  • createInput(inputFormat)/ InputFormat
    接受通用输入格式5 从集合创建DataSet5.1 Scala实现

5.2 Java实现

6 从文件/文件夹创建DataSet

6.1 Scala实现

文件

文件夹

Java实现

7 从csv文件创建Dataset

7.1 Scala实现

  • 注意忽略第一行

  • includedFields参数使用
  • 定义一个POJO8 从递归文件夹的内容创建DataSet8.1 Scala实现

9从压缩文件中创建DataSet

Flink目前支持输入文件的透明解压缩,如果它们标有适当的文件扩展名。 特别是,这意味着不需要进一步配置输入格式,并且任何FileInputFormat都支持压缩,包括自定义输入格式。

压缩文件可能无法并行读取,从而影响作业可伸缩性。

下表列出了当前支持的压缩方法

9.1 Scala实现

10 Transformation

10.1 map

Map转换在DataSet的每个元素上应用用户定义的map函数。 它实现了一对一的映射,也就是说,函数必须返回一个元素。

以下代码将Integer对的DataSet转换为Integers的DataSet:

Scala实现

Java实现

10.2 filter

Scala实现

Java实现

10.3 mapPartition

MapPartition在单个函数调用中转换并行分区。 map-partition函数将分区作为Iterable获取,并且可以生成任意数量的结果值。 每个分区中的元素数量取决于并行度和先前的操作。

Scala实现

Java实现

10.4 first

Scala实现

10.5 Cross

11 Data Sinks

11.1 Java描述

Data Sinks使用DataSet并用于存储或返回它们

使用OutputFormat描述数据接收器操作

Flink带有各种内置输出格式,这些格式封装在DataSet上的操作后面:

  • writeAsText()/ TextOutputFormat
    将元素按行顺序写入字符串。通过调用每个元素的toString()方法获得字符串。
  • writeAsFormattedText()/ TextOutputFormat
    按字符串顺序写入元素。通过为每个元素调用用户定义的format()方法来获取字符串。
  • writeAsCsv(...)/ CsvOutputFormat
    将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
  • print()/ printToErr()/ print(String msg)/ printToErr(String msg)
    打印标准输出/标准错误流上每个元素的toString()值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用。如果并行度大于1,则输出也将以生成输出的任务的标识符为前缀。
  • write()/ FileOutputFormat
    自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • output()/ OutputFormat
    最通用的输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。

可以将DataSet输入到多个操作。程序可以编写或打印数据集,同时对它们执行其他转换。

例子

标准数据接收方法:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });

使用自定义输出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

本地排序输出

可以使用元组字段位置或字段表达式以指定顺序在指定字段上对数据接收器的输出进行本地排序。 这适用于每种输出格式。

以下示例显示如何使用此功能:

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();

// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);

参考

DataSet Transformations

相关源码

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
2月前
|
缓存 监控 前端开发
顺企网 API 开发实战:搜索 / 详情接口从 0 到 1 落地(附 Elasticsearch 优化 + 错误速查)
企业API开发常陷参数、缓存、错误处理三大坑?本指南拆解顺企网双接口全流程,涵盖搜索优化、签名验证、限流应对,附可复用代码与错误速查表,助你2小时高效搞定开发,提升响应速度与稳定性。
|
2月前
|
缓存 自然语言处理 API
阿里巴巴国际站关键字搜索 API 实战:3 步搞定多语言适配 + 限流破局,询盘量提升 40%
跨境电商API开发常陷合规、多语言、限流等坑。本文详解从国际合规(GDPR/CCPA)到参数优化、数据结构化及区域化搜索的全链路方案,附Python代码模板与缓存重试架构,助力提升调用成功率至99%+,精准询盘增长42%。
|
2月前
|
开发者 API 机器学习/深度学习
淘宝 / 1688 / 义乌购图搜 API 实战指南:接口调用与商业场景应用
本文详解淘宝、1688、义乌购三大平台图片搜索接口的核心特点、调用流程与实战代码。涵盖跨平台对比、参数配置、响应解析及避坑指南,支持URL/Base64上传,返回商品ID、价格、销量等关键信息,助力开发者快速实现商品识别与比价功能。
淘宝 / 1688 / 义乌购图搜 API 实战指南:接口调用与商业场景应用
|
2月前
|
Cloud Native 算法 API
Python API接口实战指南:从入门到精通
🌟蒋星熠Jaxonic,技术宇宙的星际旅人。深耕API开发,以Python为舟,探索RESTful、GraphQL等接口奥秘。擅长requests、aiohttp实战,专注性能优化与架构设计,用代码连接万物,谱写极客诗篇。
Python API接口实战指南:从入门到精通
|
3月前
|
人工智能 运维 监控
阿里云 API 聚合实战:破解接口碎片化难题,3 类场景方案让业务响应提速 60%
API聚合破解接口碎片化困局,助力开发者降本增效。通过统一中间层整合微服务、第三方接口与AI模型,实现调用次数减少60%、响应提速70%。阿里云实测:APISIX+函数计算+ARMS监控组合,支撑百万级并发,故障定位效率提升90%。
347 0
|
3月前
|
JSON API 调度
Midjourney 技术拆解与阿里云开发者实战指南:从扩散模型到 API 批量生成
Midjourney深度解析:基于优化Stable Diffusion,实现文本到图像高效生成。涵盖技术架构、扩散模型原理、API调用、批量生成系统及阿里云生态协同,助力开发者快速落地AIGC图像创作。
621 0
|
3月前
|
数据采集 缓存 API
小红书笔记详情 API 实战指南:从开发对接、场景落地到收益挖掘(附避坑技巧)
本文详解小红书笔记详情API的开发对接、实战场景与收益模式,涵盖注册避坑、签名生成、数据解析全流程,并分享品牌营销、内容创作、SAAS工具等落地应用,助力开发者高效掘金“种草经济”。
小红书笔记详情 API 实战指南:从开发对接、场景落地到收益挖掘(附避坑技巧)
|
3月前
|
供应链 监控 安全
1688商品详情API接口实战指南:合规获取数据,驱动B2B业务增长
1688商品详情API(alibaba.product.get)是合规获取B2B商品数据的核心工具,支持全维度信息调用,助力企业实现智能选品、供应链优化与市场洞察,推动数字化转型。
|
3月前
|
缓存 监控 供应链
亚马逊 MWS API 实战:商品详情精准获取与跨境电商数据整合方案
本文详细解析亚马逊MWS API接口的技术实现,重点解决跨境商品数据获取中的核心问题。文章首先介绍MWS接口体系的特点,包括多站点数据获取、AWS签名认证等关键环节,并对比普通电商接口的差异。随后深入拆解API调用全流程,提供签名工具类、多站点客户端等可复用代码。针对跨境业务场景,文章还给出数据整合工具实现方案,支持缓存、批量处理等功能。最后通过实战示例展示多站点商品对比和批量选品分析的应用,并附常见问题解决方案。该技术方案可直接应用于跨境选品、价格监控等业务场景,帮助开发者高效获取亚马逊商品数据。
|
3月前
|
数据采集 JSON API
微店商品列表API接口开发指南:从零到实战
微店商品列表API(vdian.shop.item.list.get)用于获取店铺商品数据,支持分页、签名认证,返回JSON格式。适用于商品同步、竞品分析、多平台展示及数据清洗。提供Python请求示例,便于快速接入。