Spark Day04:Spark Core
02-[了解]-今日课程内容提纲
主要讲解RDD函数,分为2类:Transformation转换函数和Action触发函数
RDD中函数: - 函数分类,不同类型函数功能 - 常见函数概述 - 5种类型RDD函数 实际项目中使用最多的,必须要掌握 - RDD 持久化函数 可以将RDD分布式集合数据进行缓存,比如缓存到Executor内存中,再次处理数据时,直接从内存读取 - RDD Checkpoint 将RDD数据保存到可靠文件系统中,比如HDFS
首先创建Maven Module模块,编写好代码模块,讲解某个知识点时,在编写核心代码
03-[掌握]-RDD 函数分类
RDD 的操作主要可以分为
Transformation
和Action
两种。
Transformation
转换,将1个RDD转换为另一个RDDAction
触发,当1个RDD调用函数以后,触发一个Job执行(调用Action函数以后,返回值不是RDD)
官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations
RDD中2种类型操作函数:Transformation(lazy)和Action(eager)函数
- Transformation转换函数
- Action触发函数,触发一个Job执行
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a1fQcH5e-1638793130130)(/img/image-20210422150349862.png)]
04-[了解]-RDD 中常见函数概述
RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数。
主要常见使用函数如下,每个函数通过演示范例讲解。
1、分区操作函数 对RDD中每个分区数据进行操作 2、重分区函数 调整RDD中分区数目,要么变大,要么变小 3、聚合函数 对RDD中数据进行聚合统计,比如使用reduce、redueBykey等 4、关联函数 对2个RDD进行JOIN操作,类似SQL中JOIN,分为:等值JOIN、左外连接和右外连接、全外连接fullOuterJoin
RDD函数练习:运行spark-shell命令行,在本地模式运行,执行函数使用
05-[掌握]-RDD 函数之基本函数使用
RDD中map、filter、flatMap及foreach等函数为最基本函数,都是对RDD中
每个元素进行操作,将元素传递到函数中进行转换
。
编写词频统计WordCount程序,使用基本函数
package cn.itcast.spark.func.basic import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 演示RDD中基本函数使用 */ object _01SparkBasicTest { def main(args: Array[String]): Unit = { // 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息 val sc: SparkContext = { // a. 创建SparkConf对象 val sparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // b. 传递sparkConf对象,构建SparkContext实例 SparkContext.getOrCreate(sparkConf) } // step1. 读取数据 val inputRDD: RDD[String] = sc.textFile("datas/wordcount/input.data", minPartitions = 2) // step2. 处理数据 val resultRDD: RDD[(String, Int)] = inputRDD // 过滤数据 .filter(line => null != line && line.trim.length > 0) // 分割单词 .flatMap(line => line.trim.split("\\s+")) // 转换为二元组 .map(word => word -> 1) // 按照单词分组,对组内数据进行聚合求和 .reduceByKey((tmp, item) => tmp + item) // TODO: 隐式转换,将RDD对象抓好为PairRDDFunctions对象,调用方法 // step3. 输出数据 resultRDD.foreach(item => println(item)) // 应用结束,关闭资源 sc.stop() } }
06-[掌握]-RDD 函数之分区操作函数
每个RDD由多分区组成的,实际开发建议对每个分区数据的进行操作,
map函数使用mapPartitions代替
、foreach函数使用foreachPartition代替
。
针对分区数据进行操作时,函数的参数类型:迭代器Iterator
,封装分区中所有数据
针对词频统计WordCount代码进行修改,针对分区数据操作,范例代码如下:
package cn.itcast.spark.func.iter import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 分区操作函数:mapPartitions和foreachPartition */ object _02SparkIterTest { def main(args: Array[String]): Unit = { // 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息 val sc: SparkContext = { // a. 创建SparkConf对象 val sparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // b. 传递sparkConf对象,构建SparkContext实例 SparkContext.getOrCreate(sparkConf) } // step1. 读取数据 val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2) // step2. 处理数据 val resultRDD: RDD[(String, Int)] = inputRDD // 过滤数据 .filter(line => line.trim.length != 0 ) // 对每行数据进行单词分割 .flatMap(line => line.trim.split("\\s+")) // 转换为二元组 //.map(word => word -> 1) /* def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false ): RDD[U] */ .mapPartitions(iter => iter.map(word => (word, 1))) // 分组聚合 .reduceByKey((tmp, item) => tmp + item) // step3. 输出数据 //resultRDD.foreach(item => println(item)) /* def foreachPartition(f: Iterator[T] => Unit): Unit */ resultRDD.foreachPartition(iter => iter.foreach(item => println(item))) // 应用结束,关闭资源 sc.stop() } }
为什么要对分区操作,而不是对每个数据操作,好处在哪里呢???
07-[掌握]-RDD 函数之重分区函数
如何对RDD中分区数目进行调整(增加分区或减少分区),在RDD函数中主要有如下三个函数。
上述2个函数最为关键: - 增加RDD分区数目:repartition - 减少RDD分区数目:coalesce,不产生Shuffle
package cn.itcast.spark.func.iter import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 分区操作函数:mapPartitions和foreachPartition */ object _02SparkPartitionTest { def main(args: Array[String]): Unit = { // 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息 val sc: SparkContext = { // a. 创建SparkConf对象 val sparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // b. 传递sparkConf对象,构建SparkContext实例 SparkContext.getOrCreate(sparkConf) } // step1. 读取数据 val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2) println(s"raw rdd partitions = ${inputRDD.getNumPartitions}") // TODO: 增加RDD分区数目 val etlRDD: RDD[String] = inputRDD.repartition(3) println(s"etl rdd partitions = ${etlRDD.getNumPartitions}") // step2. 处理数据 val resultRDD: RDD[(String, Int)] = inputRDD // 过滤数据 .filter(line => line.trim.length != 0 ) // 对每行数据进行单词分割 .flatMap(line => line.trim.split("\\s+")) // 转换为二元组 //.map(word => word -> 1) /* def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false ): RDD[U] */ .mapPartitions(iter => iter.map(word => (word, 1))) // 分组聚合 .reduceByKey((tmp, item) => tmp + item) // step3. 输出数据 //resultRDD.foreach(item => println(item)) /* def foreachPartition(f: Iterator[T] => Unit): Unit */ // TODO: 降低结果RDD分区数目 val outputRDD: RDD[(String, Int)] = resultRDD.coalesce(1) println(s"output rdd partitions = ${outputRDD.getNumPartitions}") outputRDD.foreachPartition(iter => iter.foreach(item => println(item))) // 应用结束,关闭资源 sc.stop() } }
在实际开发中,
什么时候适当调整RDD的分区数目呢?让程序性能更好好呢????
08-[掌握]-RDD 函数之RDD 中聚合函数
回顾列表List中reduce聚合函数核心概念:
聚合的时候,往往需要聚合中间临时变量
。查看列表List中聚合函数reduce和fold源码如下:
通过代码,看看列表List中聚合函数使用:
运行截图如下所示:
fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:
聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:
在RDD中提供类似列表List中聚合函数reduce和fold
,查看如下:
案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:
运行结果解析如下:
查看RDD中高级聚合函数
aggregate
,函数声明如下:
业务需求:对RDD中数据进行求和sum。
// TODO:aggregate函数,累计求和 /* def aggregate[U: ClassTag] (zeroValue: U) ( seqOp: (U, T) => U, combOp: (U, U) => U ): U */ val aggSum: Int = datasRDD.aggregate(0)( // seqOp: (U, T) => U 分区内数据聚合 (tmp: Int, item: Int) => { println(s"seq -> p-${TaskContext.getPartitionId()}: tmp = ${tmp}, item = ${item}, sum = ${tmp + item}") tmp + item }, // combOp: (U, U) => U 分区间数据聚合 (tmp, item) => { println(s"comb -> p-${TaskContext.getPartitionId()}: tmp = ${tmp}, item = ${item}, sum = ${tmp + item}") tmp + item } ) println(s"aggSum = ${aggSum}")
09-[掌握]-RDD 函数之PairRDDFunctions 聚合函数
在Spark中有一个object对象
PairRDDFunctions
,主要针对RDD的数据类型是Key/Value对的数据提供函数
,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。
*ByKey函数
:将相同Key的Value进行聚合操作的,省去先分组再聚合。
- 第一类:分组函数
groupByKey
- 第二类:分组聚合函数
reduceByKey和foldByKey
- 第三类:分组聚合函数
aggregateByKey
在企业中如果对数据聚合使用,
不能使用reduceByKey完成时
,考虑使用aggregateByKey
函数,基本上都能完成任意聚合功能。
10-[掌握]-RDD 函数之关联JOIN函数
当两个RDD的数据类型为二元组Key/Value对时,可以依据Key进行关联Join。
RDD中关联JOIN函数都在PairRDDFunctions中,具体截图如下:
具体看一下join(等值连接)函数说明:
范例演示代码:
package cn.itcast.spark.func.join import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * RDD中关联函数Join,针对RDD中数据类型为Key/Value对 */ object _04SparkJoinTest { def main(args: Array[String]): Unit = { // 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息 val sc: SparkContext = { // a. 创建SparkConf对象 val sparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // b. 传递sparkConf对象,构建SparkContext实例 SparkContext.getOrCreate(sparkConf) } // 模拟数据集 val empRDD: RDD[(Int, String)] = sc.parallelize( Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu")) ) val deptRDD: RDD[(Int, String)] = sc.parallelize( Seq((1001, "sales"), (1002, "tech")) ) // TODO: 等值连接 // deptno empname deptname val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD) joinRDD.foreach{case (deptno, (empname, deptname)) => println(s"deptno = ${deptno}, empname = ${empname}, deptname = ${deptname}") } println("======================================================") // TODO:左外连接 val leftRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD) leftRDD.foreach{case (deptno, (empname, option)) => val deptname: String = option match { case Some(name) => name case None => "未知" } println(s"deptno = ${deptno}, empname = ${empname}, deptname = ${deptname}") } // 应用结束,关闭资源 sc.stop() } }
11-[掌握]-RDD 持久化
在实际开发中
某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到
,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
- 缓存函数
可以将RDD数据直接缓存到内存中,函数声明如下:
但是实际项目中,不会直接使用上述的缓存函数,RDD数据量往往很多,内存放不下的。在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bxNtlFD7-1638793130145)(/img/image-20210422172215367.png)]
- 缓存级别
在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:
实际项目中缓存数据时,往往选择如下两种级别:
缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发。
- 释放缓存
缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:
此函数属于eager,立即执行。
- 何时缓存数据
在实际项目开发中,什么时候缓存RDD数据,最好呢???
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TKk5WJgJ-1638793130147)(img/image-20210422172821282.png)]
12-[了解]-RDD Checkpoint
RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。
在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;
案例演示代码如下:
package cn.itcast.spark.ckpt import org.apache.spark.{SparkConf, SparkContext} /** * RDD数据Checkpoint设置,案例演示 */ object _06SparkCkptTest { def main(args: Array[String]): Unit = { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext = { // 1.a 创建SparkConf对象,设置应用的配置信息 val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 1.b 传递SparkConf对象,构建Context实例 new SparkContext(sparkConf) } // TODO: 设置检查点目录,将RDD数据保存到那个目录 sc.setCheckpointDir("datas/ckpt/") // 读取文件数据 val datasRDD = sc.textFile("datas/wordcount.data") // TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发 datasRDD.checkpoint() datasRDD.count() // TODO: 再次执行count函数, 此时从checkpoint读取数据 println(datasRDD.count()) // 应用程序运行结束,关闭资源 Thread.sleep(1000000000) sc.stop() } }
面试题:持久化和Checkpoint的区别: