Flink实战(三) - 编程范式及核心概念(四)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink实战(三) - 编程范式及核心概念(四)

7 支持的数据类型

Flink对DataSet或DataStream中可以包含的元素类型设置了一些限制。 原因是系统分析类型以确定有效的执行策略。

有六种不同类别的数据类型:

  • Java 元组 and Scala Case 类
  • Java POJOs
  • 原生类型
  • Regular Classes
  • Values
  • Hadoop Writables
  • Special Types

7.1 元组 and Case 类

7.1.1 Java版本

元组是包含固定数量的具有各种类型的字段的复合类型。 Java API提供从Tuple0到Tuple25的类。

1.png

元组的每个字段都可以是包含更多元组的任意的Flink的类型,从而产生嵌套元组。 可以使用字段名称tuple.f4直接访问元组的字段,也可以使用通用getter方法tuple.getField(int position)。 字段索引从0开始。

这与Scala的元组形成对比,但Java的常规索引更为一致。

DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
    new Tuple2<String, Integer>("hello", 1),
    new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
    @Override
    public Integer map(Tuple2<String, Integer> value) throws Exception {
        return value.f1;
    }
});
wordCounts.keyBy(0); // also valid .keyBy("f0")

7.1.2 Scala版本

Scala case类(和Scala元组是case类的特例)是包含固定数量的具有各种类型的字段的复合类型。 元组字段由它们的1偏移名称寻址,例如第一个字段的_1。 字段按名称访问。

case class WordCount(word: String, count: Int)
val input = env.fromElements(
    WordCount("hello", 1),
    WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1

7.2 POJOs

如果满足以下要求,则Flink将Java和Scala类视为特殊的POJO数据类型:

  • public限定
  • 它必须有一个没有参数的公共构造函数(默认构造函数)。
  • 所有字段都是public的,或者必须通过getter和setter函数访问。 对于名为foo的字段,getter和setter方法必须命名为getFoo()和setFoo()。
  • Flink必须支持字段的类型。 目前,Flink使用Avro序列化任意对象(例如Date)。


Flink分析POJO类型的结构,即它了解POJO的字段。 因此,POJO类型比一般类型更容易使用。 此外,Flink可以比一般类型更有效地处理POJO。

以下示例显示了一个包含两个公共字段的简单POJO。

7.2.1 Java版本

public class WordWithCount {
    public String word;
    public int count;
    public WordWithCount() {}
    public WordWithCount(String word, int count) {
        this.word = word;
        this.count = count;
    }
}
DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));
wordCounts.keyBy("word"); // key by field expression "word"

7.2.2 Scala 版本

class WordWithCount(var word: String, var count: Int) {
    def this() {
      this(null, -1)
    }
}
val input = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"

7.3 原生类型

Flink支持所有Java和Scala原生类型,如Integer,String和Double。

7.4 General Class Types

Flink支持大多数Java和Scala类(API和自定义)。 限制适用于包含无法序列化的字段的类,如文件指针,I / O流或其他本机资源。 遵循Java Beans约定的类通常可以很好地工作。


所有未标识为POJO类型的类都由Flink作为常规类类型处理。 Flink将这些数据类型视为黑盒子,并且无法访问其内容(即,用于有效排序)。 使用序列化框架Kryo对常规类型进行反序列化。


7.5 Values

值类型手动描述其序列化和反序列化。

它们不是通过通用序列化框架,而是通过使用读取和写入方法实现org.apache.flinktypes.Value接口来为这些操作提供自定义代码。当通用序列化效率非常低时,使用值类型是合理的。


一个示例是将元素的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以对非零元素使用特殊编码,而通用序列化只需编写所有数组元素。


org.apache.flinktypes.CopyableValue接口以类似的方式支持手动内部克隆逻辑。


Flink带有与基本数据类型对应的预定义值类型。 (ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。这些值类型充当基本数据类型的可变变体:它们的值可以被更改,允许程序员重用对象并从垃圾收集器中消除压力。


7.6 Hadoop Writables

可以使用实现org.apache.hadoop.Writable接口的类型。 write()和readFields()方法中定义的序列化逻辑将用于序列化。

7.7 Special Types

可以使用特殊类型,包括Scala的Either,Option和Try

Java API有自己的自定义Either实现。 与Scala的Either类似,它代表两种可能类型的值,左或右。 两者都可用于错误处理或需要输出两种不同类型记录的运算符。

7.8 Type Erasure & Type Inference

仅适用于Java

Java编译器在编译后抛弃了大部分泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。例如,DataStream 和DataStream 的实例于JVM看起来相同。


Flink在准备执行程序时(当调用程序的主要方法时)需要类型信息。 Flink Java API尝试重建以各种方式丢弃的类型信息,并将其显式存储在数据集和运算符中。您可以通过DataStream.getType()检索类型。该方法返回TypeInformation的一个实例,这是Flink表示类型的内部方式。


类型推断有其局限性,在某些情况下需要程序员的“合作”。这方面的示例是从集合创建数据集的方法,例如

ExecutionEnvironment.fromCollection()

可以在其中传递描述类型的参数。但是像MapFunction <I,O>这样的通用函数也可能需要额外的类型信息。

ResultTypeQueryable接口可以通过输入格式和函数实现,以明确告知API其返回类型。调用函数的输入类型通常可以通过先前操作的结果类型来推断。

参考

Apache Flink

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
216 3
|
6月前
|
流计算 Windows
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
|
缓存 API 流计算
Flink--7、窗口(窗口的概念、分类、API、分配器、窗口函数)、触发器、移除器
Flink--7、窗口(窗口的概念、分类、API、分配器、窗口函数)、触发器、移除器
|
8天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
36 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
30天前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
95 0
|
30天前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
108 0
|
3月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
59 1
|
3月前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
71 0
|
6月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
178 1
|
6月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02