Spark核心编程与项目案例详解(二)下

简介: 笔记

九、TopN案例实战


TopN案例是将数据排序完之后,按照前几名输出,这种案例在实际应用中也是很常见,例如对一个列表中的数据先进行排序,然后按降序输出前3名

数据模型

34,54,32,12...
map()        -> (34,34)(54,54)(32,32)(12,12)
sortByKey()  -> (54,54)(34,34)(32,32)(12,12)
map()        -> 54,34,32,12
take(2)      -> 54,34
for()

处理流程

对数据先进行map()操作,形成一个一个value对,然后进行sortByKey()操作,将数据按照key排序,然后在通过map()将一个一个value对转换成值,最后通过take()取出前几名并输出打印 

首先将SparkConf分装在一个类中

Java:

package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 6:18 下午
 */
public class CommSparkContext {
    public static JavaSparkContext getsc(){
        SparkConf sparkConf = new SparkConf().setAppName("CommSparkContext").setMaster("local");
        return new JavaSparkContext(sparkConf);
    }
}

Scala:

package com.kfk.spark.common
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 6:18 下午
 */
object CommSparkContextScala {
    def getsc():SparkContext ={
        val sparkConf = new SparkConf().setAppName("CommSparkContextScala").setMaster("local")
        return new SparkContext(sparkConf)
    }
}

(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 6:26 下午
 */
public class TopnJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list = Arrays.asList(34,54,32,12,67,25,84,58,39,18,39,81);
        /**
         * 数据模型
         * 34,54,32,12...
         * map()        -> (34,34)(54,54)(32,32)(12,12)
         * sortByKey()  -> (54,54)(34,34)(32,32)(12,12)
         * map()        -> 54,34,32,12
         * take(2)      -> 54,34
         * for()
         */
        JavaRDD rdd = sc.parallelize(list);
        JavaPairRDD<Integer,Integer> beginSortValue = rdd.mapToPair(new PairFunction<Integer,Integer,Integer>() {
            public Tuple2<Integer,Integer> call(Integer value) throws Exception {
                return new Tuple2<Integer, Integer>(value,value);
            }
        });
        /**
         * sortByKey()  -> (54,54)(34,34)(32,32)(12,12)
         */
        JavaPairRDD<Integer,Integer> sortKey = beginSortValue.sortByKey(false);
        /**
         * map()        -> 54,34,32,12
         */
        JavaRDD<Integer> sortMapKey = sortKey.map(new Function<Tuple2<Integer, Integer>, Integer>() {
            public Integer call(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception {
                return integerIntegerTuple2._2;
            }
        });
        /**
         * take(2)      -> 54,34
         */
        List<Integer> sortKeyList = sortMapKey.take(3);
        for (Integer value : sortKeyList){
            System.out.println(value);
        }
    }
}

运行结果:

84
81
67


(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 6:47 下午
 */
object TopnScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list = Array(34, 54, 32, 12, 67, 25, 84, 58, 39, 18, 39, 81)
        /**
         * 数据模型
         * 34,54,32,12...
         * map()        -> (34,34)(54,54)(32,32)(12,12)
         * sortByKey()  -> (54,54)(34,34)(32,32)(12,12)
         * map()        -> 54,34,32,12
         * take(2)      -> 54,34
         * for()
         */
        val rdd = sc.parallelize(list)
        val beginSortValue = rdd.map(x => (x,x))
        val sortKey = beginSortValue.sortByKey(false)
        val sortMapKey = sortKey.map(x => x._2)
        val sortKeyList = sortMapKey.take(3)
        for (elem <- sortKeyList) {
            System.out.println(elem)
        }
    }
}

运行结果:

84
81
67



十、GroupTopN案例实战


GroupTopN案例也是在实际应用中也是很常见的一种,它和TopN类似,是将数据先进行分组在进行排序。

数据模型

"class2 93"         (class2,93)
"class1 97"   map() ->  (class1,97)   groupByKey() -> <class1,(90,97,98,89,79,34,45,99)>    map() ->  <class1,(99,98,97,90,89,79,45,34)>
                              ...
"class1 89"         (class1,89)
...             ...

使用不同的语言,处理流程也不一样,大概的思想是一样的!

使用Java语言的处理流程

对数据先进行map()操作,形成一个一个value对,然后进行groupByKey()操作,
将数据按照key分组,value为Iterable集合,然后再通过map()将value值进行排序,形成一个新的key-value对。
排序通过top(n)算法,将数据写入到LinkedList列表中,只保留按大小排名的前三名元素。

top(n) 算法设计需求:

对于一组集合中的数据,求出它数据大小排名前n的数

90,97,98,89,79,34,45,99
[]

top(n) 算法设计思路:

首先使用迭代器遍历集合中的每一个数据,定义一个LinkedList双向链表,如果LinkedList里面为空,那么也就是将遍历集合第一个的数据元素添加到链表中,即开始遍历集合第二个数据元素与链表中第一个元素对比,以此类推,这里会出现两种情况,如果集合中的数据元素比链表中的元素大,那么就会将集合中的添加到链表中元素的前一位,但是只保留链表中前n位,否则最后一位将被移除,另一种情况如果遍历集合中的数据元素小于链表中元素,并且链表的大小小于3,则添加在最后。


top(n) 算法演化过程:

top(3)
null -> 90
97   -> 97,90 (add)
98   -> 98,97,90 (add)
89   -> 98,97,90
79   -> 98,97,90
34   -> 98,97,90
45   -> 98,97,90
99   -> 99,98,97 (remove -> 90)
97   -> 97,90 (add)
89   -> 97,90,89

top(n) 算法实现:

Iterator<Integer> iterator = groupValue._2.iterator();
LinkedList<Integer> linkedList = new LinkedList<Integer>();
while (iterator.hasNext()){
    Integer value = (Integer) iterator.next();
    if (linkedList.size() == 0){
        linkedList.add(value);
    } else {
        for (int i = 0; i < linkedList.size(); i++){
            if (value > linkedList.get(i)){
                linkedList.add(i,value);
                if (linkedList.size() > 3){
                    linkedList.removeLast();
                }
                break;
            } else {
                if (linkedList.size() < 3){
                    linkedList.add(value);
                    break;
                }
            }
        }
    }
}

(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 8:38 下午
 */
public class GroupTopnJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List<String> list = Arrays.asList("class1 90",
                "class2 93",
                "class1 97",
                "class1 89",
                "class3 99",
                "class1 34",
                "class1 45",
                "class1 99",
                "class3 78",
                "class1 79",
                "class2 85",
                "class2 89",
                "class2 96",
                "class3 92",
                "class1 98",
                "class3 86");
        JavaRDD<String> rdd = sc.parallelize(list);
        /**
         *  class1 90       mapToPair() -> (class1,90)
         *  class2 93
         *  class1 97
         *  class1 89
         *  ...
         */
        JavaPairRDD<String,Integer> beginGroupValue = rdd.mapToPair(new PairFunction<String,String,Integer>() {
            public Tuple2<String, Integer> call(String line) throws Exception {
                String key = line.split(" ")[0];
                Integer value = Integer.parseInt(line.split(" ")[1]);
                return new Tuple2<String, Integer>(key,value);
            }
        });
        /**
         * <class1,(90,97,98,89,79,34,45,99)>
         * ...
         */
        JavaPairRDD<String,Iterable<Integer>> groupValues = beginGroupValue.groupByKey();
        /**
         * <class1,(90,97,98,89,79,34,45,99)>      -> <class1,(99,98,97,90,89,79,45,34)>
         */
        JavaPairRDD<String,Iterable<Integer>> groupTopValues = groupValues.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
            public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupValue) throws Exception {
                Iterator<Integer> iterator = groupValue._2.iterator();
                /**
                 * top(n) 算法演化过程
                 * top(3)
                 * null -> 90
                 * 97   -> 97,90 (add)
                 * 98   -> 98,97,90 (add)
                 * 89   -> 98,97,90
                 * 79   -> 98,97,90
                 * 34   -> 98,97,90
                 * 45   -> 98,97,90
                 * 99   -> 99,98,97 (remove -> 90)
                 *
                 * 97   -> 97,90 (add)
                 * 89   -> 97,90,89
                 */
                // 90,97,89,79,34,45,99
                // [90,]
                LinkedList<Integer> linkedList = new LinkedList<Integer>();
                while (iterator.hasNext()){
                    Integer value = (Integer) iterator.next();
                    if (linkedList.size() == 0){
                        linkedList.add(value);
                    } else {
                        for (int i = 0; i < linkedList.size(); i++){
                            if (value > linkedList.get(i)){
                                linkedList.add(i,value);
                                if (linkedList.size() > 3){
                                    linkedList.removeLast();
                                }
                                break;
                            } else {
                                if (linkedList.size() < 3){
                                    linkedList.add(value);
                                    break;
                                }
                            }
                        }
                    }
                }
                return new Tuple2<String, Iterable<Integer>>(groupValue._1,linkedList);
            }
        });
        groupTopValues.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            public void call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
                System.out.println(stringIterableTuple2._1);
                Iterator<Integer> iterator = stringIterableTuple2._2.iterator();
                while (iterator.hasNext()){
                    System.out.println(iterator.next());
                }
            }
        });
    }
}

运行结果:

class3
99
86
78
class1
99
98
97
class2
96
93
85

使用Scala语言的处理流程

对数据先进行map()操作,形成一个一个value对,然后进行groupByKey()操作,
将数据按照key分组,value为Iterable集合,然后再通过map()将value值进行排序,形成一个新的key-value对。
排序通过将value值转换成一个list列表,使用sortWith()方法排序,再通过take()只保留按大小排名的前三名元素。


(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 8:38 下午
 */
object GroupTopnScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list = Array("class1 90",
            "class2 93",
            "class1 97",
            "class1 89",
            "class3 99",
            "class1 34",
            "class1 45",
            "class1 99",
            "class3 78",
            "class1 79",
            "class2 85",
            "class2 89",
            "class2 96",
            "class3 92",
            "class1 98",
            "class3 86")
        val rdd = sc.parallelize(list)
        /**
         * class1 90       mapToPair() -> (class1,90)
         * class2 93
         * class1 97
         * class1 89
         * ...
         */
        val beginGroupValue = rdd.map(x => {
            val key = x.split(" ")(0)
            val value = x.split(" ")(1).toInt
            (key,value)
        })
        /**
         * <class1,(90,97,98,89,79,34,45,99)>
         * ...
         */
        val groupValues = beginGroupValue.groupByKey()
        /**
         * <class1,(90,97,98,89,79,34,45,99)>      map() -> <class1,(99,98,97,90,89,79,45,34)>
         */
        val groupTopValues = groupValues.map(x => {
            val values = x._2.toList.sortWith((x,y) => x>y).take(3)
            (x._1,values)
        })
        groupTopValues.foreach(x => {
            System.out.println(x._1)
            for (elem <- x._2) {
                System.out.println(elem)
            }
        })
    }
}

运行结果:

class3
99
92
86
class1
99
98
97
class2
96
93
89
相关文章
|
1月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
49 5
|
1月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
37 4
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
50 3
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
61 0
|
1月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
42 4
|
1月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
40 1
|
1月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
38 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
1月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
69 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
56 0
|
1月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
29 0
下一篇
无影云桌面