九、TopN案例实战
TopN案例是将数据排序完之后,按照前几名输出,这种案例在实际应用中也是很常见,例如对一个列表中的数据先进行排序,然后按降序输出前3名
数据模型
34,54,32,12... map() -> (34,34)(54,54)(32,32)(12,12) sortByKey() -> (54,54)(34,34)(32,32)(12,12) map() -> 54,34,32,12 take(2) -> 54,34 for()
处理流程
对数据先进行map()操作,形成一个一个value对,然后进行sortByKey()操作,将数据按照key排序,然后在通过map()将一个一个value对转换成值,最后通过take()取出前几名并输出打印
首先将SparkConf分装在一个类中
Java:
package com.kfk.spark.common; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 6:18 下午 */ public class CommSparkContext { public static JavaSparkContext getsc(){ SparkConf sparkConf = new SparkConf().setAppName("CommSparkContext").setMaster("local"); return new JavaSparkContext(sparkConf); } }
Scala:
package com.kfk.spark.common import org.apache.spark.{SparkConf, SparkContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 6:18 下午 */ object CommSparkContextScala { def getsc():SparkContext ={ val sparkConf = new SparkConf().setAppName("CommSparkContextScala").setMaster("local") return new SparkContext(sparkConf) } }
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 6:26 下午 */ public class TopnJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list = Arrays.asList(34,54,32,12,67,25,84,58,39,18,39,81); /** * 数据模型 * 34,54,32,12... * map() -> (34,34)(54,54)(32,32)(12,12) * sortByKey() -> (54,54)(34,34)(32,32)(12,12) * map() -> 54,34,32,12 * take(2) -> 54,34 * for() */ JavaRDD rdd = sc.parallelize(list); JavaPairRDD<Integer,Integer> beginSortValue = rdd.mapToPair(new PairFunction<Integer,Integer,Integer>() { public Tuple2<Integer,Integer> call(Integer value) throws Exception { return new Tuple2<Integer, Integer>(value,value); } }); /** * sortByKey() -> (54,54)(34,34)(32,32)(12,12) */ JavaPairRDD<Integer,Integer> sortKey = beginSortValue.sortByKey(false); /** * map() -> 54,34,32,12 */ JavaRDD<Integer> sortMapKey = sortKey.map(new Function<Tuple2<Integer, Integer>, Integer>() { public Integer call(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception { return integerIntegerTuple2._2; } }); /** * take(2) -> 54,34 */ List<Integer> sortKeyList = sortMapKey.take(3); for (Integer value : sortKeyList){ System.out.println(value); } } }
运行结果:
84 81 67
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 6:47 下午 */ object TopnScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list = Array(34, 54, 32, 12, 67, 25, 84, 58, 39, 18, 39, 81) /** * 数据模型 * 34,54,32,12... * map() -> (34,34)(54,54)(32,32)(12,12) * sortByKey() -> (54,54)(34,34)(32,32)(12,12) * map() -> 54,34,32,12 * take(2) -> 54,34 * for() */ val rdd = sc.parallelize(list) val beginSortValue = rdd.map(x => (x,x)) val sortKey = beginSortValue.sortByKey(false) val sortMapKey = sortKey.map(x => x._2) val sortKeyList = sortMapKey.take(3) for (elem <- sortKeyList) { System.out.println(elem) } } }
运行结果:
84 81 67
十、GroupTopN案例实战
GroupTopN案例也是在实际应用中也是很常见的一种,它和TopN类似,是将数据先进行分组在进行排序。
数据模型
"class2 93" (class2,93) "class1 97" map() -> (class1,97) groupByKey() -> <class1,(90,97,98,89,79,34,45,99)> map() -> <class1,(99,98,97,90,89,79,45,34)> ... "class1 89" (class1,89) ... ...
使用不同的语言,处理流程也不一样,大概的思想是一样的!
使用Java语言的处理流程
对数据先进行map()操作,形成一个一个value对,然后进行groupByKey()操作, 将数据按照key分组,value为Iterable集合,然后再通过map()将value值进行排序,形成一个新的key-value对。 排序通过top(n)算法,将数据写入到LinkedList列表中,只保留按大小排名的前三名元素。
top(n) 算法设计需求:
对于一组集合中的数据,求出它数据大小排名前n的数
90,97,98,89,79,34,45,99 []
top(n) 算法设计思路:
首先使用迭代器遍历集合中的每一个数据,定义一个LinkedList双向链表,如果LinkedList里面为空,那么也就是将遍历集合第一个的数据元素添加到链表中,即开始遍历集合第二个数据元素与链表中第一个元素对比,以此类推,这里会出现两种情况,如果集合中的数据元素比链表中的元素大,那么就会将集合中的添加到链表中元素的前一位,但是只保留链表中前n位,否则最后一位将被移除,另一种情况如果遍历集合中的数据元素小于链表中元素,并且链表的大小小于3,则添加在最后。
top(n) 算法演化过程:
top(3) null -> 90 97 -> 97,90 (add) 98 -> 98,97,90 (add) 89 -> 98,97,90 79 -> 98,97,90 34 -> 98,97,90 45 -> 98,97,90 99 -> 99,98,97 (remove -> 90) 97 -> 97,90 (add) 89 -> 97,90,89
top(n) 算法实现:
Iterator<Integer> iterator = groupValue._2.iterator(); LinkedList<Integer> linkedList = new LinkedList<Integer>(); while (iterator.hasNext()){ Integer value = (Integer) iterator.next(); if (linkedList.size() == 0){ linkedList.add(value); } else { for (int i = 0; i < linkedList.size(); i++){ if (value > linkedList.get(i)){ linkedList.add(i,value); if (linkedList.size() > 3){ linkedList.removeLast(); } break; } else { if (linkedList.size() < 3){ linkedList.add(value); break; } } } } }
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 8:38 下午 */ public class GroupTopnJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List<String> list = Arrays.asList("class1 90", "class2 93", "class1 97", "class1 89", "class3 99", "class1 34", "class1 45", "class1 99", "class3 78", "class1 79", "class2 85", "class2 89", "class2 96", "class3 92", "class1 98", "class3 86"); JavaRDD<String> rdd = sc.parallelize(list); /** * class1 90 mapToPair() -> (class1,90) * class2 93 * class1 97 * class1 89 * ... */ JavaPairRDD<String,Integer> beginGroupValue = rdd.mapToPair(new PairFunction<String,String,Integer>() { public Tuple2<String, Integer> call(String line) throws Exception { String key = line.split(" ")[0]; Integer value = Integer.parseInt(line.split(" ")[1]); return new Tuple2<String, Integer>(key,value); } }); /** * <class1,(90,97,98,89,79,34,45,99)> * ... */ JavaPairRDD<String,Iterable<Integer>> groupValues = beginGroupValue.groupByKey(); /** * <class1,(90,97,98,89,79,34,45,99)> -> <class1,(99,98,97,90,89,79,45,34)> */ JavaPairRDD<String,Iterable<Integer>> groupTopValues = groupValues.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() { public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupValue) throws Exception { Iterator<Integer> iterator = groupValue._2.iterator(); /** * top(n) 算法演化过程 * top(3) * null -> 90 * 97 -> 97,90 (add) * 98 -> 98,97,90 (add) * 89 -> 98,97,90 * 79 -> 98,97,90 * 34 -> 98,97,90 * 45 -> 98,97,90 * 99 -> 99,98,97 (remove -> 90) * * 97 -> 97,90 (add) * 89 -> 97,90,89 */ // 90,97,89,79,34,45,99 // [90,] LinkedList<Integer> linkedList = new LinkedList<Integer>(); while (iterator.hasNext()){ Integer value = (Integer) iterator.next(); if (linkedList.size() == 0){ linkedList.add(value); } else { for (int i = 0; i < linkedList.size(); i++){ if (value > linkedList.get(i)){ linkedList.add(i,value); if (linkedList.size() > 3){ linkedList.removeLast(); } break; } else { if (linkedList.size() < 3){ linkedList.add(value); break; } } } } } return new Tuple2<String, Iterable<Integer>>(groupValue._1,linkedList); } }); groupTopValues.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() { public void call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception { System.out.println(stringIterableTuple2._1); Iterator<Integer> iterator = stringIterableTuple2._2.iterator(); while (iterator.hasNext()){ System.out.println(iterator.next()); } } }); } }
运行结果:
class3 99 86 78 class1 99 98 97 class2 96 93 85
使用Scala语言的处理流程
对数据先进行map()操作,形成一个一个value对,然后进行groupByKey()操作, 将数据按照key分组,value为Iterable集合,然后再通过map()将value值进行排序,形成一个新的key-value对。 排序通过将value值转换成一个list列表,使用sortWith()方法排序,再通过take()只保留按大小排名的前三名元素。
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 8:38 下午 */ object GroupTopnScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list = Array("class1 90", "class2 93", "class1 97", "class1 89", "class3 99", "class1 34", "class1 45", "class1 99", "class3 78", "class1 79", "class2 85", "class2 89", "class2 96", "class3 92", "class1 98", "class3 86") val rdd = sc.parallelize(list) /** * class1 90 mapToPair() -> (class1,90) * class2 93 * class1 97 * class1 89 * ... */ val beginGroupValue = rdd.map(x => { val key = x.split(" ")(0) val value = x.split(" ")(1).toInt (key,value) }) /** * <class1,(90,97,98,89,79,34,45,99)> * ... */ val groupValues = beginGroupValue.groupByKey() /** * <class1,(90,97,98,89,79,34,45,99)> map() -> <class1,(99,98,97,90,89,79,45,34)> */ val groupTopValues = groupValues.map(x => { val values = x._2.toList.sortWith((x,y) => x>y).take(3) (x._1,values) }) groupTopValues.foreach(x => { System.out.println(x._1) for (elem <- x._2) { System.out.println(elem) } }) } }
运行结果:
class3 99 92 86 class1 99 98 97 class2 96 93 89