1 Resilient Distributed Datasets(RDD)
弹性分布式数据集(RDD)是一个不可变的JVM对象的分布式集合,是Spark的基本抽象。
1.1 创建RDD
准备工作:
>>> import pyspark >>> from pyspark import SparkContext >>> from pyspark import SparkConf >>> conf = SparkConf().setAppName('project1').setMaster('local') >>> sc = SparkContext.getOrCreate(conf)
在PySpark里有两种方法创建RDD:
一是,.parallelize(…) 个collection集合 ( list or an array of some elements)。
>>> data = sc.parallelize([('amber',22),('alfred',23),('skye',4),('albert',12),('amber',9)])
二是,引用位于本地或HDFS上的某个文件(或多个文件)。
>>> data_from_file = sc.textFile('/home/qml/pyspark-ex/VS14MORT.txt.gz',4) # sc.textFile(...,n)中的最后一个参数指定数据集被分区的数量,经验是分成两个四分区
# sc.textFile(…,n)中的最后一个参数指定数据集被分区的数量,经验是分成两个四分区
Spark 支持多种数据格式:可以使用JDBC驱动程序读取文本,Parquet,JSON,Hive表和来自关系数据库的数据。请注意,Spark可以自动处理压缩的数据集(如Gzip压缩数据集)。
从文件读取的数据表示为MapPartitionsRDD,而不是像当我们.paralellize(…)一个集合的数据一样表示为ParallelCollectionRDD。
1.2 Schema
RDD是无模式的数据结构(不像DataFrames)。因此,在使用RDD时,并行化数据集对于Spark来说是完美的。
>>> data_heterogenous = sc.parallelize([('Ferrari','fast'),{'Porsche':100000},['Spain','visited',4504]]).collect()
所以,我们可以混合几乎任何东西:一个元组,一个字典,或一个列表。
一旦你.collect()数据集(即,运行一个动作将其返回给驱动程序),你可以像在Python中通常那样访问对象中的数据:
>>> data_heterogenous[1]['Porsche'] 100000
.collect()方法将RDD的所有元素返回到驱动程序,并将其作为列表序列化。
1.3 读取文件
从文本文件读取时,文件中的每一行形成RDD的一个元素。 可以创建一个元素列表,每行代表一个值列表。
>>> data_from_file.take(1) [u' 1 2101 M1087 432311 4M4 2014U7CN I64 238 070 24 0111I64 01 I64 01 11 100 601']
1.4 Lambda表达式
1.4.1 Transformations
.map(…)
该方法应用于RDD的每个元素。
In [1]: data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16]))) data_2014_2.take(10) Out[2]: [('2014', 2014), ('2014', 2014), ('2014', 2014), ('2014', 2014), ('2014', 2014), ('2014', 2014), ('2014', 2014), ('2014', 2014), ('2014', 2014), ('-99', -99)]
.filter(…)
允许选择符合指定条件的数据集元素。
data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0') data_filtered.count() 1 2 .flatMap(…) 与map()的工作方式类似,但返回的是平铺的结果而不是列表。 In [3]: data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1)) data_2014_flat.take(10) Out[4]: ['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]
.distinct()
此方法返回指定列中不同值的列表。
In [5]: distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect() distinct_gender Out[6]: ['-99', 'M', 'F']
.sample(…)
该方法返回数据集中的随机样本。
In [7]: fraction = 0.1 data_sample = data_from_file_conv.sample(False, fraction, 666) data_sample.take(1) Out[8]: [array(['1', ' ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10', ' ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251', '215', '063', ' ', '21', '02', '11I350 ', '21I251 ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', '02', 'I251 ', 'I350 ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', '28', ' ', ' ', '2', '4', '100', '8'], dtype='<U40')]
.leftOuterJoin(…)
左外连接就像SQL一样,根据两个数据集中的值加入两个RDD,并从左RDD中返回从右侧追加两个RDD匹配的记录。
>>> rd1 = sc.parallelize([('a',1),('b',4),('c',10)]) >>> rd2 = sc.parallelize([('a',4),('a',1),('b','6'),('d',15)]) >>> rd3 = rd1.leftOuterJoin(rd2) >>> print rd3.take(5) [('a', (1, 4)), ('a', (1, 1)), ('c', (10, None)), ('b', (4, '6'))]
如果我们使用.join(…)方法,那么当这两个值在这两个RDD之间相交时,我们只能得到’a’和’b’的值。
>>> rd4 = rd1.join(rd2) >>> print rd4.collect() [('a', (1, 4)), ('a', (1, 1)), ('b', (4, '6'))]
另一个有用的方法是.intersection(…),它返回两个RDD中相同的记录。
>>> rd5 = rd1.intersection(rd2) >>> print rd5.collect() [('a', 1)]
该方法从单个数据分区返回n个最高行。
In [9]: data_first = data_from_file_conv.take(1) data_first Out[10]: [array(['1', ' ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11', ' ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ', '238', '070', ' ', '24', '01', '11I64 ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', '01', 'I64 ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', '01', ' ', ' ', '1', '1', '100', '6'], dtype='<U40')]
.reduce(…)
>>> rd1.map(lambda row: row[1]).reduce(lambda x,y:x+y) 15
.reduceByKey(…)
>>> data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4) >>> data_key.reduceByKey(lambda x, y: x + y).collect() [('b', 4), ('c', 2), ('a', 12), ('d', 5)]
.count()
统计RDD中元素的数量。
>>> data_reduce.count() 6
.countByKey()
如果你的数据集是键值的形式,则可以使用.countByKey()方法获取不同键的数量。
>>> data_key.countByKey().items() [('a', 2), ('b', 2), ('d', 2), ('c', 1)]
.saveAsTextFile(…)
将RDD保存到文本文件:每个分区保存到一个单独的文件。
>>> data_key.saveAsTextFile('/Users/drabast/Documents/PySpark_Data/data_key.txt')
.foreach(…)
一种将函数应用同到RDD每个元素的迭代法。
def f(x): print(x) data_key.foreach(f)