Spark核心编程与项目案例详解(二)上

简介: 笔记

六、共享变量详解


Spark一个非常重要的特性就是共享变量。


默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只 能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。


Spark为此提供了两种共享变量,一种是Broadcast Variable (广播变量),另一种是Accumulator (累加变量)。Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。


不使用共享变量:

1.png

默认情况下,算子的函数内使用到的外部变量时,会拷贝到执行这个函数的每一个task中。当变量比较大的时候,网络传输的量就会大,并且在每个节点上占用较多的内存空间。


使用共享变量:

2.png


如果把算子使用的变量设置成共享变量的话,那么变量只会拷贝一份到每一个worker节点上,节点上所有的task都会共享这个一份变量


(1)Broadcast Variable工作原理详解

Spark提供的Broadcast Variable,是只读的。并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本。因此其最大作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗。此外,spark自己内部也使用了高效的广播算法来减少网络消耗。


可以通过调用SparkContext的broadcast()方法,来针对某个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了。每个节点可以使用广播变量的value()方法获取值。记住,广播变量,是只读的。

val factor = 3
val factorBroadcast = sc.broadcast(factor)
val arr = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(arr)
val values =rdd.map(x => (x * broast.value));
values.foreach(x => System.out.println(x));


(2)Accumulator工作原理详解

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

val arr = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(arr)
val accu = sc.longAccumulator;
rdd.foreach(x => accu.add(x));
System.out.println(accu)


(3)通过代码验证共享变量

使用Java实现

package com.kfk.spark.core;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import java.util.Arrays;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/27
 * @time : 7:52 下午
 */
public class VarJava {
    public static JavaSparkContext getsc(){
        SparkConf sparkConf = new SparkConf().setAppName("VarJava").setMaster("local");
        return new JavaSparkContext(sparkConf);
    }
    public static void main(String[] args) {
        List list = Arrays.asList(1,2,3,4,5);
        JavaSparkContext sc = getsc();
        // 广播变量
        final Broadcast broadcast = sc.broadcast(10);
        JavaRDD<Integer> javaRdd = sc.parallelize(list);
        JavaRDD mapValues = javaRdd.map(new Function<Integer, Integer>() {
            public Integer call(Integer value) throws Exception {
                return  value * (Integer) broadcast.getValue();
            }
        });
        mapValues.foreach(new VoidFunction() {
            public void call(Object o) throws Exception {
                System.out.println(o);
            }
        });
        // 累加变量
        final Accumulator accumulator = sc.accumulator(1);
        javaRdd.foreach(new VoidFunction<Integer>() {
            public void call(Integer integer) throws Exception {
                accumulator.add(integer);
            }
        });
        System.out.println(accumulator.value());
    }
}

运行结果:

广播:
10
20
30
40
50
累加:
16

使用Scala语言实现

package com.kfk.spark.core
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/27
 * @time : 9:00 下午
 */
object VarScala {
    def getsc():SparkContext ={
        val sparkConf = new SparkConf().setAppName("VarScala").setMaster("local")
        return new SparkContext(sparkConf)
    }
    def main(args: Array[String]): Unit = {
        val list = Array(1,2,3,4,5)
        val sc = getsc()
        // 广播变量
        val broadcast = sc.broadcast(10)
        val rdd = sc.parallelize(list)
        val values = rdd.map(x => x * broadcast.value)
        values.foreach(x => System.out.println(x))
        // 累加变量
        val accumulator = sc.longAccumulator
        val mapValue = rdd.foreach(x => {
            accumulator.add(x)
        })
        System.out.println(accumulator.value)
    }
}


七、Spark之WordCount高级编程


针对前面的WordCount案例,我将程序更加完善,增加了按照value值的大小进行排序。

(1)使用Java语言实现

package com.kfk.spark.core;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/22
 * @time : 10:19 下午
 */
public class SortWordCountJava {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaRDD<String> lines = sc.textFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/wordcount.txt");
        /**
         * java python hive     flatMap() -> java python hive hive java...
         * hive java ...
         */
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        /**
         * java python hive hive java...    mapToPair() -> (java,1)(hive,1)(java,1)(python,1)...
         */
        JavaPairRDD<String,Integer> word = words.mapToPair(new PairFunction<String,String,Integer>() {
            public Tuple2<String,Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word,1);
            }
        });
        /**
         * (java,1)(hive,1)(java,1)(python,1)...    reduceByKey() -> (java,2)(hive,1)(python,1)...
         */
        JavaPairRDD<String, Integer> wordcount = word.reduceByKey(new Function2<Integer,Integer,Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+ v2;
            }
        });
        /**
         * (spark,1)(hive,3)(hadoop,3)...  mapToPair() -> (3,hadoop)(3,hive)...
         */
        JavaPairRDD<Integer,String> wordcountSortValue =  wordcount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return new Tuple2<Integer, String>(stringIntegerTuple2._2,stringIntegerTuple2._1);
            }
        });
        /**
         * (3,hadoop)(3,hive)...    sortByKey(false) -> (3,hadoop)(3,hive)(2,java)(1,python)...
         */
        JavaPairRDD<Integer,String> sort = wordcountSortValue.sortByKey(false);
        /**
         * (3,hadoop)(3,hive)(2,java)(1,python)...      mapToPair() -> (hadoop,3)(hive,3)(java,2)(python,1)...
         */
        JavaPairRDD<String,Integer> wordcountSortValues = sort.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                return new Tuple2<String, Integer>(integerStringTuple2._2,integerStringTuple2._1);
            }
        });
        /**
         * foreach()
         */
        wordcountSortValues.foreach(new VoidFunction<Tuple2<String,Integer>>() {
            public void call(Tuple2<String,Integer> o) throws Exception {
                System.out.println(o._1 + " : " + o._2);
            }
        });
    }
}

运行结果:

java : 5
hive : 3
hadoop : 3
flink : 2
spark : 1
python : 1
storm : 1
hbase : 1


(2)使用Scala语言实现

package com.kfk.spark.core
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/22
 * @time : 10:48 下午
 */
object SortWordCountScala {
    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local")
        val sc = new SparkContext(sparkConf)
        val lines = sc.textFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/wordcount.txt")
        // val wordcount = lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((x,y) => (x+y)).foreach((_wordcount => println(_wordcount._1 + " : " + _wordcount._2))
        /**
         * java python hive     flatMap() -> java python hive hive java...
         * hive java ...
         */
        val words = lines.flatMap(line => line.split(" "))
        /**
         * java python hive hive java...    map() -> (java,1)(hive,1)(java,1)(python,1)...
         */
        val word = words.map(word => (word, 1))
        /**
         * (java,1)(hive,1)(java,1)(python,1)...    reduceByKey() -> (java,2)(hive,1)(python,1)...
         */
        val wordcount = word.reduceByKey((x, y) => x + y)
        /**
         * (spark,1)(hive,3)(hadoop,3)...  map() -> (3,hadoop)(3,hive)...
         */
        val wordcountSortValue = wordcount.map(x => (x._2,x._1))
        /**
         * (3,hadoop)(3,hive)...    sortByKey(false) -> (3,hadoop)(3,hive)(2,java)(1,python)...
         */
        val sort = wordcountSortValue.sortByKey(false)
        /**
         * (3,hadoop)(3,hive)(2,java)(1,python)...      map() -> (hadoop,3)(hive,3)(java,2)(python,1)...
         */
        val wordcountSortValues = sort.map(x => (x._2,x._1))
        /**
         * foreach()
         */
        wordcountSortValues.foreach(_wordcount => println(_wordcount._1 + " : " + _wordcount._2))
    }
}

运行结果:

java : 5
hive : 3
hadoop : 3
flink : 2
spark : 1
python : 1
storm : 1
hbase : 1


八、二次排序案例实战


二次排序定义:

  • 按照文件中的第一列排序
  • 如果第一列相同,则按照第二列排序

需求分析:

比如我们给定这样的数据模型,我们需要先按照班级排序,在按照班级里的分数排序

"class1 90"     class3 99
"class2 93“     class2 93
"class1 97"   ->  class1 97
"class1 89"     class1 90
"class3 99"     class1 89
...

实现思路:

1.自定义一个键值对的比较类来实现比较,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法。
2.将包含文本的RDD映射成key为自定义SecondSortKey,value为原始数据的JavaPariRDD
3.使用sortByKey算子按照自定义的key进行排序
4.将排序过的value值打印输出

数据模型转换流程:

rdd(lines)           JavaPairRDD                            JavaPairRDD
"class1 90"        (SecondSortKey(class1,90),"class1 90")               (SecondSortKey(class3,99),"class3 99")             class3 99
"class2 93“        (SecondSortKey(class2,93),"class2 93")             (SecondSortKey(class2,93),"class2 93")             class2 93
"class1 97"   map() -> (SecondSortKey(class1,97),"class1 97")   sortByKey(false) -> (SecondSortKey(class1,97),"class1 97")    foreach(x,_2) -> class1 97
"class1 89"        (SecondSortKey(class1,89),"class1 89")             (SecondSortKey(class1,90),"class1 90")             class1 90
"class3 99"        (SecondSortKey(class3,99),"class3 99")             (SecondSortKey(class1,89),"class1 89")             class1 89

(1)使用Java语言实现

自定义SecondSortKey类

package com.kfk.spark.core;
import scala.math.Ordered;
import java.io.Serializable;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 11:36 上午
 */
public class SecondSortKey implements Ordered<SecondSortKey>, Serializable {
    private String first;
    private int second;
    public String getFirst() {
        return first;
    }
    public void setFirst(String first) {
        this.first = first;
    }
    public int getSecond() {
        return second;
    }
    public void setSecond(int second) {
        this.second = second;
    }
    public SecondSortKey(String first, int second) {
        this.first = first;
        this.second = second;
    }
    public int compare(SecondSortKey that) {
        int comp = this.getFirst().compareTo(that.getFirst());
        if (comp == 0){
            return Integer.valueOf(this.getSecond()).compareTo(that.getSecond());
        }
        return comp;
    }
    public boolean $less(SecondSortKey that) {
        return false;
    }
    public boolean $greater(SecondSortKey that) {
        return false;
    }
    public boolean $less$eq(SecondSortKey that) {
        return false;
    }
    public boolean $greater$eq(SecondSortKey that) {
        return false;
    }
    public int compareTo(SecondSortKey that) {
        int comp = this.getFirst().compareTo(that.getFirst());
        if (comp == 0){
            return Integer.valueOf(this.getSecond()).compareTo(that.getSecond());
        }
        return comp;
    }
}

实现代码:

package com.kfk.spark.core;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 11:28 上午
 */
public class SecondSortJava {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("SecondSortJava").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        /**
         * class1 90
         * class2 93
         * class1 97        ->  class1 (89,90,97...) class2(89,93...) class3(86,99...)
         * class1 89
         * class3 99
         * ...
         */
        List list = Arrays.asList("class1 90","class2 93","class1 97",
                "class1 89","class3 99","class3 78",
                "class1 79","class2 85","class2 89",
                "class2 96","class3 92","class3 86");
        JavaRDD rdd = sc.parallelize(list);
        JavaPairRDD<SecondSortKey,String> beginsortValues = rdd.mapToPair(new PairFunction<String, SecondSortKey, String>() {
            public Tuple2<SecondSortKey, String> call(String line) throws Exception {
                String first = line.split(" ")[0];
                int second = Integer.parseInt(line.split(" ")[1]);
                SecondSortKey secondSortKey = new SecondSortKey(first,second);
                return new Tuple2<SecondSortKey,String>(secondSortKey,line);
            }
        });
        JavaPairRDD<SecondSortKey,String> sortValues = beginsortValues.sortByKey(false);
        sortValues.foreach(new VoidFunction<Tuple2<SecondSortKey, String>>() {
            public void call(Tuple2<SecondSortKey, String> secondSortKeyStringTuple2) throws Exception {
                System.out.println(secondSortKeyStringTuple2._2);
            }
        });
    }
}

运行结果:

class3 99
class3 92
class3 86
class3 78
class2 96
class2 93
class2 89
class2 85
class1 97
class1 90
class1 89
class1 79

(2)使用Scala语言实现

自定义SecondSortKey类

package com.kfk.spark.core
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 1:27 下午
 */
class SecondSortKeyScala(val first : String,val second : Int) extends Ordered[SecondSortKeyScala] with Serializable {
    override def compare(that: SecondSortKeyScala): Int = {
        val comp = this.first.compareTo(that.first)
        if (comp == 0){
            return this.second.compareTo(that.second)
        }
        return comp;
    }
}

实现代码:

package com.kfk.spark.core
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 1:21 下午
 */
object SecondSortScala {
    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local")
        val sc = new SparkContext(sparkConf)
        /**
         * class1 90
         * class2 93
         * class1 97        ->  class1 (89,90,97...) class2(89,93...) class3(86,99...)
         * class1 89
         * class3 99
         * ...
         */
        val list = Array("class1 90", "class2 93", "class1 97",
            "class1 89", "class3 99", "class3 78",
            "class1 79", "class2 85", "class2 89",
            "class2 96", "class3 92", "class3 86")
        val rdd = sc.parallelize(list)
        val beginSortValues = rdd.map(x => (new SecondSortKeyScala(x.split(" ")(0),x.split(" ")(1).toInt),x))
        val sortValues = beginSortValues.sortByKey(false)
        sortValues.foreach(x => System.out.println(x._2))
    }
}

运行结果:

class3 99
class3 92
class3 86
class3 78
class2 96
class2 93
class2 89
class2 85
class1 97
class1 90
class1 89
class1 79


相关文章
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
66 5
|
2月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
44 4
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
54 3
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
72 0
|
2月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
47 4
|
2月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
44 1
|
2月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
42 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
2月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
102 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
59 0
|
2月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
49 0
下一篇
DataWorks