一、spark安装
本次是在educoder这个平台上使用的,所以对于spark的安装方式是local本地模式,平台上有完整的安装步骤,在这里就不在继续叙述了,感谢理解
二、pyspark的算子学习
2.1、Transformation - map
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc =SparkContext("local","Simple App") # 2.创建一个1到5的列表List data=[1,2,3,4,5] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) """ 使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下: 需求: 偶数转换成该数的平方 奇数转换成该数的立方 """ # 5.使用 map 算子完成以上需求 rdd_map = rdd.map(lambda x: x**2 if x%2==0 else x**3) # 6.使用rdd.collect() 收集完成 map 转换的元素 print(rdd_map.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#
2.2、Transformation - mapPartitions
# -*- coding: UTF-8 -*- from pyspark import SparkContext #********** Begin **********# def f(iterator): list = [] for x in iterator: list.append((x,len(x))) return list #********** End **********# if __name__ == "__main__": #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") data = ["dog", "salmon", "salmon", "rat", "elephant"] rdd = sc.parallelize(data) print(rdd.collect()) partitions = rdd.mapPartitions(f) print(partitions.collect()) sc.stop()
2.3、Transformation - filter
filter 函数功能是对元素进行过滤,对每个元素应用f函数,返回值为 true的元素在RDD中保留,返回值为false的元素将被过滤掉。内部实现相当于生成。
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") data = [1, 2, 3, 4, 5, 6, 7, 8] rdd = sc.parallelize(data) print(rdd.collect()) rdd_filter = rdd.filter(lambda x: x % 2 == 0) print(rdd_filter.collect()) sc.stop()
2.4、Transformation - flatMap
flatMap扁平化操作
将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD中每个集合的元素合并为一个集合,内部创建:
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) """ 使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下: 需求: 合并RDD的元素,例如: ([1,2,3],[4,5,6]) --> (1,2,3,4,5,6) ([2,3],[4,5],[6]) --> (1,2,3,4,5,6) """ # 5.使用 filter 算子完成以上需求 faltmap= rdd.flatMap(lambda x: x) # 6.使用rdd.collect() 收集完成 filter 转换的元素 print(faltmap.collect()) # 7.停止 SparkContextsc.stio sc.stop() #********** End **********#
2.5、Transformation - distinct
distinct 将 RDD 中的元素进行去重操作。
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下: 需求: 元素去重,例如: 1,2,3,3,2,1 --> 1,2,3 1,1,1,1, --> 1 """ # 5.使用 distinct 算子完成以上需求 distinct = rdd.distinct() # 6.使用rdd.collect() 收集完成 distinct 转换的元素 print(distinct.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#
2.6 、Transformation - sortBy
该函数最多可以传三个参数:
- 第一个参数是一个函数,排序规则;
- 第二个参数是 ascending ,从字面的意思大家应该可以猜到,是的,这参数决定排序后 RDD 中的元素是升序还是降序,默认是 true ,也就是升序;
- 第三个参数是 numPartitions ,该参数决定排序后的 RDD 的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。
从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口} sc = SparkContext("local", "Simple App") # 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List data =[1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下: 需求: 元素排序,例如: 5,4,3,1,2 --> 1,2,3,4,5 """ # 5.使用 sortBy 算子完成以上需求 by = rdd.sortBy(lambda x: x) # 6.使用rdd.collect() 收集完成 sortBy 转换的元素 print(by.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#
2.7、Transformation - sortByKey
ascending参数是指排序(升序还是降序),默认是升序。numPartitions参数是重新分区,默认与上一个RDD保持一致。keyfunc参数是排序规则。
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List data = [('B', 1), ('A', 2), ('C', 3)] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下: 需求: 元素排序,例如: [(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)] """ # 5.使用 sortByKey 算子完成以上需求 key = rdd.sortByKey() # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素 print(key.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#
2.8、Transformation - mapValues
mapValues :针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下: 需求: 元素(key,value)的value进行以下操作: 偶数转换成该数的平方 奇数转换成该数的立方 """ # 5.使用 mapValues 算子完成以上需求 values = rdd.mapValues(lambda x: x**2 if x%2==0 else x**3) # 6.使用rdd.collect() 收集完成 mapValues 转换的元素 print(values.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#
2.9、Transformations - reduceByKey
reduceByKey 算子,只是两个值合并成一个值,比如叠加。
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) """ 使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下: 需求: 元素(key-value)的value累加操作,例如: (1,1),(1,1),(1,2) --> (1,4) (1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4) """ # 5.使用 reduceByKey 算子完成以上需求 print(rdd.reduceByKey(lambda x,y:x+y).collect()) # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素 sc.stop()
2.10、Actions -pyspark常用算子
count
count():返回 RDD 的元素个数。
示例:
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.count())
输出:
5
first
first():返回 RDD 的第一个元素(类似于take(1))。
示例:
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.first())
输出:
python
take
take(n):返回一个由数据集的前 n 个元素组成的数组。
示例:
sc = SparkContext("local", "Simple App") data = ["python", "python", "python", "java", "java"] rdd = sc.parallelize(data) print(rdd.take(2)) •
输出:
[‘python’, ‘python’]
reduce
reduce():通过func函数聚集 RDD 中的所有元素,该函数应该是可交换的和关联的,以便可以并行正确计算。
示例:
sc = SparkContext("local", "Simple App") data = [1,1,1,1] rdd = sc.parallelize(data) print(rdd.reduce(lambda x,y:x+y))
输出:
4
collect
collect():在驱动程序中,以数组的形式返回数据集的所有元素。
示例:
sc = SparkContext("local", "Simple App") data = [1,1,1,1] rdd = sc.parallelize(data) print(rdd.collect())
输出:
[1,1,1,1]
代码答案:
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1.初始化 SparkContext,该对象是 Spark 程序的入口 sc = SparkContext("local", "Simple App") # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List data = [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通过 SparkContext 并行化创建 rdd rdd = sc.parallelize(data) # 4.收集rdd的所有元素并print输出 rdd = sc.parallelize(data) print(rdd.collect()) print(rdd.count()) # 5.统计rdd的元素个数并print输出 print(rdd.first()) # 6.获取rdd的第一个元素并print输出 print(rdd.take(3)) # 7.获取rdd的前3个元素并print输出 print(rdd.reduce(lambda x,y:x+y)) # 8.聚合rdd的所有元素并print输出 # print(rdd.collect()) # 9.停止 SparkContext sc.stop() # ********** End **********#
三、Spark RDD编程初级实践(scala)
基于上面的pyspark几个算子基本上已经理解了,在scala中算子的概念是一样的
3.1、数据去重
import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object RemDup { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("RemDup").setMaster("local") val sc = new SparkContext(conf) //输入文件fileA.txt和fileB.txt已保存在本地文件系统/root/step1_files目录中 val dataFile = "file:///root/step1_files" val data = sc.textFile(dataFile, 2) /********** Begin **********/ //第一步:执行过滤操作,把空行丢弃。 val rdd1 = data.filter(_.trim().length > 0) //第二步:执行map操作,取出RDD中每个元素,去除尾部空格并生成一个(key, value)键值对。 val rdd2 = rdd1.map(line => (line.trim, "")) //第三步:执行groupByKey操作,把所有key相同的value都组织成一个value-list。 val rdd3 = rdd2.groupByKey() //第四步:对RDD进行重新分区,变成一个分区, //在分布式环境下只有把所有分区合并成一个分区,才能让所有元素排序后总体有序。 val rdd4 = rdd3.partitionBy(new HashPartitioner(1)) //第五步:执行sortByKey操作,对RDD中所有元素都按照key的升序排序。 val rdd5 = rdd4.sortByKey() //第六步:执行keys操作,将键值对RDD中所有元素的key返回,形成一个新的RDD。 val rdd6 = rdd5.keys //第七步:执行collect操作,以数组的形式返回RDD中所有元素。 val rdd7 = rdd6.collect() //第八步:执行foreach操作,并使用println打印出数组中每个元素的值。 println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。 rdd7.foreach(println) /********** End **********/ } }
3.2、整合排序
import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner object FileSort { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FileSort").setMaster("local") val sc = new SparkContext(conf) //输入文件file1.txt、file2.txt和file3.txt已保存在本地文件系统/root/step2_files目录中 val dataFile = "file:///root/step2_files" val data = sc.textFile(dataFile, 3) /********** Begin **********/ //第一步:执行过滤操作,把空行丢弃。 val rdd1 = data.filter(_.trim().length > 0) //第二步:执行map操作,取出RDD中每个元素,去除尾部空格并转换成整数,生成一个(key, value)键值对。 val rdd2 = rdd1.map(line => (line.trim.toInt, "")) //第三步:对RDD进行重新分区,变成一个分区, //在分布式环境下只有把所有分区合并成一个分区,才能让所有元素排序后总体有序。 val rdd3 = rdd2.partitionBy(new HashPartitioner(1)) //第四步:执行sortByKey操作,对RDD中所有元素都按照key的升序排序。 val rdd4 = rdd3.sortByKey() //第五步:执行keys操作,将键值对RDD中所有元素的key返回,形成一个新的RDD。 val rdd5 = rdd4.keys //第六步:执行map操作,取出RDD中每个元素,生成一个(key, value)键值对, //其中key是整数的排序位次,value是原待排序的整数。 var index = 0 val rdd6 = rdd5.map(t => { index = index + 1 (index, t) }) //第七步:执行collect操作,以数组的形式返回RDD中所有元素。 val rdd7 = rdd6.collect() //第八步:执行foreach操作,依次遍历数组中每个元素,分别取出(key, value)键值对中key和value, //按如下格式输出:key value println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。 rdd7.foreach(t => println(t._1 + " " + t._2)) /********** End **********/ } }
3.3、求平均值
import org.apache.spark.SparkContext import org.apache.spark.SparkConf object AvgScore { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FileSort").setMaster("local") val sc = new SparkContext(conf) //输入文件AlgorithmScore.txt、DataBaseScore.txt和PythonScore.txt已保存在本地文件系统/root/step3_files目录中 val dataFile = "file:///root/step3_files" val data = sc.textFile(dataFile) /********** Begin **********/ //第一步:执行过滤操作,把空行丢弃。 val rdd1 = data.filter(_.trim().length > 0) //第二步:执行map操作,取出RDD中每个元素(即一行文本),以空格作为分隔符将一行文本拆分成两个字符串, //拆分后得到的字符串封装在一个数组对象中,成为新的RDD中一个元素。 var rdd2 = rdd1.map(line => line.split(" ")) //第三步:执行map操作,取出RDD中每个元素(即字符串数组),取字符串数组中第一个元素去除尾部空格, //取字符串数组中第二个元素去除尾部空格并转换成整数,并由这两部分构建一个(key, value)键值对。 val rdd3 = rdd2.map(t => (t(0).trim, t(1).trim.toInt)) //第四步:执行mapValues操作,取出键值对RDD中每个元素的value,使用x=>(x,1)这个匿名函数进行转换。 val rdd4 = rdd3.mapValues(x => (x, 1)) //第五步:执行reduceByKey操作,计算出每个学生所有课程的总分数和总课程门数。 val rdd5 = rdd4.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) //第六步:执行mapValues操作,计算出每个学生的平均成绩。 val rdd6 = rdd5.mapValues(x => (x._1.toDouble / x._2)) //第七步:执行collect操作,以数组的形式返回RDD中所有元素。 val rdd7 = rdd6.collect() //第八步:执行foreach操作,按如下格式打印出每个学生的平均成绩:姓名 成绩,其中成绩要求保留两位小数。 println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。 rdd7.foreach(t => { val x = t._2 println(t._1 + " " + f"$x%1.2f") }) /********** End **********/ } }
最后,感谢阅读,如有帮助,一键三连哈。