Spark入门(Python版)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:

Hadoop是对大数据集进行分布式计算的标准工具,这也是为什么当你穿过机场时能看到”大数据(Big Data)”广告的原因。它已经成为大数据的操作系统,提供了包括工具和技巧在内的丰富生态系统,允许使用相对便宜的商业硬件集群进行超级计算机级别的计算。2003和2004年,两个来自Google的观点使Hadoop成为可能:一个分布式存储框架(Google文件系统),在Hadoop中被实现为HDFS;一个分布式计算框架(MapReduce)。

这两个观点成为过去十年规模分析(scaling analytics)、大规模机器学习(machine learning),以及其他大数据应用出现的主要推动力!但是,从技术角度上讲,十年是一段非常长的时间,而且Hadoop还存在很多已知限制,尤其是MapReduce。对MapReduce编程明显是困难的。对大多数分析,你都必须用很多步骤将Map和Reduce任务串接起来。这造成类SQL的计算或机器学习需要专门的系统来进行。更糟的是,MapReduce要求每个步骤间的数据要序列化到磁盘,这意味着MapReduce作业的I/O成本很高,导致交互分析和迭代算法(iterative algorithms)开销很大;而事实是,几乎所有的最优化和机器学习都是迭代的。

为了解决这些问题,Hadoop一直在向一种更为通用的资源管理框架转变,即YARN(Yet Another Resource Negotiator, 又一个资源协调者)。YARN实现了下一代的MapReduce,但同时也允许应用利用分布式资源而不必采用MapReduce进行计算。通过将集群管理一般化,研究转到分布式计算的一般化上,来扩展了MapReduce的初衷。

Spark是第一个脱胎于该转变的快速、通用分布式计算范式,并且很快流行起来。Spark使用函数式编程范式扩展了MapReduce模型以支持更多计算类型,可以涵盖广泛的工作流,这些工作流之前被实现为Hadoop之上的特殊系统。Spark使用内存缓存来提升性能,因此进行交互式分析也足够快速(就如同使用Python解释器,与集群进行交互一样)。缓存同时提升了迭代算法的性能,这使得Spark非常适合数据理论任务,特别是机器学习。

本文中,我们将首先讨论如何在本地机器上或者EC2的集群上设置Spark进行简单分析。然后,我们在入门级水平探索Spark,了解Spark是什么以及它如何工作(希望可以激发更多探索)。最后两节我们开始通过命令行与Spark进行交互,然后演示如何用Python写Spark应用,并作为Spark作业提交到集群上。

设置Spark

在本机设置和运行Spark非常简单。你只需要下载一个预构建的包,只要你安装了Java 6+和Python 2.6+,就可以在Windows、Mac OS X和Linux上运行Spark。确保java程序在PATH环境变量中,或者设置了JAVA_HOME环境变量。类似的,python也要在PATH中。

假设你已经安装了Java和Python:

  1. 访问Spark下载页
  2. 选择Spark最新发布版(本文写作时是1.2.0),一个预构建的Hadoop 2.4包,直接下载。

现在,如何继续依赖于你的操作系统,靠你自己去探索了。Windows用户可以在评论区对如何设置的提示进行评论。

一般,我的建议是按照下面的步骤(在POSIX操作系统上):

1.解压Spark

1
~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz

2.将解压目录移动到有效应用程序目录中(如Windows上的

1
~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0

3.创建指向该Spark版本的符号链接到<spark目录。这样你可以简单地下载新/旧版本的Spark,然后修改链接来管理Spark版本,而不用更改路径或环境变量。

1
~$ ln -s /srv/spark-1.2.0 /srv/spark

4.修改BASH配置,将Spark添加到PATH中,设置SPARK_HOME环境变量。这些小技巧在命令行上会帮到你。在Ubuntu上,只要编辑~/.bash_profile或~/.profile文件,将以下语句添加到文件中:

1
2
export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH

5.source这些配置(或者重启终端)之后,你就可以在本地运行一个pyspark解释器。执行pyspark命令,你会看到以下结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
~$ pyspark
Python 2.7.8 (default, Dec  2 2014, 12:45:58)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
[… snip …]
Welcome to
       ____              __
      / __/__  ___ _____/ /__
     _\ \/ _ \/ _ `/ __/  `_/
    /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
       /_/
 
Using Python version 2.7.8 (default, Dec  2 2014 12:45:58)
SparkContext available as sc.
>>>

现在Spark已经安装完毕,可以在本机以”单机模式“(standalone mode)使用。你可以在本机开发应用并提交Spark作业,这些作业将以多进程/多线程模式运行的,或者,配置该机器作为一个集群的客户端(不推荐这样做,因为在Spark作业中,驱动程序(driver)是个很重要的角色,并且应该与集群的其他部分处于相同网络)。可能除了开发,你在本机使用Spark做得最多的就是利用spark-ec2脚本来配置Amazon云上的一个EC2 Spark集群了。

简略Spark输出

Spark(和PySpark)的执行可以特别详细,很多INFO日志消息都会打印到屏幕。开发过程中,这些非常恼人,因为可能丢失Python栈跟踪或者print的输出。为了减少Spark输出 – 你可以设置$SPARK_HOME/conf下的log4j。首先,拷贝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉“.template”扩展名。

1
~$ cp $SPARK_HOME / conf / log4j.properties.template $SPARK_HOME / conf / log4j.properties

编辑新文件,用WARN替换代码中出现的INFO。你的log4j.properties文件类似:

1
2
3
4
5
6
7
8
9
10
11
# Set everything to be logged to the console
  log4j.rootCategory = WARN, console
  log4j.appender.console = org.apache.log4j.ConsoleAppender
  log4j.appender.console.target = System.err
  log4j.appender.console.layout = org.apache.log4j.PatternLayout
  log4j.appender.console.layout.ConversionPattern = % d{yy / MM / dd HH:mm:ss} % p % c{ 1 }: % m % n
# Settings to quiet third party logs that are too verbose
  log4j.logger.org.eclipse.jetty = WARN
  log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle = ERROR
  log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper = WARN
  log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter = WARN

现在运行PySpark,输出消息将会更简略!感谢@genomegeek在一次District Data Labs的研讨会中指出这一点。

在Spark中使用IPython Notebook

当搜索有用的Spark小技巧时,我发现了一些文章提到在PySpark中配置IPython notebook。IPython notebook对数据科学家来说是个交互地呈现科学和理论工作的必备工具,它集成了文本和Python代码。对很多数据科学家,IPython notebook是他们的Python入门,并且使用非常广泛,所以我想值得在本文中提及。

这里的大部分说明都来改编自IPython notebook: 在PySpark中设置IPython。但是,我们将聚焦在本机以单机模式将IPtyon shell连接到PySpark,而不是在EC2集群。如果你想在一个集群上使用PySpark/IPython,查看并评论下文的说明吧!

  1. 1.为Spark创建一个iPython notebook配置
1
2
3
4
~$ ipython profile create spark
[ProfileCreate] Generating default config file : u '$HOME/.ipython/profile_spark/ipython_config.py'
[ProfileCreate] Generating default config file : u '$HOME/.ipython/profile_spark/ipython_notebook_config.py'
[ProfileCreate] Generating default config file : u '$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'

记住配置文件的位置,替换下文各步骤相应的路径:

2.创建文件$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py,并添加如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
import  os
import  sys
 
# Configure the environment
if  'SPARK_HOME'  not  in  os.environ:
     os.environ[ 'SPARK_HOME' ] =  '/srv/spark'
 
# Create a variable for our root path
SPARK_HOME =  os.environ[ 'SPARK_HOME' ]
 
# Add the PySpark/py4j to the Python Path
sys.path.insert( 0 , os.path.join(SPARK_HOME, "python" , "build" ))
sys.path.insert( 0 , os.path.join(SPARK_HOME, "python" ))

3.使用我们刚刚创建的配置来启动IPython notebook。

1
~$ ipython notebook - - profile spark

4.在notebook中,你应该能看到我们刚刚创建的变量。

1
print  SPARK_HOME

5.在IPython notebook最上面,确保你添加了Spark context。

1
2
from  pyspark import   SparkContext
sc =  SparkContext( 'local' , 'pyspark' )

6.使用IPython做个简单的计算来测试Spark context。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def  isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n =  abs ( int (n))
# 0 and 1 are not primes
if  n < 2 :
     return  False
# 2 is the only even prime number
if  n = =  2 :
     return  True
# all other even numbers are not primes
if  not  n & 1 :
     return  False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for  x in  range ( 3 , int (n * * 0.5 ) + 1 , 2 ):
     if  n %  x = =  0 :
         return  False
return  True
 
# Create an RDD of numbers from 0 to 1,000,000
nums =  sc.parallelize( xrange ( 1000000 ))
 
# Compute the number of primes in the RDD
print  nums. filter (isprime).count()

如果你能得到一个数字而且没有错误发生,那么你的context正确工作了!

编辑提示:上面配置了一个使用PySpark直接调用IPython notebook的IPython context。但是,你也可以使用PySpark按以下方式直接启动一个notebook: $ IPYTHON_OPTS=”notebook –pylab inline” pyspark

哪个方法好用取决于你使用PySpark和IPython的具体情景。前一个允许你更容易地使用IPython notebook连接到一个集群,因此是我喜欢的方法。

在EC2上使用Spark

在讲授使用Hadoop进行分布式计算时,我发现很多可以通过在本地伪分布式节点(pseudo-distributed node)或以单节点模式(single-node mode)讲授。但是为了了解真正发生了什么,就需要一个集群。当数据变得庞大,这些书面讲授的技能和真实计算需求间经常出现隔膜。如果你肯在学习详细使用Spark上花钱,我建议你设置一个快速Spark集群做做实验。 包含5个slave(和1个master)每周大概使用10小时的集群每月大概需要$45.18。

完整的讨论可以在Spark文档中找到:在EC2上运行Spark在你决定购买EC2集群前一定要通读这篇文档!我列出了一些关键点:

  1. 通过AWS Console获取AWS EC2 key对(访问key和密钥key)。
  2. 将key对导出到你的环境中。在shell中敲出以下命令,或者将它们添加到配置中。
1
2
export AWS_ACCESS_KEY_ID = myaccesskeyid
export AWS_SECRET_ACCESS_KEY = mysecretaccesskey

注意不同的工具使用不同的环境名称,确保你用的是Spark脚本所使用的名称。

3.启动集群:

1
2
~$ cd $SPARK_HOME / ec2
ec2$ . / spark - ec2 - k <keypair> - i <key - file > - s <num - slaves> launch <cluster - name>

4.SSH到集群来运行Spark作业。

1
ec2$ . / spark - ec2 - k <keypair> - i <key - file > login <cluster - name>

5.销毁集群

1
ec2$ . / spark - ec2 destroy &lt;cluster - name&gt;.

这些脚本会自动创建一个本地的HDFS集群来添加数据,copy-dir命令可以同步代码和数据到该集群。但是你最好使用S3来存储数据,创建使用s3://URI来加载数据的RDDs。

Spark是什么?

既然设置好了Spark,现在我们讨论下Spark是什么。Spark是个通用的集群计算框架,通过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。如果你熟悉Hadoop,那么你知道分布式计算框架要解决两个问题:如何分发数据和如何分发计算。Hadoop使用HDFS来解决分布式数据问题,MapReduce计算范式提供有效的分布式计算。类似的,Spark拥有多种语言的函数式编程API,提供了除map和reduce之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。

本质上,RDD是种编程抽象,代表可以跨机器进行分割的只读对象集合。RDD可以从一个继承结构(lineage)重建(因此可以容错),通过并行操作访问,可以读写HDFS或S3这样的分布式存储,更重要的是,可以缓存到worker节点的内存中进行立即重用。由于RDD可以被缓存在内存中,Spark对迭代应用特别有效,因为这些应用中,数据是在整个算法运算过程中都可以被重用。大多数机器学习和最优化算法都是迭代的,使得Spark对数据科学来说是个非常有效的工具。另外,由于Spark非常快,可以通过类似Python REPL的命令行提示符交互式访问。

Spark库本身包含很多应用元素,这些元素可以用到大部分大数据应用中,其中包括对大数据进行类似SQL查询的支持,机器学习和图算法,甚至对实时流数据的支持。

核心组件如下:

  • Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。
  • Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。对熟悉Hive和HiveQL的人,Spark可以拿来就用。
  • Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming允许程序能够像普通RDD一样处理实时数据。
  • MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库Mahout,将会转到Spark,并在未来实现。
  • GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。

由于这些组件满足了很多大数据需求,也满足了很多数据科学任务的算法和计算上的需要,Spark快速流行起来。不仅如此,Spark也提供了使用Scala、Java和Python编写的API;满足了不同团体的需求,允许更多数据科学家简便地采用Spark作为他们的大数据解决方案。

对Spark编程

编写Spark应用与之前实现在Hadoop上的其他数据流语言类似。代码写入一个惰性求值的驱动程序(driver program)中,通过一个动作(action),驱动代码被分发到集群上,由各个RDD分区上的worker来执行。然后结果会被发送回驱动程序进行聚合或编译。本质上,驱动程序创建一个或多个RDD,调用操作来转换RDD,然后调用动作处理被转换后的RDD。

这些步骤大体如下:

  1. 定义一个或多个RDD,可以通过获取存储在磁盘上的数据(HDFS,Cassandra,HBase,Local Disk),并行化内存中的某些集合,转换(transform)一个已存在的RDD,或者,缓存或保存。
  2. 通过传递一个闭包(函数)给RDD上的每个元素来调用RDD上的操作。Spark提供了除了Map和Reduce的80多种高级操作。
  3. 使用结果RDD的动作(action)(如count、collect、save等)。动作将会启动集群上的计算。

当Spark在一个worker上运行闭包时,闭包中用到的所有变量都会被拷贝到节点上,但是由闭包的局部作用域来维护。Spark提供了两种类型的共享变量,这些变量可以按照限定的方式被所有worker访问。广播变量会被分发给所有worker,但是是只读的。累加器这种变量,worker可以使用关联操作来“加”,通常用作计数器。

Spark应用本质上通过转换和动作来控制RDD。后续文章将会深入讨论,但是理解了这个就足以执行下面的例子了。

Spark的执行

简略描述下Spark的执行。本质上,Spark应用作为独立的进程运行,由驱动程序中的SparkContext协调。这个context将会连接到一些集群管理者(如YARN),这些管理者分配系统资源。集群上的每个worker由执行者(executor)管理,执行者反过来由SparkContext管理。执行者管理计算、存储,还有每台机器上的缓存。

重点要记住的是应用代码由驱动程序发送给执行者,执行者指定context和要运行的任务。执行者与驱动程序通信进行数据分享或者交互。驱动程序是Spark作业的主要参与者,因此需要与集群处于相同的网络。这与Hadoop代码不同,Hadoop中你可以在任意位置提交作业给JobTracker,JobTracker处理集群上的执行。

与Spark交互

使用Spark最简单的方式就是使用交互式命令行提示符。打开PySpark终端,在命令行中打出pyspark。

1
2
3
~$ pyspark
[… snip …]
>>>

PySpark将会自动使用本地Spark配置创建一个SparkContext。你可以通过sc变量来访问它。我们来创建第一个RDD。

1
2
3
>>> text =  sc.textFile( "shakespeare.txt" )
>>> print  text
shakespeare.txt MappedRDD[ 1 ] at textFile at NativeMethodAccessorImpl.java: - 2

textFile方法将莎士比亚全部作品加载到一个RDD命名文本。如果查看了RDD,你就可以看出它是个MappedRDD,文件路径是相对于当前工作目录的一个相对路径(记得传递磁盘上正确的shakespear.txt文件路径)。我们转换下这个RDD,来进行分布式计算的“hello world”:“字数统计”。

1
2
3
4
5
6
7
>>> from  operator import  add
>>> def  tokenize(text):
...     return  text.split()
...
>>> words =  text.flatMap(tokenize)
>>> print  words
PythonRDD[ 2 ] at RDD at PythonRDD.scala: 43

我们首先导入了add操作符,它是个命名函数,可以作为加法的闭包来使用。我们稍后再使用这个函数。首先我们要做的是把文本拆分为单词。我们创建了一个tokenize函数,参数是文本片段,返回根据空格拆分的单词列表。然后我们通过给flatMap操作符传递tokenize闭包对textRDD进行变换创建了一个wordsRDD。你会发现,words是个PythonRDD,但是执行本应该立即进行。显然,我们还没有把整个莎士比亚数据集拆分为单词列表。

如果你曾使用MapReduce做过Hadoop版的“字数统计”,你应该知道下一步是将每个单词映射到一个键值对,其中键是单词,值是1,然后使用reducer计算每个键的1总数。

首先,我们map一下。

1
2
3
4
5
>>> wc =  words. map ( lambda  x: (x, 1 ))
>>> print  wc.toDebugString()
( 2 ) PythonRDD[ 3 ] at RDD at PythonRDD.scala: 43
|  shakespeare.txt MappedRDD[ 1 ] at textFile at NativeMethodAccessorImpl.java: - 2
|  shakespeare.txt HadoopRDD[ 0 ] at textFile at NativeMethodAccessorImpl.java: - 2

我使用了一个匿名函数(用了Python中的lambda关键字)而不是命名函数。这行代码将会把lambda映射到每个单词。因此,每个x都是一个单词,每个单词都会被匿名闭包转换为元组(word, 1)。为了查看转换关系,我们使用toDebugString方法来查看PipelinedRDD是怎么被转换的。可以使用reduceByKey动作进行字数统计,然后把统计结果写到磁盘。

1
2
>>> counts =  wc.reduceByKey(add)
>>> counts.saveAsTextFile( "wc" )

一旦我们最终调用了saveAsTextFile动作,这个分布式作业就开始执行了,在作业“跨集群地”(或者你本机的很多进程)运行时,你应该可以看到很多INFO语句。如果退出解释器,你可以看到当前工作目录下有个“wc”目录。

1
2
$ ls wc /
_SUCCESS   part - 00000  part - 00001

每个part文件都代表你本机上的进程计算得到的被保持到磁盘上的最终RDD。如果对一个part文件进行head命令,你应该能看到字数统计元组。

1
2
3
4
5
6
7
8
9
10
11
$ head wc / part - 00000
(u 'fawn' , 14 )
(u 'Fame.' , 1 )
(u 'Fame,' , 2 )
(u 'kinghenryviii@7731' , 1 )
(u 'othello@36737' , 1 )
(u 'loveslabourslost@51678' , 1 )
(u '1kinghenryiv@54228' , 1 )
(u 'troilusandcressida@83747' , 1 )
(u 'fleeces' , 1 )
(u 'midsummersnightsdream@71681' , 1 )

注意这些键没有像Hadoop一样被排序(因为Hadoop中Map和Reduce任务中有个必要的打乱和排序阶段)。但是,能保证每个单词在所有文件中只出现一次,因为你使用了reduceByKey操作符。你还可以使用sort操作符确保在写入到磁盘之前所有的键都被排过序。

编写一个Spark应用

编写Spark应用与通过交互式控制台使用Spark类似。API是相同的。首先,你需要访问<SparkContext,它已经由<pyspark自动加载好了。

使用Spark编写Spark应用的一个基本模板如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
## Spark Application - execute with spark-submit
 
## Imports
from  pyspark import  SparkConf, SparkContext
 
## Module Constants
APP_NAME =  "My Spark Application"
 
## Closure Functions
 
## Main functionality
 
def  main(sc):
     pass
 
if  __name__ = =  "__main__" :
     # Configure Spark
     conf =  SparkConf().setAppName(APP_NAME)
     conf =  conf.setMaster( "local[*]" )
     sc   =  SparkContext(conf = conf)
 
     # Execute Main functionality
     main(sc)

这个模板列出了一个Spark应用所需的东西:导入Python库,模块常量,用于调试和Spark UI的可识别的应用名称,还有作为驱动程序运行的一些主要分析方法学。在ifmain中,我们创建了SparkContext,使用了配置好的context执行main。我们可以简单地导入驱动代码到pyspark而不用执行。注意这里Spark配置通过setMaster方法被硬编码到SparkConf,一般你应该允许这个值通过命令行来设置,所以你能看到这行做了占位符注释。

使用<sc.stop()或<sys.exit(0)来关闭或退出程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
## Spark Application - execute with spark-submit
 
## Imports
import  csv
import  matplotlib.pyplot as plt
 
from  StringIO import  StringIO
from  datetime import  datetime
from  collections import  namedtuple
from  operator import  add, itemgetter
from  pyspark import  SparkConf, SparkContext
 
## Module Constants
APP_NAME =  "Flight Delay Analysis"
DATE_FMT =  "%Y-%m-%d"
TIME_FMT =  "%H%M"
 
fields   =  ( 'date' , 'airline' , 'flightnum' , 'origin' , 'dest' , 'dep' ,
             'dep_delay' , 'arv' , 'arv_delay' , 'airtime' , 'distance' )
Flight   =  namedtuple( 'Flight' , fields)
 
## Closure Functions
def  parse(row):
     """
     Parses a row and returns a named tuple.
     """
 
     row[ 0 =  datetime.strptime(row[ 0 ], DATE_FMT).date()
     row[ 5 =  datetime.strptime(row[ 5 ], TIME_FMT).time()
     row[ 6 =  float (row[ 6 ])
     row[ 7 =  datetime.strptime(row[ 7 ], TIME_FMT).time()
     row[ 8 =  float (row[ 8 ])
     row[ 9 =  float (row[ 9 ])
     row[ 10 ] =  float (row[ 10 ])
     return  Flight( * row[: 11 ])
 
def  split(line):
     """
     Operator function for splitting a line with csv module
     """
     reader =  csv.reader(StringIO(line))
     return  reader. next ()
 
def  plot(delays):
     """
     Show a bar chart of the total delay per airline
     """
     airlines =  [d[ 0 ] for  d in  delays]
     minutes  =  [d[ 1 ] for  d in  delays]
     index    =  list ( xrange ( len (airlines)))
 
     fig, axe =  plt.subplots()
     bars =  axe.barh(index, minutes)
 
     # Add the total minutes to the right
     for  idx, air, min  in  zip (index, airlines, minutes):
         if  min  > 0 :
             bars[idx].set_color( '#d9230f' )
             axe.annotate( " %0.0f min"  %  min , xy = ( min + 1 , idx + 0.5 ), va = 'center' )
         else :
             bars[idx].set_color( '#469408' )
             axe.annotate( " %0.0f min"  %  min , xy = ( 10 , idx + 0.5 ), va = 'center' )
 
     # Set the ticks
     ticks =  plt.yticks([idx +  0.5  for  idx in  index], airlines)
     xt =  plt.xticks()[ 0 ]
     plt.xticks(xt, [ ' ' ] *  len (xt))
 
     # minimize chart junk
     plt.grid(axis =  'x' , color = 'white' , linestyle = '-' )
 
     plt.title( 'Total Minutes Delayed per Airline' )
     plt.show()
 
## Main functionality
def  main(sc):
 
     # Load the airlines lookup dictionary
     airlines =  dict (sc.textFile( "ontime/airlines.csv" ). map (split).collect())
 
     # Broadcast the lookup dictionary to the cluster
     airline_lookup =  sc.broadcast(airlines)
 
     # Read the CSV Data into an RDD
     flights =  sc.textFile( "ontime/flights.csv" ). map (split). map (parse)
 
     # Map the total delay to the airline (joined using the broadcast value)
     delays  =  flights. map ( lambda  f: (airline_lookup.value[f.airline],
                                      add(f.dep_delay, f.arv_delay)))
 
     # Reduce the total delay for the month to the airline
     delays  =  delays.reduceByKey(add).collect()
     delays  =  sorted (delays, key = itemgetter( 1 ))
 
     # Provide output from the driver
     for  d in  delays:
         print  "%0.0f minutes delayed\t%s"  %  (d[ 1 ], d[ 0 ])
 
     # Show a bar chart of the delays
     plot(delays)
 
if  __name__ = =  "__main__" :
     # Configure Spark
     conf =  SparkConf().setMaster( "local[*]" )
     conf =  conf.setAppName(APP_NAME)
     sc   =  SparkContext(conf = conf)
 
     # Execute Main functionality
     main(sc)

使用<spark-submit命令来运行这段代码(假设你已有ontime目录,目录中有两个CSV文件):

1
~$ spark - submit app.py

这个Spark作业使用本机作为master,并搜索app.py同目录下的ontime目录下的2个CSV文件。最终结果显示,4月的总延误时间(单位分钟),既有早点的(如果你从美国大陆飞往夏威夷或者阿拉斯加),但对大部分大型航空公司都是延误的。注意,我们在app.py中使用matplotlib直接将结果可视化出来了:

这段代码做了什么呢?我们特别注意下与Spark最直接相关的main函数。首先,我们加载CSV文件到RDD,然后把split函数映射给它。split函数使用csv模块解析文本的每一行,并返回代表每行的元组。最后,我们将collect动作传给RDD,这个动作把数据以Python列表的形式从RDD传回驱动程序。本例中,airlines.csv是个小型的跳转表(jump table),可以将航空公司代码与全名对应起来。我们将转移表存储为Python字典,然后使用sc.broadcast广播给集群上的每个节点。

接着,main函数加载了数据量更大的flights.csv([译者注]作者笔误写成fights.csv,此处更正)。拆分CSV行完成之后,我们将parse函数映射给CSV行,此函数会把日期和时间转成Python的日期和时间,并对浮点数进行合适的类型转换。每行作为一个NamedTuple保存,名为Flight,以便高效简便地使用。

有了Flight对象的RDD,我们映射一个匿名函数,这个函数将RDD转换为一些列的键值对,其中键是航空公司的名字,值是到达和出发的延误时间总和。使用reduceByKey动作和add操作符可以得到每个航空公司的延误时间总和,然后RDD被传递给驱动程序(数据中航空公司的数目相对较少)。最终延误时间按照升序排列,输出打印到了控制台,并且使用matplotlib进行了可视化。

这个例子稍长,但是希望能演示出集群和驱动程序之间的相互作用(发送数据进行分析,结果取回给驱动程序),以及Python代码在Spark应用中的角色。

结论

尽管算不上一个完整的Spark入门,我们希望你能更好地了解Spark是什么,如何使用进行快速、内存分布式计算。至少,你应该能将Spark运行起来,并开始在本机或Amazon EC2上探索数据。你应该可以配置好iPython notebook来运行Spark。

Spark不能解决分布式存储问题(通常Spark从HDFS中获取数据),但是它为分布式计算提供了丰富的函数式编程API。这个框架建立在伸缩分布式数据集(RDD)之上。RDD是种编程抽象,代表被分区的对象集合,允许进行分布式操作。RDD有容错能力(可伸缩的部分),更重要的时,可以存储到节点上的worker内存里进行立即重用。内存存储提供了快速和简单表示的迭代算法,以及实时交互分析。

由于Spark库提供了Python、Scale、Java编写的API,以及内建的机器学习、流数据、图算法、类SQL查询等模块;Spark迅速成为当今最重要的分布式计算框架之一。与YARN结合,Spark提供了增量,而不是替代已存在的Hadoop集群,它将成为未来大数据重要的一部分,为数据科学探索铺设了一条康庄大道。

有用的链接

希望你喜欢这篇博文!写作并不是凭空而来的,以下是一些曾帮助我写作的有用链接;查看这些链接,可能对进一步探索Spark有帮助。注意,有些图书链接是推广链接,意味着如果你点击并购买了这些图书,你将会支持District Data Labs!

这篇更多是篇入门文章,而不是District Data Labs的典型文章,有些与此入门相关的数据和代码你可以在这里找到:

Spark论文

Spark与Hadoop一样,有一些基础论文,我认为那些需要对大数据集进行分布式计算的严谨数据科学家一定要读。首先是HotOS(“操作系统热门话题”的简写)的一篇研讨会论文,简单易懂地描述了Spark。第二个是偏理论的论文,具体描述了RDD。

  1. M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica, “Spark: cluster computing with working sets,” in Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, 2010, pp. 10–10.
  2. M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M. J. Franklin, S. Shenker, and I. Stoica, “Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing,” in Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation, 2012, pp. 2–2.

本文转自博客园知识天地的博客,原文链接:Spark入门(Python版),如需转载请自行联系原博主。


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
6天前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。
|
11天前
|
机器学习/深度学习 数据可视化 数据挖掘
使用Python进行数据分析的入门指南
本文将引导读者了解如何使用Python进行数据分析,从安装必要的库到执行基础的数据操作和可视化。通过本文的学习,你将能够开始自己的数据分析之旅,并掌握如何利用Python来揭示数据背后的故事。
|
7天前
|
IDE 程序员 开发工具
Python编程入门:打造你的第一个程序
迈出编程的第一步,就像在未知的海洋中航行。本文是你启航的指南针,带你了解Python这门语言的魅力所在,并手把手教你构建第一个属于自己的程序。从安装环境到编写代码,我们将一步步走过这段旅程。准备好了吗?让我们开始吧!
|
7天前
|
测试技术 开发者 Python
探索Python中的装饰器:从入门到实践
装饰器,在Python中是一块强大的语法糖,它允许我们在不修改原函数代码的情况下增加额外的功能。本文将通过简单易懂的语言和实例,带你一步步了解装饰器的基本概念、使用方法以及如何自定义装饰器。我们还将探讨装饰器在实战中的应用,让你能够在实际编程中灵活运用这一技术。
23 7
|
8天前
|
开发者 Python
Python中的装饰器:从入门到实践
本文将深入探讨Python的装饰器,这一强大工具允许开发者在不修改现有函数代码的情况下增加额外的功能。我们将通过实例学习如何创建和应用装饰器,并探索它们背后的原理和高级用法。
24 5
|
7天前
|
机器学习/深度学习 人工智能 算法
深度学习入门:用Python构建你的第一个神经网络
在人工智能的海洋中,深度学习是那艘能够带你远航的船。本文将作为你的航标,引导你搭建第一个神经网络模型,让你领略深度学习的魅力。通过简单直观的语言和实例,我们将一起探索隐藏在数据背后的模式,体验从零开始创造智能系统的快感。准备好了吗?让我们启航吧!
26 3
|
11天前
|
Python
Python编程入门:从零开始的代码旅程
本文是一篇针对Python编程初学者的入门指南,将介绍Python的基本语法、数据类型、控制结构以及函数等概念。文章旨在帮助读者快速掌握Python编程的基础知识,并能够编写简单的Python程序。通过本文的学习,读者将能够理解Python代码的基本结构和逻辑,为进一步深入学习打下坚实的基础。
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
109 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
68 0