Spark本地模式运行

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

Spark的安装分为几种模式,其中一种是本地运行模式,只需要在单节点上解压即可运行,这种模式不需要依赖Hadoop 环境。在本地运行模式中,master和worker都运行在一个jvm进程中,通过该模式,可以快速的测试Spark的功能。

下载 Spark

下载地址为http://spark.apache.org/downloads.html,根据页面提示选择一个合适的版本下载,这里我下载的是 spark-1.3.0-bin-cdh4.tgz。下载之后解压:

 cd ~
 wget http://mirror.bit.edu.cn/apache/spark/spark-1.3.0/spark-1.3.0-bin-cdh4.tgz
 tar -xf spark-1.3.0-bin-cdh4.tgz
 cd spark-1.3.0-bin-cdh4

下载之后的目录为:

⇒  tree -L 1
.
├── CHANGES.txt
├── LICENSE
├── NOTICE
├── README.md
├── RELEASE
├── bin
├── conf
├── data
├── ec2
├── examples
├── lib
├── python
└── sbin

运行 spark-shell

本地模式运行spark-shell非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME

$ MASTER=local 
$ bin/spark-shell

MASTER=local就是表明当前运行在单机模式。如果一切顺利,将看到下面的提示信息:

Created spark context..
Spark context available as sc.

这表明spark-shell中已经内置了Spark context的变量,名称为sc,我们可以直接使用该变量进行后续的操作。

spark-shell 后面设置 master 参数,可以支持更多的模式,请参考 http://spark.apache.org/docs/latest/submitting-applications.html#master-urls

我们在sparkshell中运行一下最简单的例子,统计在README.md中含有Spark的行数有多少,在spark-shell中输入如下代码:

scala>sc.textFile("README.md").filter(_.contains("Spark")).count

如果你觉得输出的日志太多,你可以从模板文件创建 conf/log4j.properties :

$ mv conf/log4j.properties.template conf/log4j.properties

然后修改日志输出级别为WARN

log4j.rootCategory=WARN, console

如果你设置的 log4j 日志等级为 INFO,则你可以看到这样的一行日志 INFO SparkUI: Started SparkUI at http://10.9.4.165:4040,意思是 Spark 启动了一个 web 服务器,你可以通过浏览器访问http://10.9.4.165:4040来查看 Spark 的任务运行状态等信息。

pyspark

运行 bin/pyspark 的输出为:

$ bin/pyspark
Python 2.7.6 (default, Sep  9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
15/03/30 15:19:07 WARN Utils: Your hostname, june-mac resolves to a loopback address: 127.0.0.1; using 10.9.4.165 instead (on interface utun0)
15/03/30 15:19:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/03/30 15:19:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ / __/  _/
   /__ / .__/\_,_/_/ /_/\_\   version 1.3.0
      /_/

Using Python version 2.7.6 (default, Sep  9 2014 15:04:36)
SparkContext available as sc, HiveContext available as sqlCtx.

你也可以使用 IPython 来运行 Spark:

IPYTHON=1  ./bin/pyspark

如果要使用 IPython NoteBook,则运行:

IPYTHON_OPTS="notebook"  ./bin/pyspark

从日志可以看到,不管是 bin/pyspark 还是 bin/spark-shell,他们都有两个内置的变量:sc 和 sqlCtx。

SparkContext available as sc, HiveContext available as sqlCtx

sc 代表着 Spark 的上下文,通过该变量可以执行 Spark 的一些操作,而 sqlCtx 代表着 HiveContext 的上下文。

spark-submit

在Spark1.0之后提供了一个统一的脚本spark-submit来提交任务。

对于 python 程序,我们可以直接使用 spark-submit:

$ mkdir -p /usr/lib/spark/examples/python
$ tar zxvf /usr/lib/spark/lib/python.tar.gz -C /usr/lib/spark/examples/python

$ ./bin/spark-submit examples/python/pi.py 10

对于 Java 程序,我们需要先编译代码然后打包运行:

$ spark-submit --class "SimpleApp" --master local[4] simple-project-1.0.jar

测试 RDD

在 Spark 中,我们操作的集合被称为 RDD,他们被并行拷贝到集群各个节点上。我们可以通过 sc 来创建 RDD 。

创建 RDD 有两种方式:

  • sc.parallelize()
  • sc.textFile()

使用 Scala 对 RDD 的一些操作:

val rdd1=sc.parallelize(List(1,2,3,3))
val rdd2=sc.parallelize(List(3,4,5))

//转换操作
rdd1.map(2*).collect //等同于:rdd1.map(t=>2*t).collect
//Array[Int] = Array(2, 4, 6, 6)

rdd1.filter(_>2).collect
//Array[Int] = Array(3, 3)

rdd1.flatMap(_ to 4).collect
//Array[Int] = Array(1, 2, 3, 4, 2, 3, 4, 3, 4, 3, 4)

rdd1.sample(false, 0.3, 4).collect
//Array[Int] = Array(3, 3)

rdd1.sample(true, 0.3, 4).collect
//Array[Int] = Array(3)

rdd1.union(rdd2).collect
//Array[Int] = Array(1, 2, 3, 3, 3, 4, 5)

rdd1.distinct().collect
//Array[Int] = Array(1, 2, 3)

rdd1.map(i=>(i,i)).groupByKey.collect
//Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(1)), (2,CompactBuffer(2)), (3,CompactBuffer(3, 3)))

rdd1.map(i=>(i,i)).reduceByKey(_ + _).collect
//Array[(Int, Int)] = Array((1,1), (2,2), (3,6))

rdd1.map(i=>(i,i)).sortByKey(false).collect
//Array[(Int, Int)] = Array((3,3), (3,3), (2,2), (1,1))

rdd1.map(i=>(i,i)).join(rdd2.map(i=>(i,i))).collect
//Array[(Int, (Int, Int))] = Array((3,(3,3)), (3,(3,3)))

rdd1.map(i=>(i,i)).cogroup(rdd2.map(i=>(i,i))).collect
//Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((4,(CompactBuffer(),CompactBuffer(4))), (1,(CompactBuffer(1),CompactBuffer())), (5,(CompactBuffer(),CompactBuffer(5))), (2,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(3, 3),CompactBuffer(3))))

rdd1.cartesian(rdd2).collect()
//Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5), (3,3), (3,4), (3,5))

rdd1.pipe("head -n 1").collect
//Array[String] = Array(1, 2, 3, 3)

//动作操作
rdd1.reduce(_ + _)
//Int = 9

rdd1.collect
//Array[Int] = Array(1, 2, 3, 3)

rdd1.first()
//Int = 1

rdd1.take(2)
//Array[Int] = Array(1, 2)

rdd1.top(2)
//Array[Int] = Array(3, 3)

rdd1.takeOrdered(2)
//Array[Int] = Array(1, 2)

rdd1.map(i=>(i,i)).countByKey()
//scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)

rdd1.countByValue()
//scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)

rdd1.intersection(rdd2).collect()
//Array[Int] = Array(3)

rdd1.subtract(rdd2).collect()
//Array[Int] = Array(1, 2)

rdd1.foreach(println)
//3
//2
//3
//1

rdd1.foreachPartition(x => println(x.reduce(_ + _)))

更多例子,参考http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
3月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
201 6
|
3月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
88 2
|
5月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
285 3
|
3月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
47 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
6月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
6月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
7月前
|
存储 分布式计算 监控
Spark Standalone模式是一种集群部署方式
【6月更文挑战第17天】Spark Standalone模式是一种集群部署方式
95 7
|
7月前
|
分布式计算 DataWorks 网络安全
DataWorks操作报错合集之还未运行,spark节点一直报错,如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
8月前
|
分布式计算 资源调度 调度
利用SparkLauncher实现Spark Cluster模式下的远端交互
利用SparkLauncher实现Spark Cluster模式下的远端交互
146 0
|
8月前
|
机器学习/深度学习 分布式计算 并行计算
Spark 3.0 中的屏障执行模式_Spark的MPI时代来了
Spark 3.0 中的屏障执行模式_Spark的MPI时代来了
48 0