开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:阶段练习_代码编写】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/689/detail/11988
阶段练习_代码编写
内容介绍
一、学前须知
二、代码编写
一、学前须知
接下来进行代码编写,在代码编写步骤:首先,第一步,拷贝数据集到工程,然后创建类。创建类以后,代码就有容器,就可以编写代码,编写代码以后去运行测试。这是整个的四个步骤。
二、代码编写
第一步,如何拷贝数据集。
数据集位置如下图所示:spsrk 下 file 目录,点开 Dateset。并复制选中的 csv 文件。
默认情况下 CSV 文件当中有一个 header,第一行是头,这个头应该去掉。如果要使用代码来去掉稍微有一点繁琐。容易思路上开小差,所以要把第一个 header 去掉,然后生成一个新的数据。使用这个 header 的数据就可以了。
将 csv 文件复制到工程当中。放入 dateset 目录。
创建StagePractice类,Stage阶段的类。
编写代码如下:
package cn.itcast.spark.rdd
import org.apache.spark.{SparkConf,SparkContext}
import org.junit.Test
class StagePractice{
@Test //导入 notation 包
def pmProcess(():Unit={
//创建方法 pmProcess。
// 1.创建 sc 对象
val conf =new SparkConf().setMaster("local[6]").setAppName("stage practice"
)
//new 一个 SparkConf(),传递2个参数:设置Master,本地位置为6,另外设置 AppName ,名称为:stage practice
val sc=newSparkContext(conf)
//创建 Spark 对象,把 conf 放到 SparkContext 中。
// 2.读取文件
val source =sc.textFile( path="dataset/BeijingPM20100101_20151231_noheader.csv")
//以 textfile 的形式读取文件。
CSV 文件和普通的文件最大的区别是,它也是分行的,这一点和普通文件一样。最大的区别是 CSV 文件以逗号作为分隔符,每一行当中都有固定数目的列数,列之间使用逗号分隔。文件在 dataset 下,复制地址。同时注意命名规范。
//3.通过算子处理数据
NA 表示没有值,需要过滤清洗。然后按照年月进行 Reduce by key,再去 sort 排序一下总和,出来结果。
步骤为:
// 1.抽取数据,年,月,PM,返回结果:((年,月),PM)
// 2.清洗 ,过滤掉空的字符串,过滤掉NA。
// 3.聚合 ,
// 4.排序
source.map( item => ((item.split( regex = ",")(1)
, //调用 map 方法 ,并且接收字符串。
item.split( regex = ",")(2)),
//使用逗号 split 出来。
item.split( regex = ",")(6))
//把年,月同时当做 key ,下标为一的项是年,下标为2的项是月,在 value 部分选下标为6的项为 PM 。元组统一当做 key 处理。.filter(item=>StringUtils.isNotEmpty(item._2)&&!item._2.equalsIgnoreCase(anotherString="NA"))
//先进行判空,判断 isNotEmpty,空就收录结果,item._2指前边的数据整个作为 item 的第二项数据传输 。
IgnoreCase 指忽略两个大小写的判断两个字符串大小相等。相等就收录的逻辑不正确,故前面加 !。
.map(item =>(item._1,item._2.toInt))
//第二项 NA 是字符串类型,故需要转换,则第一项没变,第二项转为 int .
.reduceByKey((curr,agg)=> curr+agg)
//因为是 ByKey 。故先按照 key 来进行分组,然后把每一组里面的 value 聚合起来。直接 curr+agg 聚合起来。
.sortBy(item=> item._2)
//将元组的第二项返回,就会按照第二项 pm 进行排序
// 4.获取结果
resultRDD.take(num=10).foreach(item=> println(item))
//foreach 打印所有的数据,因为太对,故只取10项。拿到里面每一项数据后 ,println 进行打印。
// 5.运行测试
sc.stop()
//在运行结束后关掉 sc
}
运行结果如下:
第一项是 pm 值最高的2015年,36451,第二项是37612,注意到 topten 有一点点反,故在.sortBy(item=> item._2) 改为
.sortBy( item => item. 2, ascending = false),
再次运行后结果没有问题。