Spark学习--day05、累加器

简介: Spark学习--day05、累加器

 实现原理

累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。 image.png image.png

package Acc
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

/**
 *  累加器
 少加:转换算子中调用累加器,如果没有行动算子,那么不会执行
 *  多加:转换算子调用累加器,如果没有行动算子,那么不会执行
 */
object Spark01_RDD_acc {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    //var sum : Int = 0

    //创建累加器
    val sum: LongAccumulator = sc.longAccumulator("sum")

    //注册累加器

    rdd.foreach(
      //算子内代码 -> Executor
      //使用累加器
      num => sum.add(num)
    )

    //Driver端的代码
    //获取累加器的值
    println(sum.value)

    sc.stop()

  }

}
package Acc
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
object Spark01_RDD_acc1 {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)

    val words: RDD[String] = sc.makeRDD(List("a", "a", "b", "c", "b", "a"), 2)

    //使用自定义累加器的步骤:
    //1. 创建累加器
    val wordCountAcc = new WordCountAcc

    //2. 注册累加器
    sc.register(wordCountAcc, "wcAcc")

    //3. 使用累加器
    words.foreach(
      word => wordCountAcc.add(word)
    )

    //4. 获取累加器的结果

    val result: mutable.Map[String, Int] = wordCountAcc.value
    println(result)


    sc.stop()

  }

  /**
   * 自定义累加器
   *   1. 继承AccumulatorV2类
   *      2. 指定泛型:
   *      IN:  String
   *
   * OUT: mutable.Map[String,Int]
   *
   * 3. 实现抽象方法
   *
   *
   */
  class WordCountAcc extends AccumulatorV2[String, mutable.Map[String, Int]] {

    private var resultMap = mutable.Map[String, Int]()

    // 是否是初始状态
    override def isZero: Boolean = resultMap.isEmpty

    override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new WordCountAcc

    override def reset(): Unit = resultMap.clear()

    // 往累加器中添加元素
    override def add(word: String): Unit = {
      //将传入的单词加到resultMap中
      val oldCnt: Int = resultMap.getOrElse(word, 0)
      resultMap.put(word, oldCnt + 1)

    }

    // 合并多个累加器的值
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
      val otherMap: mutable.Map[String, Int] = other.value
      for ((word, cnt) <- otherMap) {
        val oldcnt: Int = resultMap.getOrElse(word, 0)
        resultMap.put(word, oldcnt + cnt)
      }

    }

    //获取累加器的值
    override def value: mutable.Map[String, Int] = resultMap
  }

}

广播变量

实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。

package bc
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
object Spark01_RDD_bc {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)

    val rdd1: RDD[(String, Int)] = sc.makeRDD(
      List(
        ("a", 1), ("b", 1)
      )
    )

    var mapData = mutable.Map[String, Int](("a", 2), ("b", 3))

    //使用广播变量
    val bcMap: Broadcast[mutable.Map[String, Int]] = sc.broadcast(mapData)


    val result: RDD[(String, (Int, Int))] = rdd1.map {
      case (word, cnt) => {
        val mapV: Int = bcMap.value.getOrElse(word, 0)
        (word, (cnt, mapV))
      }
    }
    result.collect().foreach(println)



    //    val rdd2: RDD[(String, Int)] = sc.makeRDD(
    //      List(
    //        ("a", 2), ("b", 3)
    //      )
    //    )

    // (a,(1,2))   (b,(1,3))

    //val result: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
    //result.collect().foreach(println)


    sc.stop()

  }

}
相关文章
|
1月前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
66 11
|
1月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
75 1
|
1月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
41 1
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
107 1
|
1月前
|
分布式计算 Spark 索引
Spark学习---day07、Spark内核(Shuffle、任务执行)
Spark学习---day07、Spark内核(源码提交流程、任务执行)
41 2
|
1月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
41 2
|
1月前
|
分布式计算 Spark
Spark【Spark学习大纲】简介+生态+RDD+安装+使用(xmind分享)
【2月更文挑战第14天】Spark【Spark学习大纲】简介+生态+RDD+安装+使用(xmind分享)
32 1
|
1月前
|
存储 缓存 分布式计算
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
39 1
|
2月前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
42 1
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
161 0