实现原理
累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
package Acc import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} /** * 累加器 少加:转换算子中调用累加器,如果没有行动算子,那么不会执行 * 多加:转换算子调用累加器,如果没有行动算子,那么不会执行 */ object Spark01_RDD_acc { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) //var sum : Int = 0 //创建累加器 val sum: LongAccumulator = sc.longAccumulator("sum") //注册累加器 rdd.foreach( //算子内代码 -> Executor //使用累加器 num => sum.add(num) ) //Driver端的代码 //获取累加器的值 println(sum.value) sc.stop() } }
package Acc import org.apache.spark.rdd.RDD import org.apache.spark.util.{AccumulatorV2, LongAccumulator} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object Spark01_RDD_acc1 { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) val words: RDD[String] = sc.makeRDD(List("a", "a", "b", "c", "b", "a"), 2) //使用自定义累加器的步骤: //1. 创建累加器 val wordCountAcc = new WordCountAcc //2. 注册累加器 sc.register(wordCountAcc, "wcAcc") //3. 使用累加器 words.foreach( word => wordCountAcc.add(word) ) //4. 获取累加器的结果 val result: mutable.Map[String, Int] = wordCountAcc.value println(result) sc.stop() } /** * 自定义累加器 * 1. 继承AccumulatorV2类 * 2. 指定泛型: * IN: String * * OUT: mutable.Map[String,Int] * * 3. 实现抽象方法 * * */ class WordCountAcc extends AccumulatorV2[String, mutable.Map[String, Int]] { private var resultMap = mutable.Map[String, Int]() // 是否是初始状态 override def isZero: Boolean = resultMap.isEmpty override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new WordCountAcc override def reset(): Unit = resultMap.clear() // 往累加器中添加元素 override def add(word: String): Unit = { //将传入的单词加到resultMap中 val oldCnt: Int = resultMap.getOrElse(word, 0) resultMap.put(word, oldCnt + 1) } // 合并多个累加器的值 override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = { val otherMap: mutable.Map[String, Int] = other.value for ((word, cnt) <- otherMap) { val oldcnt: Int = resultMap.getOrElse(word, 0) resultMap.put(word, oldcnt + cnt) } } //获取累加器的值 override def value: mutable.Map[String, Int] = resultMap } }
广播变量
实现原理
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。
package bc import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object Spark01_RDD_bc { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) val rdd1: RDD[(String, Int)] = sc.makeRDD( List( ("a", 1), ("b", 1) ) ) var mapData = mutable.Map[String, Int](("a", 2), ("b", 3)) //使用广播变量 val bcMap: Broadcast[mutable.Map[String, Int]] = sc.broadcast(mapData) val result: RDD[(String, (Int, Int))] = rdd1.map { case (word, cnt) => { val mapV: Int = bcMap.value.getOrElse(word, 0) (word, (cnt, mapV)) } } result.collect().foreach(println) // val rdd2: RDD[(String, Int)] = sc.makeRDD( // List( // ("a", 2), ("b", 3) // ) // ) // (a,(1,2)) (b,(1,3)) //val result: RDD[(String, (Int, Int))] = rdd1.join(rdd2) //result.collect().foreach(println) sc.stop() } }