大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


RDD容错机制

RDD分区机制

RDD分区器

RDD自定义分区器

广播变量

基本介绍

有时候需要在多个任务之间共享变量,或者在任务(Task)和 Driver Program 之间共享变量。

为了满足这个需求,Spark提供了两种类型的变量。


广播变量(broadcast variable)

累加器(accumulators)

广播变量、累加器的主要作用是为了优化Spark程序。

广播变量将变量在节点的Executor之间进行共享(由Driver广播),广播变量用来高效分发较大的对象,向所有工作节点(Executor)发送一个较大的只读值,以供一个或多个操作使用。


使用广播变量的过程如下:


对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象,任何可序列化的类型都可以这么实现(在Driver端)

通过Value属性访问该对象的值(Executor中)

变量只会被分到各个Executor一次,作为只读值处理

广播变量的相关参数:

  • spark.broadcast.blockSize(缺省值: 4m)
  • spark.broadcast.checksum(缺省值:true)
  • spark.broadcast.compree(缺省值:true)

变量应用

普通JOIN

MapSideJoin

生成数据 test_spark_01.txt

1000;商品1
1001;商品2
1002;商品3
1003;商品4
1004;商品5
1005;商品6
1006;商品7
1007;商品8
1008;商品9

生成数据格式如下:

生成数据 test_spark_02.txt

10000;订单1;1000
10001;订单2;1001
10002;订单3;1002
10003;订单4;1003
10004;订单5;1004
10005;订单6;1005
10006;订单7;1006
10007;订单8;1007
10008;订单9;1008

生成的数据格式如下:

编写代码1

我们编写代码进行测试

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object JoinDemo {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("JoinDemo")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)
    sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)

    val productRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_01.txt")
      .map {
        line => val fields = line.split(";")
          (fields(0), line)
      }

    val orderRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_02.txt", 8)
      .map {
        line => val fields = line.split(";")
          (fields(2), line)
      }

    val resultRDD = productRDD.join(orderRDD)
    println(resultRDD.count())
    Thread.sleep(100000)
    sc.stop()
  }

}

编译打包1

mvn clean package
• 1

并上传到服务器,准备运行

运行测试1

spark-submit --master local[*] --class icu.wzk.JoinDemo spark-wordcount-1.0-SNAPSHOT.jar
• 1

提交任务并执行,注意数据的路径,查看下图:

运行结果可以查看到,运行了: 2.203100 秒 (取决于你的数据量的多少)



运行结果可以查看到,运行了: 2.203100 秒 (取决于你的数据量的多少)

2024-07-19 10:35:08,808 INFO  [main] scheduler.DAGScheduler (Logging.scala:logInfo(54)) - Job 0 finished: count at JoinDemo.scala:32, took 2.203100 s
200

编写代码2

接下来,我们对比使用 MapSideJoin 的方式

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapSideJoin {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("MapSideJoin")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)
    sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)

    val productRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_01.txt")
      .map {
        line => val fields = line.split(";")
          (fields(0), line)
      }

    val productBC = sc.broadcast(productRDD.collectAsMap())

    val orderRDD: RDD[(String, String)] = sc
      .textFile("data/test_spark_02.txt")
      .map {
        line => val fields = line.split(";")
          (fields(2), line)
      }

    val resultRDD = orderRDD
      .map {
        case (pid, orderInfo) =>
          val productInfo = productBC.value
          (pid, (orderInfo, productInfo.getOrElse(pid, null)))
      }
    println(resultRDD.count())

    sc.stop()
  }

}

编译打包2

mvn clean package
• 1

编译后上传到服务器准备执行:

运行测试2

spark-submit --master local[*] --class icu.wzk.MapSideJoin spark-wordcount-1.0-SNAPSHOT.jar

启动我们的程序,并观察结果

我们可以观察到,这次只用了 0.10078 秒就完成了任务:

累加器

基本介绍

累加器的作用:可以实现一个变量在不同的Executor端能保持状态的累加。

累加器在Driver端定义、读取,在Executor中完成累加。

累加器也是Lazy的,需要Action触发:Action触发一次,执行一次;触发多次,执行多次。


Spark内置了三种类型的累加器,分别是:


LongAccumulator 用来累加整数型

DoubleAccumulator 用来累加浮点型

CollectionAccumulator 用来累加集合元素

运行测试

我们可以在 SparkShell 中进行一些简单的测试,目前我在 h122 节点上,启动SparkShell

spark-shell --master local[*]

启动的主界面如下:

写入如下的内容进行测试:


val data = sc.makeRDD("hadoop spark hive hbase java scala hello world spark scala java hive".split("\\s+"))
val acc1 = sc.longAccumulator("totalNum1")
val acc2 = sc.doubleAccumulator("totalNum2")
val acc3 = sc.collectionAccumulator[String]("allwords")

我们进行测试的结果如下图所示:

继续编写一段进行测试:

val rdd = data.map{word => acc1.add(word.length); acc2.add(word.length); acc3.add(word); word}
rdd.count
rdd.collect

println(acc1.value)
println(acc2.value)
println(acc3.value)

我们进行测试的结果如下:

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
43 0
|
3天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
13 2
|
1月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
45 0
|
13天前
|
存储 NoSQL 大数据
大数据 数据存储优化
【10月更文挑战第25天】
46 2
|
15天前
|
分布式计算 监控 大数据
如何优化Spark中的shuffle操作?
【10月更文挑战第18天】
|
1月前
|
SQL 分布式计算 NoSQL
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
28 1
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
|
1月前
|
存储 分布式计算 监控
Spark如何优化?需要注意哪些方面?
【10月更文挑战第10天】Spark如何优化?需要注意哪些方面?
39 6
|
1月前
|
存储 大数据 分布式数据库
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
33 1
|
1月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
38 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
1月前
|
SQL 存储 监控
大数据-161 Apache Kylin 构建Cube 按照日期、区域、产品、渠道 与 Cube 优化
大数据-161 Apache Kylin 构建Cube 按照日期、区域、产品、渠道 与 Cube 优化
48 0

热门文章

最新文章