Spark-快速上手-阿里云开发者社区

开发者社区> 开发与运维> 正文

Spark-快速上手

简介: 快速上手 Spark 的交互式 shell( 用 Python 或Scala) 介 绍 它的 API 。当演示如何在 Java, Scala 和 Python 写独立的程序 时 ,看 编 程指南里完整的参考。

快速上手

Spark 的交互式 shell( 用 Python 或Scala) 介 绍 它的 API 。当演示如何在 Java, Scala 和 Python 写独立的程序 时 ,看 编 程指南里完整的参考。依照 这 个指南,首先从 Spark 网站下 载 一个 Spark 发 行包。因 为 我 们 不会使用 HDFS ,你可以下 载 任何 Hadoop 版本的包。使用 Spark Shell。Spark 的 shell 作 为 一个强大的交互式数据分析工具,提供了一个 简单 的方式来学 习 API 。它可以使用 Scala( 在 Java 虚 拟 机上 运 行 现 有的 Java 库 的一个很好方式 ) 或 Python 。在 Spark目 录 里使用下面的方式开始 运 行:

./bin/spark-shell

Spark 最主要的抽象是叫Resilient Distributed Dataset(RDD) 的 弹 性分布式集合。RDDs 可以使用 Hadoop InputFormats(例如 HDFS 文件) 创 建,也可以从其他的 RDDs 转换 。 让 我 们 在Spark 源代 码 目 录 从 README 文本文件中 创 建一个新的 RDD。

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDD 的 actions 从 RDD 中返回 值 , transformations 可以 转换 成一个新 RDD 并返回它的引用。让我们开始使用几个操作:

scala> textFile.count() // RDD  的数据条数
res0: Long = 126
scala> textFile.first() // RDD  的第一行数据
res1: String = # Apache Spark

现在让我们使用一个 transformation ,我们将使用 filter 在这个文件里返回一个包含子数据集的新 RDD。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

我们可以把 actions 和 transformations 链接在一起:

scala> textFile.filter(line => line.contains("Spark")).count() //  有多少行包括  "Spark"
res3: Long = 15

RDD 操作

RDD actions 和 transformations 能被用在更多的复 杂计算中。比方 说 ,我 们 想要找到一行中最多的 单词 数量:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

首先将行映射成一个整型数 值产 生一个新RDD 在这 个新的RDD上调用reduce找到行中最大的个数。map 和 reduce的参数是Scala的函数串 ( 闭 包 ) ,并且可以使用任何 语 言特性或者 Scala/Java 类库 。例如,我 们 可以很方便地 调 用其他的函数声明。 我 们 使用
Math.max() 函数 让 代 码 更容易理解:

scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

Hadoop 流行的一个通用的数据流模式是 MapReduce 。 Spark 能很容易地 实现
MapReduce:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

这里,我们结合 flatMap, map 和 reduceByKey 来计算文件里每个单词出现的数量,它的结果
是包含一组(String, Int) 键值对的 RDD。我们可以使用 [collect] 操作在我们的 shell 中收集单
词 的数量:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2

缓 存

Spark 支持把数据集拉到集群内的内存 缓 存中。当要重复 访问时这 是非常有用的,例如当我 们在一个小的热(hot)数据集中查询,或者运行一个像网页搜索排序这样的重复算法。作为一个简单的例子,让我们把linesWithSpark数据集标记在缓存中:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15

缓 存100行的文本文件来研究Spark这 看起来很傻。真正让人感 兴 趣的部分是我 们 可以在非常大型的数据集中使用同 样 的函数,甚至在 10 个或者 100 个节 点中交叉 计 算。你同 样 可以使用 bin/spark-shell 连 接到一个 cluster 来替 换 掉 进 行交互操作。

独立 应 用程序

现 在假 设 我 们 想要使用 Spark API 写一个独立的 应 用程序。我 们 将通 过 使用 Scala( 用 SBT) ,Java( 用 Maven) 和 Python 写一个 简单 的 应 用程序来学 习 。
我 们 用 Scala 创 建一个非常 简单 的 Spark 应 用程序。

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" //  应该 是你系 统 上的某些文件
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

这个程序仅仅是在 Spark README 中计算行里面包含 ‘a’ 和包含 ‘b’ 的次数。你需要注意将 YOUR_SPARK_HOME 替换成你已经安装 Spark 的路径。不像之前的 Spark Shell 例子,这里初始化了自己的 SparkContext ,我 们 把 SparkContext 初始化作 为 程序的一部分。我 们 通 过 SparkContext 的 构 造函数参入 SparkConf 对 象, 这 个 对 象包含了一些关于我 们 程序的信息。我们的程序依赖于 Spark API,所以我们需要包含一个 sbt 文件文件, simple.sbt 解释了Spark 是一个依赖。这个文件还要补充 Spark 依赖于一个 repository:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"

要 让 sbt 正确工作,我 们 需要把 SimpleApp.scala 和 simple.sbt 按照 标 准的文件目 录结构布局。上面的做好之后,我 们 可以把程序的代 码创 建成一个 JAR 包。然后使用 spark-submit 来 运 行我 们 的程序。

$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
$ sbt package
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.0.jar

其他的 组 件例如: Spark Streaming

要 让 程序 运 行在集群 (cluster) 上,最后, Spark 在 examples 文件目 录 里包含了 Scala, Java 和 Python 的几个 简单 的例子,你可以直接 运 行它 们 :

./bin/run-example SparkPi

./bin/spark-submit examples/src/main/python/pi.py

宏 观 上 说 ,每个 Spark 应 用程序都由一个 驱动 程序 (driver programe) 构 成, 驱动 程序在集群上运 行用 户 的 main 函数来 执 行各种各 样 的并行操作 (parallel operations) 。 Spark 的主要抽象是提供一个 弹 性分布式数据集 (RDD resilient distributed dataset) , RDD 是指能横跨集群所有节 点 进 行并行计算的分区元素集合。 RDD可以从 Hadoop 文件系 统 中的一个文件中 创 建而来( 或其他 Hadoop 支持的文件系 统 ) ,或者从一个已有的 Scala 集合 转换 得到。用 户 可以要求Spark 将 RDD 持久化 (persist) 到内存中,来 让 它在并行 计 算中高效地重用。最后, RDD 能从节 点失 败 中自 动 地恢复 过 来。Spark 的第二个抽象是共享 变 量(shared variables),共享 变 量能被 运 行在并行 计 算中。默 认 情况下,当 Spark 运 行一个并行函数 时 , 这 个并行函数会作 为 一个任 务 集在不同的 节点上 运行,它会把函数里使用的每个 变量都复制搬 运 到每个任 务 中。有 时 ,一个 变 量需要被共享到交叉任 务 中或 驱动 程序和任 务 之 间 。Spark 支持 2 种 类 型的共享 变 量:广播 变 量(*broadcastvariables) ,用来在所有 节 点的内存中 缓 存一个 值 ;累加器*(accumulators) , 仅仅 只能 执 行 “ 添加(added)”操作,例如:记数器(counters)和求和(sums)。 Spark 支持的所有语言中演示它的每一个特征。可以非常简单地从一个 Spark交互式 shell 开始 -—— bin/spark-shell 开始一个 Scala shell,或 bin/pyspark 开始一个Python shell。
Spark 1.2.0 使用 Scala 2.10 写 应 用程序,你需要使用一个兼容的 Scala 版本 ( 例如:2.10.X) 。

写 Spark 应 用程序

你需要添加 Spark 的 Maven 依 赖 , Spark 可以通 过 Maven 中心 仓库
来 获 得:
groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.2.0
另 外,如果你希望 访问 HDFS 集群,你需要根据你的 HDFS 版本添加 hadoop-client 的依赖 。一些公共的 HDFS 版本 tags 在第三方 发 行 页 面中被groupId = org.apache.hadoop
artifactId = hadoop-client
version = 你需要导入一些 Spark 的类和隐式转换到你的程序,添加下面的行就可以了:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

初始化 Spark
Spark 编 程的第一步是需要 创 建一个 SparkContext 对 象,用来告 诉 Spark 如何 访问 集群。在创 建 SparkContext 之前,你需要 构 建一个 SparkConf 对 象, SparkConf 对 象包含了一些你应 用程序的信息。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName 参数是你程序的名字,它会 显 示在 cluster UI 上。 master 是 Spark, Mesos 或YARN 集群的 URL,或 运 行在本地模式 时 ,使用 专 用字符串 “local”。在 实 践中,当 应 用程序运 行在一个集群上 时 ,你并不想要把 master 硬 编码 到你的程序中,你可以用 spark-submit启动 你的 应 用程序的 时 候 传递 它。然而,你可以在本地 测试 和 单 元 测试 中使用 “local” 运 行Spark 进 程。
使用 Shell在 Spark shell 中,有一个 专 有的 SparkContext 已 经为 你 创 建好。在 变 量中叫做 sc 。你自己创建的 SparkContext 将无法工作。可以用 –master 参数来设置 SparkContext 要连接的集群,用 –jars 来设置需要添加到 classpath 中的 JAR 包,如果有多个 JAR 包使用逗号分割符连接它们。例如:在一个拥有 4 核的环境上运行 bin/spark-shell ,使用:

$ ./bin/spark-shell --master local[4]
或在 classpath 中添加   code.jar  ,使用:
$ ./bin/spark-shell --master local[4] --jars code.jar

执行 spark-shell –help 获取完整的选项列表。在这之后,调用 spark-shell 会比 spark-submit 脚本更 为 普遍。

弹 性分布式数据集 (RDDs)

Spark 核心的概念是 Resilient Distributed Dataset (RDD) :一个可并行操作的有容 错 机制的数据集合。有 2 种方式 创 建 RDDs :第一种是在你的 驱动 程序中并行化一个已 经 存在的集合另 外一种是引用一个外部存 储 系 统 的数据集,例如共享的文件系 统 , HDFS , HBase 或其他Hadoop 数据格式的数据源。
Spark RDD并行集合 (Parallelized collections) 的 创 建是通 过 在一个已有的集合 (Scala Seq ) 上 调 用SparkContext 的 parallelize 方法 实现 的。集合中的元素被复制到一个可并行操作的分布式数据集中。例如, 这 里演示了如何在一个包含 1 到 5 的数 组 中 创 建并行集合:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

一旦 创 建完成, 这 个分布式数据集( distData )就可以被并行操作。例如,我 们 可以 调 用

 distData.reduce((a, b) => a + b)    将 这 个数 组 中的元素相加。

我 们 以后再描述在分布式上的一些操作。并行集合一个很重要的参数是切片数(slices),表示一个数据集切分的份数。Spark 会在集群上为每一个切片运行一个任务。你可以在集群上为每个 CPU 设置 2-4 个切片 (slices) 。正常情
况下,Spark 会 试 着基于你的集群状况自 动 地 设 置切片的数目。然而,你也可以通 过 parallelize 的第二个参数手 动 地 设 置 ( 例如: sc.parallelize(data, 10) ) 。
外部数据集,Spark 可以从任何一个 Hadoop 支持的存 储 源 创 建分布式数据集,包括你的本地文件系 统 ,HDFS , Cassandra , HBase , Amazon S3 等。 Spark 支持文本文件 (textfiles) , SequenceFiles 和其他 Hadoop InputFormat 。文本文件 RDDs 可以使用 SparkContext 的 textFile 方法 创 建。 在 这 个方法里 传 入文件的URI ( 机器上的本地路径或 hdfs:// , s3n:// 等 ) ,然后它会将文件 读 取成一个行集合。 这 里是一个 调 用例子:

scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08

一旦 创 建完成, distFiile 就能做数据集操作。例如,我 们 可以用下面的方式使用 map 和reduce 操作将所有行的长度相加:

distFile.map(s => s.length).reduce((a, b) => a + b)  

注意,Spark 读文件时:
如果使用本地文件系统路径,文件必须能在 work 节点上用相同的路径访问到。要么复制文件到所有的 workers,要么使用网络的方式共享文件系统。
所有 Spark 的基于文件的方法,包括 textFile ,能很好地支持文件目录,压缩过的文件和通配符。例如,你可以使用 textFile(“/my/ 文件目 录 “) , textFile(“/my/ 文件目录 /.txt”) 和 textFile(“/my/ 文件目 录 /.gz”) 。textFile 方法也可以选择第二个可选参数来控制切片(slices)的数目。默认情况下,Spark 为每一个文件块(HDFS 默认文件块大小是 64M)创建一个切片(slice)。但是你也可以通 过 一个更大的 值 来 设 置一个更高的切片数目。注意,你不能 设 置一个小于文件 块 数
目的切片值。除了文本文件,Spark 的 Scala API 支持其他几种数据格式:
SparkContext.wholeTextFiles 让你读取一个包含多个小文本文件的文件目录并且返回每一个(filename, content)对。与 textFile 的差异是:它记录的是每个文件中的每一行。对 于 SequenceFiles,可以使用 SparkContext 的 sequenceFile[K, V] 方法 创 建,K 和 V分别对应的是 key 和 values 的类型。像 IntWritable 与 Text 一样,它们必须是 Hadoop的 Writable 接口的子类。另外,对于几种通用的 Writables,Spark 允许你指定原生类型来替代。例如: sequenceFile[Int, String] 将会自动读取 IntWritables 和 Text 。对于其他的 Hadoop InputFormats,你可以使用 SparkContext.hadoopRDD 方法,它可以指定任意的 JobConf , 输 入格式(InputFormat),key 类 型,values 类 型。你可以跟 设 置Hadoop job 一样的方法设置输入源。你还可以在新的 MapReduce 接口(org.apache.hadoop.mapreduce)基础上使用 SparkContext.newAPIHadoopRDD ,老的接SparkContext.newHadoopRDD )。RDD.saveAsObjectFile 和 SparkContext.objectFile 支持保存一个 RDD ,保存格式是一个 简单 的 Java 对 象序列化格式。 这 是一种效率不高的 专 有格式,如 Avro ,它提供了 简单 的方法来保存任何一个 RDD 。

RDD 操作

RDDs 支持 2 种 类 型的操作: 转换 (transformations) 从已 经 存在的数据集中 创 建一个新的数据集; 动 作 (actions) 在数据集上 进 行 计 算之后返回一个 值 到 驱动 程序。例如, map 是一个 转换操作,它将每一个数据集元素 传递给 一个函数并且返回一个新的 RDD 。 另 一方面, reduce 是一个 动 作,它使用相同的函数来聚合 RDD 的所有元素,并且将最 终 的 结 果返回到 驱动 程序( 不 过 也有一个并行 reduceByKey 能返回一个分布式数据集 ) 。在 Spark 中,所有的 转换 (transformations) 都是惰性 (lazy) 的,它 们 不会 马 上 计 算它 们 的 结
果。相反的,它 们仅仅记录转换 操作是 应 用到 哪 些基 础 数据集(例如一个文件)上的。 转换仅仅在 这 个 时 候 计 算:当 动 作 (action) 需要一个 结 果返回 给驱动 程序的 时 候。 这 个 设计 能 够让Spark 运 行得更加高效。例如,我 们 可以 实现 :通 过 map 创 建一个新数据集在 reduce 中使用,并且 仅仅 返回 reduce 的 结 果 给 driver,而不是整个大的映射 过 的数据集。默认情况下,每一个转换过的 RDD 会在每次执行动作 (action) 的时候重新计算一次。然而,你也可以使用 persist (或 cache )方法持久化( persist )一个 RDD 到内存中。在 这 个情况
下, Spark 会在集群上保存相关的元素,在你下次 查询 的 时 候会 变 得更快。在 这 里也同 样 支持持久化 RDD 到磁盘,或在多个节点间复制。
基 础为了说明 RDD 基本知识,考虑下面的简单程序:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行是定 义 来自于外部文件的 RDD。 这 个数据集并没有加 载 到内存或做其他的操
作: lines 仅仅是一个指向文件的指针。第二行是定义 lineLengths ,它是 map 转换
(transformation)的结果。同样, lineLengths 由于懒惰模式也没有立即计算。最后,我们执行 reduce ,它是一个动作 (action) 。在这个地方, Spark 把计算分成多个任务 (task) ,并且让它们运行在多个机器上。每台机器都运行自己的 map 部分和本地 reduce 部分。然后仅仅将结 果返回 给驱动 程序。如果我 们 想要再次使用 lineLengths ,我 们 可以添加:lineLengths.persist()在 reduce 之前,它会导致 lineLengths 在第一次计算完成之后保存到内存中传递 函数到 Spark
Spark 的 API 很大程度上依靠在 驱动 程序里 传递 函数到集群上 运 行。 这 里有 两 种推荐的方式:匿名函数 (Anonymous function syntax) ,可以在比 较 短的代 码 中使用。全局 单 例 对 象里的静 态 方法。例如,你可以定 义 object MyFunctions 然后 传递
MyFounctions.func1 ,像下面 这样 :

object MyFunctions {
  def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)

注意,它可能 传递 的是一个 类实 例里的一个方法引用 ( 而不是一个 单 例 对 象 ) , 这 里必 须传 送包含方法的整个对象。例如:
class MyClass {
def func1(s: String): String = { … }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
这 里,如果我 们创 建了一个 new MyClass 对 象,并且 调 用它的 doStuff , map 里面引用了这个 MyClass 实例中的 func1 方法,所以这个对象必须传送到集群上。类 rdd.map(x => this.func1(x)) 的方式,访问外部对象的字段将会引用整个对象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

相当于写成 rdd.map(x => this.field + x) ,引用了整个 this 对象。为了避免这个问题,
最简单的方式是复制 field 到一个本地变量而不是从外部访问它:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

使用 键值对虽 然很多 Spark 操作工作在包含任意 类 型 对 象的 RDDs 上的,但是少数几个特殊操作 仅仅 在键值 (key-value) 对 RDDs 上可用。最常 见 的是分布式 “shuffle” 操作,例如根据一个 key 对 一组 数据 进 行分 组 和聚合。在 Scala 中, 这 些操作在包含二元 组 (Tuple2)( 在 语 言的内建元 组 中,通 过简单 的写 (a, b) 创
建 ) 的 RDD 上自 动 地 变 成可用的,只要在你的程序中 导 入 org.apache.spark.SparkContext._ 来 启 用 Spark 的 隐 式 转换 。在 PairRDDFunctions 的 类 里 键值对 操作是可以使用的,如果你导 入 隐 式 转换 它会自 动 地包装成元 组 RDD。例如,下面的代 码 在 键值对 上使用 reduceByKey 操作来 统计 在一个文件里每一行文本内容出
现 的次数:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

我们也可以使用 counts.sortByKey() ,例如,将键值对按照字母进行排序,最后
counts.collect() 把它 们 作 为 一个 对 象数 组带 回到 驱动 程序。
注意:当使用一个自定 义对 象作 为 key 在使用 键值对 操作的 时 候,你需要确保自定 义
equals() 方法和 hashCode() 方法是匹配的。更加详细的内容,查看 Object.hashCode() 文档)中的契约概述。Transformations Spark 支持的一些常用 transformations 。 RDD API 文档
(Scala, Java, Python) 和 PairRDDFunctions 文档 (Scala, Java) 。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

其他文章