Spark编程指南V1.4.0
· 简单介绍
· 接入Spark
· Spark初始化
· 使用Shell
· 在集群上部署代码
· 弹性分布式数据集
· 并行集合(Parallelized Collections)
· 其它数据集
· RDD的操作
· 基础操作
· 向Spark传递函数
· 处理键值对
· 转换
· 动作
· RDD的持久化
· 存储级别的选择
· 移除数据
· 共享变量
· 广播变量
· 累加器
· 部署到一个集群上
· 单元測试
· 从1.0之前版本号的Spark迁移
· 下一步该怎么做
简单介绍
总的来说,每个Spark的应用。都是由一个驱动程序(driver program)构成。它执行用户的main函数,在一个集群上执行各种各样的并行操作。Spark提出的最主要抽象概念是弹性分布式数据集 (resilientdistributed dataset,RDD)。它是一个元素集合,划分到集群的各个节点上,能够被并行操作。
RDDs的创建能够从HDFS(或者随意其它支持Hadoop文件系统)上的一个文件開始,或者通过转换驱动程序(driver program)中已存在的Scala集合而来。用户也能够让Spark保留一个RDD在内存中,使其能在并行操作中被有效的反复使用。最后,RDD能自己主动从节点故障中恢复。
Spark的第二个抽象概念是共享变量(shared variables),能够在并行操作中使用。在默认情况下,Spark通过不同节点上的一系列任务来执行一个函数,它将每个函数中用到的变量的拷贝传递到每个任务中。有时候,一个变量须要在任务之间,或任务与驱动程序之间被共享。
Spark支持两种类型的共享变量:广播变量,能够在内存的全部的结点上缓存变量;累加器:仅仅能用于做加法的变量。比如计数或求和。
本指南将用每一种Spark支持的语言来展示这些特性。
这都是非常easy来跟着做的假设你启动了Spark的交互式Shell或者Scala的bin/spark-shell或者Python的bin/pyspark。
接入Spark
Scala
Spark1.2.1须要和Scala2.10一起使用。假设你要用Scala来编写应用,你须要用一个对应版本号的Scala(比如2.10.X)。
要写一个Spark应用程序,你须要在加入Spark的Maven依赖。Spark能够通过Maven中心库来获得:
groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.2.1
除此之外,假设你想訪问一个HDFS集群,你须要依据你的HDFS版本号。加入一个hadoop-client的依赖。一些通用的HDFS版本号标签在第三方发行版页面列出。
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>
最后,你须要将一些Spark的类和隐式转换导入到你的程序中。
通过例如以下语句:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf
Java
Spark1.2.1须要执行在Java6及更高版本号上。假设你正在使用Java8,Spark支持使用Lambda表达式简洁地编写函数。或者你能够使用在org.apache.spark.api.java.function包中的类。
要使用Java编写Spark应用程序。你须要加入一个Spark的依赖。Spark能够通过Maven中心库获得:
groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.2.1
此外,假设你想訪问一个HDFS集群。你须要依据你的HDFS版本号,加入一个hadoop-client的依赖。一些通用的HDFS版本号标签在第三方发行版页面列出。
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>
最后,你须要将Spark的类导入到你的程序中。加入例如以下行:
importorg.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.SparkConf
Python
Spark1.2.1须要和Python2.6或者更高的版本号(但不是Python3)一起使用。它使用标准的CPython解释器,因此像NumPy这类的C语言库能够用。
要用Python的方式执行Spark应用程序。能够使用在Spark文件夹下的bin/spark-submit脚本。
这个脚本会装载Spark的Java和Scala库并同意你将程序提交到集群。
你也能够使用bin/pyspark来启动一个交互式Python Shell。
假设你想要訪问HDFS数据,你须要依据你的HDFS版本号使用一个PySpark的构建。一些通用的HDFS版本号标签在第三方发行版页面列出。针对通用的HDFS版本号的预先构建的包在Spark主页上也是可获得。
最后,你须要导入一些Spark相关的类到你的程序中。加入例如以下的行:
from pyspark import SparkContext, SparkConf
初始化Spark
Scala
Spark程序须要做的第一件事情,就是创建一个SparkContext对象。它将告诉Spark怎样訪问一个集群。要创建一个SparkContext你首先须要建立一个SparkConf对象,这个对象包括你的程序的信息。
每一个JVM仅仅能有一个活动的SparkContext。在创建一个新的SparkContext之前你必须stop()活动的SparkContext。
val conf = newSparkConf().setAppName(appName).setMaster(master) new SparkContext(conf)
appName是你的应用的名称。将会在集群的Web监控UI中显示。master參数,是一个用于指定所连接的Spark,Mesos or Mesos 集群URL的字符串,也能够是一个如以下所描写叙述的用于在local模式执行的特殊字符串“local”。在实践中,当执行在一个集群上时,你不会想把master硬编码到程序中,而是启动spark-submit来接收它。然而。对于本地測试和单元測试。你能够通过“local”模式执行Spark。
Java
Spark程序须要做的第一件事情,就是创建一个JavaSparkContext对象,它将告诉Spark怎样訪问一个集群。要创建一个SparkContext你首先须要建立一个SparkConf对象,这个对象包括你的程序的信息。
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = newJavaSparkContext(conf);
appName是你的应用的名称。将会在集群的Web监控UI中显示。
master參数,是一个用于指定所连接的Spark,Mesos or Mesos 集群URL的字符串,也能够是一个如以下所描写叙述的用于在local模式执行的特殊字符串“local”。在实践中,当执行在一个集群上时,你不会想把master硬编码到程序中,而是启动spark-submit来接收它。
然而。对于本地測试和单元測试。你能够通过“local”模式执行Spark。
Python
Spark程序须要做的第一件事情,就是创建一个JavaSparkContext对象。它将告诉Spark怎样訪问一个集群。
要创建一个SparkContext你首先须要建立一个SparkConf对象,这个对象包括你的程序的信息。
conf =SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
appName是你的应用的名称,将会在集群的Web监控UI中显示。master參数,是一个用于指定所连接的Spark,Mesos or Mesos 集群URL的字符串,也能够是一个如以下所描写叙述的用于在local模式执行的特殊字符串“local”。
在实践中。当执行在一个集群上时,你不会想把master硬编码到程序中,而是启动spark-submit来接收它。然而,对于本地測试和单元測试。你能够通过“local”模式执行Spark。
使用Shell
Scala
在Spark shell中,一个特殊的解释器感知的SparkContext已经为你创建好了,变量名叫做sc。创建自己的SparkContext将不会生效。
你能够使用-master參数设置context连接到那个master,而且你能够使用-jars參数把用逗号分隔的一个jar包列表加入到classpath中。比如,假设在四核CPU上执行spark-shell,使用:
$ ./bin/spark-shell --master local[4]
或者。同一时候在classpath中增加code.jar,使用:
$ ./bin/spark-shell --master local[4] --jarscode.jar
想要获得完整的选项列表,执行spark-shell –help。
在背后,spark-shell调用更一般的spark-submit脚本。
Python
在PySpark shell中,一个特殊的解释器感知的SparkContext已经为你创建好了,变量名叫做sc。创建自己的SparkContext将不会生效。
你能够使用-master參数设置context连接到那个master,而且你能够使用—py-files參数把用逗号分隔的一个Python .zip,.egg或者.py文件列表加入到classpath中。比如。假设在四核CPU上执行bin/pyspark,使用:
$ ./bin/pyspark --master local[4]
或者。同一时候将code.py加入到搜索路径中(为了以后使用import code),使用:
$ ./bin/pyspark --master local[4] --py-filescode.py
想要获得完整的选项列表。执行pyspark –help。在背后。pyspark调用更一般的spark-submit脚本。
也能够在IPython中启动Pyspark shell。一个增强的Python解释器。PySpark要使用IPython1.0.0及其之后的版本号。要使用IPython,当执行bin/pyspark时要设置PYSPARK_DRIVER_PYTHON变量为ipython:
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
你能够通过设置PYSPARK_DRIVER_PYTHON_OPTS參数来自己定义ipython命令。
比如,启动有PyLab支持的IPython Notebook支持:
$PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook--pylab inline" ./bin/pyspark
弹性分布式数据集(RDDs)
Spark环绕的概念是弹性分布式数据集(RDD)。是一个有容错机制并能够被并行操作的元素集合。眼下有两种创建RDDs的方法:并行化一个在你的驱动程序中已经存在的集合,或者引用在外部存储系统上的数据集。比如共享文件系统,HDFS,HBase。或者不论什么以Hadoop输入格式提供的数据源。
并行集合
Scala
并行集合是通过调用SparkContext的parallelize方法,在一个已经存在的集合上创建的(一个Scala Seq对象)。集合的对象将会被拷贝,创建出一个能够被并行操作的分布式数据集。比如,以下展示了如何创建一个含有数字1到5的并行集合:
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
一旦创建了分布式数据集(distData),就能够对其运行并行操作。比如,我们能够调用distData.reduce((a,b)=>a+b)来累加数组的元素。兴许我们会进一步地描写叙述对分布式数据集的操作。
并行集合的一个重要參数是分区数(the number of partitions)。表示数据集切分的份数。
Spark将在集群上为每一个分区数据起一个任务。
典型情况下,你希望集群的每一个CPU分布2-4个分区(partitions)。通常,Spark会尝试基于集群状况自己主动设置分区数。然而,你也能够进行手动设置,通过将分区数作为第二个參数传递给parallelize方法来实现。
(比如:sc.parallelize(data,10))。
注意:代码中的一些地方使用属于“分片(分区的近义词)”来保持向后兼容。
Java
并行集合是通过对存在于驱动程序中的集合调用JavaSparkContext的parallelize方法来构建的。构建时会拷贝集合中的元素。创建一个能够被并行操作的分布式数据集。
比如,这里演示了怎样创建一个包括数字1到5的并行集合:
List<Integer> data = Arrays.asList(1, 2,3, 4, 5); JavaRDD<Integer> distData =sc.parallelize(data);
一旦创建了分布式数据集(distData)。就能够对其运行并行操作。
比如,我们能够调用distData.reduce((a,b)=>a+b)来累加数组的元素。
兴许我们会进一步地描写叙述对分布式数据集的操作。
注意:在本指南中,我们会常常使用简洁地Java8的lambda语法来指明Java函数,而在Java的旧版本号中,你能够实现org.apache.spark.api.java.function包中的接口。
以下我们将在把函数传递到Spark中描写叙述很多其它的细节。
并行集合的一个重要參数是分区数(the number of partitions)。表示数据集切分的份数。Spark将在集群上为每一个分区数据起一个任务。
典型情况下,你希望集群的每一个CPU分布2-4个分区(partitions)。
通常,Spark会尝试基于集群状况自己主动设置分区数。然而,你也能够进行手动设置。通过将分区数作为第二个參数传递给parallelize方法来实现。(比如:sc.parallelize(data,10))。注意:代码中的一些地方使用属于“分片(分区的近义词)”来保持向后兼容。
Python
并行集合是通过对存在于驱动程序中的迭代器(iterable)或集合(collection),调用SparkContext的parallelize方法来构建的。构建时会拷贝迭代器或集合中的元素,创建一个能够被并行操作的分布式数据集。比如,这里演示了怎样创建一个包括数字1到5的并行集合:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
一旦创建了分布式数据集(distData),就能够对其运行并行操作。
比如,我们能够调用distData.reduce(lambda a,b:a+b)来累加列表的元素。兴许我们会进一步地描写叙述对分布式数据集的操作。
并行集合的一个重要參数是分区数(the number of partitions),表示数据集切分的份数。
Spark将在集群上为每一个分区数据起一个任务。典型情况下,你希望集群的每一个CPU分布2-4个分区(partitions)。通常。Spark会尝试基于集群状况自己主动设置分区数。然而,你也能够进行手动设置,通过将分区数作为第二个參数传递给parallelize方法来实现。
(比如:sc.parallelize(data,10))。注意:代码中的一些地方使用属于“分片(分区的近义词)”来保持向后兼容。
外部数据集
Scala
Spark能够从Hadoop支持的不论什么存储源中构建出分布式数据集,包含你的本地文件系统,HDFS。Cassandre。HBase,Amazon S3等。
Spark支持text files。Sequence files,以及其它不论什么一种Hadoop InputFormat。
Text file RDDs的创建能够使用SparkContext的textFile方法。该方法接受一个文件的URI地址(或者是机器上的一个本地路径。或者是一个hdfs://,s3n://等URI)作为參数。并读取文件的每一行数据。放入集合中,以下是一个调用样例:
scala> val distFile =sc.textFile("data.txt") distFile: RDD[String] = MappedRDD@1d4cee08
一旦创建完毕。就能够在distFile上运行数据集操作。比如,要相对全部行的长度进行求和,我们能够通过例如以下的map和reduce操作来完毕:
distFile.map(s => s.length).reduce((a, b)=> a + b)
Spark读文件时的一些注意事项:
1. 假设文件使用本地文件系统上的路径。那么该文件必须在工作节点的同样路径下也能够訪问。
能够将文件复制到全部的worker节点上,或者使用network-mounted共享文件系统。
2. Spark的全部基于文件的输入方法。包含textFile。支持在文件夹上执行,压缩文件和通配符。比如。你能够使用textFile(”/my/directory”),textFile(“/my/directory/*.txt”),和textFile(“/my/directory/*.gz”)。
3. textFile方法也带有可选的第二个參数,用于控制文件的分区数。默认情况下,Spark会为文件的每个block创建一个分区。可是你也能够通过传入更大的值,来设置更高的分区数。注意,你设置的分区数不能比文件的块数小。
除了text文件,Spark的Scala API也支持其它几种数据格式:
1. SparkContext.wholeTextFiles能够让你读取包括多个小text文件的文件夹,而且每一个文件相应返回一个(filename,content)对。而相应的textFile方法,文件的每一行相应返回一条记录(record)。
2. 对于Sequence文件。使用SparkContext的sequenceFile[K,V]方法。当中K和V分别相应文件里key和values的类型。这些类型必须是Hadoop的Writable接口的子类,如IntWritable和Text。另外。Spark同意你使用一些常见的Writables的原生类型;比如,sequenceFile[Int,String]会自己主动的转换为类型IntWritables和Texts。
3. 对于其它的Hadoop InputFormats,你能够使用SparkContext.hadoopRDD方法。它能够接受一个随意类型的JobConf和输入格式类。key类和value类。像Hadoop Job设置输入源那样去设置这些參数就可以。对基于“新”的MapReduce API(org.apache.hadoop.mapreduce)的InputFormats,你也能够使用SparkContex.newHadoopRDD。
4. RDD.saveAsObjectFile和SparkContext.objectFile支持由序列化的Java对象组成的简单格式来保存RDD。
尽管这不是一种像Avro那样有效的序列化格式。可是她提供了一种能够存储不论什么RDD的简单方式。
Java
Spark能够从Hadoop支持的不论什么存储源中构建出分布式数据集,包含你的本地文件系统。HDFS,Cassandre。HBase,Amazon S3等。
Spark支持text files,Sequence files,以及其它不论什么一种Hadoop InputFormat。
Text file RDDs的创建能够使用SparkContext的textFile方法。该方法接受一个文件的URI地址(或者是机器上的一个本地路径。或者是一个hdfs://,s3n://等URI)作为參数,并读取文件的每一行数据,放入集合中,以下是一个调用样例:
JavaRDD<String> distFile =sc.textFile("data.txt");
一旦创建完毕,就能够在distFile上运行数据集操作。比如。要相对全部行的长度进行求和。我们能够通过例如以下的map和reduce操作来完毕:
distFile.map(s -> s.length()).reduce((a, b)-> a + b)
Spark读文件时的一些注意事项:
1. 假设文件使用本地文件系统上的路径,那么该文件必须在工作节点的同样路径下也能够訪问。能够将文件复制到全部的worker节点上,或者使用network-mounted共享文件系统。
2. Spark的全部基于文件的输入方法,包含textFile,支持在文件夹上执行,压缩文件和通配符。
比如,你能够使用textFile(”/my/directory”),textFile(“/my/directory/*.txt”)。和textFile(“/my/directory/*.gz”)。
3. textFile方法也带有可选的第二个參数,用于控制文件的分区数。默认情况下,Spark会为文件的每个block创建一个分区。可是你也能够通过传入更大的值,来设置更高的分区数。
注意。你设置的分区数不能比文件的块数小。
除了text文件,Spark的Java API也支持其它几种数据格式:
1. JavaSparkContext.wholeTextFiles能够让你读取包括多个小text文件的文件夹,而且每一个文件相应返回一个(filename,content)对。
而相应的textFile方法。文件的每一行相应返回一条记录(record)。
2. 对于Sequence文件,使用SparkContext的sequenceFile[K,V]方法,当中K和V分别相应文件里key和values的类型。这些类型必须是Hadoop的Writable接口的子类,如IntWritable和Text。另外,Spark同意你使用一些常见的Writables的原生类型;比如。sequenceFile[Int,String]会自己主动的转换为类型IntWritables和Texts。
3. 对于其它的Hadoop InputFormats,你能够使用JavaSparkContext.hadoopRDD方法。它能够接受一个随意类型的JobConf和输入格式类,key类和value类。
像Hadoop Job设置输入源那样去设置这些參数就可以。对基于“新”的MapReduce API(org.apache.hadoop.mapreduce)的InputFormats,你也能够使用JavaSparkContex.newHadoopRDD。
4. JavaRDD.saveAsObjectFile和JavaSparkContext.objectFile支持由序列化的Java对象组成的简单格式来保存RDD。尽管这不是一种像Avro那样有效的序列化格式,可是她提供了一种能够存储不论什么RDD的简单方式。
Python
PySpark能够从Hadoop支持的不论什么存储源中构建出分布式数据集。包含你的本地文件系统,HDFS,Cassandre。HBase,Amazon S3等。Spark支持text files,Sequence files,以及其它不论什么一种Hadoop InputFormat。
Text file RDDs的创建能够使用SparkContext的textFile方法。该方法接受一个文件的URI地址(或者是机器上的一个本地路径,或者是一个hdfs://。s3n://等URI)作为參数,并读取文件的每一行数据。放入集合中,以下是一个调用样例:
>>> distFile =sc.textFile("data.txt")
一旦创建完毕,就能够在distFile上运行数据集操作。比如。要相对全部行的长度进行求和,我们能够通过例如以下的map和reduce操作来完毕:
distFile.map(lambda s: len(s)).reduce(lambda a,b: a + b)
Spark读文件时的一些注意事项:
1. 假设文件使用本地文件系统上的路径。那么该文件必须在工作节点的同样路径下也能够訪问。
能够将文件复制到全部的worker节点上,或者使用network-mounted共享文件系统。
2. Spark的全部基于文件的输入方法。包含textFile,支持在文件夹上执行,压缩文件和通配符。
比如,你能够使用textFile(”/my/directory”),textFile(“/my/directory/*.txt”),和textFile(“/my/directory/*.gz”)。
3. textFile方法也带有可选的第二个參数。用于控制文件的分区数。
默认情况下,Spark会为文件的每个block创建一个分区,可是你也能够通过传入更大的值。来设置更高的分区数。注意,你设置的分区数不能比文件的块数小。
除了text文件。Spark的Python API也支持其它几种数据格式:
1. JavaSparkContext.wholeTextFiles能够让你读取包括多个小text文件的文件夹,而且每一个文件相应返回一个(filename,content)对。而相应的textFile方法。文件的每一行相应返回一条记录(record)。
2. RDD.saveAsPickleFile和SparkContext.pickleFile支持由pickled Python对象组成的简单格式保存RDD。使用批量的方式处理pickle模块的对象序列化。默认批处理大小为10.
3. SequenceFile和Hadoop输入/输出格式
注意。此功能当前标识为试验性的,是为高级用户而提供的。在将来的版本号中,可能会由于支持基于SparkSQL的读写而被代替。在这样的情况下。SparkSQL是首选的方法。
Writable支持
PySpark的SequenceFile支持载入Java中的键值(key-value)对RDD。能够将Writable转换为主要的Java类型。而且通过Pyrolite。在结果Java对象上运行pickles序列化操作。当将一个键值对的RDD保存为SequenceFIle时。PySpark会对其进行反操作。
它会unpickles Python的对象为Java对象,然后再将它们转换为Writables。
下表中的Writables会被自己主动地转换:
Writable Type |
Python Type |
Text |
unicode str |
IntWritable |
int |
FloatWritable |
float |
DoubleWritable |
float |
BooleanWritable |
bool |
BytesWritable |
bytearray |
NullWritable |
None |
MapWritable |
dict |
数组不支持开箱(out-of-the-box)处理。当读或写数组时,用户须要指定自己定义的ArrayWritable子类。当写数组时,用户也须要指定自己定义的转换器(converters),将数组转换为自己定义的ArrayWritable子类。当读数组时,默认的转换器会将自己定义的ArrayWritable子类转换为Java的Object[]。然后被pickled成Python的元组。假设要获取包括基本数据类型的数组,Python的array.array的话,用户须要为该数组指定自己定义的转换器。
保存和载入SequenFiles
类似于text files,SequenceFiles能够被保存和载入到指定的路径下。
能够指定key和value的类型,但对标准的Writables类型则不须要指定。
>>> rdd = sc.parallelize(range(1,4)).map(lambda x: (x, "a" * x )) >>>rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')]
保存和载入其它的Hadoop输入/输出格式
PySpark也能够读不论什么Hadoop InputFormat或者写不论什么Hadoop OutputFormat,包含“新”和“旧”两个Hadoop MapReduce APIs。假设须要的话,能够将传递进来的一个Hadoop配置当成一个Python字典。这里有一个使用了Elasticsearch ESInputFormat的例子:
SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar./bin/pyspark >>> conf = {"es.resource" :"index/type"} # assumeElasticsearch is running on localhost defaults >>> rdd =sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable","org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that isconverted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
注意。假设这个InputFormat仅仅是简单地依赖于Hadoop配置和/或输入路径。以及key和value的类型。它就能够非常easy地依据上面的表格进行转换,那么这样的方法应该能够非常好地处理这些情况。
假设你有一个定制序列化的二进制数据(比方载入自Cassandra/HBase的数据)。那么你首先要做的,是在Scala/Java側将数据转换为能够用Pyrolite的pickler处理的东西。
Converter特质提供了这一转换功能。简单地extend该特质。然后在convert方法中实现你自己的转换代码。
记住要确保该类,以及訪问你的InputFormat所需的依赖。都须要被打包到你的Spark作业的jar包,而且包括在PySpark的类路径中。
在Python例子和Converter例子上给出了带自己定义转换器的Cassandra/HBase的InputFormat和OutputFormat的使用例子。
RDD操作
RDDs支持两种操作:转换(transformations),能够从已有的数据集创建一个新的数据集;而动作(actions),在数据集上执行计算后,会向驱动程序返回一个值。
比如。map就是一种转换,它将数据集每个元素都传递给函数,并返回一个新的分布数据集来表示结果。
另一方面。reduce是一种动作。通过一些函数将全部的元素聚合起来,并将终于结果返回给驱动程序(只是另一个并行的reduceByKey,能返回一个分布式数据集)。
Spark中的全部转换都是惰性的,也就是说,它们并不会立即计算结果。相反的,它们仅仅是记住应用到基础数据集(比如一个文件)上的这些转换动作。仅仅有当发生一个要求返回结果给驱动程序的动作时。这些转换才会真正执行。这样的设计让Spark更加有效率地执行。
比如,我们对map操作创建的数据集进行reduce操作时,仅仅会向驱动返回reduce操作的结果,而不是返回更大的map操作创建的数据集。
默认情况下,每个转换过的RDD都会在你对它运行一个动作时被又一次计算。
只是,你也能够使用持久化或者缓存方法。把一个RDD持久化到内存中。在这样的情况下。Spark会在集群中保存相关元素。以便你下次查询这个RDD时。能更高速地訪问。
对于把RDDs持久化到磁盘上,或在集群中拷贝到多个节点也是支持的。
基础操作
Scala
为了描写叙述RDD的基础操作,能够考虑以下的简单程序:
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b)=> a + b)
第一行通过一个外部文件定义了一个主要的RDD。这个数据集未被载入到内存。也未在上面执行操作:lines只指向这个文件。第二行定义了lineLengths作为map转换结果。此外,因为惰性,不会马上计算lineLengths。最后。我们执行reduce。这是一个动作。这时候,Spark才会将这个计算拆分成不同的task,并执行在独立的机器上,而且每台机器执行它自己的map部分和本地的reducatin。只返回它的结果给驱动程序。
假设我们希望以后能够复用lineLengths,能够加入:
lineLengths.persist()
在reduce之前,这将导致lineLengths在第一次被计算之后,被保存在内存中。
Java
为了描写叙述RDD的基础操作,能够考虑以下的简单程序:
JavaRDD<String> lines =sc.textFile("data.txt"); JavaRDD<Integer> lineLengths =lines.map(s -> s.length()); int totalLength = lineLengths.reduce((a, b)-> a + b);
第一行通过一个外部文件定义了一个主要的RDD。这个数据集未被载入到内存,也未在上面执行操作:lines只指向这个文件。
第二行定义了lineLengths作为map转换结果。
此外,因为惰性,不会马上计算lineLengths。最后,我们执行reduce,这是一个动作。
这时候。Spark才会将这个计算拆分成不同的task,并执行在独立的机器上。而且每台机器执行它自己的map部分和本地的reducatin,只返回它的结果给驱动程序。
假设我们希望以后能够复用lineLengths,能够加入:
lineLengths.persist();
在reduce之前,这将导致lineLengths在第一次被计算之后。被保存在内存中。
Python
为了描写叙述RDD的基础操作。能够考虑以下的简单程序:
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a+ b)
第一行通过一个外部文件定义了一个主要的RDD。
这个数据集未被载入到内存。也未在上面执行操作:lines只指向这个文件。
第二行定义了lineLengths作为map转换结果。
此外,因为惰性。不会马上计算lineLengths。最后。我们执行reduce,这是一个动作。这时候,Spark才会将这个计算拆分成不同的task,并执行在独立的机器上,而且每台机器执行它自己的map部分和本地的reducatin,只返回它的结果给驱动程序。
假设我们希望以后能够复用lineLengths,能够加入:
lineLengths.persist()
在reduce之前。这将导致lineLengths在第一次被计算之后,被保存在内存中。
把函数传递到Spark
Scala
Spark的API,在非常大程度上依赖于把驱动程序中的函数传递到集群上执行。
这有两种推荐的实现方式:
●使用匿名函数的语法,这能够用来替换简短的代码。
●使用全局单例对象的静态方法。比方,你能够定义函数对象objectMyFunctions。然后传递该对象的方法MyFunction.func1,例如以下所看到的:
object MyFunctions { deffunc1(s: String): String = { ... } } myRdd.map(MyFunctions.func1)
注意:因为可能传递的是一个类实例方法的引用(而不是一个单例对象)。在传递方法的时候,应该同一时候传递包括该方法的对象。比方,考虑:
class MyClass { deffunc1(s: String): String = { ... } defdoStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } }
这里。假设我们创建了一个类实例new MyClass,而且调用了实例的doStuff方法,该方法中的map处调用了这个MyClass实例的func1方法。所以须要将整个对象传递到集群中。类似于写成:rdd.map(x=>this.func1(x))。
类似地,訪问外部对象的字段时将引用整个对象:
class MyClass { valfield = "Hello" defdoStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) } }
等同于写成rdd.map(x=>this.field+x),引用了整个this。为了避免这样的问题,最简单的方式是把field复制到本地变量,而不是去外部訪问它:
def doStuff(rdd: RDD[String]): RDD[String] = { valfield_ = this.field rdd.map(x => field_ + x) }
Java
Spark的API,在非常大程度上依赖于把驱动程序中的函数传递到集群上执行。在Java中,函数由那些实现了org.apache.spark.api.java.function包中的接口的类表示。
有两种创建这种函数的方式:
●在你自己的类中实现Function接口,能够是匿名内部类,或者命名类。而且传递类的一个实例到Spark。
●在Java8中,使用lambda表达式来简明地定义函数的实现。
为了保持简洁性,本指南中大量使用了lambda语法,这在长格式中非常easy使用全部同样的APIs。
比方,我们能够把上面的代码写成:
JavaRDD<String> lines =sc.textFile("data.txt"); JavaRDD<Integer> lineLengths =lines.map(new Function<String, Integer>() { publicInteger call(String s) { return s.length(); } }); int totalLength = lineLengths.reduce(newFunction2<Integer, Integer, Integer>() { publicInteger call(Integer a, Integer b) { return a + b; } });
或者。假设不方便编写内联函数的话,能够写成:
class GetLength implements Function<String,Integer> { publicInteger call(String s) { return s.length(); } } class Sum implements Function2<Integer,Integer, Integer> { publicInteger call(Integer a, Integer b) { return a + b; } } JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths =lines.map(new GetLength()); int totalLength = lineLengths.reduce(newSum());
注意,Java中的匿名内部类也能够訪问封闭域中的变量,仅仅要这些变量标识为final就可以。Spark会像处理其它语言一样,将这些变量复制到每一个工作节点上。
Python
Spark的API,在非常大程度上依赖于把驱动程序中的函数传递到集群上执行。
有三种推荐方法能够使用:
●使用Lambda表达式来编写能够写成一个表达式的简单函数(Lambdas不支持没有返回值的多语句函数或表达式)。
●Spark调用的函数中的Local defs,能够用来取代更长的代码。
●模块中的顶级函数。
比如,假设想传递一个支持使用lambda表达式的更长的函数,能够考虑下面代码:
"""MyScript.py""" if __name__ == "__main__": defmyFunc(s): words = s.split(" ") return len(words) sc =SparkContext(...) sc.textFile("file.txt").map(myFunc)
注意:因为可能传递的是一个类实例方法的引用(而不是一个单例对象(singleton object)),在传递方法的时候,应该同一时候传递包括该方法的对象。比方,考虑:
class MyClass(object): deffunc(self, s): return s defdoStuff(self, rdd): return rdd.map(self.func)
这里。假设我们创建了一个类实例new MyClass,而且调用了实例的doStuff方法。该方法中的map处调用了这个MyClass实例的func1方法,所以须要将整个对象传递到集群中。
类似地,訪问外部对象的字段时将引用整个对象:
class MyClass(object): def__init__(self): self.field = "Hello" defdoStuff(self, rdd): return rdd.map(lambda s: self.field + x)
为了避免这样的问题。最简单的方式是把field复制到本地变量。而不是去外部訪问它:
def doStuff(self, rdd): field= self.field return rdd.map(lambda s: field + x)
理解闭包
关于Spark的一个更困难的问题是理解当在一个集群上运行代码的时候,变量和方法的范围以及生命周期。改动范围之外变量的RDD操作常常是造成混乱的源头。在以下的样例中我们看一下使用foreach()来添加一个计数器的代码,只是相同的问题也可能有其它的操作引起。
样例
考虑以下的单纯的RDD元素求和,依据是否执行在一个虚拟机上。它们的行为全然不同。一个寻常的样例是在local模式(--master=local[n])下执行Spark对照将Spark程序部署到一个集群上(比如通过spark-submit提交到YARN)。
Scala
var counter = 0 var rdd = sc.parallelize(data) // Wrong: Don't do this!! rdd.foreach(x => counter += x) println("Counter value: " + counter)
Java
int counter = 0; JavaRDD<Integer> rdd =sc.parallelize(data); // Wrong: Don't do this!! rdd.foreach(x -> counter += x); println("Counter value: " + counter);
Python
counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! rdd.foreach(lambda x: counter += x) print("Counter value: " + counter)
本地模式VS集群模式
基本的挑战是,上述代码的行为是没有定义的。在使用单个JVM的本地模式中。上面的代码会在RDD中计算值的总和并把它存储到计数器中。这是由于RDD和计数器变量在驱动节点的同一个内存空间中。
然而,在集群模式下。发生的事情更为复杂,上面的代码可能不会依照目的工作。
要运行作业,Spark将RDD操作分成任务——每个任务由一个运行器操作。在运行前。Spark计算闭包。闭包是指运行器要在RDD上进行计算时必须对运行节点可见的那些变量和方法(在这里是foreach())。这个闭包被序列化并发送到每个运行器。在local模式下,仅仅有一个运行器因此全部东西都分享同一个闭包。然而在其它的模式中,就不是这个情况了,运行在不同工作节点上的运行器有它们自己的闭包的一份拷贝。
这里发生的事情是闭包中的变量被发送到每一个运行器都是被拷贝的,因此,当计数器在foreach函数中引用时,它不再是驱动节点上的那个计数器了。在驱动节点的内存中仍然有一个计数器。但它对运行器来说不再是可见的了!
运行器仅仅能看到序列化闭包中的拷贝。因此,计数器终于的值仍然是0,由于全部在计数器上的操作都是引用的序列化闭包中的值。
在这样的情况下要确保一个良好定义的行为。应该使用累加器。Spark中的累加器是一个专门用来在运行被分散到一个集群中的各个工作节点上的情况下安全更新变量的机制。
本指南中的累加器部分会做具体讨论。
一般来说。闭包-构造像循环或者本地定义的方法。不应该用来改变一些全局状态。Spark未定义或者是保证改变在闭包之外引用的对象的行为。
一些这样做的代码可能会在local模式下起作用。但那不过个偶然,这种代码在分布式模式下是不会依照期望工作的。假设须要一些全局的參数,能够使用累加器。
打印RDD中的元素
还有一个常见的使用方法是使用rdd.foreach(println)方法或者rdd.map(println)方法试图打印出RDD中的元素。
在一台单一的机器上,这样会产生期望的输出并打印出RDD中的元素。然而。在集群模式中。被运行器调用输出到stdout的输出如今被写到了运行器的stdout,并非在驱动上的这一个,因此驱动上的stdout不会显示这些信息。要在驱动上打印全部的元素,能够使用collect()方法首先把RDD取回到驱动节点如:rdd.collect().foreach(println)。然而,这可能导致驱动内存溢出。由于collect()将整个RDD拿到了单台机器上;假设你仅仅须要打印非常少几个RDD的元素,一个更安全的方法是使用take()方法:rdd.take(100).foreach(println)。
键值对的使用
Scala
尽管,在包括随意类型的对象的RDDs中,能够使用大部分的Spark操作。但也有一些特殊的操作仅仅能在键值对的RDDs上使用。最常见的一个就是分布式的洗牌(shuffle)操作,诸如基于key值对元素进行分组或聚合的操作。
在Scala中。包括二元组(Tuple2)对象(能够通过简单地(a,b)代码,来构建内置于语言中的元组的RDDs支持这些操作)。仅仅要你在程序中导入了org.apache.spark.SparkContext._,就能进行隐式转换。
PairRDDFunction类支持键值对的操作,假设你导入了隐式转换。该类型就能自己主动地对元组RDD的元素进行转换。
比方。下列代码在键值对上使用了reduceByKey操作。来计算在一个文件里每行文本出现的总次数:
val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a +b)
我们也能够使用counts.sortByKey(),比方,将键值对以字典序进行排序。最后使用counts.collect()转换成对象的数组形式,返回给驱动程序。
注意:在键值对操作中。假设使用了自己定义对象作为建,你必须确保该对象实现了自己定义的equals()和相应的hashCode()方法。很多其它详情请查看Object.hashCode()文档大纲中列出的规定。
Java
尽管。在包括随意类型的对象的RDDs中,能够使用大部分的Spark操作。但也有一些特殊的操作仅仅能在键值对的RDDs上使用。最常见的一个就是分布式的洗牌(shuffle)操作。诸如基于key值对元素进行分组或聚合的操作。
在java中,能够使用Scala标准库中的scala.Tuple2类来表示键值对,你能够简单地调用new Tuple2(a,b)来创建一个元组。然后使用tuple._1()和tuple._2()方法来訪问元组的字段。
使用JavaPairRDD来表示键值对RDDs。你能够使用指定版本号的map操作,从JavaRDDs构建JavaPairRDDs。比方mapToPair和flatMapToPair。
JavaPairRDD支持标准的RDD函数,也支持特殊的键值函数。
比如。以下的代码在键值(key-value)对上使用 reduceByKey操作来计算在一个文件里每行文本出现的总次数:
JavaRDD<String> lines =sc.textFile("data.txt"); JavaPairRDD<String, Integer> pairs =lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts =pairs.reduceByKey((a, b) -> a + b);
我们也能够使用 counts.sortByKey(),比如。将键值对以字典序(alphabetically)进行排序。
后调用 counts.collect() 转换成对象的数组形式,返回给驱动程序(driverprogram)。
注意:在键值(key-value)对操作中。假设使用了自己定义对象作为键,你必须确保该对象实现了自己定义的 equals()和相应的 hashCode()方法。很多其它详情请查看Object.hashCode() documentation文档大纲中列出的规定。
Python
尽管在包括随意类型的对象的 RDDs中,能够使用大部分的 Spark操作。但也有一
些特殊的操作仅仅能在键值(key-value)对的 RDDs上使用。
最常见的一个就是分布式的洗牌("shuffle")操作,诸如基于 key值对元素进行分组或聚合的操作。
在 Python中, RDDs支持的操作包括 Python内置的元组(tuples)操作,比方 (1, 2)。你能够简单地创建这种元组,然后调用期望的操作。
比如。以下的代码在键值(key-value)对上使用 reduceByKey操作来计算在一个文件里每行文本出现的总次数:
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)
我们也能够使用 counts.sortByKey()。比如,依照字典序(alphabetically)排序键值对。最后调用 counts.collect() 转换成对象的数组形式,返回给驱动程序(driver program)。
转换
下表中列出了 Spark支持的一些常见的转换 (Transformations)。详情请參考 RDDAPI文档 (Scala, Java, Python)和 pair RDD函数文档 (Scala, Java)。
Transformation |
Meaning |
map(func) |
返回一个新分布式数据集。由每个输入元素经过 func函数转换后组成。 |
filter(func) |
返回一个新数据集。由经过 func函数计算后返回值为 true的输入元素组成。 |
flatMap(func) |
类似于 map,可是每个输入元素能够被映射为 0或多个输出元素(因此 func应该返回一个序列(Seq),而不是单一元素)。 |
mapPartitions(func) |
类似于 map。但独立地在 RDD的每个分区(partition,相应块(block))上执行,当在类型为 T 的 RDD上执行时, func的函数类型必须是Iterator<T> => Iterator<U>。 |
mapPartitionsWithIndex(func) |
类似于 mapPartitions,但 func带有一个整数參数表示分区(partition)的索引值。 当在类型为 T的 RDD上执行时, func的函数类型必须是(Int, Iterator<T>) => Iterator<U>。 |
sample(withReplacement, fraction, seed) |
依据 fraction指定的比例。对数据进行採样。能够选择是否用随机数进行替换, seed用于指定随机数生成器种子。 |
union(otherDataset) |
返回一个新的数据集,新数据集由源数据集和參数数据集的元素联合(union)而成。 |
intersection(otherDataset) |
返回一个新的数据集,新数据集由源数据集和參数数据集的元素的交集(intersection)组成。 |
distinct([numTasks])) |
返回一个新的数据集。新数据集由源数据集过滤掉多余的反复元素仅仅保留一个而成。 |
groupByKey([numTasks]) |
在一个 (K, V)对的数据集上调用,返回一个 (K, Iterable<V>)对的数据集。 注意:假设你想在每一个key上分组运行聚合(如总和或平均值)操作,使用 reduceByKey或 combineByKey会产生更好的性能。 注意:默认情况下,输出的并行数依赖于父 RDD(parent RDD)的分区数(number of partitions)。你能够通过传递可选的第二个參数 numTasks来设置不同的任务数。 |
reduceByKey(func, [numTasks]) |
在一个 (K, V)对的数据集上调用时。返回一个 (K, V)对的数据集。使用指定的 reduce函数 func将同样 key的值聚合到一起,该函数的类型必须是(V,V) => V。类似 groupByKey。 reduce的任务个数是能够通过第二个可选參数来配置的。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
在一个 (K, V)对的数据集上调用时,返回一个 (K, U)对的数据集,对每一个键的值使用给定的组合函数(combine functions)和一个中性的“零”值进行聚合。同意聚合后的值类型不同于输入的值类型。从而避免了不必要的内存分配。如同 groupByKey,能够通过设置第二个可选參数来配置reduce任务的个数。 |
sortByKey([ascending], [numTasks]) |
在一个 (K, V)对的数据集上调用,当中, K必须实现Ordered,返回一个 依照 Key进行排序的 (K, V)对数据集。升序或降序由布尔參数 ascending决定。 |
join(otherDataset, [numTasks]) |
在类型为 (K, V)和 (K, W)类型的数据集上调用时,返回一个同样 key 相应的全部元素对在一起的 (K, (V, W))对的数据集。也支持外联(Outer joins),通过使用 leftOuterJoin和 rightOuterJoin. |
cogroup(otherDataset, [numTasks]) |
在类型为 (K, V)和 (K, W)的数据集上调用,返回一个 (K, Iterable<V>, Iterable<W>)元组(tuples)的数据集。这个操作也能够称之为 groupWith。 |
cartesian(otherDataset) |
笛卡尔积。在类型为 T和 U类型的数据集上调用时,返回一个 (T, U)对的数据集(全部元素交互进行笛卡尔积)。 |
pipe(command, [envVars]) |
以管道(Pipe)方式将 RDD的各个分区(partition)传递到 shell命令,比方一个 Perl或 bash脚本中。 RDD的元素会被写入进程的标准输入(stdin),而且将作为字符串的 RDD(RDD of strings),在进程的标准输出(stdout)上输出一行行数据。 |
coalesce(numPartitions) |
把 RDD的分区数减少到指定的 numPartitions。 过滤掉一个大数据集 之后再运行操作会更加有效。 |
repartition(numPartitions) |
随机地对 RDD的数据又一次洗牌(Reshuffle),以便创建很多其它或更少的分区,对它们进行平衡。总是对网络上的全部数据进行洗牌(shuffles)。 |
repartitionAndSortWithinPartitions(partitioner) |
依据给定的分区器对RDD进行又一次分区。在每一个结果分区中。将记录依照key值进行排序。这在每一个分区中比先调用repartition再排序效率更高。由于它能够推动排序到分牌机器上。 |
动作
下表中列出了 Spark支持的一些常见的动作 (actions)。详情请參考 RDD API文档
(Scala,Java, Python) 和pair RDD函数文档(Scala, Java)。
Action |
Meaning |
reduce(func) |
通过函数 func (接受两个參数。返回一个參数),聚集数据集中的全部元素。 该函数应该是可交换和可结合的,以便它能够正确地并行计算。 |
collect() |
在驱动程序中。以数组的形式。返回数据集的全部元素。这一般会在使用filter或者其他操作,并返回一个足够小的数据子集后再使用会比較实用 |
count() |
返回数据集的元素的个数。 |
first() |
返回数据集的第一个元素。 (类似于 take(1)). |
take(n) |
返回一个由数据集的前 n个元素组成的数组。 注意,这个操作眼下不能并行运行。而是由驱动程序(driver program)计算全部的元素。 |
takeSample(withReplacement,num, [seed]) |
返回一个数组,由数据集中随机採样的 num个元素组成。能够选择是否用随机数替换不足的部分。能够指定可选參数seed。预先指定一个随机数生成器的种子。 |
takeOrdered(n, [ordering]) |
返回一个由数据集的前 n个元素,并使用自然顺序或定制顺序对这些元素进行排序。 |
saveAsTextFile(path) |
将数据集的元素,以 text file (或 text file的集合)的形式,保存到本地文件系统的指定文件夹, Spark会对每一个元素调用 toString方法,然后转换为文件里的文本行。 |
saveAsSequenceFile(path) |
将数据集的元素,以 Hadoop sequencefile的格式,保存到各种文件系统的指定路径下,包含本地系统。 HDFS或者不论什么其他 hadoop支持的文件系统。该方法仅仅能用于键值(key-value)对的 RDDs。或者实现了 Hadoop的Writable接口的情况下。在 Scala中。也能够用于支持隐式转换为 Writable的类型。(Spark包含了基本类型的转换。比如 Int, Double。 String,等等)。 |
saveAsObjectFile(path) |
以简单地 Java序列化方式将数据集的元素写入指定的路径,相应的能够用 SparkContext.objectFile()载入该文件。 |
countByKey() |
仅仅对 (K,V)类型的 RDD有效。返回一个 (K, Int)对的 hashmap,当中 (K,Int)对表示每个 key相应的元素个数。 |
foreach(func) |
在数据集的每个元素上,执行 func函数。这通经常使用于副作用(sideeffects)。比如更新一个累加器变量(accumulator variable)(參见下文)。或者和外部存储系统进行交互. |
洗牌操作
Spark触发一个事件后进行的一些操作成为洗牌。洗牌是Spark又一次分配数据的机制,这样它就能够跨分区分组。这通常涉及在运行器和机器之间复制数据,这就使得洗牌是一个复杂和高代价的操作。
背景
为了理解在洗牌的时候发生了什么,我们能够考虑reduceByKey操作的样例。
reduceByKey操作产生了一个新的RDD,在这个RDD中,全部的单个的值被组合成了一个元组,key和运行一个reduce函数后的结果中与这个key有关的全部值。面临的挑战是一个key的全部的值并不都是在同一个分区上的,甚至不是一台机器上的,可是他们必须是可连接的以计算结果。
在Spark中。数据通常是不会跨分区分布的,除非是在一个特殊的地方为了某种特定的目的。在计算过程中,单个任务将在单个分区上操作——因此,为了组织全部数据运行单个reduceByKey中的reduce任务,Spark须要运行一个all-to-all操作。
它必须读取全部分区,找到全部key的值,并跨分区把这些值放到一起来计算每一个key的终于结果——这就叫做洗牌。
虽然在每一个分区中新洗牌的元素集合是确定性的,分区本身的顺序也相同如此。这些元素的顺序就不一定是了。
假设期望在洗牌后获得可预測的有序的数据。能够使用:
mapPartitions 来排序每一个分区。比如使用.sorted
repartitionAndSortWithinPartitions 在又一次分区的同一时候有效地将分区排序
sortBy来创建一个全局排序的RDD
能够引起洗牌的操作有重分区比如repartition和coalesce,‘ByKey操作(除了计数)像groupByKey和reduceByKey,还有join操作比如cogroup和join。
性能影响
Shuffle是一个代价高昂的操作。由于它调用磁盘I/O。数据序列化和网络I/O。要组织shuffle的数据,Spark生成一个任务集合——map任务来组织数据,并使用一组reduce任务集合来聚合它。它的命名来自与MapReduce,但并不直接和Spark的map和reduce操作相关。
在内部,单个的map任务的结果被保存在内存中。直到他们在内存中存不下为止。
然后,他们基于目标分区进行排序。并写入到一个单个的文件里。
在reduce这边。任务读取相关的已经排序的块。
某些shuffle操作会消耗大量的堆内存。由于他们用在内存中的数据结构在转换操作之前和之后都要对数据进行组织。特别的。reduceByKey和aggregateByKey在map側创建这些结构,‘ByKey操作在reduce側生成这些结构。当数据在内存中存不下时。Spark会将他们存储到磁盘,造成额外的磁盘开销和添加垃圾收集。
Shuffle也会在磁盘上产生大量的中间文件。
在Spark1.3中。这些文件直到Spark停止执行时才会从Spark的暂时存储中清理掉,这意味着长时间执行Spark作业会消耗可观的磁盘空间。这些做了之后假设lineage又一次计算了。那shuffle不须要又一次计算了。
在配置Spark上下文时,暂时存储文件夹由spark.local.dir配置參数指定。
Shuffle的行为能够通过调整各种配置參数来调整。请看Spark配置指南中的Shuffle Behavior部分。
RDD持久化
Spark最重要的一个功能,就是在不同操作间,将一个数据集持久化(persisting) (或缓存(caching))到内存中。
当你持久化(persist)一个 RDD。每个节点都会把它计算的全部分区(partitions)存储在内存中,并在对数据集 (或者衍生出的数据集)运行其它动作(actioins)时重用。这将使得兴许动作(actions)的运行变得更加迅速(通常快 10 倍)。缓存(Caching)是用 Spark 构建迭代算法和高速地交互使用的关键。
你能够使用 persist()或 cache()方法来持久化一个 RDD。
在首次被一个动作(action)触发计算后。它将会被保存到节点的内存中。
Spark 的缓存是带有容错机制的,假设 RDD丢失不论什么一个分区的话,会自己主动地用原先构建它的转换(transformations)操作来又一次进行计算。
此外。每个被持久化的 RDD都能够用不同的存储级别(storage level)进行存储。比方,同意你持久化数据集到硬盘,以序列化的 Java对象(节省空间)存储到内存。跨节点复制,或者以off-heap的方式存储在 Tachyon。这些级别的选择,是通过将一个 StorageLevel对象 (Scala Java, Python)传递到 persist()方法中进行设置的。 cache()方法是使用默认存储级别的快捷方法,也就是 StorageLevel.MEMORY_ONLY (将反序列化 (deserialized)的对象存入内存)。
完整的可选存储级别例如以下:
Storage Level |
Meaning |
MEMORY_ONLY |
将 RDD以反序列化(deserialized)的Java对象存储到 JVM。假设 RDD不能被内存装下。一些分区将不会被缓存,而且在须要的时候被又一次计算。这是默认的级别。 |
MEMORY_AND_DISK |
将 RDD以反序列化(deserialized)的 Java对象存储到 JVM。 假设 RDD不能被内存装下,超出的分区将被保存在硬盘上,而且在须要时被读取。 |
MEMORY_ONLY_SER |
将 RDD以序列化(serialized)的 Java对象进行存储(每一分区占用一个字节数组)。 通常来说,这比将对象反序列化(deserialized)的空间利用率更高,尤其当使用高速序列化器(fast serializer),但在读取时会比較耗 CPU。 |
MEMORY_AND_DISK_SER |
类似于 MEMORY_ONLY_SER。可是把超出内存的分区将存储在硬盘上而不是在每次须要的时候又一次计算。 |
DISK_ONLY |
仅仅将 RDD分区存储在硬盘上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. |
与上述的存储级别一样,可是将每个分区都拷贝到两个集群节点上。 |
OFF_HEAP (experimental) |
以序列化的格式 (serialized format) 将 RDD存储到 Tachyon。相比于MEMORY_ONLY_SER。 OFF_HEAP 减少了垃圾收集(garbage collection)的开销。并使 executors变得更小并且共享内存池,这在大堆(heaps)和多应用并行的环境下是很吸引人的。并且,因为 RDDs驻留于 Tachyon中, executor的崩溃不会导致内存中的缓存丢失。在这样的模式下, Tachyon中的内存是可丢弃的。因此。 Tachyon不会尝试重建一个在内存中被清除的分块。 |
注意:在 Python中。存储对象时总是使用 Pickle库来序列化(serialized),而无论你是否选择了一个序列化的级别。
Spark也会自己主动地持久化一些洗牌(shuffle)操作(比方,reduceByKey )的中间数据,即使用户没有调用 persist。
这么做是为了避免在一个节点上的洗牌(shuffle)过程失败时,又一次计算整个输入。我们仍然建议用户在结果 RDD 上调用 persist。假设希望重用它的话。
怎样选择存储级别?
Spark 的存储级别旨在满足内存使用和CPU效率权衡上的不同需求。我们建议通过下面方法进行选择:
●假设你的 RDDs能够非常好的与默认的存储级别(MEMORY_ONLY)契合,就不须要做不论什么改动了。
这已经是 CPU使用效率最高的选项,它使得 RDDs的操作尽可能的快。
●假设不行,试着使用 MEMORY_ONLY_SER,而且选择一个高速序列化库使对象在有比較高的空间使用率(space-efficient)的情况下,依旧能够较快被訪问。
●尽可能不要存储到硬盘上。除非计算数据集的函数的计算量特别大。或者它们过滤了大量的数据。否则。又一次计算一个分区的速度,可能和从硬盘中读取差点儿相同快。
●假设你想有高速的故障恢复能力,使用复制存储级别(比如:用 Spark来响应 web应用的请求)。
全部的存储级别都有通过又一次计算丢失的数据来恢复错误的容错机制。可是复制的存储级别能够让你在 RDD 上持续地执行任务,而不须要等待丢失的分区被又一次计算。
●在大量的内存或多个应用程序的环境下。试验性的 OFF_HEAP模式具有下面几个长处:
o 同意多个 executors共享 Tachyon中同样的内存池。
o 极大地减少了垃圾收集器(garbage collection)的开销。
o 即使个别的 executors崩溃了,缓存的数据也不会丢失。
移除数据
Spark 会自己主动监控各个节点上的缓存使用情况,并使用近期最少使用算法(least-recently-used (LRU))删除老的数据分区。假设你想手动移除一个 RDD,而不是等它自己主动从缓存中清除。能够使用 RDD.unpersist()方法。
共享变量
一般来说。当一个函数被传递给一个在远程集群节点上执行的 Spark操作(比如 map或 reduce) 时,它操作的是这个函数用到的全部变量的独立拷贝。这些变量会被复制到每一台机器。并且在远程机器上对变量的全部更新都不会被传播回驱动程序。
通常看来。读-写任务间的共享变量显然不够高效。然而。Spark还是为两种常见的使用模式。提供了两种有限的共享变量:广播变量(broadcast variables)和累加器(accumulators)。
广播变量
广播变量同意程序猿保留一个仅仅读的变量。缓存在每一台机器上,而不是每一个任务保存一份拷贝。它们能够这样被使用,比如,以一种高效的方式给每一个节点一个大的输入数据集。Spark会尝试使用一种高效的广播算法来传播广播变量。从而降低通信的代价。
Spark动作的执行是通过一个阶段的集合,通过分布式的Shuffle操作分离。Spark自己主动广播在每一个阶段里任务须要的共同数据。以这样的方式广播的数据以序列化的形式缓存并在执行每一个任务之前进行反序列化。这意味着显式地创建广播变量仅仅在当多个阶段之间须要同样的数据或者是当用反序列化的形式缓存数据特别重要的时候。
广播变量是通过调用 SparkContext.broadcast(v)方法从变量 v创建的。广播变量是一个 v的封装器,它的值能够通过调用 value方法获得。
例如以下代码展示了这个:
Scala
scala>val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala>broadcastVar.value res0:Array[Int] = Array(1, 2, 3)
Java
Broadcast<int[]>broadcastVar = sc.broadcast(new int[] {1, 2, 3}); broadcastVar.value(); // returns[1, 2, 3]
Python
>>>broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcastobject at 0x102789f10> >>>broadcastVar.value [1, 2, 3]
在广播变量被创建后。它应该在集群执行的不论什么函数中,取代 v值被调用,从而 v值不须要被再次传递到这些节点上。
另外,对象 v不能在广播后改动。这样能够保证全部节点具有同样的广播变量的值(比方,兴许假设变量被传递到一个新的节点)。
累加器
累加器是一种仅仅能通过具有结合性的操作(associative operation)进行“加(added)”的变量。因此能够高效地支持并行。它们能够用来实现计数器(如 MapReduce 中)和求和器。Spark原生就支持数值类型的累加器,开发人员能够自己加入新的支持类型。
假设创建了一个命名的累加器(accumulators)。这些累加器将会显示在 Spark UI 界面上。这对于了解当前执行阶段(stages)的进展情况是很实用的(注意:这在 Python 中尚未支持)。
一个累加器能够通过调用 SparkContext.accumulator(v)方法从一个初始值 v中创建。
执行在集群上的任务,能够通过使用 add方法或 +=操作符(在 Scala和 Python)来给它加值。然而。它们不能读取这个值。仅仅有驱动程序能够使用 value方法来读取累加器的值。
下面代码展示了怎样利用一个累加器,将一个数组里面的全部元素相加:
Scala
scala> val accum = sc.accumulator(0,"My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3,4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasksfinished in 0.317106 s scala> accum.value res2: Int = 10
Java
Accumulator<Integer> accum =sc.accumulator(0); sc.parallelize(Arrays.asList(1, 2, 3,4)).foreach(x -> accum.add(x)); // ... // 10/09/29 18:41:08 INFO SparkContext: Tasksfinished in 0.317106 s accum.value(); // returns 10
Python
>>> accum = sc.accumulator(0) Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3,4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasksfinished in 0.317106 s scala> accum.value 10
尽管代码能够使用内置支持的 Int类型的累加器。但程序猿也能够通过子类化(subclassing) AccumulatorParam来创建自己的类型。 AccumulatorParam接口有两个方法: zero,为你的数据类型提供了一个“零值(zero value)”,以及 addInPlace提供了两个值相加的方法。比方,如果我们有一个表示数学上向量的 Vector类,我们能够这么写:
Scala
object VectorAccumulatorParam extendsAccumulatorParam[Vector] { defzero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } defaddInPlace(v1: Vector, v2: Vector): Vector = { v1 +=v2 } }
// Then, create an Accumulator of this type: val vecAccum = sc.accumulator(newVector(...))(VectorAccumulatorParam)
在 Scala中。 Spark也支持更通用的 Accumulable接口去累加数据。其结果类型和累加的元素不同(比方。构建一个包括全部元素的列表)。而且SparkContext.accumulableCollection方法能够累加普通的 Scala集合(collection)类型。
Java
class VectorAccumulatorParam implementsAccumulatorParam<Vector> { publicVector zero(Vector initialValue) { return Vector.zeros(initialValue.size()); } publicVector addInPlace(Vector v1, Vector v2) { v1.addInPlace(v2); return v1; } }
// Then, create an Accumulator of this type: Accumulator<Vector> vecAccum =sc.accumulator(new Vector(...), new VectorAccumulatorParam());
在 Java中, Spark也支持更通用的 Accumulable接口去累加数据。其结果类型和累加的元素不同(比方,构建一个包括全部元素的列表)。
Python
class VectorAccumulatorParam(AccumulatorParam): defzero(self, initialValue): return Vector.zeros(initialValue.size) defaddInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(...),VectorAccumulatorParam())
由于累加器的更新仅仅在action中执行。Spark确保每一个任务对累加器的更新都仅仅会被应用一次,比如,重新启动任务将不会更新这个值。在转换中,用户应该清楚假设任务或者作业阶段是反复执行的,每一个任务的更新可能会应用不止一次。
累加器不会改变Spark的懒惰评价模型。
假设它们在一个RDD的操作中正在被更新。他们的值仅仅会被更新一次,RDD作为动作的一部分被计算。因此。累加器更新当在运行一个懒惰转换。比如map()时,并不保证被运行。
以下的代码段演示了这个属性:
Scala
val accum = sc.accumulator(0) data.map { x => accum += x; f(x) } // Here, accum is still 0 because no actionshave caused the `map` to be computed.
Java
Accumulator<Integer> accum =sc.accumulator(0); data.map(x -> { accum.add(x); return f(x);}); // Here, accum is still 0 because no actionshave caused the `map` to be computed.
Python
accum = sc.accumulator(0) def g(x): accum.add(x) returnf(x) data.map(g) # Here, accum is still 0 because no actionshave caused the `map` to be computed.
把代码部署到集群上
应用程序提交指南(application submission guide)描写叙述了怎样将应用程序提交到一个集群。简单地说,一旦你将你的应用程序打包成一个 JAR(对于 Java/Scala)或者一组的 .py或 .zip文件 (对于 Python), bin/spark-submit 脚本能够让你将它提交到支持的不论什么集群管理器中。
从Java/Scala中启动Spark作业
Org.apache.spark.launcher包中提供了相关类来启动Spark作业作为子线程的简单的Java API。
单元測试
Spark 对单元測试很友好。能够使用不论什么流行的单元測试框架。在你的測试中简单地创建一个 SparkContext。并将 master URL设置成 local。执行你的各种操作,然后调用SparkContext.stop()结束測试。确保在 finally块或測试框架的 tearDown方法中调用 context的 stop方法,由于 Spark不支持在一个程序中同一时候执行两个contexts。
Spark1.0之前版本号的迁移
Scala
Spark 1.0 冻结了 1.X系列的 Spark核心(Core) API,如今。当中的 API,除了标识为“试验性(experimental)”或“开发人员的(developer) API”的。在将来的版本号中都会被支持。
对 Scala用户而言,唯一的改变在于组操作(grouping operations),比方, groupByKey, cogroup 和 join,其返回值已经从 (Key, Seq[Value])对改动为(Key,Iterable[Value])。
迁移指南也能够从 Spark Streaming。 MLlib和 GraphX获取。
Java
Spark 1.0 冻结了 1.X系列的 Spark核心(Core) API,如今,当中的 API。仅仅要不是标识为“试验性(experimental)”或“开发人员的(developer) API”的。在将来的版本号中都会被支持。当中对 Java API做了一些改动:
•对于 org.apache.spark.api.java.function中的类函数(Function classes),在 1.0版本号中变成了接口,这意味着旧的代码中 extends Function应该须要为 implement Function。
•添加了 map转换(transformations)的新变体,如 mapToPair和 mapToDouble,用于创建指定数据类型的 RDDs。
•组操作(grouping operations),如 groupByKey。 cogroup 和 join的返回值也被改动了,从原先返回 (Key, List<Value>)对改为 (Key,Iterable<Value>)。
迁移指南也能够从 Spark Streaming, MLlib和 GraphX获取。
Python
Spark 1.0 冻结了 1.X系列的 Spark核心(Core) API。如今。当中的 API,仅仅要不是
标识为“试验性(experimental)”或“开发人员的(developer) API”的,在将来的版本号中
都会被支持。
对 Python用户而言,唯一的改动在于分组操作(grouping operations),比
如groupByKey,cogroup和join,其返回值从 (key, list of values)对改动为 (key,
iterableof values)。
迁移指南也能够从 Spark Streaming。 MLlib和 GraphX获取。
下一步
你能够在 Spark的站点上看到 spark程序的例子。另外。Spark在 examples文件夹 (Scala, Java, Python。R)中也包括了一些例子。你能够通过将类名传递给 spark的 bin/run-example脚本来执行 Java和 Scala的例子。比如:
<span lang="EN-US" style="font-size: 10.5pt; font-family: Helvetica, sans-serif; color: rgb(87, 87, 87); background-image: initial; background-attachment: initial; background-size: initial; background-origin: initial; background-clip: initial; background-position: initial; background-repeat: initial;">./bin/run-example SparkPi</span>
对于 Python例子,要使用 spark-submit:
./bin/spark-submitexamples/src/main/python/pi.py
对于R例子,使用spark-submit:
./bin/spark-submitexamples/src/main/r/dataframe.R
为了帮助优化你的程序,在配置(configuration)和调优(tuning)的指南上提供了最佳实践信息。它们在确保将你的数据用一个有效的格式存储在内存上,是很重要的。
对于部署的帮助信息,能够查看集群模式概述(cluster mode overview),描写叙述了分布式操作以及支持集群管理器所涉及的组件。
最后。完整的 API文档能够查看 Scala, Java。Python和R。
本文转自mfrbuaa博客园博客,原文链接:http://www.cnblogs.com/mfrbuaa/p/5386688.html,如需转载请自行联系原作者