Spark是一个快速、通用的计算集群框架,它的内核使用Scala语言编写,它提供了Scala、Java和Python编程语言high-level API,使用这些API能够非常容易地开发并行处理的应用程序。
下面,我们通过搭建Spark集群计算环境,并进行简单地验证,来体验一下使用Spark计算的特点。无论从安装运行环境还是从编写处理程序(用Scala,Spark默认提供的Shell环境可以直接输入Scala代码进行数据处理),我们都会觉得比Hadoop MapReduce计算框架要简单得多,而且,Spark可以很好地与HDFS进行交互(从HDFS读取数据,以及写数据到HDFS中)。
安装配置
- 下载安装配置Scala
1 |
wget http://www.scala-lang.org/files/archive/scala-2.10.3.tgz |
2 |
tar xvzf scala-2.10.3.tgz |
在~/.bashrc中增加环境变量SCALA_HOME,并使之生效:
1 |
export SCALA_HOME=/usr/scala/scala-2.10.3 |
2 |
export PATH=$PATH:$SCALA_HOME/bin |
- 下载安装配置Spark
我们首先在主节点m1上配置Spark程序,然后将配置好的程序文件复制分发到集群的各个从结点上。下载解压缩:
1 |
wget http://d3kbcqa49mib13.cloudfront.net/spark-0.9.0-incubating-bin-hadoop1.tgz |
2 |
tar xvzf spark-0.9.0-incubating-bin-hadoop1.tgz |
在~/.bashrc中增加环境变量SPARK_HOME,并使之生效:
1 |
export SPARK_HOME=/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 |
2 |
export PATH=$PATH:$SPARK_HOME/bin |
在m1上配置Spark,修改spark-env.sh配置文件:
1 |
cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/conf |
2 |
cp spark- env .sh.template spark- env .sh |
在该脚本文件中,同时将SCALA_HOME配置为Unix环境下实际指向路径,例如:
1 |
export SCALA_HOME=/usr/scala/scala-2.10.3 |
修改conf/slaves文件,将计算节点的主机名添加到该文件,一行一个,例如:
1 |
s1 |
2 |
s2 |
3 |
s3 |
最后,将Spark的程序文件和配置文件拷贝分发到从节点机器上:
1 |
scp -r ~/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 shirdrn@s1:~/cloud/programs/ |
2 |
scp -r ~/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 shirdrn@s2:~/cloud/programs/ |
3 |
scp -r ~/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 shirdrn@s3:~/cloud/programs/ |
启动Spark集群
我们会使用HDFS集群上存储的数据作为计算的输入,所以首先要把Hadoop集群安装配置好,并成功启动,我这里使用的是Hadoop 1.2.1版本。启动Spark计算集群非常简单,执行如下命令即可:
1 |
cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/ |
2 |
sbin/start-all.sh |
可以看到,在m1上启动了一个名称为Master的进程,在s1上启动了一个名称为Worker的进程,如下所示,我这里也启动了Hadoop集群:
主节点m1上:
1 |
54968 SecondaryNameNode |
2 |
55651 Master |
3 |
55087 JobTracker |
4 |
54814 NameNode |
5 |
6 |
从节点s1上: |
7 |
33592 Worker |
8 |
33442 TaskTracker |
9 |
33336 DataNode |
各个进程是否启动成功,也可以查看日志来诊断,例如:
1 |
主节点上: |
2 |
tail -100f $SPARK_HOME/logs/spark-shirdrn-org.apache.spark.deploy.master.Master-1-m1.out |
3 |
从节点上: |
4 |
tail -100f $SPARK_HOME/logs/spark-shirdrn-org.apache.spark.deploy.worker.Worker-1-s1.out |
Spark集群计算验证
我们使用我的网站的访问日志文件来演示,示例如下:
1 |
27.159.254.192 - - [21/Feb/2014:11:40:46 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" |
2 |
120.43.4.206 - - [21/Feb/2014:10:37:37 +0800] "GET /archives/417.html HTTP/1.1" 200 11464 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" |
统计该文件里面IP地址出现频率,来验证Spark集群能够正常计算。另外,我们需要从HDFS中读取这个日志文件,然后统计IP地址频率,最后将结果再保存到HDFS中的指定目录。
首先,需要启动用来提交计算任务的Spark Shell:
1 |
bin/spark-shell |
在Spark Shell上只能使用Scala语言写代码来运行。
然后,执行统计IP地址频率,在Spark Shell中执行如下代码来实现:
1 |
val file = sc.textFile( "hdfs://m1:9000/user/shirdrn/wwwlog20140222.log" ) |
2 |
val result = file.flatMap(line = > line.split( "\\s+.*" )).map(word = > (word, 1 )).reduceByKey((a, b) = > a + b) |
上述的文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log是输入日志文件。处理过程的日志信息,示例如下所示:
01 |
14/03/06 21:59:22 INFO MemoryStore: ensureFreeSpace(784) called with curMem=43296, maxMem=311387750 |
02 |
14/03/06 21:59:22 INFO MemoryStore: Block broadcast_11 stored as values to memory (estimated size 784.0 B, free 296.9 MB) |
03 |
14/03/06 21:59:22 INFO FileInputFormat: Total input paths to process : 1 |
04 |
14/03/06 21:59:22 INFO SparkContext: Starting job: collect at <console>:13 |
05 |
14/03/06 21:59:22 INFO DAGScheduler: Registering RDD 84 (reduceByKey at <console>:13) |
06 |
14/03/06 21:59:22 INFO DAGScheduler: Got job 10 (collect at <console>:13) with 1 output partitions (allowLocal=false) |
07 |
14/03/06 21:59:22 INFO DAGScheduler: Final stage: Stage 20 (collect at <console>:13) |
08 |
14/03/06 21:59:22 INFO DAGScheduler: Parents of final stage: List(Stage 21) |
09 |
14/03/06 21:59:22 INFO DAGScheduler: Missing parents: List(Stage 21) |
10 |
14/03/06 21:59:22 INFO DAGScheduler: Submitting Stage 21 (MapPartitionsRDD[84] at reduceByKey at <console>:13), which has no missing parents |
11 |
14/03/06 21:59:22 INFO DAGScheduler: Submitting 1 missing tasks from Stage 21 (MapPartitionsRDD[84] at reduceByKey at <console>:13) |
12 |
14/03/06 21:59:22 INFO TaskSchedulerImpl: Adding task set 21.0 with 1 tasks |
13 |
14/03/06 21:59:22 INFO TaskSetManager: Starting task 21.0:0 as TID 19 on executor localhost: localhost (PROCESS_LOCAL) |
14 |
14/03/06 21:59:22 INFO TaskSetManager: Serialized task 21.0:0 as 1941 bytes in 0 ms |
15 |
14/03/06 21:59:22 INFO Executor: Running task ID 19 |
16 |
14/03/06 21:59:22 INFO BlockManager: Found block broadcast_11 locally |
17 |
14/03/06 21:59:22 INFO HadoopRDD: Input split:hdfs://m1:9000/user/shirdrn/wwwlog20140222.log:0+4179514 |
18 |
14/03/06 21:59:23 INFO Executor: Serialized size of result for 19 is 738 |
19 |
14/03/06 21:59:23 INFO Executor: Sending result for 19 directly to driver |
20 |
14/03/06 21:59:23 INFO TaskSetManager: Finished TID 19 in 211 ms on localhost (progress: 0/1) |
21 |
14/03/06 21:59:23 INFO TaskSchedulerImpl: Remove TaskSet 21.0 from pool |
22 |
14/03/06 21:59:23 INFO DAGScheduler: Completed ShuffleMapTask(21, 0) |
23 |
14/03/06 21:59:23 INFO DAGScheduler: Stage 21 (reduceByKey at <console>:13) finished in 0.211 s |
24 |
14/03/06 21:59:23 INFO DAGScheduler: looking for newly runnable stages |
25 |
14/03/06 21:59:23 INFO DAGScheduler: running: Set() |
26 |
14/03/06 21:59:23 INFO DAGScheduler: waiting: Set(Stage 20) |
27 |
14/03/06 21:59:23 INFO DAGScheduler: failed: Set() |
28 |
14/03/06 21:59:23 INFO DAGScheduler: Missing parents for Stage 20: List() |
29 |
14/03/06 21:59:23 INFO DAGScheduler: Submitting Stage 20 (MapPartitionsRDD[86] at reduceByKey at <console>:13), which is now runnable |
30 |
14/03/06 21:59:23 INFO DAGScheduler: Submitting 1 missing tasks from Stage 20 (MapPartitionsRDD[86] at reduceByKey at <console>:13) |
31 |
14/03/06 21:59:23 INFO TaskSchedulerImpl: Adding task set 20.0 with 1 tasks |
32 |
14/03/06 21:59:23 INFO Executor: Finished task ID 19 |
33 |
14/03/06 21:59:23 INFO TaskSetManager: Starting task 20.0:0 as TID 20 on executor localhost: localhost (PROCESS_LOCAL) |
34 |
14/03/06 21:59:23 INFO TaskSetManager: Serialized task 20.0:0 as 1803 bytes in 0 ms |
35 |
14/03/06 21:59:23 INFO Executor: Running task ID 20 |
36 |
14/03/06 21:59:23 INFO BlockManager: Found block broadcast_11 locally |
37 |
14/03/06 21:59:23 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-zero-bytes blocks out of 1 blocks |
38 |
14/03/06 21:59:23 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in 1 ms |
39 |
14/03/06 21:59:23 INFO Executor: Serialized size of result for 20 is 19423 |
40 |
14/03/06 21:59:23 INFO Executor: Sending result for 20 directly to driver |
41 |
14/03/06 21:59:23 INFO TaskSetManager: Finished TID 20 in 17 ms on localhost (progress: 0/1) |
42 |
14/03/06 21:59:23 INFO TaskSchedulerImpl: Remove TaskSet 20.0 from pool |
43 |
14/03/06 21:59:23 INFO DAGScheduler: Completed ResultTask(20, 0) |
44 |
14/03/06 21:59:23 INFO DAGScheduler: Stage 20 (collect at <console>:13) finished in 0.016 s |
45 |
14/03/06 21:59:23 INFO SparkContext: Job finished: collect at <console>:13, took 0.242136929 s |
46 |
14/03/06 21:59:23 INFO Executor: Finished task ID 20 |
47 |
res14: Array[(String, Int)] = Array((27.159.254.192,28), (120.43.9.81,40), (120.43.4.206,16), (120.37.242.176,56), (64.31.25.60,2), (27.153.161.9,32), (202.43.145.163,24), (61.187.102.6,1), (117.26.195.116,12), (27.153.186.194,64), (123.125.71.91,1), (110.85.106.105,64), (110.86.184.182,36), (27.150.247.36,52), (110.86.166.52,60), (175.98.162.2,20), (61.136.166.16,1), (46.105.105.217,1), (27.150.223.49,52), (112.5.252.6,20), (121.205.242.4,76), (183.61.174.211,3), (27.153.230.35,36), (112.111.172.96,40), (112.5.234.157,3), (144.76.95.232,7), (31.204.154.144,28), (123.125.71.22,1), (80.82.64.118,3), (27.153.248.188,160), (112.5.252.187,40), (221.219.105.71,4), (74.82.169.79,19), (117.26.253.195,32), (120.33.244.205,152), (110.86.165.8,84), (117.26.86.172,136), (27.153.233.101,8), (123.12... |
可以看到,输出了经过map和reduce计算后的部分结果。
最后,我们想要将结果保存到HDFS中,只要输入如下代码:
1 |
result.saveAsTextFile( "hdfs://m1:9000/user/shirdrn/wwwlog20140222.log.result" ) |
查看HDFS上的结果数据:
1 |
[shirdrn@m1 ~]$ hadoop fs - cat /user/shirdrn/wwwlog20140222.log.result/part-00000 | head -5 |
2 |
(27.159.254.192,28) |
3 |
(120.43.9.81,40) |
4 |
(120.43.4.206,16) |
5 |
(120.37.242.176,56) |
6 |
(64.31.25.60,2) |