五、sample、cartesian
5.1 sample
/** * sample用来从RDD中抽取样本。他有三个参数 * withReplacement: Boolean, * true: 有放回的抽样 * false: 无放回抽象 * fraction: Double: * 抽取样本的比例 * seed: Long: * 随机种子 */
def sample(): Unit ={ val list = 1 to 100 val listRDD = sc.parallelize(list) listRDD.sample(false,0.1,0).foreach(num => print(num + " ")) }
5.2 cartesian
cartesian是用于求笛卡尔积的
def cartesian(): Unit ={ val list1 = List("A","B") val list2 = List(1,2,3) val list1RDD = sc.parallelize(list1) val list2RDD = sc.parallelize(list2) list1RDD.cartesian(list2RDD).foreach(t => println(t._1 +"->"+t._2)) }
六、filter、distinct、intersection
过滤 出偶数
6.1 filter
def filter(): Unit ={ val list = List(1,2,3,4,5,6,7,8,9,10) val listRDD = sc.parallelize(list) listRDD.filter(num => num % 2 ==0).foreach(print(_)) }
6.2 distinct
def distinct(): Unit ={ val list = List(1,1,2,2,3,3,4,5) sc.parallelize(list).distinct().foreach(println(_)) }
6.3 intersection
def intersection(): Unit ={ val list1 = List(1,2,3,4) val list2 = List(3,4,5,6) val list1RDD = sc.parallelize(list1) val list2RDD = sc.parallelize(list2) list1RDD.intersection(list2RDD).foreach(println(_)) }
七、coalesce、repartition、repartitionAndSortWithinPartitions
7.1 coalesce
分区数由多 -》 变少
def coalesce(): Unit = { val list = List(1,2,3,4,5,6,7,8,9) sc.parallelize(list,3).coalesce(1).foreach(println(_)) }
7.2 replication
进行重分区,解决的问题:本来分区数少 -》 增加分区数
def replication(): Unit ={ val list = List(1,2,3,4) val listRDD = sc.parallelize(list,1) listRDD.repartition(2).foreach(println(_)) }
7.3 repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。
def repartitionAndSortWithinPartitions(): Unit ={ val list = List(1, 4, 55, 66, 33, 48, 23) val listRDD = sc.parallelize(list,1) listRDD.map(num => (num,num)) .repartitionAndSortWithinPartitions(new HashPartitioner(2)) .mapPartitionsWithIndex((index,iterator) => { val listBuffer: ListBuffer[String] = new ListBuffer while (iterator.hasNext) { listBuffer.append(index + "_" + iterator.next()) } listBuffer.iterator },false) .foreach(println(_)) }
八、cogroup、sortBykey、aggregateByKey
8.1 cogroup
对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。
def cogroup(): Unit ={ val list1 = List((1, "www"), (2, "bbs")) val list2 = List((1, "cnblog"), (2, "cnblog"), (3, "very")) val list3 = List((1, "com"), (2, "com"), (3, "good")) val list1RDD = sc.parallelize(list1) val list2RDD = sc.parallelize(list2) val list3RDD = sc.parallelize(list3) list1RDD.cogroup(list2RDD,list3RDD).foreach(tuple => println(tuple._1 + " " + tuple._2._1 + " " + tuple._2._2 + " " + tuple._2._3)) }
8.2 sortBykey
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size) : RDD[(K, V)] = { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }
从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:
def sortByKey(): Unit ={ val list = List((99, "张三丰"), (96, "东方不败"), (66, "林平之"), (98, "聂风")) sc.parallelize(list).sortByKey(false).foreach(tuple => println(tuple._2 + "->" + tuple._1)) }
8.3 aggregateByKey
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。
def aggregateByKey(): Unit ={ val list = List("you,jump", "i,jump") sc.parallelize(list) .flatMap(_.split(",")) .map((_, 1)) .aggregateByKey(0)(_+_,_+_) .foreach(tuple =>println(tuple._1+"->"+tuple._2)) }