阿里一面:Flink的类型与序列化怎么做的

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里一面:Flink的类型与序列化怎么做的

一、DataStream类型系统

1.1 物理类型

640.png

1.2 逻辑类型

640.png

1.3 类型推断

1、类型提取的时机

#DataStream.java

640.png

在使用DataStream#map接口的时候,就会触发类型的提取。


2、自动类型推断


       Flink首先会自动进行类型推断,但是对于一些带有泛型的类型,Java泛型的类型擦除机制会导致Flink在处理Lambda表达式的类型推断时不能保证一定能提取到类型。


       Java泛型(Generic) 的引入加强了参数类型的安全性,减少了类型的转换,但有一点需要注意:Java的泛型机制是在编译级别实现的。编译器生成的字节码在运行期间并不包含泛型的类型信息。


       此时就需要为Flink的应用提供类型信息,使用TypeHint的匿名类来获取泛型的类型信息。


TypeExtractor.getMapReturnTypes(function(TypeInformation)TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}));

上述代码使用匿名内部类来获取泛型信息,其中new TypeHint<Tuple3<String, String, String>>(){} 就是用来在类型擦除的情况下来获取泛型信息的。


3、Lambda函数的类型提取


       Flink 类型提取依赖于继承等机制,但Lambda函数比较特殊,其类型提取是匿名的,也没有与之相关的类,所以其类型信息较难获取。


1)Java类型擦除的原因


       a:避免JVM的重构。如果JVM将泛型类型延续到运行期,那么到运行期时JVM就需要进行大量的重构工作。


       b:版本兼容。在编译期擦除可以更好地支持原生类型(Raw Type)。


2)Java泛型类型擦除规则


       a:如果是继承基类而来的泛型,就用getGenericSuperclass(),转型为ParameterizedType来获得实际类型。


       b:如果是实现接口而来的泛型,就用getGenericSuperclass(),针对其中的元素转型为ParameterizedType来获得实际类型。


       c:Java泛型在字节码中会被擦除,并不总是擦除为Object类型,而是擦除到上限类型。

1.4 显示类型

Flink 提供了两层简化的类型使用方式:


  • 按照数据类型的快捷方式

       BasicTypeInfo这个类定义了基本类型的TypeInformation的快捷声明,如String、Boolean、Byte、Short\Integer、Long、Float、Double、Char等。


  • 通用的类型快捷方式

       Types类(org.apache.flink.api.common.typeinfo.Types)

二、SQL类型系统

2.1 SQL逻辑类型

640.png

在目前版本的Flink存在两套Row结构:


  • org.apache.flink.types.Row:在Flink Planner中使用,是1.9版本之前Flink SQL使用的Row结构,在SQL相关的算子、UDF函数、代码生成中都是使用该套Row结构。
  • org.apache.flink.table.dataformat.BaseRow及其子类:是在Blink Runtime和Blink Planner中使用的新的Row类型数据结构,在Blink算子、UDF函数和代码生成中使用此结构。

2.2 Blink Row

640.png

Blink 中的行式存储结构


  • BinaryRow:表数据的二进制行式存储,分为定长部分和不定长部分,定长部分只能在一个MemorySegment内。
  • NestedRow:与BinaryRow的内存结构一样,区别在于NestedRow的定长部分可以跨MemorySegment。
  • UpdatableRow:该类型的Row比较特别,其保存了该行所有字段的数据,更新字段数据的时候不修改原始数据,而是使用一个数组记录被修改字段的最新值。读取数据的时候,首先判断数据是否被更新过,如果更新过则读取最新值,如果没有则读取原始值。
  • ObjectArrayRow:使用对象数据保存数据,比二进制结构存储形式多了对象的序列化/反序列化,理论上来说成本更高。其中两个实现类GenericRow和BoxedWrapperRow。GenericRow中存储的数据类型是原始类型(如int等),BoxedWrapperRow中存储的数据类型是可序列化和可比较大小的对象类型。
  • JoinedRow:表示Join或者关联运算中的两行数据的逻辑结构,如Row1、Row2,两行数据并没有进行物理上的合并,物理合并成本高。但是从使用者的角度来说,看起来就是一行数据,无须关注底层。
  • ColumnarRow:是一种内存列式存储结构,每一列的抽象结构为ColumnVector。在当前的实现中,只支持堆上ColumnVector,对外的ColumnVector尚不被支持。堆上ColumnVector本质上是使用Java原始类型数据保存一列的数据。Orc类型的列式存储使用了ColumnarRow。

为了提升Flink SQL的性能,在1.9版本实现了BinaryRow,BinaryRow直接使用MemorySegment来存储和计算,计算过程中直接对二进制数据结构进行操作,避免了序列化/反序列化的开销。


BinaryRow 存储结构中包含两个部分:定长部分和变长部分。

  • 定长部分

       定长部分包含了3个内容:头信息区(Header)、空值索引(Null Bit Set)、字段值区(Field Values)。


       1)头信息区:占用一个字节。


       2)空值索引:用于标记行中Null值字段,在内存中使用8字节进行对齐。在实际的存储中,该区域的第一个字节就是行的头信息区,剩下的才是Null值字段标识位。


       3)字段值区:保存基本类型和8个字节长度以内的值,如果某个字段值超过了8个字节,则保存该字段的长度与offset偏移量。在目前的实现中,一般的Bool类型、数值类型和长度较短的时间类型、精度低一些的Decimal类型可以宝UC你在定长部分。

640.png

在目前的设计中,定长部分全部保存在1个MemorySegment中,以提升读写BinaryRow中字段的速度。在写入阶段,如果BinaryRow中定长部分超过单个MemorySegment的存储容量,确实有非常多的字段,建议增加MemorySegment的大小。


  • 变长部分

       变长部分用来保存超过8个字节长度的字段的值,可能会保存跨越多个MemorySegment的字段。


BinaryRow实际上是参照Spark的UnsageRow 来设计的,两者的区别在于Flink的BinaryRow不在保存在连续内存中的,如果不定长部分足够小,可以保存在一个固定长度的内存中。

三、数据序列化

3.1 数据序列化/反序列化

数据序列化、反序列化的概要过程:

640.png

对于嵌套类型的数据结构,从最内层的原子字段开始进行序列化,外层的TypeSerialize负责将内层的序列化结果组装到一起。

640.png

反序列化的时候,Tuple中的每个子序列化器能够自动识别应该读取到多少字节的数据,如对于int类型,读取32字节,对于String类型,则会首先读取长度部分,根据长度的数值计算出字符串的起始内存地址和应该读取的字节长度。

3.2 String的序列化过程

StringSerializer中实现了serialize和deserializer方法,调用StringValue.class实现了数据的序列化和反序列化。

#StringSerializer.java

640.png

最终的实际序列化动作交给了StringValue.class执行,写入String的长度和String的值到java.io.DataOutput,实际上就是写入MemorySegment中。

#StringValue.java#writeString

640.png

调用了DataOutputView接口,DataOutputView接口继承自DataOutput接口。

反序列hue的逻辑是相反的,将二进制数据流转换为UTF8编码的字符串。

#StringValue.class#readString

640.png

接下来Flink内存管理篇,如果对Flink感兴趣或者正在使用的小伙伴,可以加我入群一起探讨学习。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
66 5
|
2月前
|
JSON 缓存 NoSQL
redis序列化数据时,如何包含clsss类型信息?
通过配置 `com.fasterxml.jackson.databind.ObjectMapper` 的 `enableDefaultTyping` 方法,可以使序列化后的 JSON 包含类信息。
51 2
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
129 0
|
3月前
|
调度 流计算
Flink 新一代流计算和容错问题之Flink 中的数据可以分为什么类型
Flink 新一代流计算和容错问题之Flink 中的数据可以分为什么类型
|
3月前
|
分布式计算 Oracle 关系型数据库
实时计算 Flink版产品使用问题之获取Oracle的数据时无法获取clob类型的数据,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL JSON 数据库
实时计算 Flink版操作报错合集之写入Hudi时,遇到从 COW(Copy-On-Write)表类型转换为 MOR(Merge-On-Read)表类型时报字段错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
JSON 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在使用CDAS语法同步MySQL数据到Hologres时,如果开启了字段类型宽容模式,MySQL中的JSON类型会被转换为什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 数据采集 Oracle
实时计算 Flink版产品使用问题之如何读取oracle中的blob类型的数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Java
springboot解决jackson序列化Long类型精度失效问题
springboot解决jackson序列化Long类型精度失效问题
133 0