Rdd 算子_转换_mappartitions | 学习笔记

简介: 快速学习 Rdd 算子_转换_mappartitions

开发者学堂课程【大数据Spark2020最新课程(知识精讲与实战演练)第二阶段Rdd 算子_转换_mappartitions学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/689/detail/11967


Rdd 算子_转换_mappartitions


内容介绍:

一、mapmappartitions mappartitions with index 的区别

二、Mappartition

三、Mappartition with index

四、总结

 

一、mapmappartitions mappartitions with index 的区别

mappartitions和mappartitions with index 都是转换算子,他们和 map 算子区别很小。

区别一:map 针对于每一条数据进行处理,如果使用 rdd1.map,那么 rdd1里面的所有数据都会在经过这个算子以及这个函数,但是有一些情况下,效率比较低。

假如在这个处理的这个位置要进行一个数据库的访问,如果在这个 map 里面进行一个数据库访问的话,那么里面有上百万条数据,如果是这样的话,那么要访问1万次,建立1万次连接这样不合适。所以如果遇到这种时候,我们不需要让每一条数据都执行这个函数,而是对分区来执行这个函数,也就是说通过一个算子,这个算子去接受一整个分区的数据,那么有几个分区就会执行多少次,在内部进行数据库访问会好一些,分区相对于数据来说,会少很多然后再到算子里在进行数据库访问,相对来说会好一些,mappartitions 有俩个单词构成,一个是 map,一个是 partitions,也就是原始的粒度,它所针对的是每一条数据条目的一个粒度。mappartitionsMap 的粒度是每一个分区数。以上就是他们之间最大的区别。

编写代码来说明 mappartitions mappartitions with index 的使用方法

进入到 IDEA 中,打开包,在包里面创建一个 scala class,命名为 transformation optransformation op 当中定义第一个方法,叫做 mappartitions。定义出来之后,就可以编写具体代码,把所有算子放在 map transformation 当中,所以底下所有算子都要用到 spark context。在整个根目录里面创建出所需的东西,那么需要创建俩个参数,第一个参数是 set master local

第二个是设置一个 app name,即 transformation op。接下里创建出 sc 对象。创建完后,就需要对 mappartition 的算子进行演示,mappartition 大致是三个步骤,第一步是生成数据,第二步是数据处理,第三步是获取结果

数据的创建就不在去读取外部的数据源了,就直接创建出来这些是数据,整个代码将使用代码链式来编写。在seq当中传入六个数字,即1-6.接下来在第二个指定参数上当前分区数。接下里想看看 mappartition 如何使用,就需要调用mappartitons,在调用前先查看一下 mappartitions api,进入后:

/★★

Return a new RDD by applying a function to each partition of this RDD.

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false'unless this is a pair RDD and the input function doesn't modify the keys./

def mapPartitions[U: ClassTag](

f: Iterator[T]=> Iterator[U],

preservesPartitioning: Boolean = false): RDD[U]=withScope [

val cleanedF = sc.clean(f)

"erhtepartieionspoo(

prev = this,

(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),

preservesPartitioning)

/*

[performance] Spark's internal mapPartitionsWithIndex method that skips closure cleaning.It is a performance API to be used carefully only if we are sure that the RDD elements areserializable and don't require closure cleaning.

eparam preservesPartitioning indicates whether the inputfunction preserves the partitioner, which should befalse unless thisis a pair RDD and the input function doesn't modify the keys.

有俩个参数,第一个是 f,第二个是 prev,参数的含义的是否保留参数的信息,一般是用不到,只需要保留 false 就可以。除非不去修改 key。这里面的 function,他要求你传入的函数是 interator 的继承类。代表一个数组,代表一个集合,因为现在这个 mappartitions 一次处理就处理一个分区的数据,如果一次处理不是只处理一条数据,所以处理前要用集合来进行包装,所以这个集合就代表整个分区的数据。

preservesPartitioning 这个参数是不需要制定的,当前默认指定的是 false,因此再次调用这个方法的时候,不需要重新指定参数的值。即默认参数。

那么既然 mappartitions 是对一整个分区进行处理,那么是接受一整个函数,这个函数就接受了一个集合,这个集合用 interreator 来表示,假如想看内容,就调用 foreach,用 printin 打印出 item,在执行次操作时,会发现系统报错,因为次函数比较特殊,这个 interreator 接受以后,它要求返回另一个集合,所以打印之后还要把 iter 返回,接下来执行程序。查看当前地方打印出来的内容是否是需要的。

package cn.itcast.spark.rdd

Cimport org.apache.spark.(SparkConf, SparkContext]

cimport org.junit.Test

class Transformation0p

val conf= new SparkConf().setMaster("local[6]").setAppName("transformation_op")valsc = new SparkContext(conf)

@Test

defmapPartitions():Unit=

//1.数据生成

// 2.算子使用

// 3. 获取结果

sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices=2)

,mapPartitions(iter => (

iter.foreach(item => println(item))

iter

])

.collect() I

运行上述代码,之后运行结果出来后,滑到底部如图:

image.png

可以看到结果已经出来了,得到了六个结果。

 

二、Mappartition

接下里想要使用 mappartitions 进行一些具体的事务,创建一个新的方法,叫 mappartitions2,刚刚只进行了打印操作,首先他内部传入了一个函数,要求传入一个集合,转换完集合里面的每一条数据,再通过 interreator 的形式返回过去,那么这一部分的数据是不是就会替换掉整个分区里的那一部分数据,接下来想要把数字都乘10,首先应该遍历iter 里的每一条数据,然后进行转换。转换完成后,返回整个 iter,也就是要对集合中每条数据进行转换。

转换完成再把整个集合返回上去,但是在操作之前有一个问题,这个是 spark 当中的类型还是 scala 当中的类型,其实是 scala 当中的类型。Iter scala 当中的集合类型,所以针对于 it er 的操作也都是针对 scala 基础的操作,所以这个 interreator 里有一个方法叫做 map 的操作,那你注意这个 map scala 的代码,是 scala 的算子,和 spark 没有关系,不是 rdd

接下来这个 map 当中肯定是接收一个 itemScala 当中的集合 map,有没有返回值?进入到 ma p中,可以看到 map 的返回值是另一个 interreator,写一个 val reasult 来接收返回值。

@Test

def mapPartitions():Unit=

// 1. 数据生成

// 2.算子使用

// 3.获取结果

sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices=2)

.mapPartitions(iter => [

iter.foreach(item => println(item))

iter

])

.collect()

@Test

def mapPartitions2(): Unit=

//1.数据生成

//2.算子使用

//3.获取结果

sc.parallelize(Seq(1, 2, 3, 4, 5, 6),numSlices=2)

.mapPartitions(iter => [

//遍历iter 其中每一条数据进行转换,转换完成以后,返回这个 iter// iter scala 中的集合类型

valresult = iter.map(item => item 10)

resull

1) I

.collect()

再去做一个拓展操作,如果去除掉一部分,符不符合语法,有没有问题?里面的 mappartition 的返回值是另一个interreator,访问完 interreator 之后,又作为整个函数当中的最后一行,所以最后一行的执行结果,就是整个的运行结果。

那么接下来运行这个案例,会发现没有打印内容,所以接下来就需要进行 foreach,那么这个 foreach 就需要打印item。运行结果:

image.png

可以看到结果已经出来了,没有问题,说明代码无误。

 

三、Mappartition with index

前面第一行代码还是跟刚才一样的,生成一个数组,划分两个不同的分区。调用 mappartitions with index,调用前,要明白它和 mappartitions 的区别,多了一个分区的 index。那么可以看到算子的接受方式里面,是接受俩个参数,第一个是int型,还有一个是 intereator 型的,跟前面的 t 是有区别的。

/**

Return a new RDD by applying a function to each partition of this RDD, while tracking the in of the original partition.

preservesPartitioning indicates whether the input function preserves the partitioner, whic should be false°unless this is a pair RDD and the input function doesn't modify the keys./

def mapPartitionsWithIndex[UClassTag](

f: (Int, Iterator ]) => Iterator[U],

preservesPartitioning: Boolean = false): RDD[U] = withScope

val cleanedF = sc.clean(f)

new MapPartitionsRDD(

prev= this,(context:TaskContext,index:Int,iter:Iterator[T])=>cleanedF(index,iter),preservesPartitioning)

/★★

Zips this RDD with another one, returning key-value pairswiththefirstelementineachRDD second element in each RDD, etc. Assumes that thetwoRDDshavethe*samenumber of

* partitions* and the same number of elements in each partition (e.g. one was madethrough a map on the other).

def zip[U: ClassTag](other:RDD[U]): RDD[(T, U)] = withScope

zipPartitions(other, preservesPartitioning = false) [ (thisIter, otherIter) =>

new Iterator[(T, U)] [

里面的 iter 就是 index,进行打印。回到代码中,接受俩个参数 index inter。接下来直接打印 index,或者打印inter。也打印出里面的每一条数据,打印完成,再把这个数据返回到集合当中去。

加上 ion 增加辨识度,增加编号,在打印集合当中的每一条数据,最终做一个转换。

@Test

def mapPartitions2(): Unit =(

// 1.数据生成

//2.算子使用

// 3.获取结果

sc.parallelize(Seq(1, 2, 3, 4, 5, 6),numSlices= 2)

.mapPartitions(iter => (

//遍历 iter 其中每一条数据进行转换,转换完成以后,返回这个 iter// iter scala 中的集合类型

iter.map(item => item 10)

.collect()

.foreach(item => println(item))

@Test

def mapPartitionsWithIndex(): Unit=[

sc.parallelize(Seq(1, 2, 3, 4, 5, 6),numSlices=2)

.mapPartitionsWithIndex( (index, iter)=> (

println("index:"+ index)

iter.foreach(item => println(item))

Iterl

上述代码运行结果:

对于上述结果,可能会困惑为什么结果不是:“index1 1 2 3

因为这俩个分区的算子是同时执行的,并不是串行的。算子里面的操作是并行。


四、总结

Mappartitions map 算子是一样的,只是 map 是针对每一条数据来转换,但 mappartitions 是针对一整个分区来进行转换,所以 map 的方式始终参数是单条数据,而 mappartition function 是多条数据,一个集合。这个集合是整个分区所有数据。

Map 的函数返回值是单挑数值,但 mappartition 是的 function 的返回值说是一个集合。因为传入的是一个集合,转换完成之后,返回回去之后也是一个集合。

Mappartitions mappartitions with index 的区别,function 中多了一个参数是分区号。

目录
打赏
0
0
0
0
127
分享
相关文章
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
[Spark精进]必须掌握的4个RDD算子之mapPartitions算子
[Spark精进]必须掌握的4个RDD算子之mapPartitions算子
186 0
[Spark精进]必须掌握的4个RDD算子之map算子
[Spark精进]必须掌握的4个RDD算子之map算子
169 0
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark RDD 机制理解吗?RDD 的五大属性,RDD、DataFrame、DataSet 三者的关系,RDD 和 DataFrame 的区别,Spark 有哪些分区器【重要】
Spark RDD 机制理解吗?RDD 的五大属性,RDD、DataFrame、DataSet 三者的关系,RDD 和 DataFrame 的区别,Spark 有哪些分区器【重要】
1354 0