Spark学习--day05、累加器

简介: Spark学习--day05、累加器

 实现原理

累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。 image.png image.png

package Acc
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

/**
 *  累加器
 少加:转换算子中调用累加器,如果没有行动算子,那么不会执行
 *  多加:转换算子调用累加器,如果没有行动算子,那么不会执行
 */
object Spark01_RDD_acc {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    //var sum : Int = 0

    //创建累加器
    val sum: LongAccumulator = sc.longAccumulator("sum")

    //注册累加器

    rdd.foreach(
      //算子内代码 -> Executor
      //使用累加器
      num => sum.add(num)
    )

    //Driver端的代码
    //获取累加器的值
    println(sum.value)

    sc.stop()

  }

}
package Acc
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
object Spark01_RDD_acc1 {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)

    val words: RDD[String] = sc.makeRDD(List("a", "a", "b", "c", "b", "a"), 2)

    //使用自定义累加器的步骤:
    //1. 创建累加器
    val wordCountAcc = new WordCountAcc

    //2. 注册累加器
    sc.register(wordCountAcc, "wcAcc")

    //3. 使用累加器
    words.foreach(
      word => wordCountAcc.add(word)
    )

    //4. 获取累加器的结果

    val result: mutable.Map[String, Int] = wordCountAcc.value
    println(result)


    sc.stop()

  }

  /**
   * 自定义累加器
   *   1. 继承AccumulatorV2类
   *      2. 指定泛型:
   *      IN:  String
   *
   * OUT: mutable.Map[String,Int]
   *
   * 3. 实现抽象方法
   *
   *
   */
  class WordCountAcc extends AccumulatorV2[String, mutable.Map[String, Int]] {

    private var resultMap = mutable.Map[String, Int]()

    // 是否是初始状态
    override def isZero: Boolean = resultMap.isEmpty

    override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new WordCountAcc

    override def reset(): Unit = resultMap.clear()

    // 往累加器中添加元素
    override def add(word: String): Unit = {
      //将传入的单词加到resultMap中
      val oldCnt: Int = resultMap.getOrElse(word, 0)
      resultMap.put(word, oldCnt + 1)

    }

    // 合并多个累加器的值
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
      val otherMap: mutable.Map[String, Int] = other.value
      for ((word, cnt) <- otherMap) {
        val oldcnt: Int = resultMap.getOrElse(word, 0)
        resultMap.put(word, oldcnt + cnt)
      }

    }

    //获取累加器的值
    override def value: mutable.Map[String, Int] = resultMap
  }

}

广播变量

实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。

package bc
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
object Spark01_RDD_bc {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd1: RDD[(String, Int)] = sc.makeRDD(
      List(
        ("a", 1), ("b", 1)
      )
    )

    var mapData = mutable.Map[String, Int](("a", 2), ("b", 3))

    //使用广播变量
    val bcMap: Broadcast[mutable.Map[String, Int]] = sc.broadcast(mapData)


    val result: RDD[(String, (Int, Int))] = rdd1.map {
      case (word, cnt) => {
        val mapV: Int = bcMap.value.getOrElse(word, 0)
        (word, (cnt, mapV))
      }
    }
    result.collect().foreach(println)



    //    val rdd2: RDD[(String, Int)] = sc.makeRDD(
    //      List(
    //        ("a", 2), ("b", 3)
    //      )
    //    )

    // (a,(1,2))   (b,(1,3))

    //val result: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
    //result.collect().foreach(println)


    sc.stop()

  }

}
相关文章
|
3月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
77 5
|
3月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
58 3
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
78 0
|
3月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
82 0
|
3月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
25 0
|
3月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
55 0
|
4月前
|
分布式计算 Shell Scala
学习使用Spark
学习使用Spark
131 3
|
5月前
|
分布式计算 Shell Scala
如何开始学习使用Spark?
【8月更文挑战第31天】如何开始学习使用Spark?
141 2
|
8月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
488 1
|
8月前
|
分布式计算 Spark 索引
Spark学习---day07、Spark内核(Shuffle、任务执行)
Spark学习---day07、Spark内核(源码提交流程、任务执行)
119 2