一、map、mappartitions 和 mappartitions with index 的区别
三、Mappartition with index
一、map、mappartitions 和 mappartitions with index 的区别
mappartitions和mappartitions with index 都是转换算子,他们和 map 算子区别很小。
区别一:map 针对于每一条数据进行处理,如果使用 rdd1.map,那么 rdd1里面的所有数据都会在经过这个算子以及这个函数,但是有一些情况下,效率比较低。
假如在这个处理的这个位置要进行一个数据库的访问,如果在这个 map 里面进行一个数据库访问的话,那么里面有上百万条数据,如果是这样的话,那么要访问1万次,建立1万次连接这样不合适。所以如果遇到这种时候,我们不需要让每一条数据都执行这个函数,而是对分区来执行这个函数,也就是说通过一个算子,这个算子去接受一整个分区的数据,那么有几个分区就会执行多少次,在内部进行数据库访问会好一些,分区相对于数据来说,会少很多然后再到算子里在进行数据库访问,相对来说会好一些,mappartitions 有俩个单词构成,一个是 map,一个是 partitions,也就是原始的粒度,它所针对的是每一条数据条目的一个粒度。mappartitionsMap 的粒度是每一个分区数。以上就是他们之间最大的区别。
编写代码来说明 mappartitions 和 mappartitions with index 的使用方法
进入到 IDEA 中,打开包,在包里面创建一个 scala class,命名为 transformation op,transformation op 当中定义第一个方法,叫做 mappartitions。定义出来之后,就可以编写具体代码,把所有算子放在 map 的 transformation 当中,所以底下所有算子都要用到 spark context。在整个根目录里面创建出所需的东西,那么需要创建俩个参数,第一个参数是 set master local。
第二个是设置一个 app name,即 transformation op。接下里创建出 sc 对象。创建完后,就需要对 mappartition 的算子进行演示,mappartition 大致是三个步骤,第一步是生成数据,第二步是数据处理,第三步是获取结果
数据的创建就不在去读取外部的数据源了,就直接创建出来这些是数据,整个代码将使用代码链式来编写。在seq当中传入六个数字,即1-6.接下来在第二个指定参数上当前分区数。接下里想看看 mappartition 如何使用,就需要调用mappartitons,在调用前先查看一下 mappartitions 的 api,进入后:
那么既然 mappartitions 是对一整个分区进行处理,那么是接受一整个函数,这个函数就接受了一个集合,这个集合用 interreator 来表示,假如想看内容,就调用 foreach,用 printin 打印出 item,在执行次操作时,会发现系统报错,因为次函数比较特殊,这个 interreator 接受以后,它要求返回另一个集合,所以打印之后还要把 iter 返回,接下来执行程序。查看当前地方打印出来的内容是否是需要的。
package cn.itcast.spark.rdd
Cimport org.apache.spark.(SparkConf, SparkContext]
cimport org.junit.Test
class Transformation0p
val conf= new SparkConf().setMaster("local[6]").setAppName("transformation_op")valsc = new SparkContext(conf)
// 2.算子使用
// 3. 获取结果
sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices=2)
,mapPartitions(iter => (
iter.foreach(item => println(item))
.collect() I
接下里想要使用 mappartitions 进行一些具体的事务,创建一个新的方法,叫 mappartitions2,刚刚只进行了打印操作,首先他内部传入了一个函数,要求传入一个集合,转换完集合里面的每一条数据,再通过 interreator 的形式返回过去,那么这一部分的数据是不是就会替换掉整个分区里的那一部分数据,接下来想要把数字都乘10,首先应该遍历iter 里的每一条数据,然后进行转换。转换完成后,返回整个 iter,也就是要对集合中每条数据进行转换。
转换完成再把整个集合返回上去,但是在操作之前有一个问题,这个是 spark 当中的类型还是 scala 当中的类型,其实是 scala 当中的类型。Iter 是 scala 当中的集合类型,所以针对于 it er 的操作也都是针对 scala 基础的操作,所以这个 interreator 里有一个方法叫做 map 的操作,那你注意这个 map 是 scala 的代码,是 scala 的算子,和 spark 没有关系,不是 rdd。
接下来这个 map 当中肯定是接收一个 item。Scala 当中的集合 map,有没有返回值?进入到 ma p中,可以看到 map 的返回值是另一个 interreator,写一个 val reasult 来接收返回值。
def mapPartitions():Unit=
// 1. 数据生成
// 2.算子使用
// 3.获取结果
sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices=2)
.mapPartitions(iter => [
iter.foreach(item => println(item))
def mapPartitions2(): Unit=
sc.parallelize(Seq(1, 2, 3, 4, 5, 6),numSlices=2)
.mapPartitions(iter => [
//遍历iter 其中每一条数据进行转换,转换完成以后,返回这个 iter// iter 是 scala 中的集合类型
valresult = iter.map(item => item
★ 10)
1) I
再去做一个拓展操作,如果去除掉一部分,符不符合语法,有没有问题?里面的 mappartition 的返回值是另一个interreator,访问完 interreator 之后,又作为整个函数当中的最后一行,所以最后一行的执行结果,就是整个的运行结果。
那么接下来运行这个案例,会发现没有打印内容,所以接下来就需要进行 foreach,那么这个 foreach 就需要打印item。运行结果:
三、Mappartition with index
前面第一行代码还是跟刚才一样的,生成一个数组,划分两个不同的分区。调用 mappartitions with index,调用前,要明白它和 mappartitions 的区别,多了一个分区的 index。那么可以看到算子的接受方式里面,是接受俩个参数,第一个是int型,还有一个是 intereator 型的,跟前面的 t 是有区别的。
里面的 iter 就是 index,进行打印。回到代码中,接受俩个参数 index 和 inter。接下来直接打印 index,或者打印inter。也打印出里面的每一条数据,打印完成,再把这个数据返回到集合当中去。
加上 ion 增加辨识度,增加编号,在打印集合当中的每一条数据,最终做一个转换。
def mapPartitions2(): Unit =(
// 1.数据生成
// 3.获取结果
sc.parallelize(Seq(1, 2, 3, 4, 5, 6),numSlices= 2)
.mapPartitions(iter => (
//遍历 iter 其中每一条数据进行转换,转换完成以后,返回这个 iter// iter 是 scala 中的集合类型
iter.map(item => item
★ 10)
.foreach(item => println(item))
def mapPartitionsWithIndex(): Unit=[
sc.parallelize(Seq(1, 2, 3, 4, 5, 6),numSlices=2)
.mapPartitionsWithIndex( (index, iter)=> (
println("index:"+ index)
iter.foreach(item => println(item))
对于上述结果,可能会困惑为什么结果不是:“index:1 1 2 3”
Mappartitions 和 map 算子是一样的,只是 map 是针对每一条数据来转换,但 mappartitions 是针对一整个分区来进行转换,所以 map 的方式始终参数是单条数据,而 mappartition 的 function 是多条数据,一个集合。这个集合是整个分区所有数据。
Map 的函数返回值是单挑数值,但 mappartition 是的 function 的返回值说是一个集合。因为传入的是一个集合,转换完成之后,返回回去之后也是一个集合。
Mappartitions 和 mappartitions with index 的区别,function 中多了一个参数是分区号。