Spark RDD高级应用(传参、闭包、共享变量)

简介: Spark RDD高级应用(传参、闭包、共享变量)

0x00 教程内容


  1. 向Spark传递参数的两种方式
  2. 闭包的概念及实操
  3. 共享变量的两种方式


0x01 向Spark传递参数的方式


Spark 中的大部分操作都依赖于用户传递的函数,主要有两种方式:


方式一:匿名函数

方式二:传入静态方法和传入方法的引用


1. 匿名函数

在前面的教程中,我们用到了很多次这种方式,如这句:


val wordRDD = textFileRDD.flatMap(line => line.split(" "))


=> 左边是参数,参数可以省略参数类型,右边是函数体。


关于匿名函数:


var inc = (x:Int) => x+1


也可以这样写,可能会更易懂一点


var inc = ((x:Int) => x+1)


或者:


var inc = {(x:Int) => x+1}


其实这句匿名函数,跟下面这种写法是一样的意思:


def add2 = new Function1[Int,Int]{  

   def apply(x:Int):Int = x+1;  

}


使用的时候,直接把 inc 当成函数名就可以了,Java 是一门面向对象的编程语言,一切皆对象;而 Scala 是一门面向函数的编程语言,一切皆函数!


使用方式:


inc(1)


或者:


var x = inc(7)-1


我们也可以在匿名函数中可以定义多个参数:


var mul = (x: Int, y: Int) => x*y


使用方式如下:


println(mul(3, 4))


我们也可以不给匿名函数设置参数:


var userDir = () => { System.getProperty("user.dir") }


使用方式如下:


println( userDir() )


2. 传入静态方法和传入方法的引用

我们可以传入全局单例对象的静态方法。比如:先定义 Object Functions,然后传递 Functions.func1。


val nums = sc.parallelize(List(1,2,3,4))
object Functions{
    def func1(x:Int):Int = {x+1}
}
nums.map(Functions.func1).collect()


注意:虽然可以在同一个类实例中传递方法的引用,但是需要发送包含该类的对象连同该方法集群,此过程会比仅传递静态方法到集群的开销更大。


0x02 闭包


1. 闭包的概念

闭包是一个函数,返回值依赖于声明在函数外部的一个或多个变量。

闭包可以简单的理解为:可以访问一个函数里面局部变量的另外一个函数。

2. 闭包实操

我们现在有个需求:计算所有元素的总和


val data = Array(1,2,3,4,5)
var counter = 0
var rdd = sc.parallelize(data)
rdd.foreach(x => counter+=x)
println("counter的值:" + counter)


image.png


此时发现了一个问题,计算结果竟然是0!原因是在执行的时候,Spark会将 RDD 拆分成多个 task,并且分发给不同的执行器(executor)执行,而分发到每个执行器的counter是复制过去的,执行器之间不能相互访问,导致执行器在执行foreach方法的时候只能修改自己执行器中的值,驱动程序中的counter值没有被修改,所以最终输出的counter值依然是 0。这种场景有点像函数中参数的调用,调用的时候,应该传入变量的地址,否则,当需要调用的函数执行完结果后,不会把执行后的结果返回给主函数。


在上面的代码中,我们的foreach() 函数就是 闭包 。在执行的过程中,引用了函数外部的counter变量。


那我们可以怎么去解决以上的问题呢?


此时,我们可以使用共享全局变量的方式来解决:


val data = Array(1,2,3,4,5)
// 使用全局共享变量的累加器(accumulator)
val counter = sc.accumulator(0)
var rdd = sc.parallelize(data)
rdd.foreach(x => counter+=x)
println("counter的值:" + counter)


image.png


发现结果是对的,此时已经将 counter变量设置为共享全局变量。


Spark 有两种共享变量的方式:广播变量(broadcast variable)与累加器(accumulator),此处我们使用的是累加器,下面还会详细讲解。


3. 打印 RDD 的元素

如果是 Spark 是单机模式,可以使用 rdd.foreach(println)或rdd.map(println)来打印 RDD 的元素。但是如果是集群模式,就需要考虑闭包问题了,因为此时,执行器输出写入的是执行器的stdout,而不是驱动程序的 stdout。所以如果那么要想在驱动程序中打印元素,可以怎么操作呢?

如果数据量不多的话可以使用 collect()方法:

rdd.collect().foreach(println);


如果数据量很大的话可以使用 take()方法:

rdd.take(num).foreach(println);


RDD 算子相关教程参考: Spark RDD的实操教程(二)


0x03 共享变量


Spark 为了能够让多节点之间共享变量,提供了两种方法:广播变量和累加器。

1. 广播变量

广播变量可以在每个节点上缓存一个只读的变量,可以提高数据的分配效率。

  1. 创建方式:


sc.broadcast(v)


可以在Spark Shell中执行:

// 创建广播变量
val broadcastVar = sc.broadcast(Array(1, 2, 3))
// 查看广播变量的值
broadcastVar.value


image.png


2. 累加器

累加器只能够通过加操作的变量,因此在并行操作中具备更高的效率,可以实现 counterssums

可以在Spark Shell中执行:

// 创建累加器
val accum = sc.accumulator(0, "Accumulator")
// 对RDD数据集的数值进行累加
sc.parallelize(Array(1, 2, 3)).foreach(x => accum += x)
// 查看累加器的结果
accum.value


image.png


0xFF 总结


  1. 本文介绍了向Spark传递参数的两种方式,其实一般只会用匿名函数这样的用法;闭包的概念其实也比较简单,需要理解;共享变量的两种方式必须要掌握,包括原理。
  2. 请继续学习相关课程,加油!



相关文章
|
4月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
482 1
|
6天前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
【赵渝强老师】Spark RDD的缓存机制
|
15天前
|
机器学习/深度学习 分布式计算 大数据
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
135 15
|
3月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
24天前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
65 15
|
4月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
64 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
4月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
56 0
|
4月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
141 0
|
3月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
253 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
4月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
101 0