Structured_介绍_序列化更迭 | 学习笔记

简介: 快速学习 Structured_介绍_序列化更迭

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_介绍_序列化更迭】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12141


Structured_介绍_序列化更迭

内容介绍

一. 为什么要讲序列化

二. 序列化进化过程

三. 总结

 

一.为什么要讲序列化

整个编程模型对外提供接口,让我们进行使用,那么进行序列化也是非常重要的,序列化的进化过程关乎于整个编程的效率,数据处理的效率,在 Dataset 这个阶段序列化已经相对完美。

Spark 中的序列化过程决定了数据如何存储,是性能优化一个非常重要的着眼点,Spark 的进化并不只是针对编程模型提供的 API 在大数据处理中,也必须要考虑性能。

 

二. 序列化进化过程

1. 序列化和反序列化是什么?

在 Java 中,序列化的代码大概如下

public class JavaSerializable implements Serializable {

NonSerializable ns=new NonSerializable();

}

public class NonSerializable {

}

public static void main(String[] args) throws IOException {// 序列化

JavaSerializable serializable = new JavaSerializable();

ObiectOutputStream obiectOutputStream = new ObiectOutptStream(new FileOutputStream("/tmp/obj.ser"));

//这里会抛出一个"java.io.NotSerializableException: cn.itcast.NonSerializable”异常

objectOutputStream.writeObiect(serializable);

objectOutputStream.flush();

objectOutputStream.close();

//反序列化

FileInputStream fileInputStream = new FileInputStream("/tmp/obj.ser");

ObiectInputStream obiectOutputStream=new ObjectInputStream(fileInputStream);

JavaSerializable serializablel = objectOutputStream.readbject();

}

在 Java 当中,我们这有两个类,一个类叫做 Java serveralizable 它继承了,它重写了或者它实现了 serveralizable 接口,那么这个对象是可以系列化的。还有一个对象,它没有实现 serveralizable 这个接口,所以 NonSerializable 这个对象是不可序列化的,我们就可以把这个对象写到一个文件当中。

如果一个对象实现了 Serializable 这个接口,那么就可以把它这个对象转成二进制,然后放在文件当中或者在网络当中来进行传输。那么这个代码如下,

JavaSerializable serializable = new JavaSerializable();

ObiectOutputStream obiectOutputStream = new ObiectOutptStream(new FileOutputStream("/tmp/obj.ser"));

//这里会抛出一个"java.io.NotSerializableException: cn.itcast.NonSerializable”异常

objectOutputStream.writeObiect(serializable);

objectOutputStream.flush();

objectOutputStream.close();

首先我们可以创建出来这个对象,创建以后,我们可以 objectOutputStream ,就是一个对象的输出流。这个对象的输出流当中,传入了一个 FileOutputStream 最终会输出为 obj.ser 这样的一个文件。

但是在去序列化这个 JavaSerializable 的时候会报一个错叫做 NotSerializableException,报错原因是 NonSerializable 它是不可序列化的,因为它没有实现 Seralizable 这个接口。但是在 JavaSerializable 这个对象当中引用了一个 NonSerializable 属性,该属性是没有办法被序列化的,所以在这个地方会报错。即这个对象本身可以序列化不够,里面的所有属性及内容都要可以被序列化才行。然后就可以 write object 去写,然后 Flash close 关闭流。

反序列化的时候直接创建一个 FileInputStream 然后传入到叫做 ObjectInputStream ,即一个对象输入流,然后该对象输入流就可以通过 readObject 直接反序列化为一个对象。

(1)序列化是什么?

序列化就是把一个对象写到一个文件里或者写成二进制的这种形式的过程。

反序列化指的是将保存下来的二进制对象数据恢复成对象

(2)序列化对对象的要求:

对象必须实现 Serializable 接口

对象中的所有属性必须都要可以被序列化,如果出现无法被序列化的属性,则序列化失败

(3)限制:

对象序列化后生成的二进制文件当中包含了很多环境的信息,特别是 Java 的对象序列化里面又包含了对象头,又包含了属性字段、属性名字、属性的类型、方法等各种声明,所以比较浪费存储,数据量比较大。

因为数据量大,所以序列化和反序列化的过程比较慢。

(4)序列化的应用场景:

持久化对象数据

网络中不能传输 Java 对象,只能将其序列化后传输二进制数据。(分布式环境当中对象在网络当中传输需要序列化。直接传输一个 Java 对象到另一个机器里其实做不到的。无论 API 里面看得多像,但是其实是做不到的,在网络里面是不能传输对象的。因为对象是 JVM 虚拟机的概念,在网络当中传输的永远只是二进制的数据。把 Java 对象转成二进制数据的过程就叫做序列化,然后就可以在网络当中传输了。如果要保留一个对象数据缓存到某一个文件中的话,也要使用序列化)

2. 在 Spark 中什么地方要用到序列化和反序列化。

在 Spark 当中,序列化和反序列化的应用场景是什么?

(1)Task

image.png

首先第一点,最初一个 Spark 程序是先在 Master 运行出来一个 Driver 。

接下来 Task set 生成以后,会由 Driver 当中的 Task Scheduler 去分发到不同的 Worker 当中去执行。所以 Task 要在网络当中来进行传输,如果是这样, Task 也是一个对象,要在网络当中进行传输,那么必须要先序列化这个 Task 对象。

(2)RDD 缓存

val rdd1 = rdd.flatMap(_.split(""))

.map((_,1))

.reduceByKey(_+_)

rdd1.cache

rdd1.collect

既然都要存储数据,RDD 当中如果是一个对象,在 catch 的时候,这个对象也要缓存下来。要先给它序列化一下,然后再存成一个文件或者存到内存当中。

这就是第二个场, RDD 的缓存当中需要把对象进行序列化。对象是不能直接存文件的,必须要序列化为二进制才能存。

(3)广播变量

image.png

如果要把一个数据广播到不同的 Executor 当中,使用 Broadcast Receiver 去广播一个变量的时候,就要考虑是不是要先序列化才能广播,因为也要在网络中传输并分发给每一个 Executor 存到 Block manager 当中。

(4)shuffle 过程

image.png

shuffle 分为两端,一端叫做 Mapper 一端叫做 Reducer Reducer 是从 Mapper 当中把数据拉过来,这个过程当中也经过网络,所以 Reducer 当中和 Mapper 来进行交互的时候,这个对象也要先序列化成文件,然后在网络当中进行传输。

(5)Spark Streaming

image.png

Spark Streaming 当中的 Receiver 可以去读取 Kafka 某一个 Broker 里面的数据,读完以后它会先去暂存到 Block Manager 当中。

那么这个时候也要把序列化以后才能存储,这个过程它默认的存储级别是 Memory and Desk Server less 两份。

算子引入外部对象

class Unserializable(i:Int)

rdd.map(i=> new Unserializable(i))

.collect

.foreach(println)

在 Map 算子的函数中,传入了一个 Unserializable 的对象

Map 算子的函数是会在整个集群中运行的,那 Unserializable

对象就需要跟随 Map 算子的函数被传输到不同的节点上

如果 Unserializable 不能被序列化则会报错

分析一下这个错误

map 算子当中引用了 Unserializable 这个对象。这个 map 算子最终会生成会生成一组 Task 。这个 Task 它要分发到其他的节点当中去运行,分发到其他的 worker 当中的 executor。这时要注意,这个 task 要先被序列化以后才能分发,那么 task 在序列化以后, Unserializable 也是 task 这个对象当中的一个属性。

如果你要序列化一个对象,里面的所有属性也要能被序列化才行。但是 Unserializable 这个类它没有办法被序列化,所以这段代码会报错。也就是说你在这个麦算子当中引入了一个外部的变量,你就要确定这个外部的变量是可以被序列化的,并且序列化以后没有任何问题,才能去使用,这点非常重要。

3. RDD 的序列化和反序列化如何实现?(代表了过去时代的序列化和反序列化)

image.png

首先 RDD 的这个序列化只能使用 Java 的序列化器或者一个新的序列化器叫做 kyro。

只能使用 Java 或者 kyro 的原因是 RDD 当中在进行处理的时候,它无法感知到这个数据应该是什么格式,有什么样的结构,也不会留存这个结构信息。所以在序列化的时候是一定要针对于这个 RDD 当中一整个对象来进行序列化。

比如说一个范型 RDD[person] ,在进行序列化的时候要直接把 person 这个对象给序列化掉才可以。也可以在 RDD 当中使用 kyro 序列化器来进行序列化。

kyro 是一个比 Java 的序列化器要高级一点的序列化器,它也可以针对于一个对象来进行序列化,它里面用到了 ASM ,所以 Kyro 可以理解为一个第三方的序列化器,它的性能会比较高。在 Spark 当中,如果要使用 RDD ,会频繁使用 kyro 序列化器去替代这个 RDD 默认原生的 Java 序列化器,因为 kyro 的性能高。

如何在 RDD 当中使用 kyro ?

val conf = new SparkConf()

.setMaster("local[2]")

.setAppName("KyroTest")

conf.set("spark.serializer", "org.apache.spark.serializerKryoSerializer")

conf.registerKryoClasses(Array(classOf[Person]))

val sc = newSparkContext(conf)

rdd.map(arr=> Person(arr(0),arr(1),arr(2)))

在创建出来 conf 对象以后,可以往里面设置一个属性 spark.serializer",  指定为org.apache.spark.serializerKryoSerializer

第二步所有要涉及到序列化的对象类型全部都要去 register ,register 以后这个 kyro 才能够序列化。原因是 kyro 在序列化的时候不准备存这个类的信息,即 kyro 在存的时候,基本不存储元数据信息,所以要确定类型项。在反序列化的时候,它就会按照这个类型来进行反序列化,那么就可以去创建这个 SparkContext ,在接下来 Person 对象就可以使用 kyro 序列化器来进行序列化。

注意一点,即在只有 RDD 的时候,我们并不知道 RDD 的众多缺陷,但是自从 DataFrame 和 Dataset 出现以后,发现 RDD 的序列化性能稍微有点差。RDD 的序列化这么差是因为 RDD 当中无法感知数据的组成和数据的结构,只能以对象形式处理数据。所以这每一个数据对项目对于 RDD 来说都是黑盒子,它只能够全局地把所有的数据全都以对象的形式序列化。所以这是 RDD 映射, RDD 不知道这个数据是什么架构。

4. Dataset 的序列化和反序列化如何实现?(代表了新时代的序列化和反序列化)

(1)DataFrame 和 Dataset 的特点:

DataFrame 和 Dataset 是为结构化数据优化的

在 DataFrame 和 Dataset 中数据和数据的 Schema 是分开存

储的

spark.read SCALA

.csv("...")

.where($"name" =!= "")

.groupBy($"name")

.map(row:Row => row)

.show()

DataFrame 中没有数据对象这个概念,所有的数据都以行的形式

存在于 Row 对象中。Row 中记录了每行数据的结构。包括列名,类型等。

而 DataFrame 和 Dataset 知道数据是什么架构,所以 DataFrame 和 Dataset 不用 Java 的序列化器,也不用 kyro, 可以去配置,但是默认情况下是不用的。 DataFrame 存放的只有一个 row 对象, DataFrame 就会直接使用 row 对象来进行序列化。

在 Dataset 和 DataFrame 当中有 Schema 概念,所以它在序列化的时候,根本就无需去把整个对象的这个所有源信息全部序列化掉。因为有结构信息,在反序列化的时候,按照结构信息把二进制变成这个对应类型的对象,不需要保存这个对象信息,所以在 DataFrame 当中是没有数据对象这个概念的

image.png

首先 DataFrame 当中是一个二维的表格,第一维度就是 row 对象一个 row 代表一个行,第二个 row 代表第二个行。它还有一个维度的概念叫做列,每一个列都代表了某一个 row 当中的一个字段,所以 row 对象本身就有结构化的信息。

这个 row 1 里面第一个字段叫做 date ,然后也知道它是一个什么类型,第二个字段叫做 purpose ,也知道这个类型是什么,然后它也知道第三个字段叫做 sales 以及它的类型是什么,所以这些结构信息 DataFrame 是知道的。

DataFrame 就是 Dataset 的 row 类型, type DataFrame = Dataset[Row] ,所以 DataFrame 的 row 对象本身就是可以被序列化。

Dataset 它的上层可以提供有类型 API 所以用于操作数据。但是内部无论 Dataset 是什么类型,无论你的 Dataset 里面的类型是 person 还是 string ,在内部存 dataset 的时候,其实都是 RDD 里面只有一个泛型叫做 internal row 。

无论 Dataset 上层外层是什么样的一个对象类型,在内部全都以 internal row 去存储数据。

val dataset: Dataset[Person] = sparkread.csv(...).as[Person]

(2)优化点

优化点1:元信息独立

RDD 不保存数据的元信息,即这个对象是什么结构,它是不需要序列化的,因为它里面有 Schema 信息,那就可以直接把数据给序列化掉, Schema 就可以不序列化,所以只能使用 JavaSerializer 或者 KyroSerializer 保存整个对象。

DataFrame 和 Dataset 中保存了数据的元信息,所以可以把元信息独立出来分开保存

假如去序列化这样的一个 DataFrame 或者 Dataset

image.png

一个 DataFrame 或者一个 Dataset 中,元信息只需要保存一份,序列化的时候,元信息不需要参与

image.png

可以直接通过这个一个叫做 encoder 的组件,在 DataFrame 和 Dataset 当中都有一个 encoder , encoder 可以把这些对象转成 internal row 类型,去这个序列化 internal row 这个类型。反序列化的时候,即存储的时候 DataFrame 是 row , Dataset 是 internal row ,它们都是以一个单独的对象类型来存储的。

在反序列化( InternalRow→0bject )时加入 Schema 信息即可

image.png 

那么在反序列化 internal row 的时候,要给它反序列化为一个对象类型。那么这个时候它可以把 Schema 的信息 ( Schema 是独立存储的,只存一份)加进去。即它在反序列化的时候动态地加入 Schema 信息,从而生成一个原始的数据。所以这个过程当中,它也是使用独立的 Schema 信息,并不是直接把 Schema 信息直接序列化掉。

优化点2:使用堆外内存

DataFrame 和 Dataset 不再序列化元信息,所以内存使用大

大减少,同时新的序列化方式还将数据存入堆外内存中从而避免 GC 的开销

堆外内存又叫做 Unsafe ,之所以叫不安全的,因为不能使用 Java 的垃圾回收机制,需要自己负责对象的创建和回收,性能很好,但是不建议普通开发者使用,毕竟不安全 DataFrame 和 Dataset,在某一个版本加入了一个钨丝计划 Tungsten ,乌斯计划允许他们使用独立的堆外内存。一个叫做堆内内存,一个叫做堆外内存,堆内内存是交给 GVM 去管理的,所以堆内存里面的对象要经过 GC 即垃圾回收管理器来去进行回收和动态的分配,所以它的性能会受到一些的限制。

那么我们 DataFrame 和 Dataset 的第二个优化点就是不再直接使用堆内内存,所以它使用堆外内存来存储数据。

这个堆外内存是不安全的,不推荐一般的程序员使用。对于 Spark 这种高级的项目,那么他们的程序员及这个团队是有办法保证他在使用堆外内存的时候是安全的。所以 Spark 当中使用了堆外内存,使用堆外内存会显著提升这个内存的使用率,并且降低 GC 的开销。

 

三.总结

1.当需要将对象缓存下来的时候,或者在网络中传输的时候,要把对象转成二进制,在使用的时候再将二进制转为对象,这个过程叫做序列化和反序列化。

2.在 Spark 中有很多场景需要存储对象或者在网络中传输对象

(1)Task分发的时候,需要将任务序列化分发到不同的 Executor中执行

(2)缓存 RDD  的时候,需要保存 RDD 中的数据

(3)广播变量的时候需要将变量序列化在集群中广播

(4) RDD 的 Shuffle 过程中 Map 和 Reducer 之间需要交换数据

(5)算子中如果引入了外部的变量,这个外部的变量也需要被序列化

3. RDD 因为不保留数据的元信息所以必须要序列化整个对象,常见的方式是Java的序列化器和 Kyro 序列化器

4. Dataset 和 DataFrame 中保留数据的元信息,所以可以不再使用 Java 的序列化器和 Kyro 序列化器 使用 Spark 特有的序列化协议生成 UnsafeInternalRow 用以保存数据,这样不仅能减少数据量,也能减少序列化和反序列化的开销,其速度大概能达到 RDD 的序列化的 20 倍左右。

相关文章
|
Java Maven
JAVA反序列化学习笔记4.Commons Collections2分析
JAVA反序列化学习笔记4.Commons Collections2分析
|
安全 Java
JAVA反序列化学习笔记3.Commons Collections5分析
JAVA反序列化学习笔记3.Commons Collections5分析
|
安全 Java
JAVA反序列化学习笔记2.Commons Collections1分析
JAVA反序列化学习笔记2.Commons Collections1分析
java202303java学习笔记第三十七天序列化流1
java202303java学习笔记第三十七天序列化流1
32 0
|
消息中间件 JSON Java
RabbitTemplate 发送接受消息& amp 序列化机制|学习笔记
快速学习 RabbitTemplate 发送接受消息& amp 序列化机制
463 0
RabbitTemplate 发送接受消息& amp 序列化机制|学习笔记
|
JSON 网络协议 JavaScript
序列化 struct 时 tag 的使用 | 学习笔记
快速学习序列化 struct 时 tag 的使用
序列化 struct 时 tag 的使用 | 学习笔记
|
JSON 网络协议 测试技术
反序列化介绍和应用实例 | 学习笔记
快速学习反序列化介绍和应用实例
反序列化介绍和应用实例 | 学习笔记
|
XML JSON 前端开发
【Django学习笔记 - 17】:序列化和反序列化(restful接口小案例、DRF的工程搭建、序列化器与序列化、验证、保存)
【Django学习笔记 - 17】:序列化和反序列化(restful接口小案例、DRF的工程搭建、序列化器与序列化、验证、保存)
523 0
【Django学习笔记 - 17】:序列化和反序列化(restful接口小案例、DRF的工程搭建、序列化器与序列化、验证、保存)
|
存储 JSON 编解码
06、Netty学习笔记—(聊天业务优化:扩展序列化算法)
06、Netty学习笔记—(聊天业务优化:扩展序列化算法)
06、Netty学习笔记—(聊天业务优化:扩展序列化算法)
|
消息中间件 JSON 缓存
RedisTemplate&amp 序列化机制|学习笔记
快速学习 RedisTemplate&amp 序列化机制
153 0