排序,真的非常重要!
RDD.scala(源码)
在其,没有罗列排序,不是说它不重要!
本博文的主要内容有:
1、基础排序算法实战
2、二次排序算法实战
3、更高级别排序算法
4、排序算法内幕解密
1、基础排序算法实战
启动hdfs集群
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh
启动spark集群
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh
启动spark-shell
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g
scala> sc.setLogLevel("WARN") //过滤日志提醒 scala> sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).collect
scala> sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).collect res2: Array[(String, Int)] = Array(("",67), (the,21), (Spark,14), (to,14), (for,12), (a,10), (and,10), (##,8), (run,7), (is,6), (on,6), (can,6), (of,5), (also,5), (in,5), (if,4), (or,4), (Hadoop,4), (with,4), (you,4), (build,3), (including,3), (Please,3), (use,3), (particular,3), (documentation,3), (example,3), (an,3), (You,3), (building,3), (that,3), (guidance,3), (For,2), (This,2), (Hive,2), (To,2), (SparkPi,2), (refer,2), (Interactive,2), (be,2), (./bin/run-example,2), (1000:,2), (tests,2), (examples,2), (at,2), (using,2), (Shell,2), (class,2), (`examples`,2), (set,2), (Hadoop,,2), (cluster,2), (supports,2), (Python,2), (general,2), (locally,2), (following,2), (which,2), (should,2), ([project,2), (do,2), (how,2), (It,2), (Scala,2), (detailed,2), (return,2), (one,2), (Python,,2), (SQL... scala>
则,可看出,是sortByKey(false)是按key排序且降序.
sortByKey源码
/** * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling * `collect` or `save` on the resulting RDD will return or output an ordered list of records * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). */ // TODO: this currently doesn't work on P other than Tuple2! def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }
由此,可看出,一旦排序,则产生ShuffledRDD。
为什么我的是没显示出来?
rangPartition是怎么排序的呢?
好的,基础排序算法实战至此。
2、二次排序算法实战
所谓,二次排序,就是指排序的时候考虑两个维度。
如,在第一列,按照降序排,第一列的key相同,那么,再怎么排呢?则,考虑第二列,按照降序排。即,用到了二次排序。
准备
【数据文件Input】
2 3 4 1 3 2 4 3 8 7 2 1
【运行结果Output】
2 1 2 3 3 2 4 1 4 3 8 7
如果是去大公司的话,则要掌握,5个维度,甚至8个维度,而不是才2个维度而已。加油!zhouls。
这里,就用,Scala IDE for Eclipse,来写,
Scala IDE for Eclipse的下载、安装和WordCount的初步使用(本地模式和集群模式)
SecondarySortKey.java
package com.zhouls.spark.SparkApps.cores; import java.io.Serializable; import scala.math.Ordered; public class SecondarySortKey implements Ordered<SecondarySortKey>,Serializable{ private int first; private int second; @Override public boolean $greater(SecondarySortKey arg0) { // TODO Auto-generated method stub return false; } @Override public boolean $greater$eq(SecondarySortKey arg0) { // TODO Auto-generated method stub return false; } @Override public boolean $less(SecondarySortKey arg0) { // TODO Auto-generated method stub return false; } @Override public boolean $less$eq(SecondarySortKey arg0) { // TODO Auto-generated method stub return false; } @Override public int compare(SecondarySortKey arg0) { // TODO Auto-generated method stub return 0; } @Override public int compareTo(SecondarySortKey arg0) { // TODO Auto-generated method stub return 0; } }
在这里,学下技巧。
然后,修改成我们自己想要的。
最终的SecondarySortKey.java如下:
package com.zhouls.spark.SparkApps.cores; import java.io.Serializable; import scala.math.Ordered; public class SecondarySortKey implements Ordered<SecondarySortKey>,Serializable{ private int first; private int second; //二次排序的公开构造器 public SecondarySortKey(int first,int second){ this.first=first; this.second=second; } public boolean $greater(SecondarySortKey other) { if(this.first>other.getFirst()){ return true; }else if(this.first==other.getFirst()&&this.second>other.getSecond()){ return true; } return false; } public boolean $greater$eq(SecondarySortKey other) { if(this.$greater(other)){ return true; }else if(this.first==other.getFirst()&&this.second==other.getSecond()){ return true; } return false; } public boolean $less(SecondarySortKey other) { if(this.first<other.getFirst()){ return true; }else if(this.first==other.getFirst()&&this.second<other.getSecond()){ return true; } return false; } public boolean $less$eq(SecondarySortKey other) { if(this.$less(other)){ return true; }else if(this.first==other.getFirst()&&this.second==other.getSecond()){ return true; } return false; } public int compare(SecondarySortKey other) { if(this.first-other.getFirst() !=0){ return this.first-other.getFirst(); }else{ return this.second-other.getSecond(); } } public int compareTo(SecondarySortKey other) { if(this.first-other.getFirst() !=0){ return this.first-other.getFirst(); }else{ return this.second-other.getSecond(); } } public int hashCode() { final int prime = 31; int result = 1; result = prime * result + first; result = prime * result + second; return result; } public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; SecondarySortKey other = (SecondarySortKey) obj; if (first != other.first) return false; if (second != other.second) return false; return true; } public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } }
package com.zhouls.spark.SparkApps.cores; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /* * 二次排序,具体的实现步骤: * 第一步:安装Ordered和Serrializable接口实现自定义排序的key * 第二步:将要进行二次排序的文件加载进来<key,value>类型的RDD * 第三步:使用sortByKey基于自定义的Key进行二次排序 * 第四步:去除掉排序的Key,只保留排序的结果 */ public class SecondarySortApp { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf);//其底层实际上就是Scala的SparkContext JavaRDD<String> lines = sc.textFile("D://SoftWare//spark-1.5.2-bin-hadoop2.6//helloSpark.txt"); JavaPairRDD<SecondarySortKey, String> pairs = lines.mapToPair(new PairFunction<String, SecondarySortKey, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<SecondarySortKey, String> call(String line) throws Exception { String[] splited = line.split(" "); SecondarySortKey key =new SecondarySortKey(Integer.valueOf(splited[0]),Integer.valueOf(splited[1])); return new Tuple2<SecondarySortKey,String>(key,line); } }); JavaPairRDD<SecondarySortKey, String> sorted = pairs.sortByKey(); //过滤掉排序后自定的Key,保留排序的结果 JavaRDD<String> SecondaySorted=sorted.map(new Function<Tuple2<SecondarySortKey,String>, String>() { private static final long serialVersionUID = 1L; @Override public String call(Tuple2<SecondarySortKey, String> sortedContent) throws Exception { System.out.println("sortedContent._1 "+(sortedContent._1).toString()); System.out.println("sortedContent._2 "+sortedContent._2); return sortedContent._2; } }); SecondaySorted.foreach(new VoidFunction<String>() { @Override public void call(String sorted) throws Exception { System.out.println(sorted); } }); } }
Scala
package com.zhouls.spark.cores /** * Created by Administrator on 2016/9/30. */ class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable { def compare(that: SecondarySortKey): Int = { if(this.first-that.first!=0){ return this.first-that.first }else{ return this.second-that.second } } }
package com.zhouls.spark.cores import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/9/30. * 二次排序:具体的实现步骤: * 第一步:按照Ordered和Serrializable接口实现自定义排序的Key * 第二步:将要进行二次排序的文件加载进来< key,value> 类型的RDD * 第三步:使用sortByKey基于自定义的Key进行二次排序 * 第四步:去除掉排序的Key,只保留排序的结果 */ object SecondarySortApp { def main (args: Array[String]) { val conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local")//创建SparkConf,初始化程序的配置 val sc = new SparkContext(conf)//创建SparkContext,这是第一个RDD创建的唯一入口,也是Driver的灵魂,是通往集群的唯一通道 val lines = sc.textFile("D:\\SoftWare\\spark-1.5.2-bin-hadoop2.6\\helloSpark.txt")//读取文件 // val results = lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt , line.split(" ")(1).toInt), line)).sortByKey().map(pair => pair._2) val pairWithSortKey = lines.map (line=> ( new SecondarySortKey(line.split(" ")(0).toInt ,line.split(" ")(1).toInt), line )) val sorted = pairWithSortKey.sortByKey() val sortedResult = sorted.map(sortedLine => sortedLine._2) sortedResult.collect.foreach(println) } }
作业:
1:Scala实现二次排序
SecondarySortKey.scala的完整代码:
class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable{ override def compare(that: SecondarySortKey): Int = { if(this.first-that.first!=0){ return this.first-that.first }else{ return this.second-that.second } } } object SecondarySortKey extends scala.AnyRef with Serializable{ def apply(first:Int,second:Int): SecondarySortKey ={ new SecondarySortKey(first,second) } }
SecondarySortApp.scala的完整代码:
object SecondarySortApp { def main (args: Array[String]) { val conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local") val sc=new SparkContext(conf) val lines=sc.textFile("D:\\SoftWare\\spark-1.5.2-bin-hadoop2.6\\helloSpark.txt") //val results=lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line)).sortByKey().map(pair=>pair._2) val results=lines.map(line=>(SecondarySortKey.apply(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line)).sortByKey().map(pair=>pair._2) results.collect.foreach(println) } }
2:RangePartitioner的源码阅读:
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.{ClassTag, classTag} import scala.util.hashing.byteswap32 import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.{CollectionsUtils, Utils} import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils} /** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. */ abstract class Partitioner extends Serializable { def numPartitions: Int def getPartition(key: Any): Int } object Partitioner { /** * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. * * If any of the RDDs already has a partitioner, choose that one. * * Otherwise, we use a default HashPartitioner. For the number of partitions, if * spark.default.parallelism is set, then we'll use the value from SparkContext * defaultParallelism, otherwise we'll use the max number of upstream partitions. * * Unless spark.default.parallelism is set, the number of partitions will be the * same as the number of partitions in the largest upstream RDD, as this should * be least likely to cause out-of-memory errors. * * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) } } } /** * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using * Java's `Object.hashCode`. * * Java arrays have hashCodes that are based on the arrays' identities rather than their contents, * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will * produce an unexpected or incorrect result. */ class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions } /** * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly * equal ranges. The ranges are determined by sampling the content of the RDD passed in. * * Note that the actual number of partitions created by the RangePartitioner might not be the same * as the `partitions` parameter, in the case where the number of sampled records is less than * the value of `partitions`. */ class RangePartitioner[K : Ordering : ClassTag, V]( @transient partitions: Int, @transient rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true) extends Partitioner { // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.") private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx, n, sample) => if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } else { // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.size).toFloat for (key <- sample) { candidates += ((key, weight)) } } } if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id - 1) val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() val weight = (1.0 / fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } RangePartitioner.determineBounds(candidates, partitions) } } } def numPartitions: Int = rangeBounds.length + 1 private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <= 128) { // If we have less than 128 partitions naive search while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 } } else { // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } } override def equals(other: Any): Boolean = other match { case r: RangePartitioner[_, _] => r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending case _ => false } override def hashCode(): Int = { val prime = 31 var result = 1 var i = 0 while (i < rangeBounds.length) { result = prime * result + rangeBounds(i).hashCode i += 1 } result = prime * result + ascending.hashCode result } @throws(classOf[IOException]) private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { val sfactory = SparkEnv.get.serializer sfactory match { case js: JavaSerializer => out.defaultWriteObject() case _ => out.writeBoolean(ascending) out.writeObject(ordering) out.writeObject(binarySearch) val ser = sfactory.newInstance() Utils.serializeViaNestedStream(out, ser) { stream => stream.writeObject(scala.reflect.classTag[Array[K]]) stream.writeObject(rangeBounds) } } } @throws(classOf[IOException]) private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { val sfactory = SparkEnv.get.serializer sfactory match { case js: JavaSerializer => in.defaultReadObject() case _ => ascending = in.readBoolean() ordering = in.readObject().asInstanceOf[Ordering[K]] binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int] val ser = sfactory.newInstance() Utils.deserializeViaNestedStream(in, ser) { ds => implicit val classTag = ds.readObject[ClassTag[Array[K]]]() rangeBounds = ds.readObject[Array[K]]() } } } } private[spark] object RangePartitioner { /** * Sketches the input RDD via reservoir sampling on each partition. * * @param rdd the input RDD to sketch * @param sampleSizePerPartition max sample size per partition * @return (total number of items, an array of (partitionId, number of items, sample)) */ def sketch[K : ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2.toLong).sum (numItems, sketched) } /** * Determines the bounds for range partitioning from candidates with weights indicating how many * items each represents. Usually this is 1 over the probability used to sample this candidate. * * @param candidates unordered candidates with weights * @param partitions number of partitions * @return selected bounds */ def determineBounds[K : Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] val ordered = candidates.sortBy(_._1) val numCandidates = ordered.size val sumWeights = ordered.map(_._2.toDouble).sum val step = sumWeights / partitions var cumWeight = 0.0 var target = step val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] while ((i < numCandidates) && (j < partitions - 1)) { val (key, weight) = ordered(i) cumWeight += weight if (cumWeight > target) { // Skip duplicate values. if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key target += step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray } }
本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5921572.html,如需转载请自行联系原作者