大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化

简介: 大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


RDD容错机制

RDD分区机制

RDD分区器

RDD自定义分区器

广播变量

基本介绍

有时候需要在多个任务之间共享变量,或者在任务(Task)和 Driver Program 之间共享变量。

为了满足这个需求,Spark提供了两种类型的变量。


广播变量(broadcast variable)

累加器(accumulators)

广播变量、累加器的主要作用是为了优化Spark程序。

广播变量将变量在节点的Executor之间进行共享(由Driver广播),广播变量用来高效分发较大的对象,向所有工作节点(Executor)发送一个较大的只读值,以供一个或多个操作使用。


使用广播变量的过程如下:


对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象,任何可序列化的类型都可以这么实现(在Driver端)

通过Value属性访问该对象的值(Executor中)

变量只会被分到各个Executor一次,作为只读值处理

广播变量的相关参数:

  • spark.broadcast.blockSize(缺省值: 4m)
  • spark.broadcast.checksum(缺省值:true)
  • spark.broadcast.compree(缺省值:true)

变量应用

普通JOIN

MapSideJoin

生成数据 test_spark_01.txt

1000;商品1
1001;商品2
1002;商品3
1003;商品4
1004;商品5
1005;商品6
1006;商品7
1007;商品8
1008;商品9

生成数据格式如下:

生成数据 test_spark_02.txt

10000;订单1;1000
10001;订单2;1001
10002;订单3;1002
10003;订单4;1003
10004;订单5;1004
10005;订单6;1005
10006;订单7;1006
10007;订单8;1007
10008;订单9;1008

生成的数据格式如下:

编写代码1

我们编写代码进行测试

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object JoinDemo {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("JoinDemo")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)
    sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)

    val productRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_01.txt")
      .map {
        line => val fields = line.split(";")
          (fields(0), line)
      }

    val orderRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_02.txt", 8)
      .map {
        line => val fields = line.split(";")
          (fields(2), line)
      }

    val resultRDD = productRDD.join(orderRDD)
    println(resultRDD.count())
    Thread.sleep(100000)
    sc.stop()
  }

}

编译打包1

mvn clean package
• 1

并上传到服务器,准备运行

运行测试1

spark-submit --master local[*] --class icu.wzk.JoinDemo spark-wordcount-1.0-SNAPSHOT.jar
• 1

提交任务并执行,注意数据的路径,查看下图:

运行结果可以查看到,运行了: 2.203100 秒 (取决于你的数据量的多少)



运行结果可以查看到,运行了: 2.203100 秒 (取决于你的数据量的多少)

2024-07-19 10:35:08,808 INFO  [main] scheduler.DAGScheduler (Logging.scala:logInfo(54)) - Job 0 finished: count at JoinDemo.scala:32, took 2.203100 s
200

编写代码2

接下来,我们对比使用 MapSideJoin 的方式

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapSideJoin {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("MapSideJoin")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)
    sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)

    val productRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_01.txt")
      .map {
        line => val fields = line.split(";")
          (fields(0), line)
      }

    val productBC = sc.broadcast(productRDD.collectAsMap())

    val orderRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_02.txt")
      .map {
        line => val fields = line.split(";")
          (fields(2), line)
      }

    val resultRDD = orderRDD
      .map {
        case (pid, orderInfo) =>
          val productInfo = productBC.value
          (pid, (orderInfo, productInfo.getOrElse(pid, null)))
      }
    println(resultRDD.count())

    sc.stop()
  }

}

编译打包2

mvn clean package
• 1

编译后上传到服务器准备执行:

运行测试2

spark-submit --master local[*] --class icu.wzk.MapSideJoin spark-wordcount-1.0-SNAPSHOT.jar

启动我们的程序,并观察结果

我们可以观察到,这次只用了 0.10078 秒就完成了任务:

累加器

基本介绍

累加器的作用:可以实现一个变量在不同的Executor端能保持状态的累加。

累加器在Driver端定义、读取,在Executor中完成累加。

累加器也是Lazy的,需要Action触发:Action触发一次,执行一次;触发多次,执行多次。


Spark内置了三种类型的累加器,分别是:


LongAccumulator 用来累加整数型

DoubleAccumulator 用来累加浮点型

CollectionAccumulator 用来累加集合元素

运行测试

我们可以在 SparkShell 中进行一些简单的测试,目前我在 h122 节点上,启动SparkShell

spark-shell --master local[*]

启动的主界面如下:

写入如下的内容进行测试:


val data = sc.makeRDD("hadoop spark hive hbase java scala hello world spark scala java hive".split("\\s+"))
val acc1 = sc.longAccumulator("totalNum1")
val acc2 = sc.doubleAccumulator("totalNum2")
val acc3 = sc.collectionAccumulator[String]("allwords")

我们进行测试的结果如下图所示:

继续编写一段进行测试:

val rdd = data.map{word => acc1.add(word.length); acc2.add(word.length); acc3.add(word); word}
rdd.count
rdd.collect

println(acc1.value)
println(acc2.value)
println(acc3.value)

我们进行测试的结果如下:

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
8月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
431 0
|
11月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
579 79
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
253 0
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
612 15
|
12月前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
360 0
【赵渝强老师】Spark RDD的缓存机制
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
219 4
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
213 4
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
216 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
存储 分布式计算 并行计算
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
279 0