一、RDD概述
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。
RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
执行原理
从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。
二、RDD创建
1.从文件系统中加载数据创建RDD
scala> val lines =sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") scala> val lines =sc.textFile("/user/hadoop/word.txt") scala> val lines = sc.textFile("word.txt")
2.通过并行集合(数组)创建RDD
val array = Array(1,2,3,4,5) val rdd = sc.parallelize(array)
三、RDD操作
惰性机制
例子
1.转换操作
只记录转换的轨迹,不发生计算。
①filter
val linesWithSpark=lines.filter(line => line.contains("Spark")
③map
val rdd2 = rdd1.map(x=>x+10)
val words = lines.map(line => line.split(" "))
④flatmap
val words = lines.flatMap(line => line.split(" "))
⑤groupByKey
先汇总再相加。
⑥reduceByKey
先内部相加一次,然后再汇总再加一次。
2.动作操作
执行真正的计算。
四、RDD的持久化和分区Checkpoint
1.持久化
2.分区
根据key值的最后一位数字,写到不同的文件
作用1:增加程序的并行度实现分布式计算
语法格式
①partitionNum指定分区个数
sc.textFile(path,partitionNum)
②手动指定分区个数
scala> val array = Array(1,2,3,4,5) scala> val rdd = sc.parallelize(array,2)I/设置两个分区
③reparitition重新设置分区个数
完善 Spark 3: RDD概述
一、RDD概述
RDD(弹性分布式数据集)是Apache Spark中的核心数据抽象,它代表了一个可并行操作的不可变分布式数据集。RDD具有弹性、容错和可并行处理等特性,使其成为Spark的核心组件。
二、RDD创建
1. 从文件系统中加载数据创建RDD
你可以使用textFile
方法从文件系统中加载数据创建RDD。以下是一个示例:
from pyspark import SparkContext sc = SparkContext("local", "RDD Example") rdd = sc.textFile("file:///path/to/your/file.txt")
2. 通过并行集合(数组)创建RDD
你可以使用parallelize
方法通过并行集合(如Python列表)创建RDD。以下是一个示例:
from pyspark import SparkContext sc = SparkContext("local", "RDD Example") data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data)
三、RDD操作
1. 转换操作
RDD支持多种转换操作,以下是一些常见的转换操作:
a. filter
filter
操作用于过滤RDD中的元素,返回满足条件的元素。
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
b. map
map
操作用于对RDD中的每个元素应用一个函数,并返回新的RDD。
mapped_rdd = rdd.map(lambda x: x * 2)
c. flatMap
flatMap
操作与map
类似,但可以返回多个元素。
flat_mapped_rdd = rdd.flatMap(lambda x: (x, x * 2))
d. groupByKey
groupByKey
操作用于将RDD中的元素按键进行分组。
key_value_rdd = sc.parallelize([(1, 'a'), (2, 'b'), (1, 'c')]) grouped_rdd = key_value_rdd.groupByKey()
e. reduceByKey
reduceByKey
操作用于对具有相同键的值进行归约操作。
key_value_rdd = sc.parallelize([(1, 2), (2, 3), (1, 4)]) reduced_rdd = key_value_rdd.reduceByKey(lambda x, y: x + y)
2. 动作操作
动作操作是对RDD执行计算并返回结果的操作。以下是一些常见的动作操作:
a. count
count
操作返回RDD中元素的数量。
count = rdd.count()
b. collect
collect
操作将RDD中的所有元素收集到一个列表中。
data = rdd.collect()
c. first
first
操作返回RDD中的第一个元素。
first_element = rdd.first()
四、RDD的持久化和分区Checkpoint
1. 持久化
持久化是将RDD的数据缓存在内存中,以加速重复操作。你可以使用persist
方法来指定持久化级别(内存、磁盘等)。
rdd.persist()
2. 分区
RDD可以分成多个分区,以便并行处理。Spark根据数据的大小自动确定分区数,但你也可以手动设置分区数。
rdd = rdd.repartition(4) # 设置分区数为4
RDD的分区和持久化是Spark中性能优化的关键概念,通过合理配置分区和持久化可以提高计算效率。
这些是RDD的概述、创建和操作的基本内容。RDD是Spark中非常重要的数据结构,它支持分布式计算和处理大规模数据集。你可以根据具体需求使用不同的转换和动作操作来处理RDD,以实现各种分布式计算任务。