RDD
Author:萌狼蓝天
【哔哩哔哩】萌狼蓝天
【博客】https://mllt.cc
【博客园】萌狼蓝天 - 博客园
【微信公众号】mllt9920
【学习交流QQ群】238948804
目录
- map(func)
- filter 过滤
- flatMap(func) 分割单词
- sortBy 排序
- distinct 去重复
- union 合并
- intersection 交集
- subtract 差集
- cartesian
- take(num)
- mapValues
- join按键内连接
- leftOuterJoin和rightOuterJoin和fullOuterJoin
- zip
- CombineByKey
- groupByKey([numPartitions])
- reduceByKey(func, [numPartitions])
@萌狼蓝天
【!】启动spark集群
【!】启动
spark-shell
spark2.0将spark context 和hive context集成到了spark session
spark也可以作为程序入口
spark用scala编程
特点
它是集群节点上的不可改变的、已分区的集合对象;
- 通过并行转换的方式来创建如(map、filter、join等);
- 失败自动重建;
- 可以控制存储级别(内存、磁盘等)来进行重用;
- 必须是可序列化的;在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能有大的下降但不会差于现在的MapReduce;
- 对于丢失部分数据分区只需要根据它的lineage就可重新计算出来,而不需要做特定的checkpoint;
创建
从内存中创建RDD
启动spark-shell
val list = List(1,2,3) var rdd = sc.parallelize(list) rdd.partitions.size
从外部存储创建RDD
1.创建本地文件
cd /home mkdir data touch a.txt
- 不一定非要在家目录创建
- 可以使用vim在a.txt中添加一些内容
2.启动spark-shell
3.从本地文件系统中读取
val localrdd = sc.textFile("file:///home/用户名/data/a.txt")
路径前面加
file://
表示从本地文件系统读取
localrdd.collect//返回RDD中所有的元素
注意:若在完全分布式spark-shell模式下,该文件需要在所有节点的相同位置保存才可以被读取,否则会报错“文件不存在”
从HDFS创建RDD
1.在HDFS根目录下创建目录(姓名学号)
hdfs dfs -mkdir /zwj25 hdfs dfs -ls /
访问 http://[IP]:50070
2.上传本地文件到HDFS
hdfs dfs -put file.txt /zwj25
3.进入spark4-shell
var hdfsrdd=sc.textFile("/zwj25/file.txt") hdfsrdd.collect hdfsrdd.partitions hdfsrdd.partitions.size
sc.defaultMinPartitions=min(sc.defaultParallelism,2)
rdd分区数=max(hdfs文件的block数目,sc.defaultMinPartitions)
从其他RDD创建
算子
map(func)
类型:Transformation类型算子
map: 将原来RDD的每个数据项通过map中的用户自定义函数f转换成一个新的RDD,map操作不会改变RDD的分区数目
filter 过滤
filter(func)
Transformation类型算子
保留通过函数func,返回值为true的元素,组成新的RDD
eg:过滤掉data RDD中元素小于或等于2的元素
val data =sc.parallelize(List(1,2,3,4)) val result = data.filter(x=>x>2) result.collect
flatMap(func) 分割单词
类型:Transformation类型算子
flatMap:对集合中的每个元素进行map操作再扁平化
val data = sc.parallelize(List("I am Meng Lang Lan Tian","my wechat is mllt9920")) data.map(x=>x.split(" ")).collect data.flatMap(x=>x.split(" ")).collect
sortBy 排序
sortBy(f:(T) => K, ascending, numPartitions)
类型:Transformation类型算子
作用:对标准RDD进行排序
sortBy()
可接受三个参数:
f:(T) => K
:左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
ascending
:决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序,false为降序排序。
numPartitions
:该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等。
eg:按照每个元素的第二个值进行降序排序,将得到的结果存放到RDD "data2" 中
val data1 = sc.parallelize(List((1,3),(2,4),(5,7),(6,8))) val data2 = data1.sortBy(x=>x._2,false,1) val data3 = data1.sortBy(x=>x._1,false,1)
distinct 去重复
distinct([numPartitions]))
类型:Transformation类型算子
作用:去重。针对RDD中重复的元素,只保留一个元素
eg:
val data1 = sc.parallelize(List(1,2,3,3,3,4,4)) data1.collect data1.distinct.collect data1.collect
union 合并
union(otherDataset)
作用:合并RDD,需要保证两个RDD元素类型一致
eg:合并rdd1和rdd2
val rdd1 = sc.parallelize(List(1,2,3)) val rdd2 = sc.parallelize(List(4,5,6)) rdd1.union(rdd2).collect
注意:union两个RDD元素类型要一致
intersection 交集
intersection(otherDataset)
作用:找出两个RDD的共同元素,也就是找出两个RDD的交集
eg:找出c_rdd1和c_rdd2中相同的元素
val c_rdd1 = sc.parallelize(List(('a',1),('b',2),('a',1),('c',1))) val c_rdd2 = sc.parallelize(List(('a',1),('b',1),('d',1),('e',1))) c_rdd1.intersection(c_rdd2).collect
subtract 差集
subtract (otherDataset)
作用:获取两个RDD之间的差集
eg:找出rdd1与rdd2之间的差集
val rdd1 = sc.parallelize(Array("A","B","C","D")) val rdd2 = sc.parallelize(Array("C","D","E","F")) val subtractRDD = rdd1.subtract(rdd2) subtractRDD.collect
cartesian
cartesian(otherDataset)
名称:笛卡尔积
作用:将两个集合的元素两两组合成一组
eg:
val rdd01 = sc.makeRDD(List(1,3,5,3)) val rdd02 = sc.makeRDD(List(2,4,5,1)) rdd01.cartesian(rdd02).collect
take(num)
返回RDD前面num条记录
val data = sc.parallelize(List(1,2,3,4)) data.take(2)