创建RDD
RDD是Spark编程核心,在进行Spark编程时,首要任务是创建一个初始的RDD,这样就相当于设置了Spark应用程序的输入源数据,然后在创建了初始的RDD之后,才可以通过Spark 提供的一些高阶函数,对这个RDD进行操作,来获取其它的RDD
Spark提供三种创建RDD方式:集合、本地文件、HDFS文件
- 使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。
- 使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件
- 使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进行离线批处理操作
使用集合创建RDD
如果要通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。
相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了。
调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。
Spark会为每一个partition运行一个task来进行处理。
Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)
Scala版本代码如下
package com.bigdata.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:使用集合创建RDD
* Created by 颜
*/
object CreateRddByArrayScala {
def main(args: Array[String]): Unit = {
//创建SparkContext
val conf = new SparkConf()
conf.setAppName("CreateRddByArrayScala")
.setMaster("local")
val sc = new SparkContext(conf)
//创建集合
val arr = Array(1,2,3,4,5)
//基于集合创建RDD
val rdd = sc.parallelize(arr)
//对集合中的数据求和
val sum = rdd.reduce( + )
//注意:这行println代码是在driver进程中执行的
println(sum)
}
}
结果:15
注意:
val arr = Array(1,2,3,4,5)还有println(sum)代码是在driver进程中执行的,这些代码不会并行执行
parallelize还有reduce之类的操作是在worker节点中执行的
转换算子
Spark2.4.3官方文档:Overview - Spark 2.4.4 Documentation (apache.org)
选择RDDs,UserGuide
转换算子如下
常用算子介绍:
转换算子操作开发实战
前置需求分析操作如下
- map:对集合中每个元素乘以2
- filter:过滤出集合中的偶数
- flatMap:将行拆分为单词
- groupByKey:对每个大区的主播进行分组
- reduceByKey:统计每个大区的主播数量
- sortByKey:对主播的音浪收入排序
- join:打印每个主播的大区信息和音浪收入
- distinct:统计当天开播的大区信息
map:
def mapOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
dataRDD.map(_ * 2).foreach(println(_))
}
filter:
def filterOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//满足条件的保留下来
dataRDD.filter( % 2 == 0).foreach(println())
}
flatMap:
def flatMapOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array("good good study","day day up"))
dataRDD.flatMap(.split(" ")).foreach(println())
}
groupByKey:一元情况
def groupByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))
//需要使用map对tuple中的数据进行位置互换,因为我们需要把大区作为key进行分组操作
//此时的key就是tuple中的第一列,其实在这里就可以把这个tuple认为是一个key-value
//注意:在使用类似于groupByKey这种基于key的算子的时候,需要提前把RDD中的数据组装成tuple2这种形式
//此时map算子之后生成的新的数据格式是这样的:("US",150001)
//如果tuple中的数据列数超过了2列怎么办?看groupByKeyOp2
dataRDD.map(tup=>(tup.2,tup.1)).groupByKey().foreach(tup=>{
//获取大区信息
val area = tup._1
print(area+":")
//获取同一个大区对应的所有用户id
val it = tup._2
for(uid <- it){
print(uid+" ")
}
println()
})
}
groupByKey:tuple是二元的情况
def groupByKeyOp2(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001, "US","male"), (150002, "CN","female"), (150003, "CN","male"), (150004, "IN","female")))
//如果tuple中的数据列数超过了2列怎么办?
//把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用一个tuple2包装一下
//此时map算子之后生成的新的数据格式是这样的:("US",(150001,"male"))
//注意:如果你的数据结构比较负责,你可以在执行每一个算子之后都调用foreach打印一下,确认数据的格式
dataRDD.map(tup=>(tup.2,(tup.1,tup._3))).groupByKey().foreach(tup=>{
//获取大区信息
val area = tup._1
print(area+":")
//获取同一个大区对应的所有用户id和性别信息
val it = tup._2
for((uid,sex) <- it){
print("<"+uid+","+sex+"> ")
}
println()
})
}
reduceByKey:
def reduceByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))
//由于这个需求只需要使用到大区信息,所以在mao操作的时候只保留大区信息即可
dataRDD.map(tup=>(tup.2,1)).reduceByKey( + ).foreach(println())
}
sortByKey:
def sortByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004,100)))
//由于需要对音浪收入进行排序,所以需要把音浪收入作为key,在这里要进行位置和互换
/*dataRDD.map(tup=>(tup._2,tup._1))
.sortByKey(false)//默认是正序,第一个参数为true,想要倒序需要把这个参数设置为false
.foreach(println(_))*/
//sortBy的使用:可以动态指定排序字段,比较灵活
dataRDD.sortBy(.2,false).foreach(println(_))
dataRDD.sortByKey()
}
join:
def joinOp(sc: SparkContext): Unit = {
val dataRDD1 = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))
val dataRDD2 = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004,100)))
val joinRDD = dataRDD1.join(dataRDD2)
//joinRDD.foreach(println(_))
joinRDD.foreach(tup=>{
//用户id
val uid = tup._1
val area_gold = tup._2
//大区
val area = area_gold._1
//音浪收入
val gold = area_gold._2
println(uid+"\t"+area+"\t"+gold)
})
}
distinct:
def distinctOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))
//由于是统计开播的大区信息,需要根据大区信息去重,所以只保留大区信息
dataRDD.map(.2).distinct().foreach(println(_))
}
获取配置:
def getSparkContext = {
val conf = new SparkConf()
conf.setAppName("WordCountScala")
.setMaster("local")
new SparkContext(conf)
}
主方法一览:
def main(args: Array[String]): Unit = {
val sc = getSparkContext
//map:对集合中每个元素乘以2
//mapOp(sc)
//filter:过滤出集合中的偶数
//filterOp(sc)
//flatMap:将行拆分为单词
//flatMapOp(sc)
//groupByKey:对每个大区的主播进行分组
//groupByKeyOp(sc)
//groupByKeyOp2(sc)
//reduceByKey:统计每个大区的主播数量
//reduceByKeyOp(sc)
//sortByKey:对主播的音浪收入排序
//sortByKeyOp(sc)
//join:打印每个主播的大区信息和音浪收入
//joinOp(sc)
//distinct:统计当天开播的大区信息
//distinctOp(sc)
sc.stop()
}
常用Action介绍
Action操作开发实战
下面针对常见的Action算子来写一些具体案例
- reduce:聚合计算
- collect:获取元素集合
- take(n):获取前n个元素
- count:获取元素总数
- saveAsTextFile:保存文件
- countByKey:统计相同的key出现多少次
- foreach:迭代遍历元素
获取配置:
def getSparkContext = {
val conf = new SparkConf()
conf.setAppName("ActionOpScala")
.setMaster("local")
new SparkContext(conf)
}
reduce:
def reduceOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
val num = dataRDD.reduce( + )
println(num)
}
collect:
def collectOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//collect返回的是一个Array数组
//注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程所在的节点
//如果想要获取几条数据,查看一下数据格式,可以使用take(n)
val res = dataRDD.collect()
for(item <- res){
println(item)
}
}
take(n):
def takeOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//从RDD中获取前2个元素
val res = dataRDD.take(2)
for(item <- res){
println(item)
}
}
count:
def countOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
val res = dataRDD.count()
println(res)
}
saveAsTextFile:
def saveAsTextFileOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//指定HDFS的路径信息即可,需要指定一个不存在的目录
dataRDD.saveAsTextFile("hdfs://bigdata:9000/out001")
}
countByKey:
def countByKeyOp(sc: SparkContext): Unit = {
val daraRDD = sc.parallelize(Array(("A",1001),("B",1002),("A",1003),("C",1004)))
//返回的是一个map类型的数据
val res = daraRDD.countByKey()
for((k,v) <- res){
println(k+","+v)
}
}
foreach:
def foreachOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//注意:foreach不仅限于执行println操作,这个只是在测试的时候使用的
//实际工作中如果需要把计算的结果保存到第三方的存储介质中,就需要使用foreach
//在里面实现具体向外部输出数据的代码
dataRDD.foreach(println(_))
}
main方法:
def main(args: Array[String]): Unit = {
val sc = getSparkContext
//reduce:聚合计算
//reduceOp(sc)
//collect:获取元素集合
//collectOp(sc)
//take(n):获取前n个元素
//takeOp(sc)
//count:获取元素总数
//countOp(sc)
//saveAsTextFile:保存文件
//saveAsTextFileOp(sc)
//countByKey:统计相同的key出现多少次
//countByKeyOp(sc)
//foreach:迭代遍历元素
//foreachOp(sc)
sc.stop()
}