Flink实战(四) - DataSet API编程

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 1 你将学到◆ DataSet API开发概述◆ 计数器◆ DataSource◆ 分布式缓存◆ Transformation◆ Sink2 Data Set API 简介Flink中的DataSet程序是实现数据集转换(例如,过滤,映射,连接,分组)的常规程序.

1 你将学到

◆ DataSet API开发概述

◆ 计数器

◆ DataSource

◆ 分布式缓存

◆ Transformation

◆ Sink

2 Data Set API 简介

Flink中的DataSet程序是实现数据集转换(例如,过滤,映射,连接,分组)的常规程序.

最初从某些Source源创建数据集(例如,通过读取文件或从本地集合创建)

结果通过sink返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)


Flink程序可以在各种环境中运行,单机运行或嵌入其他程序中

执行可以在本地JVM中执行,也可以在集群机器上执行.

  • 有关Flink API基本概念的介绍,请参阅本系列的上一篇

Flink实战(三) - 编程模型及核心概念

为了创建自己的Flink DataSet程序,鼓励从Flink程序的解剖开始,逐步添加自己的转换!

3 测试环境

4 Data Sources简介

数据源创建初始数据集,例如来自文件或Java集合。创建数据集的一般机制是在InputFormat后面抽象的

Flink附带了几种内置格式,可以从通用文件格式创建数据集。其中许多都在ExecutionEnvironment上有快捷方法。

4.1 基于文件

  • readTextFile(path)/ TextInputFormat
    按行读取文件并将它们作为字符串返回
  • readTextFileWithValue(path)/ TextValueInputFormat
    按行读取文件并将它们作为StringValues返回。 StringValues是可变字符串
  • readCsvFile(path)/ CsvInputFormat
    解析逗号(或其他字符)分隔字段的文件。返回元组,案例类对象或POJO的DataSet。支持基本的java类型及其Value对应的字段类型
  • readFileOfPrimitives(path,delimiter)/ PrimitiveInputFormat
    使用给定的分隔符解析新行(或其他char序列)分隔的原始数据类型(如String或Integer)的文件
  • readSequenceFile(Key,Value,path)/ SequenceFileInputFormat
    创建JobConf并从类型为SequenceFileInputFormat,Key class和Value类的指定路径中读取文件,并将它们作为Tuple2 返回。

4.2 基于集合

  • fromCollection(Iterable) - 从Iterable创建数据集。 Iterable返回的所有元素必须属于同一类型
  • fromCollection(Iterator) - 从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
  • fromElements(elements:_ *) - 根据给定的对象序列创建数据集。所有对象必须属于同一类型
  • fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
  • generateSequence(from,to) - 并行生成给定时间间隔内的数字序列。

4.3 通用

  • readFile(inputFormat,path)/ FileInputFormat
    接受文件输入格式
  • createInput(inputFormat)/ InputFormat
    接受通用输入格式5 从集合创建DataSet5.1 Scala实现

5.2 Java实现

6 从文件/文件夹创建DataSet

6.1 Scala实现

文件

文件夹

Java实现

7 从csv文件创建Dataset

7.1 Scala实现

  • 注意忽略第一行

  • includedFields参数使用
  • 定义一个POJO8 从递归文件夹的内容创建DataSet8.1 Scala实现

9从压缩文件中创建DataSet

Flink目前支持输入文件的透明解压缩,如果它们标有适当的文件扩展名。 特别是,这意味着不需要进一步配置输入格式,并且任何FileInputFormat都支持压缩,包括自定义输入格式。

压缩文件可能无法并行读取,从而影响作业可伸缩性。

下表列出了当前支持的压缩方法

9.1 Scala实现

10 Transformation

10.1 map

Map转换在DataSet的每个元素上应用用户定义的map函数。 它实现了一对一的映射,也就是说,函数必须返回一个元素。

以下代码将Integer对的DataSet转换为Integers的DataSet:

Scala实现

Java实现

10.2 filter

Scala实现

Java实现

10.3 mapPartition

MapPartition在单个函数调用中转换并行分区。 map-partition函数将分区作为Iterable获取,并且可以生成任意数量的结果值。 每个分区中的元素数量取决于并行度和先前的操作。

Scala实现

Java实现

10.4 first

Scala实现

10.5 Cross

11 Data Sinks

11.1 Java描述

Data Sinks使用DataSet并用于存储或返回它们

使用OutputFormat描述数据接收器操作

Flink带有各种内置输出格式,这些格式封装在DataSet上的操作后面:

  • writeAsText()/ TextOutputFormat
    将元素按行顺序写入字符串。通过调用每个元素的toString()方法获得字符串。
  • writeAsFormattedText()/ TextOutputFormat
    按字符串顺序写入元素。通过为每个元素调用用户定义的format()方法来获取字符串。
  • writeAsCsv(...)/ CsvOutputFormat
    将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
  • print()/ printToErr()/ print(String msg)/ printToErr(String msg)
    打印标准输出/标准错误流上每个元素的toString()值。可选地,可以提供前缀(msg),其前缀为输出。这有助于区分不同的打印调用。如果并行度大于1,则输出也将以生成输出的任务的标识符为前缀。
  • write()/ FileOutputFormat
    自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • output()/ OutputFormat
    最通用的输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。

可以将DataSet输入到多个操作。程序可以编写或打印数据集,同时对它们执行其他转换。

例子

标准数据接收方法:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });

使用自定义输出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );

本地排序输出

可以使用元组字段位置或字段表达式以指定顺序在指定字段上对数据接收器的输出进行本地排序。 这适用于每种输出格式。

以下示例显示如何使用此功能:

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();

// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);

参考

DataSet Transformations

相关源码

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
1月前
|
JSON Shell API
免费导航规划API接口详解:调用指南与实战示例
该接口由接口盒子提供,支持根据起点、终点及途经点生成驾车或步行导航路线,可获取详细或简化导航数据。提供GET/POST请求方式,需传入坐标信息及用户认证参数,适用于物流路径优化、步行导航、旅游路线规划等场景。
|
2月前
|
数据采集 缓存 监控
唯品会 API 开发痛点解析:从权限申请到数据清洗的实战经验
本文深入解析唯品会开放平台(VOP)API开发全流程,涵盖权限申请、签名机制、数据清洗、性能优化等核心挑战与实战解决方案,助力开发者高效构建稳定可靠的电商数据整合系统。
|
30天前
|
人工智能 API 定位技术
MCP 开发实战:手把手教你封装高德地图与 arXiv API
本教程为 MCP(Model Context Protocol)开发实战第二阶段,带你从零封装第三方 API 为 AI 模型可用工具。通过高德地图地理编码与 arXiv 论文检索两个实例,涵盖项目搭建、工具声明、资源定义、错误处理等核心内容,助你快速上手 MCP 开发并集成至 Claude 使用。
|
1月前
|
缓存 前端开发 API
阿里巴巴国际站关键字搜索 API 实战:从多条件筛选到商品列表高效获客
本文详解了如何通过阿里巴巴国际站的关键字搜索接口实现多条件商品搜索功能,涵盖接口调用、参数设置、分页处理及数据解析,并提供可复用的 Python 实现代码,助力开发者高效构建跨境电商商品搜索系统。
|
1月前
|
机器学习/深度学习 JSON API
2025最新版天猫图片搜索API全解析:从图像识别到商品匹配实战
天猫图片搜索API(拍立淘)基于深度学习与CNN技术,实现以图搜商品,支持图片URL或二进制上传,适用于比价、推荐等场景。2025版新增多模态搜索优化与相似度动态调整。接口支持POST/GET请求,返回商品详情及排序结果,示例代码提供Python请求方式。
|
2月前
|
缓存 安全 测试技术
精选API实战问答,解决开发中的疑难杂症
这是一份精选的 API 开发高频问题与解决方案合集,涵盖基础概念、设计规范、安全认证、调试测试、性能优化等十大主题。每日学习 1-2 个问题,结合实战应用,快速掌握 API 开发核心技能,高效解决开发中的各类疑难杂症,适合开发者构建系统化知识体系。
|
2月前
|
存储 数据采集 监控
电商数据分析实战:利用 API 构建商品价格监控系统
在电商运营中,商品价格直接影响转化率和竞争力。本文介绍如何构建一套自动化价格监控系统,覆盖京东、淘宝双平台,实现数据采集、存储、分析与智能告警,助力企业实时掌握价格动态,优化定价策略。
|
2月前
|
存储 数据采集 API
小红书笔记详情API深度解析与实战指南(2025年最新版)
本文深入解析小红书开放平台笔记详情API的进阶使用与合规策略,涵盖接口升级、数据维度扩展、调用优化等内容,并提供Python调用示例及数据清洗存储方案。结合电商导购、舆情监控等实战场景,助力开发者高效获取并应用内容资产,同时强调数据隐私与平台政策合规要点,帮助构建稳定、安全的数据应用体系。
|
2月前
|
供应链 搜索推荐 安全
淘宝/京东/亚马逊API实战:中小商家的自动化生存指南
电商API是连接电商平台、商家、支付与物流系统的技术桥梁,具备商品管理、订单处理、用户服务、营销支持等功能,助力业务自动化与数据驱动决策,成为电商生态中提升效率与创新的关键基础设施。
|
2月前
|
JSON API 开发者
深入解析与实战应用:利用Python和Amazon Product Advertising API实战分析
本文介绍了如何通过接入亚马逊关键词搜索接口,高效获取商品信息,优化选品策略。内容涵盖注册开发者账号、获取API密钥、构建请求URL、调用搜索API及处理响应数据,并提供Python代码示例,助力商家提升运营效率。