Spark RDD高级应用(传参、闭包、共享变量)

简介: Spark RDD高级应用(传参、闭包、共享变量)

0x00 教程内容


  1. 向Spark传递参数的两种方式
  2. 闭包的概念及实操
  3. 共享变量的两种方式


0x01 向Spark传递参数的方式


Spark 中的大部分操作都依赖于用户传递的函数,主要有两种方式:


方式一:匿名函数

方式二:传入静态方法和传入方法的引用


1. 匿名函数

在前面的教程中,我们用到了很多次这种方式,如这句:


val wordRDD = textFileRDD.flatMap(line => line.split(" "))


=> 左边是参数,参数可以省略参数类型,右边是函数体。


关于匿名函数:


var inc = (x:Int) => x+1


也可以这样写,可能会更易懂一点


var inc = ((x:Int) => x+1)


或者:


var inc = {(x:Int) => x+1}


其实这句匿名函数,跟下面这种写法是一样的意思:


def add2 = new Function1[Int,Int]{  

   def apply(x:Int):Int = x+1;  

}


使用的时候,直接把 inc 当成函数名就可以了,Java 是一门面向对象的编程语言,一切皆对象;而 Scala 是一门面向函数的编程语言,一切皆函数!


使用方式:


inc(1)


或者:


var x = inc(7)-1


我们也可以在匿名函数中可以定义多个参数:


var mul = (x: Int, y: Int) => x*y


使用方式如下:


println(mul(3, 4))


我们也可以不给匿名函数设置参数:


var userDir = () => { System.getProperty("user.dir") }


使用方式如下:


println( userDir() )


2. 传入静态方法和传入方法的引用

我们可以传入全局单例对象的静态方法。比如:先定义 Object Functions,然后传递 Functions.func1。


val nums = sc.parallelize(List(1,2,3,4))
object Functions{
    def func1(x:Int):Int = {x+1}
}
nums.map(Functions.func1).collect()


注意:虽然可以在同一个类实例中传递方法的引用,但是需要发送包含该类的对象连同该方法集群,此过程会比仅传递静态方法到集群的开销更大。


0x02 闭包


1. 闭包的概念

闭包是一个函数,返回值依赖于声明在函数外部的一个或多个变量。

闭包可以简单的理解为:可以访问一个函数里面局部变量的另外一个函数。

2. 闭包实操

我们现在有个需求:计算所有元素的总和


val data = Array(1,2,3,4,5)
var counter = 0
var rdd = sc.parallelize(data)
rdd.foreach(x => counter+=x)
println("counter的值:" + counter)


image.png


此时发现了一个问题,计算结果竟然是0!原因是在执行的时候,Spark会将 RDD 拆分成多个 task,并且分发给不同的执行器(executor)执行,而分发到每个执行器的counter是复制过去的,执行器之间不能相互访问,导致执行器在执行foreach方法的时候只能修改自己执行器中的值,驱动程序中的counter值没有被修改,所以最终输出的counter值依然是 0。这种场景有点像函数中参数的调用,调用的时候,应该传入变量的地址,否则,当需要调用的函数执行完结果后,不会把执行后的结果返回给主函数。


在上面的代码中,我们的foreach() 函数就是 闭包 。在执行的过程中,引用了函数外部的counter变量。


那我们可以怎么去解决以上的问题呢?


此时,我们可以使用共享全局变量的方式来解决:


val data = Array(1,2,3,4,5)
// 使用全局共享变量的累加器(accumulator)
val counter = sc.accumulator(0)
var rdd = sc.parallelize(data)
rdd.foreach(x => counter+=x)
println("counter的值:" + counter)


image.png


发现结果是对的,此时已经将 counter变量设置为共享全局变量。


Spark 有两种共享变量的方式:广播变量(broadcast variable)与累加器(accumulator),此处我们使用的是累加器,下面还会详细讲解。


3. 打印 RDD 的元素

如果是 Spark 是单机模式,可以使用 rdd.foreach(println)或rdd.map(println)来打印 RDD 的元素。但是如果是集群模式,就需要考虑闭包问题了,因为此时,执行器输出写入的是执行器的stdout,而不是驱动程序的 stdout。所以如果那么要想在驱动程序中打印元素,可以怎么操作呢?

如果数据量不多的话可以使用 collect()方法:

rdd.collect().foreach(println);


如果数据量很大的话可以使用 take()方法:

rdd.take(num).foreach(println);


RDD 算子相关教程参考: Spark RDD的实操教程(二)


0x03 共享变量


Spark 为了能够让多节点之间共享变量,提供了两种方法:广播变量和累加器。

1. 广播变量

广播变量可以在每个节点上缓存一个只读的变量,可以提高数据的分配效率。

  1. 创建方式:


sc.broadcast(v)


可以在Spark Shell中执行:

// 创建广播变量
val broadcastVar = sc.broadcast(Array(1, 2, 3))
// 查看广播变量的值
broadcastVar.value


image.png


2. 累加器

累加器只能够通过加操作的变量,因此在并行操作中具备更高的效率,可以实现 counterssums

可以在Spark Shell中执行:

// 创建累加器
val accum = sc.accumulator(0, "Accumulator")
// 对RDD数据集的数值进行累加
sc.parallelize(Array(1, 2, 3)).foreach(x => accum += x)
// 查看累加器的结果
accum.value


image.png


0xFF 总结


  1. 本文介绍了向Spark传递参数的两种方式,其实一般只会用匿名函数这样的用法;闭包的概念其实也比较简单,需要理解;共享变量的两种方式必须要掌握,包括原理。
  2. 请继续学习相关课程,加油!



相关文章
|
1月前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
1月前
|
分布式计算 Serverless 数据处理
|
1月前
|
分布式计算 资源调度 测试技术
“Spark Streaming异常处理秘籍:揭秘如何驯服实时数据流的猛兽,守护你的应用稳如泰山,不容错过!”
【8月更文挑战第7天】Spark Streaming 是 Apache Spark 中的关键组件,用于实时数据流处理。部署时可能遭遇数据问题、资源限制或逻辑错误等异常。合理处理这些异常对于保持应用稳定性至关重要。基础在于理解其异常处理机制,通过 DSC 将数据流切分为 RDD。对于数据异常,可采用 try-catch 结构捕获并处理;资源层面异常需优化 Spark 配置,如调整内存分配;逻辑异常则需加强单元测试及集成测试。结合监控工具,可全面提升应用的健壮性和可靠性。
63 3
|
1月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
39 0
|
2月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
99 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
1月前
|
存储 分布式计算 监控
Spark中广播变量
【8月更文挑战第13天】
62 0
|
2月前
|
分布式计算 Hadoop Serverless
数据处理的艺术:EMR Serverless Spark实践及应用体验
阿里云EMR Serverless Spark是基于Spark的全托管大数据处理平台,融合云原生弹性与自动化,提供任务全生命周期管理,让数据工程师专注数据分析。它内置高性能Fusion Engine,性能比开源Spark提升200%,并有成本优化的Celeborn服务。支持计算存储分离、OSS-HDFS兼容、DLF元数据管理,实现一站式的开发体验和Serverless资源管理。适用于数据报表、科学项目等场景,简化开发与运维流程。用户可通过阿里云控制台快速配置和体验EMR Serverless Spark服务。
|
4月前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
56580 7
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
3月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
94 6
|
3月前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。