Spark Day03:Spark 基础环境
02-[了解]-今日课程内容提纲
主要讲解2个方面内容:Spark on YARN集群和RDD 是什么
1、Spark on YARN 将Spark应用程序,提交运行到YARN集群上,企业中绝大多数运行模式,必须掌握 - 如何配置 - 提交应用运行 - Spark应用运行在集群上2种Deploy-Mode - yarn-client模式 - yarn-cluster模式 2、RDD是什么 RDD,弹性分布式数据集,抽象概念,相当于集合,比如列表List,分布式集合,存储海量数据 引入RDD数据结构 RDD 官方定义,从文档和源码 RDD 5大特性(面试必问) 词频统计WordCount查看RDD有哪些 RDD创建方式,如何将数据封装到RDD集合中,2种方式 创建RDD时,如何处理小文件(面试)
03-[掌握]-Spark on YARN之属性配置和服务启动
将Spark Application提交运行到YARN集群上,至关重要,企业中大多数都是运行在YANR上
当Spark Application运行到YARN上时,在提交应用时指定master为yarn
即可,同时需要告知YARN集群配置信息
(比如ResourceManager地址信息),此外需要监控Spark Application,配置历史服务器相关属性
。
在实际项目中,只需要配置:6.1.1 至 6.1.4即可,由于在虚拟机上测试,所以配置6.1.5解除资源检查限制。
04-[掌握]-Spark on YARN之提交应用
先将圆周率PI程序提交运行在YARN上,命令如下:
SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master yarn \ --class org.apache.spark.examples.SparkPi \ ${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
运行完成在YARN 监控页面截图如下
设置资源信息,提交运行WordCount程序至YARN上,命令如下:
SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master yarn \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --num-executors 2 \ --queue default \ --class cn.itcast.spark.submit.SparkSubmit \ hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar \ /datas/wordcount.data /datas/swcy-output
当WordCount应用运行YARN上完成以后,从8080 WEB UI页面点击应用历史服务连接,查看应用运行状态信息。
05-[掌握]-DeployMode两种模式区别
Spark Application提交运行时部署模式Deploy Mode,表示的是Driver Program运行的地方,要么是提交应用的
Client:client
,要么是集群中从节点(Standalone:Worker,YARN:NodeManager):cluster
。
- client 模式
默认DeployMode为
Client
,表示应用Driver Program运行在提交应用Client主机
上(启动JVM Process进程),示意图如下:
假设运行圆周率PI程序,采用client模式,命令如下:
SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \ --deploy-mode client \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --total-executor-cores 2 \ --class org.apache.spark.examples.SparkPi \ ${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
- cluster 模式
如果采用cluster模式运行应用,应用Driver Program运行在集群从节点Worker某台机器上。
假设运行圆周率PI程序,采用cluster模式,命令如下:
SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \ --deploy-mode cluster \ --supervise \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --total-executor-cores 2 \ --class org.apache.spark.examples.SparkPi \ ${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
06-[掌握]-Spark on YARN之YARN Client 模式
当应用运行YARN上时,有2部分组成:
- AppMaster,应用管理者,申请资源和调度Job执行
- Process,运行在NodeManager上进程,运行Task任务
Spark 应用运行集群上时,也有2部分组成:
- Driver Program,应用管理者,申请资源运行Executors和调度Job执行
- Executors,运行JVM进程,其中执行Task任务和缓存数据
- YARN
Client
模式
当Spark 运行在YARN集群时,采用client DeployMode时,有如下三个进程:
AppMaster
,申请资源,运行ExecutorsDriver Program
,调度Job执行和监控Executors
,运行JVM进程,其中执行Task任务和缓存数据
- YARN
Cluster
模式
当Spark 运行在YARN集群时,采用clusterDeployMode时,有如下2个进程:
Driver Program(AppMaster)
,既进行资源申请,又进行Job调度Executors
,运行JVM进程,其中执行Task任务和缓存数据
所以Spark Application运行在YARN上时,采用不同DeployMode时架构不一样,企业实际生产环境还是以cluster模式为主,client模式用于开发测试,两者的区别面试中常问。
在YARN Client模式下,Driver在任务提交的本地机器上运行,示意图如下:
采用yarn-client方式运行词频统计WordCount程序
/export/server/spark/bin/spark-submit \ --master yarn \ --deploy-mode client \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --num-executors 2 \ --queue default \ --class cn.itcast.spark.submit.SparkSubmit \ hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar \ /datas/wordcount.data /datas/swcy-client
07-[掌握]-Spark on YARN之YARN Cluster模式
在YARN Cluster模式下,Driver运行在NodeManager Contanier中,此时Driver与AppMaster合为一体,示意图如下:
以运行词频统计WordCount程序为例,提交命令如下:
/export/server/spark/bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --num-executors 2 \ --queue default \ --class cn.itcast.spark.submit.SparkSubmit \ hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar \ /datas/wordcount.data /datas/swcy-cluster
08-[理解]-Spark 应用MAIN函数代码执行
Spark Application应用程序运行时,无论client还是cluster部署模式DeployMode,当DriverProgram和Executors启动完成以后,就要开始执行应用程序中MAIN函数的代码,以词频统计WordCount程序为例剖析讲解。
上述图片中,A、B都是在Executor中执行,原因在于对RDD数据操作的,针对C来说,如果没有返回值时,在Executor中执行,有返回值,比如调用count、first等函数时,在Driver中执行的。
09-[了解]-RDD 概念之引入说明
对于大量的数据,Spark 在内部保存计算的时候,都是用一种叫做弹性分布式数据集(ResilientDistributed Datasets,RDD)的数据结构来保存的,所有的运算以及操作都建立在 RDD 数据结构的基础之上
也就是说RDD设计的核心点为:
文档:http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html
10-[掌握]-RDD 概念之官方定义
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
拆分核心要点三个方面:
可以认为RDD是分布式的列表List或数组Array,抽象的数据结构,RDD是一个抽象类AbstractClass和泛型
Generic Type
:
RDD弹性分布式数据集核心点示意图如下:
11-[掌握]-RDD 概念之5大特性剖析
RDD 数据结构内部有五个特性(摘录RDD 源码):前3个特性,必须包含的;后2个特性,可选的。
- 第一个:
a list of partitions
- 第二个:
A function for computing each split
- 第三个:
A list of dependencies on other RDDs
在RDD类中,对应一个方法:
- 第四个:
Optionally, a Partitioner for key-value RDDs
- 第五个:
Optionally, a list of preferred locations to compute each split on
- 对RDD中每个分区数据进行计算时,找到
最佳位置
列表 - 对数据计算时,考虑数据本地行,数据在哪里,尽量将Task放在哪里,快速读取数据进行处理
RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来、如何计算,主要属性包括五个方面(必须牢记,通过编码加深理解,面试常问):
12-[掌握]-RDD 概念之词频统计WordCount中RDD
以词频统计WordCount程序为例,查看整个Job中各个RDD类型及依赖关系,WordCount程序代码如下:
运行程序结束后,查看WEB UI监控页面,此Job(RDD调用foreach触发)执行DAG图:
13-[掌握]-RDD 创建的两种方式
如何将数据封装到RDD集合中,主要有两种方式:
并行化本地集合
(Driver Program中)和引用加载外部存储系统
(如HDFS、Hive、HBase、Kafka、Elasticsearch等)数据集。
- 并行化集合:
- 由一个已经存在的 Scala 集合创建,集合并行化,集合必须时Seq本身或者子类对象。
- 只能将Scala中Seq对象或者子类对象,并行化RDD
package cn.itcast.spark.source import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Spark 采用并行化的方式构建Scala集合Seq中的数据为RDD * - 将Scala集合转换为RDD * sc.parallelize(seq) * - 将RDD转换为Scala中集合 * rdd.collect() -> Array * rdd.collectAsMap() - Map,要求RDD数据类型为二元组 */ object _01SparkParallelizeTest { def main(args: Array[String]): Unit = { val sc: SparkContext = { // sparkConf对象 val sparkConf = new SparkConf() // _01SparkParallelizeTest$ ->(.stripSuffix("$")) -> _01SparkParallelizeTest .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // sc 实例对象 SparkContext.getOrCreate(sparkConf) } // TODO: 1、Scala中集合Seq序列存储数据 val linesSeq: Seq[String] = Seq( "hadoop scala hive spark scala sql sql", // "hadoop scala spark hdfs hive spark", // "spark hdfs spark hdfs scala hive spark" // ) // TODO: 2、并行化集合 val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2) // TODO: 3、词频统计 val resultRDD = inputRDD .flatMap(line => line.split("\\s+")) .map(word => (word, 1)) .reduceByKey((tmp, item) => tmp + item) // TODO: 4、输出结果 resultRDD.foreach(println) // 应用结束,关闭资源 sc.stop() } }
- 外部存储系统
实际使用最多的方法:textFile,读取HDFS或LocalFS上文本文件,指定文件路径和RDD分区数目。
实际项目中如果从HDFS读取海量数据,应用运行在YARN上,默认情况下,RDD分区数目等于HDFS上Block块数目。
14-[掌握]-创建RDD时小文件读取
在实际项目中,有时往往处理的数据文件属于小文件(每个文件数据数据量很小,比如KB,几十MB等),文件数量又很大,如果一个个文件读取为RDD的一个个分区,计算数据时很耗时性能低下,使用SparkContext中提供:
wholeTextFiles
类,专门读取小文件数据。
范例演示:读取100个小文件数据,每个文件大小小于1MB,设置RDD分区数目为2。
package cn.itcast.spark.source import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 采用SparkContext#wholeTextFiles()方法读取小文件 */ object _02SparkWholeTextFileTest { def main(args: Array[String]): Unit = { val sc: SparkContext = { // sparkConf对象 val sparkConf = new SparkConf() // _01SparkParallelizeTest$ ->(.stripSuffix("$")) -> _01SparkParallelizeTest .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // sc 实例对象 SparkContext.getOrCreate(sparkConf) } /* def wholeTextFiles( path: String, minPartitions: Int = defaultMinPartitions ): RDD[(String, String)] Key: 每个小文件名称路径 Value:每个小文件的内容 */ val inputRDD: RDD[(String, String)] = sc.wholeTextFiles("datas/ratings100", minPartitions = 2) println(s"RDD 分区数目 = ${inputRDD.getNumPartitions}") inputRDD.take(2).foreach(tuple => println(tuple)) // 应用结束,关闭资源 sc.stop() } }