Spark Core
Spark是大数据领域最活跃的开源项目,甚至比Hadoop还要热门。如第1章所述,它被认为是Hadoop的继任者。Spark的使用率大幅增长。很多组织正在用Spark取代Hadoop。
从概念上看,Spark类似于Hadoop,它们都用于处理大数据。它们都能用商用硬件以很低的成本处理大数据。然而,相比于Hadoop,Spark有很多的优势,这些将在本章进行介绍。
本章主要介绍Spark Core,这也是Spark生态系统的基础。我们首先概述Spark Core,然后介绍Spark的总体架构和应用程序运行时的情况。Spark Core的编程接口也会一并介绍。
3.1 概述
Spark是一个基于内存的用于处理、分析大数据的集群计算框架。它提供了一套简单的编程接口,从而使得应用程序开发者方便使用集群节点的CPU、内存、存储资源来处理大数据。
3.1.1 主要特点
Spark的主要特点如下:
使用方便
快速
通用
可扩展
容错
使用方便
Spark提供了比MapReduce更简单的编程模型。使用Spark开发分布式的数据处理应用程序比用MapReduce简单多了。
Spark针对开发大数据应用程序提供了丰富的API。它提供了80多个用于处理数据的操作符。而且,Spark提供了比Hadoop MapReduce更易读的API。相比之下,Hadoop MapReduce只有两个操作符,map和reduce。Hadoop要求任何问题都必须能够分解为一系列的map作业和reduce作业。然而,有些算法却难以只用map和reduce来描述。相比于Hadoop MapReduce,使用Spark提供的操作符来处理复杂的数据显得更加简单。
而且,使用Spark可以写出比用Hadoop MapReduce更简洁的代码。用Hadoop Map-Reduce需要写大量的模块代码。同样的数据处理算法,用Hadoop MapReduce实现需要50行,而用Spark只需要10不到。有了丰富易读的API,消除了模块代码,开发者的生产力大幅提升。相对于使用Hadoop,使用Spark开发者的生产力会有5~10倍的提升。
快速
Spark要比Hadoop快上若干个数量级。如果数据都加载在内存中,它能快上数百倍,哪怕数据无法完全载入内存,Spark也能快上数十倍。
尤其是在处理大数据集的时候,速度显得至关重要。如果一个处理数据的作业要花费数天或小时,那么它将拖慢决策的速度,从而降低数据的价值。反之,如果同样的处理能提速十倍乃至百倍,它将会创造更多的机会。它甚至可能开创出前所未有的新数据驱动应用程序。
Spark比Hadoop快的原因有两方面。一方面,它可以使用基于内存的集群计算。另一方面,它实现了更先进的执行引擎。
得益于基于内存的集群计算,Spark的性能有了数量级的提升。相比于从硬盘读取数据,采用从内存读取数据的方式,获得的顺序读取吞吐量要大100倍。换句话说,从内存读取数据要比从硬盘快100倍。当应用程序只读取和处理少量数据时,内存和硬盘之间读取速度的差距并不太明显。然而,一旦数据量达到太字节级别,I/O延迟(数据从硬盘载入内存所花费的时间)就会显著影响作业执行时间。
Spark允许应用程序利用内存缓存数据。这能减少磁盘I/O。一个基于MapReduce的数据处理流水线可能包含多个作业。每个作业都需要从硬盘载入数据,处理它,而后再写入硬盘中。而且,一个使用MapReduce实现的复杂数据处理应用程序可能需要反复从硬盘读取数据,写入数据。由于Spark允许利用内存缓存数据,因此使用Spark实现的同样的应用程序只需要从硬盘读取数据一次即可。一旦数据缓存在内存中,接下来的每一个操作都可以直接操作缓存的数据。就像前面说的一样,Spark可以减少I/O延迟,这样就能显著减少作业总的执行时间。
需要注意的是,Spark不会自动将输入数据缓存在内存中。一个普遍的误解是,一旦无法把输入数据完全载入内存,那么Spark将无法使用。这并不正确。Spark可以在集群上处理太字节级的数据,哪怕集群的总内存只有仅仅100GB。在数据处理流水线上何时缓存和缓存哪部分数据完全由应用程序决定。实际上,如果数据处理应用程序只使用一次数据,那么它完全不需要缓存数据。
Spark比Hadoop MapReduce快的第二个原因是它拥有更先进的作业执行引擎。Spark和Hadoop一样都将一个作业转化为由若干个阶段构成的有向无环图(DAG)。如果你不熟悉图论,这里简单介绍下。图是一个由顶点构成的集合,这些顶点由边相连。有向图指的是那些边有方向的图。无环图指的是不存在环路的图。DAG指的就是不存在环路的有向图。换句话说,在DAG中不存在一条起点和终点都是同一个顶点的通路。第11章将对图进行更详细的介绍。
Hadoop MapReduce对任意一个作业都会创建由map和Reduce两个阶段构成的有向无环图。如果一个复杂的数据处理算法用MapReduce实现,可能需要划分成多个作业,而后按顺序执行。这种设计导致Hadoop MapReduce无法做任何的优化。
与之相反,Spark并没有迫使开发者在实现数据处理算法的时候将其划分成多个作业。Spark中的DAG可以包含任意个阶段。一个简单的作业可能只有一个阶段,而一个复杂的作业可能会有多个阶段。这使得Spark可以做些Hadoop无法实现的优化。Spark可以一次执行一个包含多阶段的复杂作业。因为它拥有所有阶段的信息,所以可以进行优化。举例来说,它可以减少磁盘I/O和数据shuffle操作的时间。数据的shuffle操作通常会涉及网络间的数据传输,并且会增加应用程序的执行时间。
通用
Spark为各种类型的数据处理作业提供一个统一的集成平台。它可以用于批处理、交互分析、流处理、机器学习和图计算。相比之比,Hadoop MapReduce只适合批处理。因此一个使用Hadoop MapReduce的开发者为了能做流处理和图计算只能使用其他的框架。
对于不同类型的数据处理作业使用不同的框架,带来了很多问题。首先,开发者不得不学习各种框架,每种框架的接口都不相同。这降低了开发者的生产力。其次,每种框架都相对独立。因此,数据也必须复制多份,存放在不同的地方。类似地,代码也必须重复多份,存放在多个地方。比如,你想使用Hadoop MapReduce处理历史数据,同时使用Storm(一个流处理框架)处理流式数据,二者采用同样的算法,那么你不得不维护两份相同的代码,一份是Hadoop MapReduce的,一份是Storm的。最后,同时使用多个框架带来了运维上的麻烦。你得为每一个框架创建并维护一个单独的集群。要知道维护多个集群可比维护一个困难多了。
Spark自带了一系列的库,用于批处理、交互分析、流处理、机器学习和图计算。使用Spark,可以使用单一框架来创建一个包含多个不同类型任务的数据处理流水线。从而,再也没有必要为了多个不同类型的数据处理任务而学习不同框架或者部署单独的集群了。使用Spark有助于降低运维的困难度,减少代码和数据的重复。
有意思的是,越来越多流行的应用和库开始集成到Spark中或添加了对Spark的支持,而它们一开始是使用Hadoop作为其执行引擎的。比如Apache Mahout(一个构建于Hadoop之上的机器学习库)正在集成到Spark中。到了2014年4月,Mahout的开发者已经放弃了Hadoop并且不再添加新的基于MapReduce的机器学习算法了。
同样地,Hive(见第1章)的开发者也正在开发一个运行在Spark上的版本。Pig(一个可以用脚本语言来创建数据处理流水线的数据分析平台)同样支持Spark作为它的执行引擎。Cascading(一个用于开发Hadoop数据应用程序的应用开发平台)也添加了对Spark的支持。
可拓展
Spark是可扩展的。Spark集群的数据处理能力可以通过增加更多集群节点的方式得以提升。你可以从一个小集群开始,随着数据量的增加,逐渐增加更多的计算能力。这相当经济。
而且,Spark的这个特性对于应用程序来说是透明的。当你往Spark集群增加节点的时候无须改动任何代码。
容错
Spark是可容错的。一个由数百个节点构成的集群中,每个节点在任何一天故障的可能性都很高。硬盘损坏或其他硬件问题都有可能导致节点不可用。Spark能自动处理集群中的节点故障。一个节点故障可能会导致性能下降但不会导致应用无法运行。
既然Spark能自动处理节点故障,应用程序的开发者就不必在应用中处理这样的异常情况了,这简化了应用程序的代码。
3.1.2 理想的应用程序
就像前面讨论的那样,Spark是一个通用框架,它用于各种大数据应用中。然而,对于一个理想的大数据应用程序而言,速度是相当重要的。使用迭代数据处理算法的应用和交互分析都是这样的典型应用。
迭代算法
迭代算法是指那些在同样数据上迭代多次的数据处理算法。使用这类算法的应用包括机器学习和图处理应用。这些应用都在同样的数据上迭代数十次乃至数百次算法。对于这类应用,Spark是理想的选择。
Spark内存计算的特性使得在Spark上面执行这些迭代算法比较快。由于Spark允许应用在内存中缓存数据,因此一个迭代算法哪怕需要迭代100次,也只需要在第一次迭代的时候从硬盘读取数据,接下来的迭代都从内存中读取。而从内存中读取数据比从硬盘要快100倍,所以在Spark上运行这些应用能快上一个数量级。
交互分析
交互式数据分析涉及交互式地探索数据。举例来说,对于一个巨型数据集,在触发一个可能需要花费数小时的长时间运行的批处理作业之前,先进行汇总分析是很有用的。类似地,一个商业分析师可能想要使用BI或数据可视化工具来进行交互分析。在这种场景下,用户会在同一个数据集上执行多个查询。Spark就提供了这样一个用于大数据交互分析的理想平台。
Spark适用于交互分析的理由还是它的内存计算特性。应用程序可以缓存数据,从而使得数据能够在内存中进行交互分析。第一个查询请求从硬盘读取数据,但是接下来的一连串请求都从内存中读取缓存数据。查询内存中的数据要比硬盘中的数据快上一个数量级。当数据缓存在内存中的时候,一个查询请求可能只需要花费数秒,而在硬盘中则需要不止一个小时。
3.2 总体架构
一个Spark应用包括5个重要部分:驱动程序、集群管理员、worker、执行者、任务(见图3-1)。
图3-1 高层Spark架构
3.2.1 worker
worker为Spark应用提供CPU、内存和存储资源。worker把Spark应用当成分布式进程在集群节点上执行。
3.2.2 集群管理员
Spark使用集群管理员来获得执行作业所需要的集群资源。顾名思义,集群管理员管理集群中worker节点的计算资源。它能跨应用从底层调度集群资源。它可以让多个应用分享集群资源并且运行在同一个worker节点上。
Spark目前支持三种集群管理员:单独模式、Mesos模式、YARN模式。Mesos模式和YARN模式都允许在同一个worker节点上同时运行Spark应用和Hadoop应用。第10章将详细介绍集群管理员。
3.2.3 驱动程序
驱动程序是一个把Spark当成库使用的应用。它提供数据处理的代码,Spark将在worker节点上执行这些代码。一个驱动程序可以在Spark集群上启动一个或多个作业。
3.2.4 执行者
执行者是一个JVM进程,对于一个应用由Spark在每一个worker上创建。它可以多线程的方式并发执行应用代码。它也可以把数据缓存在内存或硬盘中。
执行者的生命周期和创建它的应用一样。一旦Spark应用结束,那么为它创建的执行者也将寿终正寝。
3.2.5 任务
任务是Spark发送给执行者的最小工作单元。它运行在worker节点上执行者的一个线程中。每一个任务都执行一些计算,然后将结果返回给驱动程序,或者分区以用于shuffle操作。
Spark为每一个数据分区创建一个任务。一个执行者可以并发执行一个或多个任务。任务数量由分区的数量决定。更多的分区意味着将有更多的任务并行处理数据。
3.3 应用运行
本节主要描述数据处理代码是怎么在Spark集群中执行的。
3.3.1 术语
先来看看几个术语的定义。
shuffle操作。shuffle操作是指在集群节点上对数据进行重新分配。这是一个耗时操作,因为它涉及在网络间传输数据。需要注意的是,shuffle操作不是对数据进行随机重新分配,它按照某些标准将数据分成不同的集合。每一个集合就是一个新的分区。
作业。作业是一系列计算的集合,Spark执行这些计算并将结果返回给驱动程序。作业本质上就是在Spark集群上运行数据处理算法。一个应用程序可以发起多个作业。本章稍后将会介绍作业是怎么执行的。
阶段。一个阶段由若干个任务构成。Spark将一个作业分解为一个由若干个阶段构成的DAG,每一个阶段依赖于其他阶段。举个例子,把一个作业分解为阶段0和阶段1两个阶段。只有当阶段0完成之后,才可以开始阶段1。Spark利用shuffle边界将任务分成不同的阶段。不要求shuffle操作的任务属于同一阶段。只有在开始一个新阶段时,任务才需要输入数据是经过shuffle操作的。
3.3.2 应用运行过程
有了上面的这些定义,我们就可以描述一个Spark应用在集群节点上并行处理数据的过程。当一个Spark应用开始运行的时候,Spark会连接集群管理员,获取在worker节点上的执行者资源。就像前面所说的,Spark应用把一个数据处理算法当成一个作业提交。Spark将这个作业分解成由若干个阶段构成的DAG。然后,Spark在执行者上调度这些阶段的运行,调度操作由集群管理员提供的底层调度器实现。执行者并行地运行Spark提交的任务。
每一个Spark应用都有一组其自己的位于worker节点上的执行者。这样的设计有诸多好处。首先,不同应用中的任务由于运行在不同JVM之上,使得它们之间互相隔离。一个应用程序中的错误任务并不会让其他应用崩溃。其次,调度任务变得轻而易举。Spark一次只需要调度归属于同一应用的任务。它不用处理这样一种复杂情况,其中调度的多个任务属于多个并发执行的不同应用。
然而,这种设计也有不足之处。由于不同应用在不同的JVM进程中运行,因此它们之间就不太方便共享数据。即使它们可能在同一个worker节点上运行,它们也只能通过读写磁盘的方式共享数据。就像前面所说的,读写磁盘是耗时的操作。因此,应用间通过磁盘共享数据,将会遇到性能问题。
3.4 数据源
Spark本质上是一个使用集群节点进行大数据集处理的计算框架。与数据库不同,它并没有存储系统,但是它可以搭配外部存储系统使用。Spark一般都配合能存储大量数据的分布式存储系统使用。
Spark支持多种数据源。Spark应用程序可以使用的数据来源包括HDFS、HBase、Cassandra、Amazon S3,或者其他支持Hadoop的数据源。任何Hadoop支持的数据源都可以被Spark Core使用。Spark上的库Spark SQL还支持更多数据源。第7章将会介绍Spark-SQL。
兼容支持Hadoop的数据源是相当重要的。许多组织都已经在Hadoop上面投入了大量的精力。在HDFS或其他支持Hadoop的数据存储系统上都存储着大量的数据。使用Spark并不需要将这些数据迁移到其他存储系统。而且,将Hadoop MapReduce替换成Spark并不需要另起炉灶,这是比较轻松的。如果现有的Hadoop集群正在执行MapReduce作业,也可以同时在上面运行Spark应用。可以把现有的MapReduce作业转化成Spark作业。或者,也可以保留现有的MapReduce应用程序,不做更改,使用Spark运行新的应用程序。
由于Spark Core原生支持Hadoop兼容的存储系统,因此额外的数据源都能很方便地添加进来。比如,人们已经为Spark编写好了各种数据源的连接器,包括Cassandra、MongoDB、CouchDB和其他流行的数据源。
Spark也支持本地文件系统。Spark应用程序可以读写本地文件系统上的数据。如果数据可以从本地文件读取并在单机上处理,那么没必要使用Spark。尽管如此,Spark的这个特性使得它便于开发应用和调试,并且易学。
3.5 API
应用可以通过使用Spark提供的库获得Spark集群计算的能力。这些库都是用Scala编写的。但是Spark提供了各种语言的API。在本书编写之际,Spark API提供了如下语言的支持:Scala、Java、Python和R。可以使用上面的任何语言来开发Spark应用。也有其他语言(比如Clojure)的非官方支持。
Spark API主要由两个抽象部件SparkContext和弹性分布式数据集(RDD)构成。应用程序通过这两个部件和Spark进行交互。应用程序可以连接到Spark集群并使用相关资源。接下来会介绍这两个抽象部件,然后详细介绍RDD。
3.5.1 SparkContext
SparkContext是一个在Spark库中定义的类。它是Spark库的入口点。它表示与Spark集群的一个连接。使用Spark API创建的其他一些重要对象都依赖于它。
每个Spark应用程序都必须创建一个SparkContext类实例。目前,每个Spark应用程序只能拥有一个激活的SparkContext类实例。如果要创建一个新的实例,那么在此之前必须让当前激活的类实例失活。
SparkContext有多个构造函数。最简单的一个不需要任何参数。一个SparkContext类实例可以用如下代码创建。
在这种情况下,SparkContext的配置信息都从系统属性中获取,比如Spark master的地址、应用名称等。也可以创建一个SparkConf类实例,然后把它作为SparkContext的参数从而设定配置信息。SparkConf 是Spark库中定义的一个类。通过这种方式可以像下面这样设置各种Spark配置信息。
SparkConf为设置诸如Spark master这样的常用配置信息都提供了对应的显式方法。此外,它还提供了一个通用的方法用于设置配置信息,它使用键-值对进行设置。SparkContext和SparkConf可以使用的参数将在第4章进行详细介绍。
在本章接下来的例子中会继续使用上面创建的变量sc。
3.5.2 RDD
弹性分布式数据集(RDD)表示一个关于分区数据元素的集合,可以在其上进行并行操作。它是Spark的主要数据抽象概念。它是Spark库中定义的一个抽象类。
从概念上看,除了可以用于表示分布式数据集和支持惰性操作的特性外,RDD类似于Spark的集合。惰性操作将在本章稍后部分详细介绍。
下面分别简要描述RDD的特点。
不可变性
RDD是一种不可变的数据结构。一旦创建,它将不可以在原地修改。基本上,一个修改RDD的操作都会返回一个新的RDD。
分片
RDD表示的是一组数据的分区。这些分区分布在多个集群节点上。然而,当Spark在单个节点运行时,所有的分区数据都会在当前节点上。
Spark存储RDD的分区和数据集物理分区之间关系的映射关系。RDD是各个分布式数据源之中数据的一个抽象,它通常表示分布在多个集群节点上的分区数据。比如HDFS将数据分片或分块分散存储在集群中。默认情况下,一个RDD分区对应一个HDFS文件分片。其他的分布式数据源(比如Cassandra)同样也将数据分片分散存储在集群多个节点上。然而,一个RDD对应多个Cassandra分片。
容错性
RDD为可容错的。RDD代表了分散在集群中多个节点的数据,但是任何一个节点都有可能出故障。诚如之前所说的,一个节点出故障的可能性和集群节点数量成正比。集群越大,在任何一个节点它出故障的可能性就越高。
RDD会自动处理节点出故障的情况。当一个节点出故障时,该节点上存储的数据将无法被访问。此时,Spark会在其他节点上重建丢失的RDD分区数据。Spark存储每一个RDD的血统信息。通过这些血统信息,Spark可以恢复RDD的部分信息,当节点出故障的时候,它甚至可以恢复整个RDD。
接口
需要着重指出的是,RDD是一个处理数据的接口。在Spark库中它定义为一个抽象类。RDD为多种数据源提供了一个处理数据的统一接口,包括HDFS、HBase、Cassandra等。这个接口同样可以用于处理存储于多个节点内存中的数据。
Spark为不同数据源提供了各自具体的实现类,比如HadoopRDD、ParallelCollection-RDD、JdbcRDD和CassandraRDD。它们都支持基础的RDD接口。
强类型
RDD类有一个参数用于表示类型,这使得RDD可以表示不同类型的数据。RDD可以表示同一类型数据的分布式集合,包括Integer、Long、Float、String或者应用开发者自己定义的类型。而且,一个应用总会使用某种类型的RDD,包括Integer、Long、Float、Double、String或自定义类型。
驻留在内存中
之前已经提及了Spark的内存集群计算特性。RDD类提供一套支持内存计算的API。Spark允许RDD在内存中缓存或长期驻留。就像之前所说的,对一个缓存在内存中的RDD进行操作比操作没缓存的RDD要快很多。
3.5.3 创建RDD
由于RDD是一个抽象类,因此无法直接创建一个RDD的类实例。SparkContext类提供了一个工厂方法用来创建RDD实现类的类实例。RDD也可以通过由其他RDD执行转换操作得到。就像之前所说的,RDD是不可变的。任何一个对RDD的修改操作都将返回一个代表修改后数据的新RDD。
本节总结了几种创建RDD的常见方法。在下面的示例代码中,sc是一个SparkContext的类实例。之前的章节已经介绍了怎么创建它。
parallelize
这个方法用于从本地Scala集合创建RDD实例。它会对Scala集合中的数据重新分区、重新分布,然后返回一个代表这些数据的RDD。这个方法很少用在生产上,但是使用它有助于学习Spark。
textFile
textFile方法用于从文本文件创建RDD实例。它可以从多种来源读取数据,包括单个文件、本地同一目录下的多个文件、HDFS、Amazon S3,或其他Hadoop支持的存储系统。这个方法返回一个RDD,这个RDD代表的数据集每个元素都是一个字符串,每一个字符串代表输入文件中的一行。
上面的代码表示从存储于HDFS上的一个文件或者目录创建RDD实例。
textFile方法也可以读取压缩文件中的数据。而且,它的参数中可以存在通配符,用于从一个目录中读取多个文件。下面是一个例子。
textFile的第二个参数是一个可选参数,它用于指定分区的个数。默认情况下,Spark为每一个文件分块创建一个分区。可以设置成一个更大的数字从而提高并行化程度,但是设置成一个小于文件分块数的数字是不可以的。
wholeTextFiles
这个方法读取目录下的所有文本文件,然后返回一个由键值型RDD。返回RDD中的每一个键值对对应一个文件。键为文件路径,对应的值为该文件的内容。这个方法可以从多种来源读取数据,包括本地文件系统、HDFS、Amazon S3,或者其他Hadoop支持的存储系统。
sequenceFile
sequenceFile方法从SequenceFile文件中获取键值对数据,这些SequenceFile文件可以存储于本地文件系统、HDFS或者其他Hadoop支持的存储系统。这个方法返回一个键值对型RDD实例。当使用这个方法的时候,不仅需要提供文件名,还需要提供文件中数据键和值各自的类型。
3.5.4 RDD操作
Spark应用使用RDD类或其继承类中定义的方法来处理数据。这些方法也称为操作。既然Scala中可以把一个方法当成操作符使用,那么RDD中的方法有时也称为操作符。
Spark的美好之处就在于同样一个RDD方法既可以处理几字节的数据也可以处理PB级的数据。而且Spark应用可以使用同样的方法去处理数据,无论它是存储于本地还是存储于一个分布式存储系统。这样的灵活性使得开发者可以在单机上开发、调试、测试Spark应用,然后不用改动任何代码就可以将它部署到一个大集群上。
RDD操作可以归为两类:转换和行动。转换将会创建一个新的RDD实例。行动则会将结果返回给驱动程序。
转换
转换指的是在原RDD实例上进行计算,而后创建一个新的RDD实例。本节将介绍一些常见的转换操作。
从概念上看,RDD转换操作的类似于Scala集合上的方法。主要的区别在于Scala集合方法操作的数据是在单机内存中的,而RDD的转换操作可以处理分布在集群各个节点上的数据。另外一个重要的区别是,RDD转换操作是惰性的,而Scala集合方法不是。本章余下部分会详细介绍这些内容。
map
map方法是一个高阶方法,它把一个函数作为它的参数,并把这个函数作用在原RDD的每个元素上,从而创建一个新RDD实例。这个作为参数的函数拥有一个参数并返回一个值。
filter
filter方法是一个高阶方法,它把一个布尔函数作为它的参数,并把这个函数作用在原RDD的每个元素上,从而创建一个新RDD实例。一个布尔函数只有一个参数作为输入,返回true或false。filter方法返回一个新的RDD实例,这个RDD实例代表的数据集由布尔函数返回true的元素构成。因此,新RDD实例代表的数据集是原RDD的子集。
flatMap
flatMap方法是一个高阶方法,它把一个函数作为它的参数,这个函数处理原RDD中每个元素返回一个序列。扁平化这个序列的集合得到一个数据集,flatMap方法返回的RDD就代表这个数据集。
mapPartitions
mapPartitions是一个高阶方法,它使你可以以分区的粒度来处理数据。相比于一次处理一个元素,mapPartitions一次处理处理一个分区,每个分区被当成一个迭代器。mapPartitions方法的函数参数把迭代器作为输入,返回另外一个迭代器作为输出。map-Partitions将自定义函数参数作用于每一个分区上,从而返回一个新RDD实例。
union
union方法把一个RDD实例作为输入,返回一个新RDD实例,这个新RDD实例的数据集是原RDD和输入RDD的合集。
intersection
intersection方法把一个RDD实例作为输入,返回一个新RDD实例,这个新RDD实例代表的数据集是原RDD和输入RDD的交集。
这是另外一个例子。
subtract
subtract方法把一个RDD实例作为输入,返回一个新RDD实例,这个新RDD实例代表的数据集由那些存在于原RDD实例中但不在输入RDD实例中的元素构成。
这是另外一个例子。
distinct
RDD实例上的distinct方法返回一个新RDD实例,这个新RDD实例的数据集由原RDD的数据集去重后得到。
cartesian
cartesian方法把一个RDD实例作为输入,返回一个新RDD实例,这个新RDD实例的数据集由原RDD和输入RDD的所有元素的笛卡儿积构成。返回的RDD实例的每一个元素都是一个有序二元组,每一个有序二元组的第一个元素来自原RDD,第二个元素来自输入RDD。元素的个数等于原RDD的元素个数乘以输入RDD的元素个数。
这个方法类似于SQL中的join操作。
zip
zip方法把一个RDD实例作为输入,返回一个新RDD实例,这个新RDD实例的每一个元素是一个二元组,二元组的第一个元素来自原RDD,第二个元素来自输入RDD。和cartesian方法不同的是,zip方法返回的RDD的元素个数于原RDD的元素个数。原RDD的元素个数和输入RDD的相同。进一步地说,原RDD和输入RDD不仅有相同的分区数,每个分区还有相同的元素个数。
zipWithIndex
zipWithIndex方法返回一个新RDD实例,这个新RDD实例的每个元素都是由原RDD元素及其下标构成的二元组。
groupBy
groupBy是一个高阶方法,它将原RDD中的元素按照用户定义的标准分组从而组成一个RDD。它把一个函数作为它的参数,这个函数为原RDD中的每一个元素生成一个键。groupBy把这个函数作用在原RDD的每一个元素上,然后返回一个由二元组构成的新RDD实例,每个二元组的第一个元素是函数生成的键,第二个元素是对应这个键的所有原RDD元素的集合。其中,键和原RDD元素的对应关系由那个作为参数的函数决定。
需要注意的是,groupBy是一个费时操作,因为它可能需要对数据做shuffle操作。
假设有一个CSV文件,文件的内容为公司客户的姓名、年龄、性别和邮编。下面的示例代码演示了按照邮编将客户分组。
keyBy
keyBy方法与groupBy方法相类似。它是一个高阶方法,把一个函数作为参数,这个函数为原RDD中的每一个元素生成一个键。keyBy方法把这个函数作用在原RDD的每一个元素上,然后返回一个由二元组构成的新RDD实例,每个二元组的第一个元素是函数生成的键,第二个元素是对应这个键的原RDD元素。其中,键和原RDD元素的对应关系由那个作为参数的函数决定。返回的RDD实例的元素个数和原RDD的相同。
groupBy和KeyBy的区别在于返回RDD实例的元素上。虽然都是二元组,但是 groupBy返回的二元组中的第二个元素是一个集合,而keyBy的是单个值。
sortBy
sortBy是一个高阶方法,它将原RDD中的元素进行排序后组成一个新的RDD实例返回。它拥有两个参数。第一个参数是一个函数,这个函数将为原RDD的每一个元素生成一个键。第二个参数用来指明是升序还是降序排列。
下面是另一个示例。
pipe
pipe方法可以让你创建子进程来运行一段外部程序,然后捕获它的输出作为字符串,用这些字符串构成RDD实例返回。
randomSplit
randomSplit方法将原RDD分解成一个RDD数组。它的参数是分解的权重。
coalesce
coalesce方法用于减少RDD的分区数量。它把分区数作为参数,返回分区数等于这个参数的RDD实例。
使用coalesce方法时需要小心,因为减少了RDD的分区数也就意味着降低了Spark的并行能力。它通常用于合并小分区。举例来说,在执行filter操作之后,RDD可能会有很多小分区。在这种情况下,减少分区数能提升性能。
repartition
repartition方法把一个整数作为参数,返回分区数等于这个参数的RDD实例。它有助于提高Spark的并行能力。它会重新分布数据,因此它是一个耗时操作。
coalesce和repartition方法看起来一样,但是前者用于减少RDD中的分区,后者用于增加RDD中的分区。
sample
sample方法返回原RDD数据集的一个抽样子集。它拥有三个参数。第一个参数指定是有放回抽样还是无放回抽样。第二个参数指定抽样比例。第三个参数是可选的,指定抽样的随机数种子。
键值对型RDD的转换
除了上面介绍的RDD转换之外,针对键值对型RDD还支持其他的一些转换。下面将介绍只能作用于键值对型RDD的常用转换操作。
keys
keys方法返回只由原RDD中的键构成的RDD。
values
values方法返回只由原RDD中的值构成的RDD。
mapValues
mapValues是一个高阶方法,它把一个函数作为它的参数,并把这个函数作用在原RDD的每个值上。它返回一个由键值对构成的RDD。它和map方法类似,不同点在于它把作为参数的函数作用在原RDD的值上,所以原RDD的键都没有变。返回的RDD和原RDD拥有相同的键。
join
join方法把一个键值对型RDD作为参数输入,而后在原RDD和输入RDD上做内连接操作。它返回一个由二元组构成的RDD。二元组的第一个元素是原RDD和输入RDD都有的键,第二个元素是一个元组,这个元组由原RDD和输入RDD中键对应的值构成。
leftOuterJoin
leftOuterJoin方法把一个键值对型RDD作为参数输入,而后在原RDD和输入RDD之间做左连接操作。它返回一个由键值对构成的RDD。键值对的第一个元素是原RDD中的键,第二个元素是一个元组,这个元组由原RDD中键对应的值和输入RDD中的可选值构成。可选值用Option类型表示。
rightOuterJoin
rightOuterJoin方法把一个键值对型RDD作为参数输入,而后在原RDD和输入RDD之间做右连接操作。它返回一个由键值对构成的RDD。键值对的第一个元素是输入RDD中的键,第二个元素是一个元组,这个元组由原RDD中的可选值和输入RDD中键对应的值构成。可选值用Option类型表示。
fullOuterJoin
fullOuterJoin方法把一个键值对型RDD作为参数输入,而后在原RDD和输入RDD之间做全连接操作。它返回一个由键值对构成的RDD。
sampleByKey
sampleByKey通过在键上抽样返回原RDD的一个子集。它把对每个键的抽样比例作为输入参数,返回原RDD的一个抽样。
subtractByKey
subtractByKey方法把一个键值对型RDD作为输入参数,返回一个键值对RDD,这个键值对RDD的键都是只存在原RDD中但是不存在于输入RDD中。
groupByKey
groupByKey方法返回一个由二元组构成的RDD,二元组的第一个元素是原RDD的键,第二个元素是一个集合,集合由该键对应的所有值构成。它类似于上面介绍过的group-By方法。二者的区别在于groupBy是一个高阶方法,它的参数是一个函数,这个函数为原RDD的每一个元素生成一个键。groupByKey方法作用于RDD的每一个键值对上,故不需要一个生成键的函数作为输入参数。
应当尽量避免使用groupByKey。它是一个耗时操作,因为它可能会对数据进行shuffle操作。在大多数情况下,都有不使用groupByKey的更好的替代方案。
reduceByKey
reduceByKey是一个高阶方法,它把一个满足结合律的二元操作符当作输入参数。它把这个操作符作用于有相同键的值上。
一个二元操作符把两个值当作输入参数,返回一个值。一个满足结合律的二元操作符返回同样的结果,但是它不关心操作数的分组情况。
reduceByKey方法可以用于对同一键对应的值进行汇总操作。比如它可以用于对同一键对应的值进行求和,求乘积,求最小值,求最大值。
对于基于键的汇总操作、合并操作,reduceByKey比groupByKey更合适。
操作
操作指的是那些返回值给驱动程序的RDD方法。本节介绍一些RDD中常用的操作。
collect
collect方法返回一个数组,这个数组由原RDD中的元素构成。在使用这个方法的时候需要小心,因为它把在worker节点的数据移给了驱动程序。如果操作一个有大数据集的RDD,它有可能会导致驱动程序崩溃。
count
count方法返回原RDD中元素的个数。
countByValue
countByValue方法返回原RDD中每个元素的个数。它返回是一个map类实例,其中,键为元素的值,值为该元素的个数。
first
first方法返回原RDD中的第一个元素。
max
max方法返回RDD中最大的元素。
min
min方法返回RDD中最小的元素。
take
take方法的输入参数为一个整数N,它返回一个由原RDD中前N个元素构成的RDD。
takeOrdered
takeOrdered方法的输入参数为一个整数N,它返回一个由原RDD中前N小的元素构成的RDD。
top
top方法的输入参数为一个整数N,它返回一个由原RDD中前N大的元素构成的RDD。
fold
fold是一个高阶方法,用于对原RDD的元素做汇总操作,汇总的时候使用一个自定义的初值和一个满足结合律的二元操作符。它首先在每一个RDD的分区中进行汇总,然后再汇总这些结果。
初值的取值取决于RDD中的元素类型和汇总操作的目的。比如,给定一个元素为整数的RDD,为了计算这个RDD中所有元素的和,初值取为0。相反,给定一个元素为整数的RDD,为了计算这个RDD中所有元素的乘积,初值则应取为1。
reduce
reduce是一个高阶方法,用于对原RDD的元素做汇总操作,汇总的时候使用一个满足结合律和交换律的二元操作符。它类似于fold方法,然而,它并不需要初值。
键值对型RDD上的操作
键值对RDD上有一些额外的操作,我们在下面进行介绍。
countByKey
countByKey方法用于统计原RDD每个键的个数。它返回一个map类实例,其中,键为原RDD中的键,值为个数。
lookup
lookup方法的输入参数为一个键,返回一个序列,这个序列的元素为原RDD中这个键对应的值。
数值型RDD上的操作
如果RDD的元素类型为Integer、Long、Float或Double,则这样的RDD为数值型RDD。这类RDD还有一些对于统计分析十分有用的额外操作,下面将介绍一些常用的行动。
mean
mean方法返回原RDD中元素的平均值。
stdev
stdev方法返回原RDD中元素的标准差。
sum
sum方法返回原RDD中所有元素的和。
variance
variance方法返回原RDD中元素的方差。
3.5.5 保存RDD
一般来说,数据处理完毕后,结果会保存在硬盘上。Spark允许开发者将RDD保存在任何Hadoop支持的存储系统中。保存在硬盘上的RDD可以被其他Spark应用或Hadoop应用使用。
本节介绍将RDD保存成文件的常用方法。
saveAsTextFile
saveAsTextFile方法将原RDD中的元素保存在指定目录中,这个目录位于任何Hadoop支持的存储系统中。每一个RDD中的元素都用字符串表示并另存为文本中的一行。
saveAsObjectFile
saveAsObjectFile方法将原RDD中的元素序列化成Java对象,存储在指定目录中。
saveAsSequenceFile
saveAsSequenceFile方法将键值对型RDD以SequenceFile的格式保存。键值对型RDD也可以以文本的格式保存,只须使用saveAsTextFile方法即可。
需要注意的是,上面的方法都把一个目录的名字作为输入参数,然后在这个目录为每个RDD分区创建一个文件。这种设计不仅高效而且可容错。因为每一个分区被存成一个文件,所以Spark在保存RDD的时候可以启动多个任务,并行执行,将数据写入文件系统中。这样也保证了写入数据的过程是可容错的。一旦有一个将分区写入文件的任务失败了,Spark可以再启动一个任务,重写刚才失败任务创建的文件。
3.6 惰性操作
RDD的创建和转换方法都是惰性操作。当应用调用一个返回RDD的方法的时候,Spark并不会立即执行运算。比如,当你使用SparkContext的textFile方法从HDFS中读取文件时,Spark并不会马上从硬盘中读取文件。类似地,RDD转换操作(它会返回新RDD)也是惰性的。Spark会记录作用于RDD上的转换操作。
让我们考虑如下示例代码。
上面三行代码看起来很快就会执行完,哪怕textFile方法读取的是一个包含了10TB数据的文件。这其中的原因是当你调用textFile方法时,它并没有真正读取文件。类似地,filter方法也没有立即遍历原RDD中的每一个元素。
Spark仅仅记录了这个RDD是怎么创建的,在它上面做转换操作会创建怎样的子RDD等信息。Spark为每一个RDD维护其各自的血统信息。在需要的时候,Spark利用这些信息创建RDD或重建RDD。
如果RDD的创建和转换都是惰性操作,那么Spark什么时候才真正读取数据和做转换操作的计算呢?下面将会解答这个问题。
触发计算的操作
当Spark应用调用操作方法或者保存RDD至存储系统的时候,RDD的转换计算才真正执行。保存RDD至存储系统也被视为一种操作,尽管它并没有向驱动程序返回值。
当Spark应用调用RDD的操作方法或者保存RDD的时候,它触发了Spark中的连锁反应。当调用操作方法的时候,Spark会尝试创建作为调用者的RDD。如果这个RDD是从文件中创建的,那么Spark会在worker节点上读取文件至内存中。如果这个RDD是通过其他RDD的转换得到的子RDD,Spark会尝试创建其父RDD。这个过程会一直持续下去,直到Spark找到根RDD。然后Spark就会真正执行这些生成RDD所必需的转换计算,从而生成作为调用者的RDD。最后,执行操作方法所需的计算,将生成的结果返回给驱动程序。
惰性转换使得Spark可以高效地执行RDD计算。直到Spark应用需要操作结果时才进行计算,Spark可以利用这一点优化RDD的操作。这使得操作流水线化,而且还避免了在网络间不必要的数据传输。
3.7 缓存
除了将数据驻留在内存中以外,缓存在RDD中也扮演了另外一个重要的角色。就像之前所说的,创建RDD有两种方式,从存储系统中读取数据或者应用其他现存RDD的转换操作。默认情况下,当一个RDD的操作方法被调用时,Spark会根据它的父RDD来创建这个RDD,这有可能导致父RDD的创建。如此往复,这个过程一直持续到Spark找到根RDD,而后Spark通过从过存储系统读取数据的方式创建根RDD。操作方法被调用一次,上面说的过程就会执行一遍。每次调用操作方法,Spark都会遍历这个调用者RDD的血统树,执行所有的转换操作来创建它。
考虑下面的例子。
尽管上面的代码只调用了一次textFile方法,但是日志文件会被从硬盘中读取两次。这是因为调用了两次操作方法count。在调用errorLogs.count时,日志文件第一次被读取,调用warningLogs.count时,日志文件被再次读取。这只是个简单的例子,现实世界中的应用会有更多的各种转换和操作。
如果一个RDD缓存了,Spark会执行到目前为止的所有转换操作并为这个RDD创建一个检查点。具体来说,这只会在第一次在一个缓存的RDD上调用某操作的时候发生。类似于转换方法,缓存方法也是惰性的。
如果一个应用缓存了RDD,Spark并不是立即执行计算并把它存储在内存中。Spark只有在第一次在缓存的RDD上调用某操作的时候才会将RDD物化在内存中。而且这第一次操作并不会从中受益,后续的操作才会从缓存中受益。因为它们不需要再执行从存储系统中读取数据开始的一系列操作。它们通常都运行得快多了。还有,那些只使用一次数据的应用使用缓存也不会有任何好处。只有那些需要对同样数据做多次迭代的应用才能从缓存中受益。
如果一个应用把RDD缓存在内存中,Spark实际上是把它存储在每个worker节点上执行者的内存中了。每个执行者把它所计算的RDD分区缓存在内存中。
3.7.1 RDD的缓存方法
RDD类提供了两种缓存方法:cache和persist。
cache
cache方法把RDD存储在集群中执行者的内存中。它实际上是将RDD物化在内存中。
下面的例子展示了怎么利用缓存优化上面的例子。
persist
persist是一个通用版的cache方法。它把RDD存储在内存中或者硬盘上或者二者皆有。它的输入参数是存储等级,这是一个可选参数。如果调用persist方法而没有提供参数,那么它的行为类似于cache方法。
persist方法支持下列常见的存储选项。
MEMORY_ONLY:当一个应用把 MEMORY_ONLY作为参数调用persist方法时,Spark会将RDD分区采用反序列化Java对象的方式存储在worker节点的内存中。如果一个RDD分区无法完全载入worker节点的内存中,那么它将在需要时才计算。
DISK_ONLY:如果把DISK_ONLY作为参数调用persist方法,Spark会物化RDD分区,把它们存储在每一个worker节点的本地文件系统中。这个参数可以用于缓存中间的RDD,这样接下来的一系列操作就没必要从根RDD开始计算了。
MEMORY_AND_DISK:这种情况下,Spark会尽可能地把RDD分区存储在内存中,如果有剩余,就把剩余的分区存储在硬盘上。
MEMORY_ONLY_SER:这种情况下,Spark会采用序列化Java对象的方式将RDD分区存储在内存中。一个序列化的Java对象会消耗更少的内存,但是读取是CPU密集型的操作。这个参数是在内存消耗和CPU使用之间做的一个妥协。
MEMORY_AND_DISK_SER:Spark会尽可能地以序列化Java对象的方式将RDD分区存储在内存中。如果有剩余,则剩余的分区会存储在硬盘上。
3.7.2 RDD缓存是可容错的
在分布式环境中可容错性是相当重要的。之前我们就已经知道了当节点出故障的时候Spark是怎么自动把计算作业转移到其他节点的。Spark的RDD机制同样也是可容错的。
即使一个缓存RDD的节点出故障了,Spark应用也不会崩溃。Spark会在另外节点上自动重新创建、缓存出故障的节点中存储的分区。Spark利用RDD的血统信息来重新计算丢失的缓存分区。
3.7.3 缓存内存管理
Spark采用LRU算法来自动管理缓存占用的内存。只有在必要时,Spark才会从缓存占用的内存中移除老的RDD分区。而且,RDD还提供了名为unpersist的方法。应用可以调用这个方法来从缓存占用的内存中手动移除RDD分区。
3.8 Spark作业
RDD上的转换、操作和缓存方法构成了Spark应用的基础。从本质上说,RDD描述了Spark编程模型。既然我们介绍过了编程模型,那么接下来我们介绍在Spark应用中这些是怎么结合在一起的。
作业指的是Spark将要执行的一些计算,它们将操作的结果返回给驱动程序。一个应用可以发起一个或多个作业。通过调用RDD的操作方法可以发起一个作业。也就是说,一个操作方法会触发一个作业。如果一个操作是从未缓存的RDD或未缓存RDD的后代RDD发起的,Spark将会从存储系统中读取数据,从此开始作业。如果一个操作是从缓存过的RDD或者缓存过的RDD的后代RDD发起的,那么Spark就会从那个缓存过的RDD开始作业。接下来,Spark会按照操作方法的要求执行必要的转换操作来创建RDD。最后,执行操作所需的计算,一旦结果出来后,便将它返回给驱动程序。
当一个应用调用RDD的操作方法时,Spark会创建由若干个阶段构成的DAG。Spark根据shuffle边界来将不同任务划分成不同的阶段。不需要shuffle操作的任务被划分到同一个阶段。那些输入数据是已经做过shuffle操作的任务将开始一个新的阶段。
一个阶段可以由一个或者多个任务构成。Spark把任务提交给执行者,执行者将并行执行任务。在节点间调度任务的依据是数据分布情况。如果一个节点在处理任务时失效了,Spark会把这个任务提交给其他节点。
3.9 共享变量
Spark使用的架构是无共享的。数据分布在集群的各个节点上,每个节点都有自己的CPU、内存和存储资源。没有全局的内存空间用于任务间共享。驱动程序和任务之间通过消息共享数据。
举例来说,如果一个RDD操作的函数参数是驱动程序中变量的引用,Spark会将这个变量的副本以及任务一起发送给执行者。每个任务都有一份变量的副本并把它当成只读变量使用。任何对这个变量的更新都只存在任务的内部,改动并不会回传给驱动程序。而且Spark会把这个变量在每一个阶段的开始发送给worker节点。
对于一些应用而言,这种默认行为是低效的。在一个实际的使用场景中,驱动程序在作业的任务间共享了一个巨大的查找表。而这个作业由多个阶段构成。默认情况下,Spark会自动将这个变量及其相关任务发送给每个执行者。然而,Spark会在每个阶段做这件事。如果这个查找表存储了100MB的数据,并且这个作业涉及10个阶段,那么Spark就会给每个worker节点发送10次100MB的相同数据。
另外一个使用场景是在每个运行在不同节点上的任务中需要更新全局变量。默认情况下,任务中对变量的更新是不会回传给驱动程序的。
Spark通过共享变量的概念来满足这些使用场景的需求。
3.9.1 广播变量
广播变量的使用使得Spark应用可以有效地在驱动程序和执行作业的任务之间共享数据。Spark只会给worker节点发送一次广播变量,并且将它反序列化成只读变量存储在执行者的内存中。而且,Spark采用一种更高效的算法来发布广播变量。
注意,如果一个作业由多个阶段构成,且阶段中的任务使用同一个驱动程序的变量,那么使用广播变量是十分有用的。如果你不想在开始执行每个任务之前反序列化变量,使用广播变量也是有益的。默认情况下,Spark会将传输过来的变量以序列化的形式缓存在执行者的内存中,在开始执行任务之前再反序列化它。
SparkContext 类提供了一个叫作broadcast的方法用于创建广播变量。它把一个待广播的变量作为参数,返回一个Broadcast类实例。一个任务必须使用Broadcast对象的value方法才可以获取广播变量的值。
考虑这样一个应用,它根据电商交易信息生成交易详情。在现实世界的应用中会有一张顾客表、一张商品表和一张交易表。为了简化起见,我们直接用一些简单的数据结构来代替这些表作为输入数据。
使用广播变量使得我们可以高效地实现顾客数据、商品数据和交易数据之间的连接。我们可以通过使用RDD API来实现连接操作,但是这会在网络间对顾客数据、商品数据和交易数据做shuffle操作。使用广播变量,我们使得Spark只将顾客数据和商品数据发送给每个节点一次,并且用简单的map操作来代替耗时的join操作。
3.9.2 累加器
累加器是只增变量,它可以被运行在不同节点上的任务更改并且被驱动程序读取。它可以用于计数器和聚合操作。Spark提供了数值类型的累加器,也支持创建自定义类型的累加器。
SparkContext类提供了一个叫作accumulator的方法用于创建累加器变量。它有两个参数。第一个参数是累加器的初值,第二个是在Spark UI中显示的名字,这是一个可选参数。它返回一个Accumulator类实例。这个类实例为操作累加器变量提供操作符。任务只能采用add方法或者+=操作符来增加累加器变量的值。只有驱动程序可以通过value方法来获取累加器的值。
考虑这样一个应用,它需要从顾客表中过滤出不合法的顾客并计数。在现实世界的应用中,我们会从硬盘中读取数据并将过滤后的数据写入到硬盘中的另外一个文件。为简化起见,我们跳过读写硬盘的部分。
在使用累加器的时候需要注意,转换操作期间对累加器的更新无法保证恰好只有一次。如果一个任务或一个阶段重复执行,每一个任务的更新操作就会多次执行。
而且,对累加器的更新操作并不是在RDD的操作方法被调用时才执行的。RDD的转换操作是惰性的,转换操作中对累加器的更新并不会立即执行。因此,如果驱动程序在操作方法被调用之前就使用累加器的值,那么它将得到一个错误的值。
3.10 总结
Spark是一个快速、可扩展、可容错且基于内存的集群计算框架。一个Spark应用可以比Hadoop应用快上100倍。
Spark不但快速而且它能很方便地使用mapReduce。通过不同语言(包括Java、Python、Scala和R)的易读的API,它可以方便地开发分布式大数据应用。使用Spark开发者的生产力可以有5~10倍的提升。
而且Spark为各种数据处理任务提供了统一的平台。它是一个通用的框架,可以被各种大数据应用使用。对于迭代式数据分析或者使用迭代算法的应用而言,它是一个理想的平台。
Spark的编程模型基于一个叫作RDD的抽象概念。从概念上看,RDD类似于Scala中的集合。它表示的数据就是一组分区的集合,这些分区分布在集群的节点上。它还为处理数据提供一些函数式的方法。