【Python】PySpark 入门

简介: 【Python】PySpark 入门

1 Resilient Distributed Datasets(RDD)


弹性分布式数据集(RDD)是一个不可变的JVM对象的分布式集合,是Spark的基本抽象。


1.1 创建RDD


准备工作:

>>> import pyspark
>>> from pyspark import SparkContext
>>> from pyspark import SparkConf
>>> conf = SparkConf().setAppName('project1').setMaster('local')
>>> sc = SparkContext.getOrCreate(conf)


在PySpark里有两种方法创建RDD:

一是,.parallelize(…) 个collection集合 ( list or an array of some elements)。

>>> data = sc.parallelize([('amber',22),('alfred',23),('skye',4),('albert',12),('amber',9)])


二是,引用位于本地或HDFS上的某个文件(或多个文件)。

>>> data_from_file = sc.textFile('/home/qml/pyspark-ex/VS14MORT.txt.gz',4)
# sc.textFile(...,n)中的最后一个参数指定数据集被分区的数量,经验是分成两个四分区


# sc.textFile(…,n)中的最后一个参数指定数据集被分区的数量,经验是分成两个四分区


Spark 支持多种数据格式:可以使用JDBC驱动程序读取文本,Parquet,JSON,Hive表和来自关系数据库的数据。请注意,Spark可以自动处理压缩的数据集(如Gzip压缩数据集)。


从文件读取的数据表示为MapPartitionsRDD,而不是像当我们.paralellize(…)一个集合的数据一样表示为ParallelCollectionRDD。


1.2 Schema


RDD是无模式的数据结构(不像DataFrames)。因此,在使用RDD时,并行化数据集对于Spark来说是完美的。

>>> data_heterogenous = sc.parallelize([('Ferrari','fast'),{'Porsche':100000},['Spain','visited',4504]]).collect()


所以,我们可以混合几乎任何东西:一个元组,一个字典,或一个列表。


一旦你.collect()数据集(即,运行一个动作将其返回给驱动程序),你可以像在Python中通常那样访问对象中的数据:

>>> data_heterogenous[1]['Porsche']
100000


.collect()方法将RDD的所有元素返回到驱动程序,并将其作为列表序列化。


1.3 读取文件


从文本文件读取时,文件中的每一行形成RDD的一个元素。 可以创建一个元素列表,每行代表一个值列表。

>>> data_from_file.take(1)
[u'                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']


1.4 Lambda表达式


1.4.1 Transformations


.map(…)


该方法应用于RDD的每个元素。

In [1]:
data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
data_2014_2.take(10)
Out[2]:
[('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('-99', -99)]


.filter(…)


允许选择符合指定条件的数据集元素。

data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
data_filtered.count()
1
2
.flatMap(…)
与map()的工作方式类似,但返回的是平铺的结果而不是列表。
In [3]:
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
data_2014_flat.take(10)
Out[4]:
['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]


.distinct()


此方法返回指定列中不同值的列表。

In [5]:
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
distinct_gender
Out[6]:
['-99', 'M', 'F']


.sample(…)


该方法返回数据集中的随机样本。

In [7]:
fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)
data_sample.take(1)
Out[8]:
[array(['1', '  ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10',
        '  ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',
        '215', '063', '   ', '21', '02', '11I350 ', '21I251 ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '02',
        'I251 ', 'I350 ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '28', ' ',
        ' ', '2', '4', '100', '8'], 
       dtype='<U40')]


.leftOuterJoin(…)


左外连接就像SQL一样,根据两个数据集中的值加入两个RDD,并从左RDD中返回从右侧追加两个RDD匹配的记录。

>>> rd1 = sc.parallelize([('a',1),('b',4),('c',10)])
>>> rd2 = sc.parallelize([('a',4),('a',1),('b','6'),('d',15)])
>>> rd3 = rd1.leftOuterJoin(rd2)
>>> print rd3.take(5)
[('a', (1, 4)), ('a', (1, 1)), ('c', (10, None)), ('b', (4, '6'))]


如果我们使用.join(…)方法,那么当这两个值在这两个RDD之间相交时,我们只能得到’a’和’b’的值。

>>> rd4 = rd1.join(rd2)
>>> print rd4.collect()
[('a', (1, 4)), ('a', (1, 1)), ('b', (4, '6'))]


另一个有用的方法是.intersection(…),它返回两个RDD中相同的记录。

>>> rd5 = rd1.intersection(rd2)
>>> print rd5.collect()
[('a', 1)]


该方法从单个数据分区返回n个最高行。

In [9]:
data_first = data_from_file_conv.take(1)
data_first
Out[10]:
[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], 
       dtype='<U40')]


.reduce(…)

>>> rd1.map(lambda row: row[1]).reduce(lambda x,y:x+y)
15


.reduceByKey(…)

>>> data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
>>> data_key.reduceByKey(lambda x, y: x + y).collect()
[('b', 4), ('c', 2), ('a', 12), ('d', 5)]


.count()


统计RDD中元素的数量。

>>> data_reduce.count()
6


.countByKey()


如果你的数据集是键值的形式,则可以使用.countByKey()方法获取不同键的数量。

>>> data_key.countByKey().items()
[('a', 2), ('b', 2), ('d', 2), ('c', 1)]


.saveAsTextFile(…)


将RDD保存到文本文件:每个分区保存到一个单独的文件。

>>> data_key.saveAsTextFile('/Users/drabast/Documents/PySpark_Data/data_key.txt')


.foreach(…)


一种将函数应用同到RDD每个元素的迭代法。

def f(x): 
    print(x)
data_key.foreach(f)


目录
相关文章
|
2月前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。
|
2月前
|
机器学习/深度学习 数据可视化 数据挖掘
使用Python进行数据分析的入门指南
本文将引导读者了解如何使用Python进行数据分析,从安装必要的库到执行基础的数据操作和可视化。通过本文的学习,你将能够开始自己的数据分析之旅,并掌握如何利用Python来揭示数据背后的故事。
|
1天前
|
开发者 Python
Python入门:8.Python中的函数
### 引言 在编写程序时,函数是一种强大的工具。它们可以将代码逻辑模块化,减少重复代码的编写,并提高程序的可读性和可维护性。无论是初学者还是资深开发者,深入理解函数的使用和设计都是编写高质量代码的基础。本文将从基础概念开始,逐步讲解 Python 中的函数及其高级特性。
Python入门:8.Python中的函数
|
1天前
|
存储 索引 Python
Python入门:6.深入解析Python中的序列
在 Python 中,**序列**是一种有序的数据结构,广泛应用于数据存储、操作和处理。序列的一个显著特点是支持通过**索引**访问数据。常见的序列类型包括字符串(`str`)、列表(`list`)和元组(`tuple`)。这些序列各有特点,既可以存储简单的字符,也可以存储复杂的对象。 为了帮助初学者掌握 Python 中的序列操作,本文将围绕**字符串**、**列表**和**元组**这三种序列类型,详细介绍其定义、常用方法和具体示例。
Python入门:6.深入解析Python中的序列
|
1天前
|
程序员 UED Python
Python入门:3.Python的输入和输出格式化
在 Python 编程中,输入与输出是程序与用户交互的核心部分。而输出格式化更是对程序表达能力的极大增强,可以让结果以清晰、美观且易读的方式呈现给用户。本文将深入探讨 Python 的输入与输出操作,特别是如何使用格式化方法来提升代码质量和可读性。
Python入门:3.Python的输入和输出格式化
|
1天前
|
机器学习/深度学习 人工智能 算法框架/工具
Python入门:1.Python介绍
Python是一种功能强大、易于学习和运行的解释型高级语言。由**Guido van Rossum**于1991年创建,Python以其简洁、易读和十分工程化的设计而带来了庞大的用户群体和丰富的应用场景。这个语言在全球范围内都被认为是**创新和效率的重要工具**。
Python入门:1.Python介绍
|
1天前
|
缓存 算法 数据处理
Python入门:9.递归函数和高阶函数
在 Python 编程中,函数是核心组成部分之一。递归函数和高阶函数是 Python 中两个非常重要的特性。递归函数帮助我们以更直观的方式处理重复性问题,而高阶函数通过函数作为参数或返回值,为代码增添了极大的灵活性和优雅性。无论是实现复杂的算法还是处理数据流,这些工具都在开发者的工具箱中扮演着重要角色。本文将从概念入手,逐步带你掌握递归函数、匿名函数(lambda)以及高阶函数的核心要领和应用技巧。
Python入门:9.递归函数和高阶函数
|
1天前
|
存储 SQL 索引
Python入门:7.Pythond的内置容器
Python 提供了强大的内置容器(container)类型,用于存储和操作数据。容器是 Python 数据结构的核心部分,理解它们对于写出高效、可读的代码至关重要。在这篇博客中,我们将详细介绍 Python 的五种主要内置容器:字符串(str)、列表(list)、元组(tuple)、字典(dict)和集合(set)。
Python入门:7.Pythond的内置容器
|
1天前
|
存储 Linux iOS开发
Python入门:2.注释与变量的全面解析
在学习Python编程的过程中,注释和变量是必须掌握的两个基础概念。注释帮助我们理解代码的意图,而变量则是用于存储和操作数据的核心工具。熟练掌握这两者,不仅能提高代码的可读性和维护性,还能为后续学习复杂编程概念打下坚实的基础。
Python入门:2.注释与变量的全面解析
|
1天前
|
知识图谱 Python
Python入门:4.Python中的运算符
Python是一间强大而且便捷的编程语言,支持多种类型的运算符。在Python中,运算符被分为算术运算符、赋值运算符、复合赋值运算符、比较运算符和逻辑运算符等。本文将从基础到进阶进行分析,并通过一个综合案例展示其实际应用。

热门文章

最新文章

推荐镜像

更多