接上篇:https://developer.aliyun.com/article/1622537?spm=a2c6h.13148508.setting.20.27ab4f0eUI7v7p
分区器作用与分类
在PairRDD(key,value)中,很多操作都是基于Key的,系统会按照Key对数据进行重组,如 GroupByKey
数据重组需要规则,最常见的就是基于Hash的分区,此外还有一种复杂的基于抽样Range分区方法:
HashPartitioner
最简单、最常用,也是默认提供的分区器。
对于给定的Key,计算HashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个Key所属的分区ID。
该分区方法可以保证Key相同的数据出现在同一个分区中。
用户可以通过 partitionBy主动使用分区器,通过 partitions参数指定想要分区的数量。
默认情况下的分区情况是:
val rdd1 = sc.makeRDD(1 to 100).map((_, 1)) rdd1.getNumPartitions
执行结果如下图所示:
执行结果如下图所示,分区已经让我们手动控制成10个了:
val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(10)) rdd2.getNumPartitions rdd2.glom.collect.foreach(x => println(x.toBuffer))
RangePartitioner
简单来说就是将一定范围内的数映射到某个分区内,在实现中,分界的算法尤为重要,用到了水塘抽样算法。sortByKey会使用RangePartitioner。
进行代码的测试:
val rdd3 = rdd1.partitionBy(new org.apache.spark.RangePartitioner(10, rdd1)) rdd3.glom.collect.foreach(x => println(x.toBuffer))
执行结果如下图所示:
但是现在的问题是:在执行分区之前其实并不知道数据的分布情况,如果想知道数据的分区就需要对数据进行采样。
Spark中的RangePartitioner在对数据采样的过程中使用了 “水塘采样法”
水塘采样法是:在包含N个项目的集合S中选取K个样本,其中N为1或者很大的未知的数量,尤其适用于不能把所有N个项目都存放到主内存的情况。
在采样过程中执行了 collect() 操作,引发了 Action 操作。
自定义分区器
Spark允许用户通过自定义的Partitioner对象,灵活的来控制RDD的分区方式。
我们需要实现自定义分区器,按照以下的规则进行分区:
分区 0 < 100
100 <= 分区1 < 200
200 <= 分区2 < 300
300 <= 分区3 < 400
…
900 <= 分区9 < 1000
编写代码
package icu.wzk import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} import scala.collection.immutable class MyPartitioner(n: Int) extends Partitioner { override def numPartitions: Int = n override def getPartition(key: Any): Int = { val k = key.toString.toInt k / 100 } } object UserDefinedPartitioner { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("UserDefinedPartitioner") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val random = scala.util.Random val arr: immutable.IndexedSeq[Int] = (1 to 100) .map(idx => random.nextInt(1000)) val rdd1: RDD[(Int, Int)] = sc.makeRDD(arr).map((_, 1)) rdd1.glom.collect.foreach(x => println(x.toBuffer)) println("=========================================") val rdd2 = rdd1.partitionBy(new MyPartitioner(10)) rdd2.glom.collect().foreach(x => println(x.toBuffer)) sc.stop() } }
打包上传
这里之前已经重复过多次,就跳过了
mvn clean package
运行测试
spark-submit --master local[*] --class icu.wzk.UserDefinedPartitioner spark-wordcount
可以看到如下的运行结果: