开发者学堂课程【大数据Spark2020最新课程(知识精讲与实战演练)第二阶段:Rdd 算子_转换_mappartitions】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/689/detail/11967
Rdd 算子_转换_mappartitions
内容介绍:
一、map、mappartitions 和 mappartitions with index 的区别
二、Mappartition
三、Mappartition with index
四、总结
一、map、mappartitions 和 mappartitions with index 的区别
mappartitions和mappartitions with index 都是转换算子,他们和 map 算子区别很小。
区别一:map 针对于每一条数据进行处理,如果使用 rdd1.map,那么 rdd1里面的所有数据都会在经过这个算子以及这个函数,但是有一些情况下,效率比较低。
假如在这个处理的这个位置要进行一个数据库的访问,如果在这个 map 里面进行一个数据库访问的话,那么里面有上百万条数据,如果是这样的话,那么要访问1万次,建立1万次连接这样不合适。所以如果遇到这种时候,我们不需要让每一条数据都执行这个函数,而是对分区来执行这个函数,也就是说通过一个算子,这个算子去接受一整个分区的数据,那么有几个分区就会执行多少次,在内部进行数据库访问会好一些,分区相对于数据来说,会少很多然后再到算子里在进行数据库访问,相对来说会好一些,mappartitions 有俩个单词构成,一个是 map,一个是 partitions,也就是原始的粒度,它所针对的是每一条数据条目的一个粒度。mappartitionsMap 的粒度是每一个分区数。以上就是他们之间最大的区别。
编写代码来说明 mappartitions 和 mappartitions with index 的使用方法
进入到 IDEA 中,打开包,在包里面创建一个 scala class,命名为 transformation op,transformation op 当中定义第一个方法,叫做 mappartitions。定义出来之后,就可以编写具体代码,把所有算子放在 map 的 transformation 当中,所以底下所有算子都要用到 spark context。在整个根目录里面创建出所需的东西,那么需要创建俩个参数,第一个参数是 set master local。
第二个是设置一个 app name,即 transformation op。接下里创建出 sc 对象。创建完后,就需要对 mappartition 的算子进行演示,mappartition 大致是三个步骤,第一步是生成数据,第二步是数据处理,第三步是获取结果
数据的创建就不在去读取外部的数据源了,就直接创建出来这些是数据,整个代码将使用代码链式来编写。在seq当中传入六个数字,即1-6.接下来在第二个指定参数上当前分区数。接下里想看看 mappartition 如何使用,就需要调用mappartitons,在调用前先查看一下 mappartitions 的 api,进入后:
/★★
Return a new RDD by applying a function to each partition of this RDD.
preservesPartitioning indicates whether the input function preserves the partitioner, which should be false'unless this is a pair RDD and the input function doesn't modify the keys.
★/
def mapPartitions[U: ClassTag](
f: Iterator[T]=> Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]=withScope [
val cleanedF = sc.clean(f)
"erhtepartieionspoo(
prev = this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
/*
★
★[performance] Spark's internal mapPartitionsWithIndex method that skips closure cleaning.★It is a performance API to be used carefully only if we are sure that the RDD elements are★serializable and don't require closure cleaning.
eparam preservesPartitioning indicates whether the inputfunction preserves the partitioner, which should befalse unless thisis a pair RDD and the input function doesn't modify the keys.
有俩个参数,第一个是 f,第二个是 prev,参数的含义的是否保留参数的信息,一般是用不到,只需要保留 false 就可以。除非不去修改 key。这里面的 function,他要求你传入的函数是 interator 的继承类。代表一个数组,代表一个集合,因为现在这个 mappartitions 一次处理就处理一个分区的数据,如果一次处理不是只处理一条数据,所以处理前要用集合来进行包装,所以这个集合就代表整个分区的数据。
preservesPartitioning 这个参数是不需要制定的,当前默认指定的是 false,因此再次调用这个方法的时候,不需要重新指定参数的值。即默认参数。
那么既然 mappartitions 是对一整个分区进行处理,那么是接受一整个函数,这个函数就接受了一个集合,这个集合用 interreator 来表示,假如想看内容,就调用 foreach,用 printin 打印出 item,在执行次操作时,会发现系统报错,因为次函数比较特殊,这个 interreator 接受以后,它要求返回另一个集合,所以打印之后还要把 iter 返回,接下来执行程序。查看当前地方打印出来的内容是否是需要的。
package cn.itcast.spark.rdd
Cimport org.apache.spark.(SparkConf, SparkContext]
cimport org.junit.Test
class Transformation0p
【
val conf= new SparkConf().setMaster("local[6]").setAppName("transformation_op")valsc = new SparkContext(conf)
@Test
defmapPartitions():Unit=
//1.数据生成
// 2.算子使用
// 3. 获取结果
sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices=2)
,mapPartitions(iter => (
iter.foreach(item => println(item))
iter
])
.collect() I
运行上述代码,之后运行结果出来后,滑到底部如图:
可以看到结果已经出来了,得到了六个结果。
二、Mappartition
接下里想要使用 mappartitions 进行一些具体的事务,创建一个新的方法,叫 mappartitions2,刚刚只进行了打印操作,首先他内部传入了一个函数,要求传入一个集合,转换完集合里面的每一条数据,再通过 interreator 的形式返回过去,那么这一部分的数据是不是就会替换掉整个分区里的那一部分数据,接下来想要把数字都乘10,首先应该遍历iter 里的每一条数据,然后进行转换。转换完成后,返回整个 iter,也就是要对集合中每条数据进行转换。
转换完成再把整个集合返回上去,但是在操作之前有一个问题,这个是 spark 当中的类型还是 scala 当中的类型,其实是 scala 当中的类型。Iter 是 scala 当中的集合类型,所以针对于 it er 的操作也都是针对 scala 基础的操作,所以这个 interreator 里有一个方法叫做 map 的操作,那你注意这个 map 是 scala 的代码,是 scala 的算子,和 spark 没有关系,不是 rdd。
接下来这个 map 当中肯定是接收一个 item。Scala 当中的集合 map,有没有返回值?进入到 ma p中,可以看到 map 的返回值是另一个 interreator,写一个 val reasult 来接收返回值。
@Test
def mapPartitions():Unit=
// 1. 数据生成
// 2.算子使用
// 3.获取结果
sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices=2)
.mapPartitions(iter => [
iter.foreach(item => println(item))
iter
])
.collect()
@Test
def mapPartitions2(): Unit=
//1.数据生成
//2.算子使用
//3.获取结果
sc.parallelize(Seq(1, 2, 3, 4, 5, 6),numSlices=2)
.mapPartitions(iter => [
//遍历iter 其中每一条数据进行转换,转换完成以后,返回这个 iter// iter 是 scala 中的集合类型
valresult = iter.map(item => item
★ 10)
resull
1) I
.collect()
再去做一个拓展操作,如果去除掉一部分,符不符合语法,有没有问题?里面的 mappartition 的返回值是另一个interreator,访问完 interreator 之后,又作为整个函数当中的最后一行,所以最后一行的执行结果,就是整个的运行结果。
那么接下来运行这个案例,会发现没有打印内容,所以接下来就需要进行 foreach,那么这个 foreach 就需要打印item。运行结果:
可以看到结果已经出来了,没有问题,说明代码无误。
三、Mappartition with index
前面第一行代码还是跟刚才一样的,生成一个数组,划分两个不同的分区。调用 mappartitions with index,调用前,要明白它和 mappartitions 的区别,多了一个分区的 index。那么可以看到算子的接受方式里面,是接受俩个参数,第一个是int型,还有一个是 intereator 型的,跟前面的 t 是有区别的。
/**
★Return a new RDD by applying a function to each partition of this RDD, while tracking the in★ of the original partition.
preservesPartitioning indicates whether the input function preserves the partitioner, whic
★ should be false°unless this is a pair RDD and the input function doesn't modify the keys.★/
def mapPartitionsWithIndex[U
:ClassTag](
f: (Int, Iterator ]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
prev= this,
(context:TaskContext,index:Int,iter:Iterator[T])=>cleanedF(index,iter),preservesPartitioning)
/
★★
★ Zips this RDD with another one, returning key-value pairswiththefirstelementineachRDD★ second element in each RDD, etc. Assumes that thetwoRDDshavethe*samenumber of
* partitions* and the same number of elements in each partition (e.g. one was madethrough
★ a map on the other).
def zip[U: ClassTag](other:RDD[U]): RDD[(T, U)] = withScope
zipPartitions(other, preservesPartitioning = false) [ (thisIter, otherIter) =>
new Iterator[(T, U)] [
里面的 iter 就是 index,进行打印。回到代码中,接受俩个参数 index 和 inter。接下来直接打印 index,或者打印inter。也打印出里面的每一条数据,打印完成,再把这个数据返回到集合当中去。
加上 ion 增加辨识度,增加编号,在打印集合当中的每一条数据,最终做一个转换。
@Test
def mapPartitions2(): Unit =(
// 1.数据生成
//2.算子使用
// 3.获取结果
sc.parallelize(Seq(1, 2, 3, 4, 5, 6),numSlices= 2)
.mapPartitions(iter => (
//遍历 iter 其中每一条数据进行转换,转换完成以后,返回这个 iter// iter 是 scala 中的集合类型
iter.map(item => item
★ 10)
.collect()
.foreach(item => println(item))
@Test
def mapPartitionsWithIndex(): Unit=[
sc.parallelize(Seq(1, 2, 3, 4, 5, 6),numSlices=2)
.mapPartitionsWithIndex( (index, iter)=> (
println("index:"+ index)
iter.foreach(item => println(item))
Iterl
上述代码运行结果:
对于上述结果,可能会困惑为什么结果不是:“index:1 1 2 3”
因为这俩个分区的算子是同时执行的,并不是串行的。算子里面的操作是并行。
四、总结
Mappartitions 和 map 算子是一样的,只是 map 是针对每一条数据来转换,但 mappartitions 是针对一整个分区来进行转换,所以 map 的方式始终参数是单条数据,而 mappartition 的 function 是多条数据,一个集合。这个集合是整个分区所有数据。
Map 的函数返回值是单挑数值,但 mappartition 是的 function 的返回值说是一个集合。因为传入的是一个集合,转换完成之后,返回回去之后也是一个集合。
Mappartitions 和 mappartitions with index 的区别,function 中多了一个参数是分区号。