【Python】PySpark 入门

简介: 【Python】PySpark 入门

1 Resilient Distributed Datasets(RDD)


弹性分布式数据集(RDD)是一个不可变的JVM对象的分布式集合,是Spark的基本抽象。


1.1 创建RDD


准备工作:

>>> import pyspark
>>> from pyspark import SparkContext
>>> from pyspark import SparkConf
>>> conf = SparkConf().setAppName('project1').setMaster('local')
>>> sc = SparkContext.getOrCreate(conf)


在PySpark里有两种方法创建RDD:

一是,.parallelize(…) 个collection集合 ( list or an array of some elements)。

>>> data = sc.parallelize([('amber',22),('alfred',23),('skye',4),('albert',12),('amber',9)])


二是,引用位于本地或HDFS上的某个文件(或多个文件)。

>>> data_from_file = sc.textFile('/home/qml/pyspark-ex/VS14MORT.txt.gz',4)
# sc.textFile(...,n)中的最后一个参数指定数据集被分区的数量,经验是分成两个四分区


# sc.textFile(…,n)中的最后一个参数指定数据集被分区的数量,经验是分成两个四分区


Spark 支持多种数据格式:可以使用JDBC驱动程序读取文本,Parquet,JSON,Hive表和来自关系数据库的数据。请注意,Spark可以自动处理压缩的数据集(如Gzip压缩数据集)。


从文件读取的数据表示为MapPartitionsRDD,而不是像当我们.paralellize(…)一个集合的数据一样表示为ParallelCollectionRDD。


1.2 Schema


RDD是无模式的数据结构(不像DataFrames)。因此,在使用RDD时,并行化数据集对于Spark来说是完美的。

>>> data_heterogenous = sc.parallelize([('Ferrari','fast'),{'Porsche':100000},['Spain','visited',4504]]).collect()


所以,我们可以混合几乎任何东西:一个元组,一个字典,或一个列表。


一旦你.collect()数据集(即,运行一个动作将其返回给驱动程序),你可以像在Python中通常那样访问对象中的数据:

>>> data_heterogenous[1]['Porsche']
100000


.collect()方法将RDD的所有元素返回到驱动程序,并将其作为列表序列化。


1.3 读取文件


从文本文件读取时,文件中的每一行形成RDD的一个元素。 可以创建一个元素列表,每行代表一个值列表。

>>> data_from_file.take(1)
[u'                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']


1.4 Lambda表达式


1.4.1 Transformations


.map(…)


该方法应用于RDD的每个元素。

In [1]:
data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
data_2014_2.take(10)
Out[2]:
[('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('-99', -99)]


.filter(…)


允许选择符合指定条件的数据集元素。

data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
data_filtered.count()
1
2
.flatMap(…)
与map()的工作方式类似,但返回的是平铺的结果而不是列表。
In [3]:
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
data_2014_flat.take(10)
Out[4]:
['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]


.distinct()


此方法返回指定列中不同值的列表。

In [5]:
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
distinct_gender
Out[6]:
['-99', 'M', 'F']


.sample(…)


该方法返回数据集中的随机样本。

In [7]:
fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)
data_sample.take(1)
Out[8]:
[array(['1', '  ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10',
        '  ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',
        '215', '063', '   ', '21', '02', '11I350 ', '21I251 ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '02',
        'I251 ', 'I350 ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '28', ' ',
        ' ', '2', '4', '100', '8'], 
       dtype='<U40')]


.leftOuterJoin(…)


左外连接就像SQL一样,根据两个数据集中的值加入两个RDD,并从左RDD中返回从右侧追加两个RDD匹配的记录。

>>> rd1 = sc.parallelize([('a',1),('b',4),('c',10)])
>>> rd2 = sc.parallelize([('a',4),('a',1),('b','6'),('d',15)])
>>> rd3 = rd1.leftOuterJoin(rd2)
>>> print rd3.take(5)
[('a', (1, 4)), ('a', (1, 1)), ('c', (10, None)), ('b', (4, '6'))]


如果我们使用.join(…)方法,那么当这两个值在这两个RDD之间相交时,我们只能得到’a’和’b’的值。

>>> rd4 = rd1.join(rd2)
>>> print rd4.collect()
[('a', (1, 4)), ('a', (1, 1)), ('b', (4, '6'))]


另一个有用的方法是.intersection(…),它返回两个RDD中相同的记录。

>>> rd5 = rd1.intersection(rd2)
>>> print rd5.collect()
[('a', 1)]


该方法从单个数据分区返回n个最高行。

In [9]:
data_first = data_from_file_conv.take(1)
data_first
Out[10]:
[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], 
       dtype='<U40')]


.reduce(…)

>>> rd1.map(lambda row: row[1]).reduce(lambda x,y:x+y)
15


.reduceByKey(…)

>>> data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
>>> data_key.reduceByKey(lambda x, y: x + y).collect()
[('b', 4), ('c', 2), ('a', 12), ('d', 5)]


.count()


统计RDD中元素的数量。

>>> data_reduce.count()
6


.countByKey()


如果你的数据集是键值的形式,则可以使用.countByKey()方法获取不同键的数量。

>>> data_key.countByKey().items()
[('a', 2), ('b', 2), ('d', 2), ('c', 1)]


.saveAsTextFile(…)


将RDD保存到文本文件:每个分区保存到一个单独的文件。

>>> data_key.saveAsTextFile('/Users/drabast/Documents/PySpark_Data/data_key.txt')


.foreach(…)


一种将函数应用同到RDD每个元素的迭代法。

def f(x): 
    print(x)
data_key.foreach(f)


目录
相关文章
|
15天前
|
安全 数据处理 开发者
Python中的多线程编程:从入门到精通
本文将深入探讨Python中的多线程编程,包括其基本原理、应用场景、实现方法以及常见问题和解决方案。通过本文的学习,读者将对Python多线程编程有一个全面的认识,能够在实际项目中灵活运用。
|
2天前
|
Java 测试技术 持续交付
【入门思路】基于Python+Unittest+Appium+Excel+BeautifulReport的App/移动端UI自动化测试框架搭建思路
本文重点讲解如何搭建App自动化测试框架的思路,而非完整源码。主要内容包括实现目的、框架设计、环境依赖和框架的主要组成部分。适用于初学者,旨在帮助其快速掌握App自动化测试的基本技能。文中详细介绍了从需求分析到技术栈选择,再到具体模块的封装与实现,包括登录、截图、日志、测试报告和邮件服务等。同时提供了运行效果的展示,便于理解和实践。
17 4
【入门思路】基于Python+Unittest+Appium+Excel+BeautifulReport的App/移动端UI自动化测试框架搭建思路
|
3天前
|
Python
探索Python装饰器:从入门到实践
【10月更文挑战第32天】在编程世界中,装饰器是一种特殊的函数,它允许我们在不改变原有函数代码的情况下,增加额外的功能。本文将通过简单易懂的语言和实际案例,带你了解Python中装饰器的基础知识、应用以及如何自定义装饰器,让你的代码更加灵活和强大。
11 2
|
3天前
|
监控 Python
探索Python中的装饰器:从入门到实践
【10月更文挑战第31天】在Python的世界里,装饰器是那些隐藏在幕后的魔法师,它们拥有着改变函数行为的能力。本文将带你走进装饰器的世界,从基础概念到实际应用,一步步揭开它的神秘面纱。你将学会如何用几行代码增强你的函数功能,以及如何避免常见的陷阱。让我们一起来发现装饰器的魔力吧!
|
11天前
|
数据采集 机器学习/深度学习 人工智能
Python编程入门:从基础到实战
【10月更文挑战第24天】本文将带你进入Python的世界,从最基础的语法开始,逐步深入到实际的项目应用。我们将一起探索Python的强大功能和灵活性,无论你是编程新手还是有经验的开发者,都能在这篇文章中找到有价值的内容。让我们一起开启Python的奇妙之旅吧!
|
13天前
|
数据采集 存储 数据库
Python中实现简单爬虫的入门指南
【10月更文挑战第22天】本文将带你进入Python爬虫的世界,从基础概念到实战操作,一步步指导你如何使用Python编写一个简单的网络爬虫。我们将不展示代码示例,而是通过详细的步骤描述和逻辑讲解,帮助你理解爬虫的工作原理和开发过程。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你打开一扇通往数据收集新世界的大门。
|
11天前
|
测试技术 开发者 Python
探索Python中的装饰器:从入门到实践
【10月更文挑战第24天】 在Python的世界里,装饰器是一个既神秘又强大的工具。它们就像是程序的“隐形斗篷”,能在不改变原有代码结构的情况下,增加新的功能。本篇文章将带你走进装饰器的世界,从基础概念出发,通过实际例子,逐步深入到装饰器的高级应用,让你的代码更加优雅和高效。无论你是初学者还是有一定经验的开发者,这篇文章都将为你打开一扇通往高效编程的大门。
|
13天前
|
存储 人工智能 数据挖掘
Python编程入门:构建你的第一个程序
【10月更文挑战第22天】编程,这个听起来高深莫测的词汇,实际上就像搭积木一样简单有趣。本文将带你走进Python的世界,用最浅显的语言和实例,让你轻松掌握编写第一个Python程序的方法。无论你是编程新手还是希望了解Python的爱好者,这篇文章都将是你的理想起点。让我们一起开始这段奇妙的编程之旅吧!
18 3
|
12天前
|
机器学习/深度学习 人工智能 算法
机器学习基础:使用Python和Scikit-learn入门
机器学习基础:使用Python和Scikit-learn入门
23 1
|
14天前
|
存储 程序员 开发者
Python编程入门:从零开始掌握基础语法
【10月更文挑战第21天】本文将带你走进Python的世界,通过浅显易懂的语言和实例,让你快速了解并掌握Python的基础语法。无论你是编程新手还是想学习一门新的编程语言,这篇文章都将是你的不二之选。我们将一起探索变量、数据类型、运算符、控制结构、函数等基本概念,并通过实际代码示例加深理解。准备好了吗?让我们开始吧!

热门文章

最新文章