【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)2

简介: 【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)2

五、sample、cartesian


5.1 sample


/**
     * sample用来从RDD中抽取样本。他有三个参数
     * withReplacement: Boolean,
     *       true: 有放回的抽样
     *       false: 无放回抽象
     * fraction: Double:
     *      抽取样本的比例
     * seed: Long:
     *      随机种子
     */
def sample(): Unit ={
    val list = 1 to 100
    val listRDD = sc.parallelize(list)
    listRDD.sample(false,0.1,0).foreach(num => print(num + " "))
  }


20200109162116919.png


5.2 cartesian


cartesian是用于求笛卡尔积的

def cartesian(): Unit ={
    val list1 = List("A","B")
    val list2 = List(1,2,3)
    val list1RDD = sc.parallelize(list1)
    val list2RDD = sc.parallelize(list2)
    list1RDD.cartesian(list2RDD).foreach(t => println(t._1 +"->"+t._2))
  }


20200109162253949.png


六、filter、distinct、intersection


过滤 出偶数


6.1 filter


def filter(): Unit ={
    val list = List(1,2,3,4,5,6,7,8,9,10)
    val listRDD = sc.parallelize(list)
    listRDD.filter(num => num % 2 ==0).foreach(print(_))
  }


20200109173139931.png


6.2 distinct


def distinct(): Unit ={
    val list = List(1,1,2,2,3,3,4,5)
    sc.parallelize(list).distinct().foreach(println(_))
  }


20200109173648724.png

6.3 intersection


def intersection(): Unit ={
    val list1 = List(1,2,3,4)
    val list2 = List(3,4,5,6)
    val list1RDD = sc.parallelize(list1)
    val list2RDD = sc.parallelize(list2)
    list1RDD.intersection(list2RDD).foreach(println(_))
  }


20200109173738283.png


七、coalesce、repartition、repartitionAndSortWithinPartitions


7.1 coalesce


分区数由多 -》 变少

def coalesce(): Unit = {
    val list = List(1,2,3,4,5,6,7,8,9)
    sc.parallelize(list,3).coalesce(1).foreach(println(_))
  }


20200109173903422.png

7.2 replication


进行重分区,解决的问题:本来分区数少 -》 增加分区数

def replication(): Unit ={
    val list = List(1,2,3,4)
    val listRDD = sc.parallelize(list,1)
    listRDD.repartition(2).foreach(println(_))
  }


20200109174009124.png


7.3 repartitionAndSortWithinPartitions


repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。

def repartitionAndSortWithinPartitions(): Unit ={
    val list = List(1, 4, 55, 66, 33, 48, 23)
    val listRDD = sc.parallelize(list,1)
    listRDD.map(num => (num,num))
      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
      .mapPartitionsWithIndex((index,iterator) => {
        val listBuffer: ListBuffer[String] = new ListBuffer
        while (iterator.hasNext) {
          listBuffer.append(index + "_" + iterator.next())
        }
        listBuffer.iterator
      },false)
      .foreach(println(_))
  }


20200109185020403.png


20200109185030756.png


八、cogroup、sortBykey、aggregateByKey


8.1 cogroup


对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。


def cogroup(): Unit ={
    val list1 = List((1, "www"), (2, "bbs"))
    val list2 = List((1, "cnblog"), (2, "cnblog"), (3, "very"))
    val list3 = List((1, "com"), (2, "com"), (3, "good"))
    val list1RDD = sc.parallelize(list1)
    val list2RDD = sc.parallelize(list2)
    val list3RDD = sc.parallelize(list3)
    list1RDD.cogroup(list2RDD,list3RDD).foreach(tuple =>
      println(tuple._1 + " " + tuple._2._1 + " " + tuple._2._2 + " " + tuple._2._3))
  }


20200109185330384.png


8.2 sortBykey


sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
    : RDD[(K, V)] =
{
  val part = new RangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}


从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:

def sortByKey(): Unit ={
    val list = List((99, "张三丰"), (96, "东方不败"), (66, "林平之"), (98, "聂风"))
    sc.parallelize(list).sortByKey(false).foreach(tuple => println(tuple._2 + "->" + tuple._1))
  }


20200109185509679.png


8.3 aggregateByKey


aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。

def aggregateByKey(): Unit ={
    val list = List("you,jump", "i,jump")
    sc.parallelize(list)
      .flatMap(_.split(","))
      .map((_, 1))
      .aggregateByKey(0)(_+_,_+_)
      .foreach(tuple =>println(tuple._1+"->"+tuple._2))
  }


20200109185730907.png


目录
相关文章
|
2月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
43 1
|
2月前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
43 0
|
7月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
7月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
150 2
|
7月前
|
分布式计算 Hadoop Java
Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
【2月更文挑战第14天】Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
118 1
|
7月前
|
存储 分布式计算 Scala
bigdata-36-Spark转换算子与动作算子
bigdata-36-Spark转换算子与动作算子
52 0
|
分布式计算 Spark Python
spark RDD transformation与action函数整理
1.创建RDD val lines = sc.parallelize(List("pandas","i like pandas")) 2.加载本地文件到RDD val linesRDD = sc.
1039 0
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
124 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
69 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
44 0