【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)1

简介: 【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)1

文章目录


一、Transformation 和 Action

1、转换操作

2、行动操作

二、map、flatMap、mapParations、mapPartitionsWithIndex

2.1 map

2.2 flatMap

3.3 mapPartitions

3.4 mapPartitionsWithIndex

三、reduce、reduceByKey

3.1 reduce

3.2 reduceByKey

四、union,join和groupByKey

4.1 union

4.2 groupByKey

4.3 join

五、sample、cartesian

5.1 sample

5.2 cartesian

六、filter、distinct、intersection

6.1 filter

6.2 distinct

6.3 intersection

七、coalesce、repartition、repartitionAndSortWithinPartitions

7.1 coalesce

7.2 replication

7.3 repartitionAndSortWithinPartitions

八、cogroup、sortBykey、aggregateByKey

8.1 cogroup

8.2 sortBykey

8.3 aggregateByKey


一、Transformation 和 Action


RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。


常用的Transformation:


1、转换操作


1、def map[U: ClassTag](f: T => U): RDD[U] 将函数应用于RDD的每一元素,并返回一个新的RDD


2、def filter(f: T => Boolean): RDD[T] 通过提供的产生boolean条件的表达式来返回符合结果为True新的RDD


3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。


4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。


5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。


6、def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。


7、def union(other: RDD[T]): RDD[T] 将两个RDD中的元素进行合并,返回一个新的RDD


8、def intersection(other: RDD[T]): RDD[T] 将两个RDD做交集,返回一个新的RDD


9、def distinct(): RDD[T] 将当前RDD进行去重后,返回一个新的RDD


10、def partitionBy(partitioner: Partitioner): RDD[(K, V)] 根据设置的分区器重新将RDD进行分区,返回新的RDD。


11、def reduceByKey(func: (V, V) => V): RDD[(K, V)] 根据Key值将相同Key的元组的值用func进行计算,返回新的RDD


12、def groupByKey(): RDD[(K, Iterable[V])] 将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD


13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] 根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。


14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] 通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。


15、def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] aggregateByKey的简化操作,seqop和combop相同,


16、def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD


17、def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 底层实现还是使用sortByKey,只不过使用fun生成的新key进行排序。


18、def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD,但是需要注意的是,他只会返回key在两个RDD中都存在的情况。


19、def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD,注意,如果V和W的类型相同,也不放在一块,还是单独存放。


20、def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] 做两个RDD的笛卡尔积,返回对偶的RDD


21、def pipe(command: String): RDD[String] 对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD,注意,如果你是本地文件系统中,需要将脚本放置到每个节点上。


22、def coalesce(numPartitions: Int, shuffle: Boolean = false,

partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

(implicit ord: Ordering[T] = null): RDD[T] 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。


23、def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 根据你传入的分区数重新通过网络分区所有数据,重型操作。


24、def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] 性能要比repartition要高。在给定的partitioner内部进行排序


25、def glom(): RDD[Array[T]] 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]


26、def mapValues[U](f: V => U): RDD[(K, U)] 将函数应用于(k,v)结果中的v,返回新的RDD


27、def subtract(other: RDD[T]): RDD[T] 计算差的一种函数去除两个RDD中相同的元素,不同的RDD将保留下来。


2、行动操作


1、def takeSample( withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] 抽样但是返回一个scala集合。


2、def reduce(f: (T, T) => T): T 通过func函数聚集RDD中的所有元素


3、def collect(): Array[T] 在驱动程序中,以数组的形式返回数据集的所有元素


4、def count(): Long 返回RDD中的元素个数


5、def first(): T 返回RDD中的第一个元素


6、def take(num: Int): Array[T] 返回RDD中的前n个元素


7、def takeOrdered(num: Int)(implicit ord: Ordering[T]) 返回前几个的排序


8、def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。


9、def fold(zeroValue: T)(op: (T, T) => T): T 折叠操作,aggregate的简化操作,seqop和combop一样。


10、def saveAsTextFile(path: String): Unit 将RDD以文本文件的方式保存到本地或者HDFS中


11、def saveAsObjectFile(path: String): Unit 将RDD中的元素以序列化后对象形式保存到本地或者HDFS中。


12、def countByKey(): Map[K, Long] 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。


13、def foreach(f: T => Unit): Unit 在数据集的每一个元素上,运行函数func进行更新。


注意:当你在RDD中使用到了class的方法或者属性的时候,该class需要继承java.io.Serializable接口,或者可以将属性赋值为本地变量来防止整个对象的传输。


二、map、flatMap、mapParations、mapPartitionsWithIndex


基本的初始化

private val conf: SparkConf = new SparkConf().setAppName("TestTransformation").setMaster("local")
private val sparkContext = new SparkContext(conf)


2.1 map


map十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。

def map(): Unit ={
    val list = List("张无忌", "赵敏", "周芷若")
    val listRDD = sc.parallelize(list)
    val nameRDD = listRDD.map(name => "Hello " + name)
    nameRDD.foreach(name => println(name))
  }


20200109155544558.png


可以看出,对于map算子,源JavaRDD的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新RDD的元素顺序与源RDD是相同的。而由有序又引出接下来的flatMap。


2.2 flatMap


flatMap与map一样,是将RDD中的元素依次的传入call方法,他比map多的功能是能在任何一个传入call方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。


20200109155920368.png


flatMap的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源RDD查漏补缺时。


map和flatMap都是依次进行参数传递的,但有时候需要RDD中的两个元素进行相应操作时(例如:算存款所得时,下一个月所得的利息是要原本金加上上一个月所得的本金的),这两个算子便无法达到目的了,这是便需要mapPartitions算子,他传参的方式是将整个RDD传入,然后将一个迭代器传出生成一个新的RDD,由于整个RDD都传入了,所以便能完成前面说的业务。


3.3 mapPartitions


/

def mapParations(): Unit ={
    val list = List(1,2,3,4,5,6)
    val listRDD = sc.parallelize(list,2)
    listRDD.mapPartitions(iterator => {
      val newList: ListBuffer[String] = ListBuffer()
      while (iterator.hasNext){
        newList.append("hello " + iterator.next())
      }
      newList.toIterator
    }).foreach(name => println(name))
  }


20200109160054733.png


3.4 mapPartitionsWithIndex


每次获取和处理的就是一个分区的数据,并且知道处理的分区的分区号是啥?

def mapPartitionsWithIndex(): Unit ={
    val list = List(1,2,3,4,5,6,7,8)
    sc.parallelize(list).mapPartitionsWithIndex((index,iterator) => {
      val listBuffer:ListBuffer[String] = new ListBuffer
      while (iterator.hasNext){
        listBuffer.append(index+"_"+iterator.next())
      }
      listBuffer.iterator
    },true)
      .foreach(println(_))
  }


20200109160315297.png


三、reduce、reduceByKey


3.1 reduce


reduce其实是讲RDD中的所有元素进行合并,当运行call方法时,会传入两个参数,在call方法中将两个参数合并后返回,而这个返回值回合一个新的RDD中的元素再次传入call方法中,继续合并,直到合并到只剩下一个元素时。

def reduce(): Unit ={
    val list = List(1,2,3,4,5,6)
    val listRDD = sc.parallelize(list)
    val result = listRDD.reduce((x,y) => x+y)
    println(result)
  }


20200109160537719.png


3.2 reduceByKey


reduceByKey仅将RDD中所有K,V对中K值相同的V进行合并。

def reduceByKey(): Unit ={
    val list = List(("武当", 99), ("少林", 97), ("武当", 89), ("少林", 77))
    val mapRDD = sc.parallelize(list)
    val resultRDD = mapRDD.reduceByKey(_+_)
    resultRDD.foreach(tuple => println("门派: " + tuple._1 + "->" + tuple._2))
  }


20200109160628933.png


四、union,join和groupByKey


4.1 union


当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的。

def union(): Unit ={
    val list1 = List(1,2,3,4)
    val list2 = List(3,4,5,6)
    val rdd1 = sc.parallelize(list1)
    val rdd2 = sc.parallelize(list2)
    rdd1.union(rdd2).foreach(println(_))
  }


20200109160848825.png


4.2 groupByKey


union只是将两个RDD简单的累加在一起,而join则不一样,join类似于hadoop中的combin操作,只是少了排序这一段,再说join之前说说groupByKey,因为join可以理解为union与groupByKey的结合:groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。即:

def groupByKey(): Unit ={
    val list = List(("武当", "张三丰"), ("峨眉", "灭绝师太"), ("武当", "宋青书"), ("峨眉", "周芷若"))
    val listRDD = sc.parallelize(list)
    val groupByKeyRDD = listRDD.groupByKey()
    groupByKeyRDD.foreach(t => {
      val menpai = t._1
      val iterator = t._2.iterator
      var people = ""
      while (iterator.hasNext) people = people + iterator.next + " "
      println("门派:" + menpai + "人员:" + people)
    })
  }


20200109161220785.png


4.3 join


join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合

def join(): Unit = {
    val list1 = List((1, "东方不败"), (2, "令狐冲"), (3, "林平之"))
    val list2 = List((1, 99), (2, 98), (3, 97))
    val list1RDD = sc.parallelize(list1)
    val list2RDD = sc.parallelize(list2)
    val joinRDD = list1RDD.join(list2RDD)
    joinRDD.foreach(t => println("学号:" + t._1 + " 姓名:" + t._2._1 + " 成绩:" + t._2._2))
  }


20200109161824988.png



目录
相关文章
|
2月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
43 1
|
2月前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
43 0
|
7月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
7月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
150 2
|
7月前
|
分布式计算 Hadoop Java
Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
【2月更文挑战第14天】Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
118 1
|
7月前
|
存储 分布式计算 Scala
bigdata-36-Spark转换算子与动作算子
bigdata-36-Spark转换算子与动作算子
52 0
|
分布式计算 大数据 数据库
Spark 算子操作剖析 3
快速学习 Spark 算子操作剖析 3
109 0
Spark 算子操作剖析 3
|
分布式计算 大数据 Spark
Spark 算子操作剖析4
快速学习 Spark 算子操作剖析4
116 0
|
分布式计算 大数据 Spark
Spark 算子操作剖析2
快速学习 Spark 算子操作剖析2
118 0
|
分布式计算 大数据 Spark
Spark 算子操作剖析 1
快速学习 Spark 算子操作剖析 1
110 0