CentOS 6.4下安装配置Spark-0.9集群

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

Spark是一个快速、通用的计算集群框架,它的内核使用Scala语言编写,它提供了Scala、Java和Python编程语言high-level API,使用这些API能够非常容易地开发并行处理的应用程序。
下面,我们通过搭建Spark集群计算环境,并进行简单地验证,来体验一下使用Spark计算的特点。无论从安装运行环境还是从编写处理程序(用Scala,Spark默认提供的Shell环境可以直接输入Scala代码进行数据处理),我们都会觉得比Hadoop MapReduce计算框架要简单得多,而且,Spark可以很好地与HDFS进行交互(从HDFS读取数据,以及写数据到HDFS中)。

安装配置

  • 下载安装配置Scala
1 wget http://www.scala-lang.org/files/archive/scala-2.10.3.tgz
2 tar xvzf scala-2.10.3.tgz

在~/.bashrc中增加环境变量SCALA_HOME,并使之生效:

1 export SCALA_HOME=/usr/scala/scala-2.10.3
2 export PATH=$PATH:$SCALA_HOME/bin
  • 下载安装配置Spark

我们首先在主节点m1上配置Spark程序,然后将配置好的程序文件复制分发到集群的各个从结点上。下载解压缩:

1 wget http://d3kbcqa49mib13.cloudfront.net/spark-0.9.0-incubating-bin-hadoop1.tgz
2 tar xvzf spark-0.9.0-incubating-bin-hadoop1.tgz

在~/.bashrc中增加环境变量SPARK_HOME,并使之生效:

1 export SPARK_HOME=/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1
2 export PATH=$PATH:$SPARK_HOME/bin

在m1上配置Spark,修改spark-env.sh配置文件:

1 cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/conf
2 cp spark-env.sh.template spark-env.sh

在该脚本文件中,同时将SCALA_HOME配置为Unix环境下实际指向路径,例如:

1 export SCALA_HOME=/usr/scala/scala-2.10.3

修改conf/slaves文件,将计算节点的主机名添加到该文件,一行一个,例如:

1 s1
2 s2
3 s3

最后,将Spark的程序文件和配置文件拷贝分发到从节点机器上:

1 scp -r ~/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 shirdrn@s1:~/cloud/programs/
2 scp -r ~/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 shirdrn@s2:~/cloud/programs/
3 scp -r ~/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 shirdrn@s3:~/cloud/programs/

启动Spark集群

我们会使用HDFS集群上存储的数据作为计算的输入,所以首先要把Hadoop集群安装配置好,并成功启动,我这里使用的是Hadoop 1.2.1版本。启动Spark计算集群非常简单,执行如下命令即可:

1 cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/
2 sbin/start-all.sh

可以看到,在m1上启动了一个名称为Master的进程,在s1上启动了一个名称为Worker的进程,如下所示,我这里也启动了Hadoop集群:
主节点m1上:

1 54968 SecondaryNameNode
2 55651 Master
3 55087 JobTracker
4 54814 NameNode
5
6 从节点s1上:
7 33592 Worker
8 33442 TaskTracker
9 33336 DataNode

各个进程是否启动成功,也可以查看日志来诊断,例如:

1 主节点上:
2 tail -100f $SPARK_HOME/logs/spark-shirdrn-org.apache.spark.deploy.master.Master-1-m1.out
3 从节点上:
4 tail -100f $SPARK_HOME/logs/spark-shirdrn-org.apache.spark.deploy.worker.Worker-1-s1.out

Spark集群计算验证

我们使用我的网站的访问日志文件来演示,示例如下:

1 27.159.254.192 - - [21/Feb/2014:11:40:46 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
2 120.43.4.206 - - [21/Feb/2014:10:37:37 +0800] "GET /archives/417.html HTTP/1.1" 200 11464 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"

统计该文件里面IP地址出现频率,来验证Spark集群能够正常计算。另外,我们需要从HDFS中读取这个日志文件,然后统计IP地址频率,最后将结果再保存到HDFS中的指定目录。
首先,需要启动用来提交计算任务的Spark Shell:

1 bin/spark-shell

在Spark Shell上只能使用Scala语言写代码来运行。
然后,执行统计IP地址频率,在Spark Shell中执行如下代码来实现:

1 val file = sc.textFile("hdfs://m1:9000/user/shirdrn/wwwlog20140222.log")
2 val result = file.flatMap(line => line.split("\\s+.*")).map(word => (word,1)).reduceByKey((a, b) => a + b)

上述的文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log是输入日志文件。处理过程的日志信息,示例如下所示:

01 14/03/06 21:59:22 INFO MemoryStore: ensureFreeSpace(784) called with curMem=43296, maxMem=311387750
02 14/03/06 21:59:22 INFO MemoryStore: Block broadcast_11 stored as values to memory (estimated size 784.0 B, free 296.9 MB)
03 14/03/06 21:59:22 INFO FileInputFormat: Total input paths to process : 1
04 14/03/06 21:59:22 INFO SparkContext: Starting job: collect at <console>:13
05 14/03/06 21:59:22 INFO DAGScheduler: Registering RDD 84 (reduceByKey at <console>:13)
06 14/03/06 21:59:22 INFO DAGScheduler: Got job 10 (collect at <console>:13) with 1 output partitions (allowLocal=false)
07 14/03/06 21:59:22 INFO DAGScheduler: Final stage: Stage 20 (collect at <console>:13)
08 14/03/06 21:59:22 INFO DAGScheduler: Parents of final stage: List(Stage 21)
09 14/03/06 21:59:22 INFO DAGScheduler: Missing parents: List(Stage 21)
10 14/03/06 21:59:22 INFO DAGScheduler: Submitting Stage 21 (MapPartitionsRDD[84] at reduceByKey at <console>:13), which has no missing parents
11 14/03/06 21:59:22 INFO DAGScheduler: Submitting 1 missing tasks from Stage 21 (MapPartitionsRDD[84] at reduceByKey at <console>:13)
12 14/03/06 21:59:22 INFO TaskSchedulerImpl: Adding task set 21.0 with 1 tasks
13 14/03/06 21:59:22 INFO TaskSetManager: Starting task 21.0:0 as TID 19 on executor localhost: localhost (PROCESS_LOCAL)
14 14/03/06 21:59:22 INFO TaskSetManager: Serialized task 21.0:0 as 1941 bytes in 0 ms
15 14/03/06 21:59:22 INFO Executor: Running task ID 19
16 14/03/06 21:59:22 INFO BlockManager: Found block broadcast_11 locally
17 14/03/06 21:59:22 INFO HadoopRDD: Input split:hdfs://m1:9000/user/shirdrn/wwwlog20140222.log:0+4179514
18 14/03/06 21:59:23 INFO Executor: Serialized size of result for 19 is 738
19 14/03/06 21:59:23 INFO Executor: Sending result for 19 directly to driver
20 14/03/06 21:59:23 INFO TaskSetManager: Finished TID 19 in 211 ms on localhost (progress: 0/1)
21 14/03/06 21:59:23 INFO TaskSchedulerImpl: Remove TaskSet 21.0 from pool
22 14/03/06 21:59:23 INFO DAGScheduler: Completed ShuffleMapTask(21, 0)
23 14/03/06 21:59:23 INFO DAGScheduler: Stage 21 (reduceByKey at <console>:13) finished in 0.211 s
24 14/03/06 21:59:23 INFO DAGScheduler: looking for newly runnable stages
25 14/03/06 21:59:23 INFO DAGScheduler: running: Set()
26 14/03/06 21:59:23 INFO DAGScheduler: waiting: Set(Stage 20)
27 14/03/06 21:59:23 INFO DAGScheduler: failed: Set()
28 14/03/06 21:59:23 INFO DAGScheduler: Missing parents for Stage 20: List()
29 14/03/06 21:59:23 INFO DAGScheduler: Submitting Stage 20 (MapPartitionsRDD[86] at reduceByKey at <console>:13), which is now runnable
30 14/03/06 21:59:23 INFO DAGScheduler: Submitting 1 missing tasks from Stage 20 (MapPartitionsRDD[86] at reduceByKey at <console>:13)
31 14/03/06 21:59:23 INFO TaskSchedulerImpl: Adding task set 20.0 with 1 tasks
32 14/03/06 21:59:23 INFO Executor: Finished task ID 19
33 14/03/06 21:59:23 INFO TaskSetManager: Starting task 20.0:0 as TID 20 on executor localhost: localhost (PROCESS_LOCAL)
34 14/03/06 21:59:23 INFO TaskSetManager: Serialized task 20.0:0 as 1803 bytes in 0 ms
35 14/03/06 21:59:23 INFO Executor: Running task ID 20
36 14/03/06 21:59:23 INFO BlockManager: Found block broadcast_11 locally
37 14/03/06 21:59:23 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-zero-bytes blocks out of 1 blocks
38 14/03/06 21:59:23 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote gets in 1 ms
39 14/03/06 21:59:23 INFO Executor: Serialized size of result for 20 is 19423
40 14/03/06 21:59:23 INFO Executor: Sending result for 20 directly to driver
41 14/03/06 21:59:23 INFO TaskSetManager: Finished TID 20 in 17 ms on localhost (progress: 0/1)
42 14/03/06 21:59:23 INFO TaskSchedulerImpl: Remove TaskSet 20.0 from pool
43 14/03/06 21:59:23 INFO DAGScheduler: Completed ResultTask(20, 0)
44 14/03/06 21:59:23 INFO DAGScheduler: Stage 20 (collect at <console>:13) finished in 0.016 s
45 14/03/06 21:59:23 INFO SparkContext: Job finished: collect at <console>:13, took 0.242136929 s
46 14/03/06 21:59:23 INFO Executor: Finished task ID 20
47 res14: Array[(String, Int)] = Array((27.159.254.192,28), (120.43.9.81,40), (120.43.4.206,16), (120.37.242.176,56), (64.31.25.60,2), (27.153.161.9,32), (202.43.145.163,24), (61.187.102.6,1), (117.26.195.116,12), (27.153.186.194,64), (123.125.71.91,1), (110.85.106.105,64), (110.86.184.182,36), (27.150.247.36,52), (110.86.166.52,60), (175.98.162.2,20), (61.136.166.16,1), (46.105.105.217,1), (27.150.223.49,52), (112.5.252.6,20), (121.205.242.4,76), (183.61.174.211,3), (27.153.230.35,36), (112.111.172.96,40), (112.5.234.157,3), (144.76.95.232,7), (31.204.154.144,28), (123.125.71.22,1), (80.82.64.118,3), (27.153.248.188,160), (112.5.252.187,40), (221.219.105.71,4), (74.82.169.79,19), (117.26.253.195,32), (120.33.244.205,152), (110.86.165.8,84), (117.26.86.172,136), (27.153.233.101,8), (123.12...

可以看到,输出了经过map和reduce计算后的部分结果。
最后,我们想要将结果保存到HDFS中,只要输入如下代码:

1 result.saveAsTextFile("hdfs://m1:9000/user/shirdrn/wwwlog20140222.log.result")

查看HDFS上的结果数据:

查看源代码打印帮助

1 [shirdrn@m1 ~]$ hadoop fs -cat /user/shirdrn/wwwlog20140222.log.result/part-00000 |head -5
2 (27.159.254.192,28)
3 (120.43.9.81,40)
4 (120.43.4.206,16)
5 (120.37.242.176,56)
6 (64.31.25.60,2)
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
7天前
|
Java
CentOS7.8配置Adoptium-Java17运行环境
本指南介绍如何设置清华镜像源并安装 Temurin-17-JRE 运行环境。首先,编辑 `/etc/yum.repos.d/adoptium.repo` 文件,配置清华镜像源。接着,使用 `yum install -y temurin-17-jre` 命令安装 Temurin-17-JRE,并通过 `java --version` 验证安装成功。相关配置和操作界面截图附后。
22 8
|
6天前
|
网络协议 Java 应用服务中间件
centos7环境下tomcat8的安装与配置
本文介绍了在Linux环境下安装和配置Tomcat 8的详细步骤。首先,通过无网络条件下的文件交互软件(如Xftp 6或MobaXterm)下载并解压Tomcat安装包至指定路径,启动Tomcat服务并测试访问。接着,修改Tomcat端口号以避免冲突,并部署Java Web应用项目至Tomcat服务器。最后,调整Linux防火墙规则,确保外部可以正常访问部署的应用。关键步骤包括关闭或配置防火墙、添加必要的端口规则,确保Tomcat服务稳定运行。
|
2月前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据需求选择最合适的方法。通过具体案例,展示了编译源码安装的灵活性和定制性。
248 2
|
3月前
|
监控 安全 Linux
CentOS7下安装配置ntp服务的方法教程
通过以上步骤,您不仅能在CentOS 7系统中成功部署NTP服务,还能确保其配置合理、运行稳定,为系统时间的精确性提供保障。欲了解更多高级配置或遇到特定问题,提供了丰富的服务器管理和优化资源,可作为进一步学习和求助的平台。
383 1
|
3月前
|
Java jenkins 持续交付
Centos7下docker的jenkins下载并配置jdk与maven
通过上述步骤,您将成功在CentOS 7上的Docker容器中部署了Jenkins,并配置好了JDK与Maven,为持续集成和自动化构建打下了坚实基础。
167 1
|
3月前
|
Kubernetes Ubuntu Linux
Centos7 搭建 kubernetes集群
本文介绍了如何搭建一个三节点的Kubernetes集群,包括一个主节点和两个工作节点。各节点运行CentOS 7系统,最低配置为2核CPU、2GB内存和15GB硬盘。详细步骤包括环境配置、安装Docker、关闭防火墙和SELinux、禁用交换分区、安装kubeadm、kubelet、kubectl,以及初始化Kubernetes集群和安装网络插件Calico或Flannel。
272 4
|
3月前
|
存储 监控 Linux
在 CentOS 7 中如何对新硬盘进行分区、格式化、挂载及配置最佳实践
本文详细介绍了在 CentOS 7 中如何对新硬盘进行分区、格式化、挂载及配置最佳实践,包括使用 `fdisk` 创建分区、`mkfs` 格式化分区、创建挂载点、编辑 `/etc/fstab` 实现永久挂载等步骤,旨在有效管理服务器磁盘空间,提高系统稳定性和可维护性。
583 1
|
3月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
56 1
|
3月前
|
安全 Linux 数据库连接
CentOS 7环境下DM8数据库的安装与配置
【10月更文挑战第16天】本文介绍了在 CentOS 7 环境下安装与配置达梦数据库(DM8)的详细步骤,包括安装前准备、创建安装用户、上传安装文件、解压并运行安装程序、初始化数据库实例、配置环境变量、启动数据库服务、配置数据库连接和参数、备份与恢复、以及安装后的安全设置、性能优化和定期维护等内容。通过这些步骤,可以顺利完成 DM8 的安装与配置。
520 0
|
3月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
50 0