4、 Spark 参数详解
4.1 spark-shell
spark-shell 是 Spark 自带的交互式 Shell 程序,方便用户进行交互式编程,用户可以在该命令行下可以用 scala 编写 spark 程序,适合学习测试时使用!
示例: spark-shell可以携带参数 spark-shell --master local[N] 数字N表示在本地模拟N个线程来运行当前任务 spark-shell --master local[*] *表示使用当前机器上所有可用的资源 默认不携带参数就是--master local[*] spark-shell --master spark://node01:7077,node02:7077 表示运行在集群上
4.2 spark-submit
spark-submit 命令用来提交 jar 包给 spark 集群/YARN spark-shell 交互式编程确实很方便我们进行学习测试,但是在实际中我们一般是使用 IDEA 开发 Spark 应用程序打成 jar 包交给 Spark 集群/YARN 去执行。spark-submit 命令是我们开发时常用的!!!
示例:计算π cd /export/servers/spark /export/servers/spark/bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://node01:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ /export/servers/spark/examples/jars/spark-examples_2.11-2.2.0.jar \ 10
4.3 参数总结
Master 参数形式
其他参数示例
--master spark://node01:7077 指定Master的地址 --name "appName" 指定程序运行的名称 --class 程序的main方法所在的类 --jars xx.jar 程序额外使用的jar包 --driver-memory 512m Driver运行所需要的内存, 默认1g --executor-memory 2g 指定每个executor可用内存为 2g, 默认1g --executor-cores 1 指定每一个executor可用的核数 --total-executor-cores 2 指定整个集群运行任务使用的cup 核数为2个 --queue default 指定任务的对列 --deploy-mode 指定运行模式(client/cluster)
注意:
如果worker节点的内存不足,那么在启动spark-submit的时候,就不能为executor分配超出worker可用的内存容量。 如果–executor-cores超过了每个worker可用的cores,任务处于等待状态。 如果–total-executor-cores即使超过可用的cores,默认使用所有的。以后当集群其他的资源释放之后,就会被该程序所使用。 如果内存或单个executor的cores不足,启动spark-submit就会报错,任务处于等待状态,不能正常执行。
三、SparkCore 入门详解
1、RDD 详解
1.1 什么是 RDD???
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集 ,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合 。
单词拆解 Resilient :它是弹性的,RDD中的数据可以保存在内存中或者磁盘里面 Distributed :它里面的元素是分布式存储的,可以用于分布式计算 Dataset: 它是一个集合,可以存放很多元素
1.2 为什么要有 RDD?
在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,之前的MapReduce框架采用非循环式的数据流模型,把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象。 AMP实验室发表的一篇关于RDD的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》就是为了解决这些问题的 RDD提供了一个抽象的数据模型,让我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作(函数),不同RDD之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy…)
1.3 RDD 的主要属性
1)A list of partitions : 一组分片(Partition)/一个分区(Partition)列表,即数据集的基本组成单位。 对于RDD来说,每个分片都会被一个计算任务处理,分片数决定并行度。 用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。 2)A function for computing each split : 一个函数会被作用在每一个分区。 Spark中RDD的计算是以分区为单位的,compute函数会被作用到每个分区上 3)A list of dependencies on other RDDs: 一个RDD会依赖于其他多个RDD。 RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。(Spark的容错机制) 4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned): Spark中的分区函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。 对于KV类型的RDD会有一个Partitioner函数,即RDD的分区函数(可选项) 只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。 5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file): 可选项,一个列表,存储每个Partition的位置(preferred location)。 对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照"移动数据不如移动计算"的理念,Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算。
总结
RDD 是一个数据集,不仅表示了数据集,还表示了这个数据集从哪来,如何计算。
主要属性包括 1.多分区 2.计算函数 3.依赖关系 4.分区函数(默认是hash) 5.最佳位置
2、RDD-API
2.1 创建 RDD
1)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等 val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”) 2)通过已有的RDD经过算子转换生成新的RDD val rdd2=rdd1.flatMap(_.split(" ")) 3)由一个已经存在的Scala集合创建 val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8)) 或者 val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8)) makeRDD方法底层调用了parallelize方法
2.2 RDD 的方法/算子分类
2.2.1 分类
RDD 的算子分为两类:
1)Transformation转换操作:返回一个新的RDD 2)Action动作操作:返回值不是RDD(无返回值或返回其他的)
注意:
RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数) RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给Driver的 Action动作时,这些转换才会真正运行。
之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行。
2.2.2 Transformation 转换算子
转换 | 含义 |
map(func) | 返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成 |
filter(func) | 返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成 |
flatMap(func) | 类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据 fraction 指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed 用于指定随机数生成器种子 |
union(otherDataset) | 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD |
intersection(otherDataset) | 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD |
distinct([numTasks])) | 对源 RDD 进行去重后返回一个新的 RDD |
groupByKey([numTasks]) | 在一个(K,V)的 RDD 上调用,返回一个(K, Iterator[V])的 RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 分组操作。调用 groupByKey,常用。类似于 aggregate,操作的数据类型。 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDD |
sortBy(func,[ascending], [numTasks]) | 与 sortByKey 类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的 RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | 对 rdd 进行管道操作 |
coalesce(numPartitions) | 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作 |
repartition(numPartitions) | 重新给 RDD 分区 |
2.2.3 Action 动作算子
动作 | 含义 |
reduce(func) | 通过 func 函数聚集 RDD 中的所有元素,这个功能必须是可交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
first() | 返回 RDD 的第一个元素(类似于 take(1)) |
take(n) | 返回一个由数据集的前 n 个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。 |
saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数 func 进行更新。 |
foreachPartition(func) | 在数据集的每一个分区上,运行函数 func |
统计操作
算子 | 含义 |
count | 个数 |
mean | 均值 |
sum | 求和 |
max | 最大值 |
min | 最小值 |
variance | 方差 |
sampleVariance | 从采样中计算方差 |
stdev | 标准差:衡量数据的离散程度 |
sampleStdev | 采样的标准差 |
stats | 查看统计结果 |
2.3 基础练习[快速演示]
2.3.1 准备工作
集群模式启动
启动Spark集群 /export/servers/spark/sbin/start-all.sh
启动spark-shell /export/servers/spark/bin/spark-shell \ --master spark://node01:7077 \ --executor-memory 1g \ --total-executor-cores 2
或本地模式启动
/export/servers/spark/bin/spark-shell
2.3.2 WordCount
val res = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt") .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//上面的代码不会立即执行,因为都是 Transformation 转换操作 //下面的代码才会真正的提交并执行,因为是 Action 动作/行动操作
res.collect
2.3.3 创建 RDD
val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)) val rdd2 = sc.makeRDD(List(5,6,4,7,3,8,2,9,1,10))
2.3.4 查看该 RDD 的分区数量
sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).partitions.length //没有指定分区数,默认值是2 sc.parallelize(List(5,6,4,7,3,8,2,9,1,10),3).partitions.length //指定了分区数为3 sc.textFile("hdfs://node01:8020/wordcount/input/words.txt").partitions.length //2
RDD 分区的数据取决于哪些因素?
RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利用CPU的计算资源,但是在实际中为了更加充分的压榨CPU的计算资源,会把并行度设置为cpu核数的2~3倍。RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数 有关系
分区原则
1)启动的时候指定的CPU核数确定了一个参数值: spark.default.parallelism=指定的CPU核数(集群模式最小2) 2)对于Scala集合调用parallelize(集合,分区数)方法, 如果没有指定分区数,就使用spark.default.parallelism, 如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism) 3)对于textFile(文件,分区数) defaultMinPartitions 如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2) 如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数
rdd 的分区数
对于本地文件: rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions) 对于HDFS文件: rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions) 所以如果分配的核数为多个,且从文件中读取数据创建RDD,即使hdfs文件只有1个切片,最后的Spark的RDD的partition数也有可能是2
2.3.5 不同转换算子的意义以及应用
1)map 对RDD中的每一个元素进行操作并返回操作的结果 //通过并行化生成rdd val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)) //对rdd1里的每一个元素 rdd1.map(_ * 2).collect //collect方法表示收集,是action操作 2)filter 注意:函数中返回True的被留下,返回False的被过滤掉 val rdd2 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)) val rdd3 = rdd2.filter(_ >= 10) rdd3.collect //10 3)flatmap 对RDD中的每一个元素进行先map再压扁,最后返回操作的结果 val rdd1 = sc.parallelize(Array(“a b c”, “d e f”, “h i j”)) //将rdd1里面的每一个元素先切分再压平 val rdd2 = rdd1.flatMap(_.split(’ ')) rdd2.collect //Array[String] = Array(a, b, c, d, e, f, h, i, j) 4)sortBy val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)) val rdd2 = rdd1.sortBy(x=>x,true) // x=>x 表示按照元素本身进行排序,True表示升序 rdd2.collect //1,2,3,… val rdd2 = rdd1.sortBy(x=>x+"",true) //x=>x+""表示按照x的字符串形式排序变成了字符串,结果为字典顺序 rdd2.collect //1,10,2,3… 5)交集、并集、差集、笛卡尔积 注意类型要一致 val rdd1 = sc.parallelize(List(5, 6, 4, 3)) val rdd2 = sc.parallelize(List(1, 2, 3, 4)) //union不会去重 val rdd3 = rdd1.union(rdd2) rdd3.collect //去重 rdd3.distinct.collect //求交集 val rdd4 = rdd1.intersection(rdd2) rdd4.collect //求差集 val rdd5 = rdd1.subtract(rdd2) rdd5.collect //笛卡尔积 val rdd1 = sc.parallelize(List(“jack”, “tom”))//学生 val rdd2 = sc.parallelize(List(“java”, “python”, “scala”))//课程 val rdd3 = rdd1.cartesian(rdd2)//表示所有学生的所有选课情况 rdd3.collect //Array[(String, String)] = Array((jack,java), (jack,python), (jack,scala), (tom,java), (tom,python), (tom,scala)) 6)join join(内连接)聚合具有相同key组成的value元组 val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 2), (“kitty”, 3))) val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2))) val rdd3 = rdd1.join(rdd2) rdd3.collect //Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9)))
图解 1
val rdd4 = rdd1.leftOuterJoin(rdd2) //左外连接,左边的全留下,右边的满足条件的才留下 rdd4.collect //Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(2))), (tom,(1,Some(8))), (jerry,(2,Some(9))), (kitty,(3,None)))
图解 2
val rdd5 = rdd1.rightOuterJoin(rdd2) rdd5.collect //Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),2)), (tom,(Some(1),8)), (jerry,(Some(2),9)), (shuke,(None,7))) val rdd6 = rdd1.union(rdd2) rdd6.collect //Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))
7)groupbykey groupByKey()的功能是,对具有相同键的值进行分组。 比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5), 采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。 //按key进行分组 val rdd6 = sc.parallelize(Array((“tom”,1), (“jerry”,2), (“kitty”,3), (“jerry”,9), (“tom”,8), (“shuke”,7), (“tom”,2))) val rdd7=rdd6.groupByKey rdd7.collect //Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 8, 2)), (jerry,CompactBuffer(2, 9)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3))) 8)cogroup[了解] cogroup是先RDD内部分组,在RDD之间分组 val rdd1 = sc.parallelize(List((“tom”, 1), (“tom”, 2), (“jerry”, 3), (“kitty”, 2))) val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2))) val rdd3 = rdd1.cogroup(rdd2) rdd3.collect // Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer()))) 9)groupBy 根据指定的函数中的规则/key进行分组 val intRdd = sc.parallelize(List(1,2,3,4,5,6)) val result = intRdd.groupBy(x=>{if(x%2 == 0)“even” else “odd”}).collect //Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6)), (odd,CompactBuffer(1, 3, 5))) 10)reduce val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5)) //reduce聚合 val result = rdd1.reduce(_ + ) // 第一 上次一个运算的结果,第二个_ 这一次进来的元素 ★面试题 reduceByKey是Transformation还是Action? --Transformation reduce是Transformation还是Action? --Action 11)reducebykey 注意reducebykey是转换算子 reduceByKey(func)的功能是,使用func函数合并具有相同键的值。 比如,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5) 对具有相同key的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)。 可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value, 比如,对于两个具有相同key的键值对(“spark”,1)、(“spark”,2),a就是1,b就是2。 val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))) val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))) val rdd3 = rdd1.union(rdd2) //并集 rdd3.collect //Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (shuke,1), (jerry,2), (tom,3), (shuke,2), (kitty,5)) //按key进行聚合 val rdd4 = rdd3.reduceByKey(_ + _) rdd4.collect //Array[(String, Int)] = Array((tom,4), (jerry,5), (shuke,3), (kitty,7)) 12)repartition 改变分区数 val rdd1 = sc.parallelize(1 to 10,3) //指定3个分区 //利用repartition改变rdd1分区数 //减少分区 rdd1.repartition(2).partitions.length //新生成的rdd分区数为2 rdd1.partitions.length //3 //注意:原来的rdd分区数不变 //增加分区 rdd1.repartition(4).partitions.length //减少分区 rdd1.repartition(3).partitions.length //利用coalesce改变rdd1分区数 //减少分区 rdd1.coalesce(2).partitions.size rdd1.coalesce(4).partitions.size ★注意: repartition可以增加和减少rdd中的分区数, coalesce默认减少rdd分区数,增加rdd分区数不会生效。 不管增加还是减少分区数原rdd分区数不变,变的是新生成的rdd的分区数 ★应用场景: 在把处理结果保存到hdfs上之前可以减少分区数(合并小文件) sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”) .flatMap(.split(" ")).map((,1)).reduceByKey(+) .repartition(1) //在保存到HDFS之前进行重分区为1,那么保存在HDFS上的结果文件只有1个 .saveAsTextFile(“hdfs://node01:8020/wordcount/output5”) 13)collect val rdd1 = sc.parallelize(List(6,1,2,3,4,5), 2) rdd1.collect 14)count count统计集合中元素的个数 rdd1.count //6 求RDD中最外层集合里面的元素的个数 val rdd3 = sc.parallelize(List(List(“a b c”, “a b b”),List(“e f g”, “a f g”), List(“h i j”, “a a b”))) rdd3.count //3 15)distinct val rdd = sc.parallelize(Array(1,2,3,4,5,5,6,7,8,1,2,3,4), 3) rdd.distinct.collect 16)top //取出最大的前N个 val rdd1 = sc.parallelize(List(3,6,1,2,4,5)) rdd1.top(2) 17)take //按照原来的顺序取前N个 rdd1.take(2) //3 6 //需求:取出最小的2个 rdd1.sortBy(x=>x,true).take(2) 18)first //按照原来的顺序取前第一个 rdd1.first 19)keys、values val rdd1 = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2) val rdd2 = rdd1.map(x => (x.length, x)) rdd2.collect //Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle)) rdd2.keys.collect //Array[Int] = Array(3, 5, 4, 3, 7, 5) rdd2.values.collect //Array[String] = Array(dog, tiger, lion, cat, panther, eagle) 20)mapValues mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后 val rdd1 = sc.parallelize(List((1,10),(2,20),(3,30))) val rdd2 = rdd1.mapValues(_*2).collect //_表示每一个value ,key不变,将函数作用于value //(1,20),(2,40),(3,60) 21)collectAsMap 转换成Map val rdd = sc.parallelize(List((“a”, 1), (“b”, 2))) rdd.collectAsMap //scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)
面试题:foreach 和 foreachPartition
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) rdd1.foreach(x => println(x*100)) //x是每一个元素 rdd1.foreachPartition(x => println(x.reduce(_ + _))) //x是每个分区 注意:foreach和foreachPartition都是Action操作,但是以上代码在spark-shell中执行看不到输出结果, 原因是传给foreach和foreachPartition的计算函数是在各个分区执行的,即在集群中的各个Worker上执行的 应用场景: 比如在函数中要将RDD中的元素保存到数据库 foreach:会将函数作用到RDD中的每一条数据,那么有多少条数据,操作数据库连接的开启关闭就得执行多少次 foreachPartition:将函数作用到每一个分区,那么每一个分区执行一次数据库连接的开启关闭,有几个分区就会执行数据库连接开启关闭
import org.apache.spark.{SparkConf, SparkContext} object Test { def main(args: Array[String]): Unit = { val config = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(config) //设置日志输出级别 sc.setLogLevel("WARN") val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) //Applies a function f to all elements of this RDD. //将函数f应用于此RDD的所有元素 rdd1.foreach(x => println(x*100)) //把函数传给各个分区,在分区内循环遍历该分区中的元素 //x每个元素,即一个一个的数字 println("==========================") //Applies a function f to each partition of this RDD. //将函数f应用于此RDD的每个分区 rdd1.foreachPartition(x => println(x.reduce(_ + _))) //把各个分区传递给函数执行 //x是每个分区 } }
面试题:map 和 mapPartitions
将每一个分区传递给函数 val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) rdd1.mapPartitions(x=>x.map(y=>y*2)).collect //x是每一个分区,y是分区中的元素