Spark RDD编程基础(Scala版)

简介: Spark RDD编程基础(Scala版)

RDD :弹性分布式数据集(分布式的元素集合)

Spark中,对数据的所有操作不外乎创建RDD,转化已有的RDD以及调用RDD操作进行求值,Spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。

1、创建RDD

Spark有两种创建RDD的方式,一种是读取外部数据集,一种是在驱动器中对一个集合进行并行化。

最简单的创建RDD的方法就是讲程序中已有的一个集合传给SparkContext的parallelize()方法,这种方法可以快速的创建出RDD并对RDD进行操作,但是这种方法并不常用,方法如下:

val lines = sc.parallelize(List("pandas","i like pandas"))

常用的方法是从外部存储中读取数据来创建RDD,方法如下:

val lines = sc.textFile("/path/to/README.md")

2、RDD操作


RDD支持两种操作:转化操作和行动操作。

转化操作是会返回一个新的RDD的操作,比如map(),filter(),行动操作是向驱动器程序返回结果或者把结果写入外部系统的操作,会触发实际的计算,比如count(),first()。如果在实际操作中分不清自己的操作是什么操作,可以查看当前操作的返回值类型,转化操作返回的是RDD,行动操作返回的是其他类型。

转化操作   (是一种lazy操作)举例:filter()转化    下面程序展示了将log.txt中的error过滤出来。

val inputRDD = sc.textFile("log.txt")
val errorRDD = inputRDD.filter(line => line.contains("error"))

行动操作   会把最终的结果返回到驱动程序,或者写入外部存储系统中,由于行动操作需要生成实际的输出,他们会强制执行那些求值必须用到的RDD的转化操作。


例:在scala中使用行动操作对错误进行计数

println("Input had " + badLinesRDD.count() + "concerning lines")
println("Here are 10 examples:")
badLinesRDD.take(10).foreach(println)

3、常见的转化操作和行动操作


基本RDD


最常用的转化操作是map()和filter()


map()接收一个函数,把这个函数用于RDD中的每一个元素,将函数的返回结果作为结果RDD中对应元素的值。


filter()接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。


例:  inputRDD  =  {1,2,3,4}

       Mapped RDD (x*x)  = {1,4,9,16}

       Filtered RDD (x!=1)  = {2,3,4}

代码举例  (计算平方)

val input = sc.parallelize(List(1,2,3,4))
val result = input.map(x => x*x)
println(result.collect().mkString(","))

如果我们希望对每个输出元素生成多个输出元素,实现该功能的操作叫做flatMap()

使用flatMap将行数据切分为单词

val lines = sc.parallelize(List("hello world","hi"))
val words = lines.flatMap(line => line.split(" "))
words.first()   //"返回hello"

map 和 flatmap的区别举例


RDD1 = {"coffee panda","happy panda","happiest panda party"}

tokenize("coffee panda") = List("coffee","panda")

RDD1.map(tokenize) ={["coffee","panda"],["happy","panda"],["happiest","panda","party"]}

RDD1.flatMap(tokenize) ={"coffee","panda","happy","panda","happiest","panda","party"}


伪集合操作


RDD中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素。如果只要唯一的元素,我们可以使用RDD.distinct()转化操作来生成一个只包含不同元素的新RDD。


行动操作


最常见的行动操作reduce()。接收一个函数作为参数,这个函数要操作两个相同元素类型的RDD数据并返回一个同样类型的新元素。举一个简历(函数+)

val sum = rdd.reduce((x,y) => x + y) 

fold()函数的作用和reduce函数相同,但是需要提供一个初始值

rdd.fold(0)((x,y) => x+y)

4、持久化(缓存)


为了避免多次计算同一个RDD,可以让Spark对数据进行持久化,持久化存储一个RDD时,计算出RDD的节点会分别保存他们所求出的分区数据。如果一个有持久化的节点发生故障,Spark会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况下不会拖累我们的执行速度,也可以把数据备份到多个节点上。


使用persist()进行持久化操作

import org.apache.spark.storage.StorageLevel
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

还有一个方法unpersit()可以手动把持久化的RDD从缓存中移除。

相关文章
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
225 0
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
246 0
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
294 0
|
10月前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
462 15
|
10月前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
293 0
【赵渝强老师】Spark RDD的缓存机制
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
182 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
192 0
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
281 0
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
356 0
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
176 0