Spark入门(Python版)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:

Hadoop是对大数据集进行分布式计算的标准工具,这也是为什么当你穿过机场时能看到”大数据(Big Data)”广告的原因。它已经成为大数据的操作系统,提供了包括工具和技巧在内的丰富生态系统,允许使用相对便宜的商业硬件集群进行超级计算机级别的计算。2003和2004年,两个来自Google的观点使Hadoop成为可能:一个分布式存储框架(Google文件系统),在Hadoop中被实现为HDFS;一个分布式计算框架(MapReduce)。

这两个观点成为过去十年规模分析(scaling analytics)、大规模机器学习(machine learning),以及其他大数据应用出现的主要推动力!但是,从技术角度上讲,十年是一段非常长的时间,而且Hadoop还存在很多已知限制,尤其是MapReduce。对MapReduce编程明显是困难的。对大多数分析,你都必须用很多步骤将Map和Reduce任务串接起来。这造成类SQL的计算或机器学习需要专门的系统来进行。更糟的是,MapReduce要求每个步骤间的数据要序列化到磁盘,这意味着MapReduce作业的I/O成本很高,导致交互分析和迭代算法(iterative algorithms)开销很大;而事实是,几乎所有的最优化和机器学习都是迭代的。

为了解决这些问题,Hadoop一直在向一种更为通用的资源管理框架转变,即YARN(Yet Another Resource Negotiator, 又一个资源协调者)。YARN实现了下一代的MapReduce,但同时也允许应用利用分布式资源而不必采用MapReduce进行计算。通过将集群管理一般化,研究转到分布式计算的一般化上,来扩展了MapReduce的初衷。

Spark是第一个脱胎于该转变的快速、通用分布式计算范式,并且很快流行起来。Spark使用函数式编程范式扩展了MapReduce模型以支持更多计算类型,可以涵盖广泛的工作流,这些工作流之前被实现为Hadoop之上的特殊系统。Spark使用内存缓存来提升性能,因此进行交互式分析也足够快速(就如同使用Python解释器,与集群进行交互一样)。缓存同时提升了迭代算法的性能,这使得Spark非常适合数据理论任务,特别是机器学习。

本文中,我们将首先讨论如何在本地机器上或者EC2的集群上设置Spark进行简单分析。然后,我们在入门级水平探索Spark,了解Spark是什么以及它如何工作(希望可以激发更多探索)。最后两节我们开始通过命令行与Spark进行交互,然后演示如何用Python写Spark应用,并作为Spark作业提交到集群上。

设置Spark

在本机设置和运行Spark非常简单。你只需要下载一个预构建的包,只要你安装了Java 6+和Python 2.6+,就可以在Windows、Mac OS X和Linux上运行Spark。确保java程序在PATH环境变量中,或者设置了JAVA_HOME环境变量。类似的,python也要在PATH中。

假设你已经安装了Java和Python:

  1. 访问Spark下载页
  2. 选择Spark最新发布版(本文写作时是1.2.0),一个预构建的Hadoop 2.4包,直接下载。

现在,如何继续依赖于你的操作系统,靠你自己去探索了。Windows用户可以在评论区对如何设置的提示进行评论。

一般,我的建议是按照下面的步骤(在POSIX操作系统上):

1.解压Spark

1
~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz

2.将解压目录移动到有效应用程序目录中(如Windows上的

1
~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0

3.创建指向该Spark版本的符号链接到<spark目录。这样你可以简单地下载新/旧版本的Spark,然后修改链接来管理Spark版本,而不用更改路径或环境变量。

1
~$ ln -s /srv/spark-1.2.0 /srv/spark

4.修改BASH配置,将Spark添加到PATH中,设置SPARK_HOME环境变量。这些小技巧在命令行上会帮到你。在Ubuntu上,只要编辑~/.bash_profile或~/.profile文件,将以下语句添加到文件中:

1
2
export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH

5.source这些配置(或者重启终端)之后,你就可以在本地运行一个pyspark解释器。执行pyspark命令,你会看到以下结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
~$ pyspark
Python 2.7.8 (default, Dec  2 2014, 12:45:58)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
[… snip …]
Welcome to
       ____              __
      / __/__  ___ _____/ /__
     _\ \/ _ \/ _ `/ __/  `_/
    /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
       /_/
 
Using Python version 2.7.8 (default, Dec  2 2014 12:45:58)
SparkContext available as sc.
>>>

现在Spark已经安装完毕,可以在本机以”单机模式“(standalone mode)使用。你可以在本机开发应用并提交Spark作业,这些作业将以多进程/多线程模式运行的,或者,配置该机器作为一个集群的客户端(不推荐这样做,因为在Spark作业中,驱动程序(driver)是个很重要的角色,并且应该与集群的其他部分处于相同网络)。可能除了开发,你在本机使用Spark做得最多的就是利用spark-ec2脚本来配置Amazon云上的一个EC2 Spark集群了。

简略Spark输出

Spark(和PySpark)的执行可以特别详细,很多INFO日志消息都会打印到屏幕。开发过程中,这些非常恼人,因为可能丢失Python栈跟踪或者print的输出。为了减少Spark输出 – 你可以设置$SPARK_HOME/conf下的log4j。首先,拷贝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉“.template”扩展名。

1
~$ cp $SPARK_HOME / conf / log4j.properties.template $SPARK_HOME / conf / log4j.properties

编辑新文件,用WARN替换代码中出现的INFO。你的log4j.properties文件类似:

1
2
3
4
5
6
7
8
9
10
11
# Set everything to be logged to the console
  log4j.rootCategory = WARN, console
  log4j.appender.console = org.apache.log4j.ConsoleAppender
  log4j.appender.console.target = System.err
  log4j.appender.console.layout = org.apache.log4j.PatternLayout
  log4j.appender.console.layout.ConversionPattern = % d{yy / MM / dd HH:mm:ss} % p % c{ 1 }: % m % n
# Settings to quiet third party logs that are too verbose
  log4j.logger.org.eclipse.jetty = WARN
  log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle = ERROR
  log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper = WARN
  log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter = WARN

现在运行PySpark,输出消息将会更简略!感谢@genomegeek在一次District Data Labs的研讨会中指出这一点。

在Spark中使用IPython Notebook

当搜索有用的Spark小技巧时,我发现了一些文章提到在PySpark中配置IPython notebook。IPython notebook对数据科学家来说是个交互地呈现科学和理论工作的必备工具,它集成了文本和Python代码。对很多数据科学家,IPython notebook是他们的Python入门,并且使用非常广泛,所以我想值得在本文中提及。

这里的大部分说明都来改编自IPython notebook: 在PySpark中设置IPython。但是,我们将聚焦在本机以单机模式将IPtyon shell连接到PySpark,而不是在EC2集群。如果你想在一个集群上使用PySpark/IPython,查看并评论下文的说明吧!

  1. 1.为Spark创建一个iPython notebook配置
1
2
3
4
~$ ipython profile create spark
[ProfileCreate] Generating default config file : u '$HOME/.ipython/profile_spark/ipython_config.py'
[ProfileCreate] Generating default config file : u '$HOME/.ipython/profile_spark/ipython_notebook_config.py'
[ProfileCreate] Generating default config file : u '$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'

记住配置文件的位置,替换下文各步骤相应的路径:

2.创建文件$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py,并添加如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
import  os
import  sys
 
# Configure the environment
if  'SPARK_HOME'  not  in  os.environ:
     os.environ[ 'SPARK_HOME' ] =  '/srv/spark'
 
# Create a variable for our root path
SPARK_HOME =  os.environ[ 'SPARK_HOME' ]
 
# Add the PySpark/py4j to the Python Path
sys.path.insert( 0 , os.path.join(SPARK_HOME, "python" , "build" ))
sys.path.insert( 0 , os.path.join(SPARK_HOME, "python" ))

3.使用我们刚刚创建的配置来启动IPython notebook。

1
~$ ipython notebook - - profile spark

4.在notebook中,你应该能看到我们刚刚创建的变量。

1
print  SPARK_HOME

5.在IPython notebook最上面,确保你添加了Spark context。

1
2
from  pyspark import   SparkContext
sc =  SparkContext( 'local' , 'pyspark' )

6.使用IPython做个简单的计算来测试Spark context。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def  isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n =  abs ( int (n))
# 0 and 1 are not primes
if  n < 2 :
     return  False
# 2 is the only even prime number
if  n = =  2 :
     return  True
# all other even numbers are not primes
if  not  n & 1 :
     return  False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for  x in  range ( 3 , int (n * * 0.5 ) + 1 , 2 ):
     if  n %  x = =  0 :
         return  False
return  True
 
# Create an RDD of numbers from 0 to 1,000,000
nums =  sc.parallelize( xrange ( 1000000 ))
 
# Compute the number of primes in the RDD
print  nums. filter (isprime).count()

如果你能得到一个数字而且没有错误发生,那么你的context正确工作了!

编辑提示:上面配置了一个使用PySpark直接调用IPython notebook的IPython context。但是,你也可以使用PySpark按以下方式直接启动一个notebook: $ IPYTHON_OPTS=”notebook –pylab inline” pyspark

哪个方法好用取决于你使用PySpark和IPython的具体情景。前一个允许你更容易地使用IPython notebook连接到一个集群,因此是我喜欢的方法。

在EC2上使用Spark

在讲授使用Hadoop进行分布式计算时,我发现很多可以通过在本地伪分布式节点(pseudo-distributed node)或以单节点模式(single-node mode)讲授。但是为了了解真正发生了什么,就需要一个集群。当数据变得庞大,这些书面讲授的技能和真实计算需求间经常出现隔膜。如果你肯在学习详细使用Spark上花钱,我建议你设置一个快速Spark集群做做实验。 包含5个slave(和1个master)每周大概使用10小时的集群每月大概需要$45.18。

完整的讨论可以在Spark文档中找到:在EC2上运行Spark在你决定购买EC2集群前一定要通读这篇文档!我列出了一些关键点:

  1. 通过AWS Console获取AWS EC2 key对(访问key和密钥key)。
  2. 将key对导出到你的环境中。在shell中敲出以下命令,或者将它们添加到配置中。
1
2
export AWS_ACCESS_KEY_ID = myaccesskeyid
export AWS_SECRET_ACCESS_KEY = mysecretaccesskey

注意不同的工具使用不同的环境名称,确保你用的是Spark脚本所使用的名称。

3.启动集群:

1
2
~$ cd $SPARK_HOME / ec2
ec2$ . / spark - ec2 - k <keypair> - i <key - file > - s <num - slaves> launch <cluster - name>

4.SSH到集群来运行Spark作业。

1
ec2$ . / spark - ec2 - k <keypair> - i <key - file > login <cluster - name>

5.销毁集群

1
ec2$ . / spark - ec2 destroy &lt;cluster - name&gt;.

这些脚本会自动创建一个本地的HDFS集群来添加数据,copy-dir命令可以同步代码和数据到该集群。但是你最好使用S3来存储数据,创建使用s3://URI来加载数据的RDDs。

Spark是什么?

既然设置好了Spark,现在我们讨论下Spark是什么。Spark是个通用的集群计算框架,通过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。如果你熟悉Hadoop,那么你知道分布式计算框架要解决两个问题:如何分发数据和如何分发计算。Hadoop使用HDFS来解决分布式数据问题,MapReduce计算范式提供有效的分布式计算。类似的,Spark拥有多种语言的函数式编程API,提供了除map和reduce之外更多的运算符,这些操作是通过一个称作弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。

本质上,RDD是种编程抽象,代表可以跨机器进行分割的只读对象集合。RDD可以从一个继承结构(lineage)重建(因此可以容错),通过并行操作访问,可以读写HDFS或S3这样的分布式存储,更重要的是,可以缓存到worker节点的内存中进行立即重用。由于RDD可以被缓存在内存中,Spark对迭代应用特别有效,因为这些应用中,数据是在整个算法运算过程中都可以被重用。大多数机器学习和最优化算法都是迭代的,使得Spark对数据科学来说是个非常有效的工具。另外,由于Spark非常快,可以通过类似Python REPL的命令行提示符交互式访问。

Spark库本身包含很多应用元素,这些元素可以用到大部分大数据应用中,其中包括对大数据进行类似SQL查询的支持,机器学习和图算法,甚至对实时流数据的支持。

核心组件如下:

  • Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。
  • Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。对熟悉Hive和HiveQL的人,Spark可以拿来就用。
  • Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming允许程序能够像普通RDD一样处理实时数据。
  • MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库Mahout,将会转到Spark,并在未来实现。
  • GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。

由于这些组件满足了很多大数据需求,也满足了很多数据科学任务的算法和计算上的需要,Spark快速流行起来。不仅如此,Spark也提供了使用Scala、Java和Python编写的API;满足了不同团体的需求,允许更多数据科学家简便地采用Spark作为他们的大数据解决方案。

对Spark编程

编写Spark应用与之前实现在Hadoop上的其他数据流语言类似。代码写入一个惰性求值的驱动程序(driver program)中,通过一个动作(action),驱动代码被分发到集群上,由各个RDD分区上的worker来执行。然后结果会被发送回驱动程序进行聚合或编译。本质上,驱动程序创建一个或多个RDD,调用操作来转换RDD,然后调用动作处理被转换后的RDD。

这些步骤大体如下:

  1. 定义一个或多个RDD,可以通过获取存储在磁盘上的数据(HDFS,Cassandra,HBase,Local Disk),并行化内存中的某些集合,转换(transform)一个已存在的RDD,或者,缓存或保存。
  2. 通过传递一个闭包(函数)给RDD上的每个元素来调用RDD上的操作。Spark提供了除了Map和Reduce的80多种高级操作。
  3. 使用结果RDD的动作(action)(如count、collect、save等)。动作将会启动集群上的计算。

当Spark在一个worker上运行闭包时,闭包中用到的所有变量都会被拷贝到节点上,但是由闭包的局部作用域来维护。Spark提供了两种类型的共享变量,这些变量可以按照限定的方式被所有worker访问。广播变量会被分发给所有worker,但是是只读的。累加器这种变量,worker可以使用关联操作来“加”,通常用作计数器。

Spark应用本质上通过转换和动作来控制RDD。后续文章将会深入讨论,但是理解了这个就足以执行下面的例子了。

Spark的执行

简略描述下Spark的执行。本质上,Spark应用作为独立的进程运行,由驱动程序中的SparkContext协调。这个context将会连接到一些集群管理者(如YARN),这些管理者分配系统资源。集群上的每个worker由执行者(executor)管理,执行者反过来由SparkContext管理。执行者管理计算、存储,还有每台机器上的缓存。

重点要记住的是应用代码由驱动程序发送给执行者,执行者指定context和要运行的任务。执行者与驱动程序通信进行数据分享或者交互。驱动程序是Spark作业的主要参与者,因此需要与集群处于相同的网络。这与Hadoop代码不同,Hadoop中你可以在任意位置提交作业给JobTracker,JobTracker处理集群上的执行。

与Spark交互

使用Spark最简单的方式就是使用交互式命令行提示符。打开PySpark终端,在命令行中打出pyspark。

1
2
3
~$ pyspark
[… snip …]
>>>

PySpark将会自动使用本地Spark配置创建一个SparkContext。你可以通过sc变量来访问它。我们来创建第一个RDD。

1
2
3
>>> text =  sc.textFile( "shakespeare.txt" )
>>> print  text
shakespeare.txt MappedRDD[ 1 ] at textFile at NativeMethodAccessorImpl.java: - 2

textFile方法将莎士比亚全部作品加载到一个RDD命名文本。如果查看了RDD,你就可以看出它是个MappedRDD,文件路径是相对于当前工作目录的一个相对路径(记得传递磁盘上正确的shakespear.txt文件路径)。我们转换下这个RDD,来进行分布式计算的“hello world”:“字数统计”。

1
2
3
4
5
6
7
>>> from  operator import  add
>>> def  tokenize(text):
...     return  text.split()
...
>>> words =  text.flatMap(tokenize)
>>> print  words
PythonRDD[ 2 ] at RDD at PythonRDD.scala: 43

我们首先导入了add操作符,它是个命名函数,可以作为加法的闭包来使用。我们稍后再使用这个函数。首先我们要做的是把文本拆分为单词。我们创建了一个tokenize函数,参数是文本片段,返回根据空格拆分的单词列表。然后我们通过给flatMap操作符传递tokenize闭包对textRDD进行变换创建了一个wordsRDD。你会发现,words是个PythonRDD,但是执行本应该立即进行。显然,我们还没有把整个莎士比亚数据集拆分为单词列表。

如果你曾使用MapReduce做过Hadoop版的“字数统计”,你应该知道下一步是将每个单词映射到一个键值对,其中键是单词,值是1,然后使用reducer计算每个键的1总数。

首先,我们map一下。

1
2
3
4
5
>>> wc =  words. map ( lambda  x: (x, 1 ))
>>> print  wc.toDebugString()
( 2 ) PythonRDD[ 3 ] at RDD at PythonRDD.scala: 43
|  shakespeare.txt MappedRDD[ 1 ] at textFile at NativeMethodAccessorImpl.java: - 2
|  shakespeare.txt HadoopRDD[ 0 ] at textFile at NativeMethodAccessorImpl.java: - 2

我使用了一个匿名函数(用了Python中的lambda关键字)而不是命名函数。这行代码将会把lambda映射到每个单词。因此,每个x都是一个单词,每个单词都会被匿名闭包转换为元组(word, 1)。为了查看转换关系,我们使用toDebugString方法来查看PipelinedRDD是怎么被转换的。可以使用reduceByKey动作进行字数统计,然后把统计结果写到磁盘。

1
2
>>> counts =  wc.reduceByKey(add)
>>> counts.saveAsTextFile( "wc" )

一旦我们最终调用了saveAsTextFile动作,这个分布式作业就开始执行了,在作业“跨集群地”(或者你本机的很多进程)运行时,你应该可以看到很多INFO语句。如果退出解释器,你可以看到当前工作目录下有个“wc”目录。

1
2
$ ls wc /
_SUCCESS   part - 00000  part - 00001

每个part文件都代表你本机上的进程计算得到的被保持到磁盘上的最终RDD。如果对一个part文件进行head命令,你应该能看到字数统计元组。

1
2
3
4
5
6
7
8
9
10
11
$ head wc / part - 00000
(u 'fawn' , 14 )
(u 'Fame.' , 1 )
(u 'Fame,' , 2 )
(u 'kinghenryviii@7731' , 1 )
(u 'othello@36737' , 1 )
(u 'loveslabourslost@51678' , 1 )
(u '1kinghenryiv@54228' , 1 )
(u 'troilusandcressida@83747' , 1 )
(u 'fleeces' , 1 )
(u 'midsummersnightsdream@71681' , 1 )

注意这些键没有像Hadoop一样被排序(因为Hadoop中Map和Reduce任务中有个必要的打乱和排序阶段)。但是,能保证每个单词在所有文件中只出现一次,因为你使用了reduceByKey操作符。你还可以使用sort操作符确保在写入到磁盘之前所有的键都被排过序。

编写一个Spark应用

编写Spark应用与通过交互式控制台使用Spark类似。API是相同的。首先,你需要访问<SparkContext,它已经由<pyspark自动加载好了。

使用Spark编写Spark应用的一个基本模板如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
## Spark Application - execute with spark-submit
 
## Imports
from  pyspark import  SparkConf, SparkContext
 
## Module Constants
APP_NAME =  "My Spark Application"
 
## Closure Functions
 
## Main functionality
 
def  main(sc):
     pass
 
if  __name__ = =  "__main__" :
     # Configure Spark
     conf =  SparkConf().setAppName(APP_NAME)
     conf =  conf.setMaster( "local[*]" )
     sc   =  SparkContext(conf = conf)
 
     # Execute Main functionality
     main(sc)

这个模板列出了一个Spark应用所需的东西:导入Python库,模块常量,用于调试和Spark UI的可识别的应用名称,还有作为驱动程序运行的一些主要分析方法学。在ifmain中,我们创建了SparkContext,使用了配置好的context执行main。我们可以简单地导入驱动代码到pyspark而不用执行。注意这里Spark配置通过setMaster方法被硬编码到SparkConf,一般你应该允许这个值通过命令行来设置,所以你能看到这行做了占位符注释。

使用<sc.stop()或<sys.exit(0)来关闭或退出程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
## Spark Application - execute with spark-submit
 
## Imports
import  csv
import  matplotlib.pyplot as plt
 
from  StringIO import  StringIO
from  datetime import  datetime
from  collections import  namedtuple
from  operator import  add, itemgetter
from  pyspark import  SparkConf, SparkContext
 
## Module Constants
APP_NAME =  "Flight Delay Analysis"
DATE_FMT =  "%Y-%m-%d"
TIME_FMT =  "%H%M"
 
fields   =  ( 'date' , 'airline' , 'flightnum' , 'origin' , 'dest' , 'dep' ,
             'dep_delay' , 'arv' , 'arv_delay' , 'airtime' , 'distance' )
Flight   =  namedtuple( 'Flight' , fields)
 
## Closure Functions
def  parse(row):
     """
     Parses a row and returns a named tuple.
     """
 
     row[ 0 =  datetime.strptime(row[ 0 ], DATE_FMT).date()
     row[ 5 =  datetime.strptime(row[ 5 ], TIME_FMT).time()
     row[ 6 =  float (row[ 6 ])
     row[ 7 =  datetime.strptime(row[ 7 ], TIME_FMT).time()
     row[ 8 =  float (row[ 8 ])
     row[ 9 =  float (row[ 9 ])