Spark中RDD操作

简介:

Transformations(转换)
Transformation 说明
map(func) 根据传入的函数处理原有的RDD对象中每一个元素,每一个新元素处理完成后返回一个对象,这些新对象组装得到一个新的RDD,新的RDD和旧的RDD元素都是一一对应的
filter(func) 根据传入的函数来过滤RDD中每一个元素,通过过滤条件的的元素组成一个新的RDD
flatMap(func) 先进行map操作,然后把map操作得到的结果合并为一个对象,假如map操作返回的是Array[Array[String]],那flatMap操作得到的应该是Array[String],自动将多个字符串数组合并为一个,另外一个意义是一个旧的RDD元素可以生成多个新元素,一对多的关系
mapPartitions(func) 可以将其看成map,但是他处理的是每个单独分区中的数据,然后把各个分区的值合并
mapPartitionsWithIndex(func) 把分区的索引值(index)交给输入函数处理
sample(withReplacement, fraction, seed) 取样函数,分放回和不放回,由withReplacement参数决定,fraction:抽样率
union(otherDataset) 两个RDD合并,不去重
intersection(otherDataset) 两个RDD交集且去重
distinct([numTasks])) 去重
groupByKey([numTasks]) 根据key来分组,同一个key的值放在一个集合中
reduceByKey(func, [numTasks]) 将key对应的值交给传入的函数处理
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 对key对应的值做聚合计算,返回的还是Pair RDD对象
sortByKey([ascending], [numTasks]) PairRDD以key值进行排序
join(otherDataset, [numTasks]) sql语句中内关联
cogroup(otherDataset, [numTasks]) SQL中的全外关联full outer join
cartesian(otherDataset) 两个RDD进行笛卡尔集的操作,返回CartesianRDD
pipe(command, [envVars]) 将RDD的每个数据分片都接到shell-command的标准输入上。经过shell-command的输出数据会重新生成新的RDD,新RDD是string类型的RDD
coalesce(numPartitions) 合并分区,参数执行合并后的分区大小
repartition(numPartitions) 进行shuffle的coalesce操作
repartitionAndSortWithinPartitions(partitioner) 该方法依据partitioner对RDD进行分区,并且在每个结果分区中按key进行排序;通过对比sortByKey发现,这种方式比先分区,然后在每个分区中进行排序效率高,这是因为它可以将排序融入到shuffle阶段

Action(动作) Action 说明
reduce(func) 根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。
collect() 将RDD转换为数组
count() RDD的元素数量
first() 返回RDD第一个元素
take(n) 将RDD的前n个元素转换为数组返回
takeSample(withReplacement, num, [seed]) 随机取出num个元素转换为数组返回
takeOrdered(n, [ordering]) 取n个元素,用某个比较器排序后返回
saveAsTextFile(path) RDD保存到文件
saveAsSequenceFile(path) 保存为hadoop SequenceFile格式文件
saveAsObjectFile(path) 用于将RDD中的元素序列化成对象,存储到文件中
countByKey() PairRDD,计算Key的数量
foreach 无返回的,用于遍历RDD,将函数f应用于每一个元素。

例子

val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")),2) 
//建立一个行业薪水的键值对RDD,包含ID和薪水,其中ID为1、2、3、5 
val rdd2 = sc.makeRDD(Array(("1","30K"),("2","15K"),("3","25K"),("5","10K")),2)

println("//下面做Join操作,预期要得到(1,×)、(2,×)、(3,×)")  
val joinRDD=rdd1.join(rdd2).collect.foreach(println) 

输出 
(2,(Hadoop,15K)) 
(3,(Scala,25K)) 
(1,(Spark,30K))

转自 http://blog.csdn.net/itfootball/article/details/52769302


本文转自whk66668888 51CTO博客,原文链接:http://blog.51cto.com/12597095/2057733

相关文章
|
SQL 消息中间件 分布式计算
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(一)
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(一)
216 5
|
分布式计算 大数据 数据处理
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(二)
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(二)
191 4
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
214 4
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
253 0
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
611 15
|
12月前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
355 0
【赵渝强老师】Spark RDD的缓存机制
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
210 4
|
JSON 分布式计算 大数据
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
191 1
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
214 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
分布式计算 Serverless 数据处理