5万字长文!搞定Spark方方面面(二)

简介: 5万字长文!搞定Spark方方面面

4、 Spark 参数详解

4.1 spark-shell

spark-shell 是 Spark 自带的交互式 Shell 程序,方便用户进行交互式编程,用户可以在该命令行下可以用 scala 编写 spark 程序,适合学习测试时使用!

示例:
spark-shell可以携带参数
spark-shell --master local[N] 数字N表示在本地模拟N个线程来运行当前任务
spark-shell --master local[*] *表示使用当前机器上所有可用的资源
默认不携带参数就是--master local[*]
spark-shell --master spark://node01:7077,node02:7077 表示运行在集群上
4.2 spark-submit

spark-submit 命令用来提交 jar 包给 spark 集群/YARN spark-shell 交互式编程确实很方便我们进行学习测试,但是在实际中我们一般是使用 IDEA 开发 Spark 应用程序打成 jar 包交给 Spark 集群/YARN 去执行。spark-submit 命令是我们开发时常用的!!!

示例:计算π
cd /export/servers/spark
/export/servers/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077  \
--executor-memory 1g \
--total-executor-cores 2 \
/export/servers/spark/examples/jars/spark-examples_2.11-2.2.0.jar \
10
4.3 参数总结

Master 参数形式

640.png

其他参数示例

--master spark://node01:7077    指定Master的地址
--name "appName"                 指定程序运行的名称
--class                           程序的main方法所在的类
--jars  xx.jar                    程序额外使用的jar包
--driver-memory 512m             Driver运行所需要的内存, 默认1g
--executor-memory 2g             指定每个executor可用内存为 2g, 默认1g
--executor-cores 1               指定每一个executor可用的核数
--total-executor-cores 2         指定整个集群运行任务使用的cup 核数为2个
 --queue default            指定任务的对列
--deploy-mode               指定运行模式(client/cluster)

注意:

如果worker节点的内存不足,那么在启动spark-submit的时候,就不能为executor分配超出worker可用的内存容量。
如果–executor-cores超过了每个worker可用的cores,任务处于等待状态。
如果–total-executor-cores即使超过可用的cores,默认使用所有的。以后当集群其他的资源释放之后,就会被该程序所使用。
如果内存或单个executor的cores不足,启动spark-submit就会报错,任务处于等待状态,不能正常执行。

三、SparkCore 入门详解

1、RDD 详解

1.1 什么是 RDD???

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集 ,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合 。

单词拆解
Resilient :它是弹性的,RDD中的数据可以保存在内存中或者磁盘里面
Distributed :它里面的元素是分布式存储的,可以用于分布式计算
Dataset: 它是一个集合,可以存放很多元素
1.2 为什么要有 RDD?
在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,之前的MapReduce框架采用非循环式的数据流模型,把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象。
AMP实验室发表的一篇关于RDD的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》就是为了解决这些问题的
RDD提供了一个抽象的数据模型,让我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作(函数),不同RDD之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy…)
1.3 RDD 的主要属性
1)A list of partitions :
一组分片(Partition)/一个分区(Partition)列表,即数据集的基本组成单位。
对于RDD来说,每个分片都会被一个计算任务处理,分片数决定并行度。
用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。
2)A function for computing each split :
一个函数会被作用在每一个分区。
Spark中RDD的计算是以分区为单位的,compute函数会被作用到每个分区上
3)A list of dependencies on other RDDs:
一个RDD会依赖于其他多个RDD。
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。(Spark的容错机制)
4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):
Spark中的分区函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。
对于KV类型的RDD会有一个Partitioner函数,即RDD的分区函数(可选项)
只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。
5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):
可选项,一个列表,存储每个Partition的位置(preferred location)。
对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照"移动数据不如移动计算"的理念,Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算。

总结

RDD 是一个数据集,不仅表示了数据集,还表示了这个数据集从哪来,如何计算。

主要属性包括
1.多分区
2.计算函数
3.依赖关系
4.分区函数(默认是hash)
5.最佳位置

2、RDD-API

2.1 创建 RDD
1)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”)
2)通过已有的RDD经过算子转换生成新的RDD
val rdd2=rdd1.flatMap(_.split(" "))
3)由一个已经存在的Scala集合创建
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
makeRDD方法底层调用了parallelize方法

640.png

2.2 RDD 的方法/算子分类

2.2.1 分类

RDD 的算子分为两类:

1)Transformation转换操作:返回一个新的RDD
2)Action动作操作:返回值不是RDD(无返回值或返回其他的)

640.png

注意:

RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数)
RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给Driver的 Action动作时,这些转换才会真正运行。
之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行。

2.2.2 Transformation 转换算子

转换 含义
map(func) 返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成
filter(func) 返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成
flatMap(func) 类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据 fraction 指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed 用于指定随机数生成器种子
union(otherDataset) 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
intersection(otherDataset) 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
distinct([numTasks])) 对源 RDD 进行去重后返回一个新的 RDD
groupByKey([numTasks]) 在一个(K,V)的 RDD 上调用,返回一个(K, Iterator[V])的 RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 分组操作。调用 groupByKey,常用。类似于 aggregate,操作的数据类型。
sortByKey([ascending], [numTasks]) 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDD
sortBy(func,[ascending], [numTasks]) 与 sortByKey 类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的 RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars]) 对 rdd 进行管道操作
coalesce(numPartitions) 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作
repartition(numPartitions) 重新给 RDD 分区

2.2.3 Action 动作算子

动作 含义
reduce(func) 通过 func 函数聚集 RDD 中的所有元素,这个功能必须是可交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 在驱动程序中,以数组的形式返回数据集的所有元素
first() 返回 RDD 的第一个元素(类似于 take(1))
take(n) 返回一个由数据集的前 n 个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定随机数生成器种子
takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素
saveAsTextFile(path) 将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。
saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下
countByKey() 针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数 func 进行更新。
foreachPartition(func) 在数据集的每一个分区上,运行函数 func

统计操作

算子 含义
count 个数
mean 均值
sum 求和
max 最大值
min 最小值
variance 方差
sampleVariance 从采样中计算方差
stdev 标准差:衡量数据的离散程度
sampleStdev 采样的标准差
stats 查看统计结果
2.3 基础练习[快速演示]

2.3.1 准备工作

集群模式启动

启动Spark集群
/export/servers/spark/sbin/start-all.sh
启动spark-shell
/export/servers/spark/bin/spark-shell \
--master spark://node01:7077 \
--executor-memory 1g \
--total-executor-cores 2

或本地模式启动

/export/servers/spark/bin/spark-shell

2.3.2 WordCount

val res = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

//上面的代码不会立即执行,因为都是 Transformation 转换操作 //下面的代码才会真正的提交并执行,因为是 Action 动作/行动操作

res.collect

2.3.3 创建 RDD

val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 = sc.makeRDD(List(5,6,4,7,3,8,2,9,1,10))

2.3.4 查看该 RDD 的分区数量

sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).partitions.length
//没有指定分区数,默认值是2
sc.parallelize(List(5,6,4,7,3,8,2,9,1,10),3).partitions.length
//指定了分区数为3
sc.textFile("hdfs://node01:8020/wordcount/input/words.txt").partitions.length
//2

RDD 分区的数据取决于哪些因素?

RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利用CPU的计算资源,但是在实际中为了更加充分的压榨CPU的计算资源,会把并行度设置为cpu核数的2~3倍。RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数 有关系

分区原则

1)启动的时候指定的CPU核数确定了一个参数值:
spark.default.parallelism=指定的CPU核数(集群模式最小2)
2)对于Scala集合调用parallelize(集合,分区数)方法,
如果没有指定分区数,就使用spark.default.parallelism,
如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism)
3)对于textFile(文件,分区数) defaultMinPartitions
如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2)
如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数

rdd 的分区数

对于本地文件:
rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
对于HDFS文件:
rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)
所以如果分配的核数为多个,且从文件中读取数据创建RDD,即使hdfs文件只有1个切片,最后的Spark的RDD的partition数也有可能是2

2.3.5 不同转换算子的意义以及应用

1)map
对RDD中的每一个元素进行操作并返回操作的结果
//通过并行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//对rdd1里的每一个元素
rdd1.map(_ * 2).collect
//collect方法表示收集,是action操作
2)filter
注意:函数中返回True的被留下,返回False的被过滤掉
val rdd2 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
val rdd3 = rdd2.filter(_ >= 10)
rdd3.collect
//10
3)flatmap
对RDD中的每一个元素进行先map再压扁,最后返回操作的结果
val rdd1 = sc.parallelize(Array(“a b c”, “d e f”, “h i j”))
//将rdd1里面的每一个元素先切分再压平
val rdd2 = rdd1.flatMap(_.split(’ '))
rdd2.collect
//Array[String] = Array(a, b, c, d, e, f, h, i, j)
4)sortBy
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
val rdd2 = rdd1.sortBy(x=>x,true)
// x=>x 表示按照元素本身进行排序,True表示升序
rdd2.collect
//1,2,3,…
val rdd2 = rdd1.sortBy(x=>x+"",true)
//x=>x+""表示按照x的字符串形式排序变成了字符串,结果为字典顺序
rdd2.collect
//1,10,2,3…
5)交集、并集、差集、笛卡尔积
注意类型要一致
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//union不会去重
val rdd3 = rdd1.union(rdd2)
rdd3.collect
//去重
rdd3.distinct.collect
//求交集
val rdd4 = rdd1.intersection(rdd2)
rdd4.collect
//求差集
val rdd5 = rdd1.subtract(rdd2)
rdd5.collect
//笛卡尔积
val rdd1 = sc.parallelize(List(“jack”, “tom”))//学生
val rdd2 = sc.parallelize(List(“java”, “python”, “scala”))//课程
val rdd3 = rdd1.cartesian(rdd2)//表示所有学生的所有选课情况
rdd3.collect
//Array[(String, String)] = Array((jack,java), (jack,python), (jack,scala), (tom,java), (tom,python), (tom,scala))
6)join
join(内连接)聚合具有相同key组成的value元组
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 2), (“kitty”, 3)))
val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2)))
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9)))

图解 1

640.png

val rdd4 = rdd1.leftOuterJoin(rdd2) //左外连接,左边的全留下,右边的满足条件的才留下
rdd4.collect
//Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(2))), (tom,(1,Some(8))), (jerry,(2,Some(9))), (kitty,(3,None)))

图解 2

640.png

val rdd5 = rdd1.rightOuterJoin(rdd2)
rdd5.collect
//Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),2)), (tom,(Some(1),8)), (jerry,(Some(2),9)), (shuke,(None,7)))
val rdd6 = rdd1.union(rdd2)
rdd6.collect
//Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))
7)groupbykey
groupByKey()的功能是,对具有相同键的值进行分组。
比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),
采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。
//按key进行分组
val rdd6 = sc.parallelize(Array((“tom”,1), (“jerry”,2), (“kitty”,3), (“jerry”,9), (“tom”,8), (“shuke”,7), (“tom”,2)))
val rdd7=rdd6.groupByKey
rdd7.collect
//Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 8, 2)), (jerry,CompactBuffer(2, 9)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)))
8)cogroup[了解]
cogroup是先RDD内部分组,在RDD之间分组
val rdd1 = sc.parallelize(List((“tom”, 1), (“tom”, 2), (“jerry”, 3), (“kitty”, 2)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect
// Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
9)groupBy
根据指定的函数中的规则/key进行分组
val intRdd = sc.parallelize(List(1,2,3,4,5,6))
val result = intRdd.groupBy(x=>{if(x%2 == 0)“even” else “odd”}).collect
//Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6)), (odd,CompactBuffer(1, 3, 5)))
10)reduce
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val result = rdd1.reduce(_ + )
// 第一 上次一个运算的结果,第二个_ 这一次进来的元素
★面试题
reduceByKey是Transformation还是Action? --Transformation
reduce是Transformation还是Action? --Action
11)reducebykey
注意reducebykey是转换算子
reduceByKey(func)的功能是,使用func函数合并具有相同键的值。
比如,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)
对具有相同key的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)。
可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value,
比如,对于两个具有相同key的键值对(“spark”,1)、(“spark”,2),a就是1,b就是2。
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5)))
val rdd3 = rdd1.union(rdd2) //并集
rdd3.collect
//Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (shuke,1), (jerry,2), (tom,3), (shuke,2), (kitty,5))
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//Array[(String, Int)] = Array((tom,4), (jerry,5), (shuke,3), (kitty,7))
12)repartition
改变分区数
val rdd1 = sc.parallelize(1 to 10,3) //指定3个分区
//利用repartition改变rdd1分区数
//减少分区
rdd1.repartition(2).partitions.length //新生成的rdd分区数为2
rdd1.partitions.length //3 //注意:原来的rdd分区数不变
//增加分区
rdd1.repartition(4).partitions.length
//减少分区
rdd1.repartition(3).partitions.length
//利用coalesce改变rdd1分区数
//减少分区
rdd1.coalesce(2).partitions.size
rdd1.coalesce(4).partitions.size
★注意:
repartition可以增加和减少rdd中的分区数,
coalesce默认减少rdd分区数,增加rdd分区数不会生效。
不管增加还是减少分区数原rdd分区数不变,变的是新生成的rdd的分区数
★应用场景:
在把处理结果保存到hdfs上之前可以减少分区数(合并小文件)
sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”)
.flatMap(.split(" ")).map((,1)).reduceByKey(+)
.repartition(1)
//在保存到HDFS之前进行重分区为1,那么保存在HDFS上的结果文件只有1个
.saveAsTextFile(“hdfs://node01:8020/wordcount/output5”)
13)collect
val rdd1 = sc.parallelize(List(6,1,2,3,4,5), 2)
rdd1.collect
14)count
count统计集合中元素的个数
rdd1.count //6
求RDD中最外层集合里面的元素的个数
val rdd3 = sc.parallelize(List(List(“a b c”, “a b b”),List(“e f g”, “a f g”), List(“h i j”, “a a b”)))
rdd3.count //3
15)distinct
val rdd = sc.parallelize(Array(1,2,3,4,5,5,6,7,8,1,2,3,4), 3)
rdd.distinct.collect
16)top
//取出最大的前N个
val rdd1 = sc.parallelize(List(3,6,1,2,4,5))
rdd1.top(2)
17)take
//按照原来的顺序取前N个
rdd1.take(2) //3 6
//需求:取出最小的2个
rdd1.sortBy(x=>x,true).take(2)
18)first
//按照原来的顺序取前第一个
rdd1.first
19)keys、values
val rdd1 = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.collect
//Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))
rdd2.keys.collect
//Array[Int] = Array(3, 5, 4, 3, 7, 5)
rdd2.values.collect
//Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
20)mapValues
mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后
val rdd1 = sc.parallelize(List((1,10),(2,20),(3,30)))
val rdd2 = rdd1.mapValues(_*2).collect //_表示每一个value ,key不变,将函数作用于value
//(1,20),(2,40),(3,60)
21)collectAsMap
转换成Map
val rdd = sc.parallelize(List((“a”, 1), (“b”, 2)))
rdd.collectAsMap
//scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)

面试题:foreach 和 foreachPartition

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.foreach(x => println(x*100)) //x是每一个元素
rdd1.foreachPartition(x => println(x.reduce(_ + _))) //x是每个分区
注意:foreach和foreachPartition都是Action操作,但是以上代码在spark-shell中执行看不到输出结果,
原因是传给foreach和foreachPartition的计算函数是在各个分区执行的,即在集群中的各个Worker上执行的
应用场景:
比如在函数中要将RDD中的元素保存到数据库
foreach:会将函数作用到RDD中的每一条数据,那么有多少条数据,操作数据库连接的开启关闭就得执行多少次
foreachPartition:将函数作用到每一个分区,那么每一个分区执行一次数据库连接的开启关闭,有几个分区就会执行数据库连接开启关闭
import org.apache.spark.{SparkConf, SparkContext}
object Test {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    val sc = new SparkContext(config)
    //设置日志输出级别
    sc.setLogLevel("WARN")
    val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
    //Applies a function f to all elements of this RDD.
    //将函数f应用于此RDD的所有元素
    rdd1.foreach(x => println(x*100))
 //把函数传给各个分区,在分区内循环遍历该分区中的元素
 //x每个元素,即一个一个的数字
    println("==========================")
    //Applies a function f to each partition of this RDD.
    //将函数f应用于此RDD的每个分区
    rdd1.foreachPartition(x => println(x.reduce(_ + _)))
 //把各个分区传递给函数执行
 //x是每个分区
  }
}

面试题:map 和 mapPartitions

将每一个分区传递给函数
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.mapPartitions(x=>x.map(y=>y*2)).collect
//x是每一个分区,y是分区中的元素
相关文章
|
8月前
|
运维 前端开发 安全
万字长文搞懂产品模式和项目模式
万字长文搞懂产品模式和项目模式
97 0
|
11月前
|
信息无障碍 C++
万字长文,深度分析 c++的前景
万字长文,深度分析 c++的前景
105 0
|
存储 Java 编译器
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!3
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
存储 JSON 安全
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!1
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
存储 缓存 算法
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!2
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
存储 前端开发 安全
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!4
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
SQL JSON 分布式计算
5万字长文!搞定Spark方方面面(四)
5万字长文!搞定Spark方方面面
210 0
5万字长文!搞定Spark方方面面(四)
|
消息中间件 存储 分布式计算
5万字长文!搞定Spark方方面面(七)
5万字长文!搞定Spark方方面面
203 0
5万字长文!搞定Spark方方面面(七)
|
SQL 分布式计算 监控
5万字长文!搞定Spark方方面面(六)
5万字长文!搞定Spark方方面面
186 0
5万字长文!搞定Spark方方面面(六)
|
存储 缓存 分布式计算
5万字长文!搞定Spark方方面面(三)
5万字长文!搞定Spark方方面面
131 0
5万字长文!搞定Spark方方面面(三)