广播 变 量
广播 变 量允 许 程序 员缓 存一个只 读 的 变 量在每台机器上面,而不是每个任 务 保存一份拷 贝 。例如,利用广播 变 量,我 们 能 够 以一种更有效率的方式将一个大数据量 输 入集合的副本分配给 每个 节 点。(Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.)Spark也尝试 着利用有效的广播算法去分配广播 变 量,以 减 少通信的成本。一个广播变量可以通过调用 SparkContext.broadcast(v) 方法从一个初始变量v中创建。广播变量是v的一个包装 变 量,它的 值 可以通 过 value 方法 访问 ,下面的代 码说 明了 这 个 过 程:
<span style="font-size:24px;"> scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)</span>
广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量 v ,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改。
累加器
顾 名思 义 ,累加器是一种只能通 过 关 联 操作 进 行“加”操作的 变 量,因此它能 够 高效的 应 用于并行操作中。它们能够用来实现 counters 和 sums 。Spark原生支持数值类型的累加器,开发者 可以自己添加支持的类型。 如果创建了一个具名的累加器,它可以在spark的UI中显示。这对 于理解 运 行 阶 段(running stages)的 过 程有很重要的作用。(注意: 这 在python中 还 不被支 持) 一个累加器可以通 过调 用 SparkContext.accumulator(v) 方法从一个初始 变 量 v 中 创 建。 运 行在 集群上的任 务 可以通 过 add 方法或者使用 += 操作来 给 它加 值 。然而,它 们 无法 读 取 这 个 值 。只有 驱动 程序可以使用 value 方法来 读 取累加器的 值 。 如下的代 码 ,展示了如何利用累 加器将一个数 组 里面的所有元素相加:
<span style="font-size:24px;">scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10</span>
这 个例子利用了内置的整数 类 型累加器。开 发 者可以利用子 类 AccumulatorParam 创 建自己的 累加器 类 型。AccumulatorParam接口有 两 个方法: zero 方法 为 你的数据 类 型提供一个“0 值 ”(zero value); addInPlace 方法 计 算 两 个 值 的和。例如,假 设 我 们 有一个 Vector 类 代表 数学上的向量,我 们 能 够 如下定 义 累加器:
<span style="font-size:24px;">object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)</span>
在scala中,Spark支持用更一般的Accumulable接口来累积数据-结果类型和用于累加的元素类 型 不一 样 (例如通 过 收集的元素建立一个列表)。Spark也支持用 SparkContext.accumulableCollection 方法累加一般的scala集合类型。
从 spark 官方网站 查 看一些 spark 运 行例子。 另 外, Spark 的 example 目 录 包含几个 Spark 例子,你能 够 通 过 如下方式 运 行 Java 或者 scala 例子:./bin/run-example SparkPi
为 了 优 化你的 项 目, configuration 和 tuning 指南提高了最佳实 践的信息保 证你保存在内存 中的数据是有效的格式是非常重要的事情。