六、共享变量详解
Spark一个非常重要的特性就是共享变量。
默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只 能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。
Spark为此提供了两种共享变量,一种是Broadcast Variable (广播变量),另一种是Accumulator (累加变量)。Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。
不使用共享变量:
默认情况下,算子的函数内使用到的外部变量时,会拷贝到执行这个函数的每一个task中。当变量比较大的时候,网络传输的量就会大,并且在每个节点上占用较多的内存空间。
使用共享变量:
如果把算子使用的变量设置成共享变量的话,那么变量只会拷贝一份到每一个worker节点上,节点上所有的task都会共享这个一份变量
(1)Broadcast Variable工作原理详解
Spark提供的Broadcast Variable,是只读的。并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本。因此其最大作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗。此外,spark自己内部也使用了高效的广播算法来减少网络消耗。
可以通过调用SparkContext的broadcast()方法,来针对某个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了。每个节点可以使用广播变量的value()方法获取值。记住,广播变量,是只读的。
val factor = 3 val factorBroadcast = sc.broadcast(factor) val arr = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(arr) val values =rdd.map(x => (x * broast.value)); values.foreach(x => System.out.println(x));
(2)Accumulator工作原理详解
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。
val arr = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(arr) val accu = sc.longAccumulator; rdd.foreach(x => accu.add(x)); System.out.println(accu)
(3)通过代码验证共享变量
使用Java实现
package com.kfk.spark.core; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.broadcast.Broadcast; import java.util.Arrays; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/27 * @time : 7:52 下午 */ public class VarJava { public static JavaSparkContext getsc(){ SparkConf sparkConf = new SparkConf().setAppName("VarJava").setMaster("local"); return new JavaSparkContext(sparkConf); } public static void main(String[] args) { List list = Arrays.asList(1,2,3,4,5); JavaSparkContext sc = getsc(); // 广播变量 final Broadcast broadcast = sc.broadcast(10); JavaRDD<Integer> javaRdd = sc.parallelize(list); JavaRDD mapValues = javaRdd.map(new Function<Integer, Integer>() { public Integer call(Integer value) throws Exception { return value * (Integer) broadcast.getValue(); } }); mapValues.foreach(new VoidFunction() { public void call(Object o) throws Exception { System.out.println(o); } }); // 累加变量 final Accumulator accumulator = sc.accumulator(1); javaRdd.foreach(new VoidFunction<Integer>() { public void call(Integer integer) throws Exception { accumulator.add(integer); } }); System.out.println(accumulator.value()); } }
运行结果:
广播: 10 20 30 40 50 累加: 16
使用Scala语言实现
package com.kfk.spark.core import org.apache.spark.{SparkConf, SparkContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/27 * @time : 9:00 下午 */ object VarScala { def getsc():SparkContext ={ val sparkConf = new SparkConf().setAppName("VarScala").setMaster("local") return new SparkContext(sparkConf) } def main(args: Array[String]): Unit = { val list = Array(1,2,3,4,5) val sc = getsc() // 广播变量 val broadcast = sc.broadcast(10) val rdd = sc.parallelize(list) val values = rdd.map(x => x * broadcast.value) values.foreach(x => System.out.println(x)) // 累加变量 val accumulator = sc.longAccumulator val mapValue = rdd.foreach(x => { accumulator.add(x) }) System.out.println(accumulator.value) } }
七、Spark之WordCount高级编程
针对前面的WordCount案例,我将程序更加完善,增加了按照value值的大小进行排序。
(1)使用Java语言实现
package com.kfk.spark.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/22 * @time : 10:19 下午 */ public class SortWordCountJava { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> lines = sc.textFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/wordcount.txt"); /** * java python hive flatMap() -> java python hive hive java... * hive java ... */ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() { public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); /** * java python hive hive java... mapToPair() -> (java,1)(hive,1)(java,1)(python,1)... */ JavaPairRDD<String,Integer> word = words.mapToPair(new PairFunction<String,String,Integer>() { public Tuple2<String,Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word,1); } }); /** * (java,1)(hive,1)(java,1)(python,1)... reduceByKey() -> (java,2)(hive,1)(python,1)... */ JavaPairRDD<String, Integer> wordcount = word.reduceByKey(new Function2<Integer,Integer,Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+ v2; } }); /** * (spark,1)(hive,3)(hadoop,3)... mapToPair() -> (3,hadoop)(3,hive)... */ JavaPairRDD<Integer,String> wordcountSortValue = wordcount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return new Tuple2<Integer, String>(stringIntegerTuple2._2,stringIntegerTuple2._1); } }); /** * (3,hadoop)(3,hive)... sortByKey(false) -> (3,hadoop)(3,hive)(2,java)(1,python)... */ JavaPairRDD<Integer,String> sort = wordcountSortValue.sortByKey(false); /** * (3,hadoop)(3,hive)(2,java)(1,python)... mapToPair() -> (hadoop,3)(hive,3)(java,2)(python,1)... */ JavaPairRDD<String,Integer> wordcountSortValues = sort.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception { return new Tuple2<String, Integer>(integerStringTuple2._2,integerStringTuple2._1); } }); /** * foreach() */ wordcountSortValues.foreach(new VoidFunction<Tuple2<String,Integer>>() { public void call(Tuple2<String,Integer> o) throws Exception { System.out.println(o._1 + " : " + o._2); } }); } }
运行结果:
java : 5 hive : 3 hadoop : 3 flink : 2 spark : 1 python : 1 storm : 1 hbase : 1
(2)使用Scala语言实现
package com.kfk.spark.core import org.apache.spark.{SparkConf, SparkContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/22 * @time : 10:48 下午 */ object SortWordCountScala { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local") val sc = new SparkContext(sparkConf) val lines = sc.textFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/wordcount.txt") // val wordcount = lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((x,y) => (x+y)).foreach((_wordcount => println(_wordcount._1 + " : " + _wordcount._2)) /** * java python hive flatMap() -> java python hive hive java... * hive java ... */ val words = lines.flatMap(line => line.split(" ")) /** * java python hive hive java... map() -> (java,1)(hive,1)(java,1)(python,1)... */ val word = words.map(word => (word, 1)) /** * (java,1)(hive,1)(java,1)(python,1)... reduceByKey() -> (java,2)(hive,1)(python,1)... */ val wordcount = word.reduceByKey((x, y) => x + y) /** * (spark,1)(hive,3)(hadoop,3)... map() -> (3,hadoop)(3,hive)... */ val wordcountSortValue = wordcount.map(x => (x._2,x._1)) /** * (3,hadoop)(3,hive)... sortByKey(false) -> (3,hadoop)(3,hive)(2,java)(1,python)... */ val sort = wordcountSortValue.sortByKey(false) /** * (3,hadoop)(3,hive)(2,java)(1,python)... map() -> (hadoop,3)(hive,3)(java,2)(python,1)... */ val wordcountSortValues = sort.map(x => (x._2,x._1)) /** * foreach() */ wordcountSortValues.foreach(_wordcount => println(_wordcount._1 + " : " + _wordcount._2)) } }
运行结果:
java : 5 hive : 3 hadoop : 3 flink : 2 spark : 1 python : 1 storm : 1 hbase : 1
八、二次排序案例实战
二次排序定义:
- 按照文件中的第一列排序
- 如果第一列相同,则按照第二列排序
需求分析:
比如我们给定这样的数据模型,我们需要先按照班级排序,在按照班级里的分数排序
"class1 90" class3 99 "class2 93“ class2 93 "class1 97" -> class1 97 "class1 89" class1 90 "class3 99" class1 89 ...
实现思路:
1.自定义一个键值对的比较类来实现比较,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法。 2.将包含文本的RDD映射成key为自定义SecondSortKey,value为原始数据的JavaPariRDD 3.使用sortByKey算子按照自定义的key进行排序 4.将排序过的value值打印输出
数据模型转换流程:
rdd(lines) JavaPairRDD JavaPairRDD "class1 90" (SecondSortKey(class1,90),"class1 90") (SecondSortKey(class3,99),"class3 99") class3 99 "class2 93“ (SecondSortKey(class2,93),"class2 93") (SecondSortKey(class2,93),"class2 93") class2 93 "class1 97" map() -> (SecondSortKey(class1,97),"class1 97") sortByKey(false) -> (SecondSortKey(class1,97),"class1 97") foreach(x,_2) -> class1 97 "class1 89" (SecondSortKey(class1,89),"class1 89") (SecondSortKey(class1,90),"class1 90") class1 90 "class3 99" (SecondSortKey(class3,99),"class3 99") (SecondSortKey(class1,89),"class1 89") class1 89
(1)使用Java语言实现
自定义SecondSortKey类
package com.kfk.spark.core; import scala.math.Ordered; import java.io.Serializable; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 11:36 上午 */ public class SecondSortKey implements Ordered<SecondSortKey>, Serializable { private String first; private int second; public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } public SecondSortKey(String first, int second) { this.first = first; this.second = second; } public int compare(SecondSortKey that) { int comp = this.getFirst().compareTo(that.getFirst()); if (comp == 0){ return Integer.valueOf(this.getSecond()).compareTo(that.getSecond()); } return comp; } public boolean $less(SecondSortKey that) { return false; } public boolean $greater(SecondSortKey that) { return false; } public boolean $less$eq(SecondSortKey that) { return false; } public boolean $greater$eq(SecondSortKey that) { return false; } public int compareTo(SecondSortKey that) { int comp = this.getFirst().compareTo(that.getFirst()); if (comp == 0){ return Integer.valueOf(this.getSecond()).compareTo(that.getSecond()); } return comp; } }
实现代码:
package com.kfk.spark.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 11:28 上午 */ public class SecondSortJava { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("SecondSortJava").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); /** * class1 90 * class2 93 * class1 97 -> class1 (89,90,97...) class2(89,93...) class3(86,99...) * class1 89 * class3 99 * ... */ List list = Arrays.asList("class1 90","class2 93","class1 97", "class1 89","class3 99","class3 78", "class1 79","class2 85","class2 89", "class2 96","class3 92","class3 86"); JavaRDD rdd = sc.parallelize(list); JavaPairRDD<SecondSortKey,String> beginsortValues = rdd.mapToPair(new PairFunction<String, SecondSortKey, String>() { public Tuple2<SecondSortKey, String> call(String line) throws Exception { String first = line.split(" ")[0]; int second = Integer.parseInt(line.split(" ")[1]); SecondSortKey secondSortKey = new SecondSortKey(first,second); return new Tuple2<SecondSortKey,String>(secondSortKey,line); } }); JavaPairRDD<SecondSortKey,String> sortValues = beginsortValues.sortByKey(false); sortValues.foreach(new VoidFunction<Tuple2<SecondSortKey, String>>() { public void call(Tuple2<SecondSortKey, String> secondSortKeyStringTuple2) throws Exception { System.out.println(secondSortKeyStringTuple2._2); } }); } }
运行结果:
class3 99 class3 92 class3 86 class3 78 class2 96 class2 93 class2 89 class2 85 class1 97 class1 90 class1 89 class1 79
(2)使用Scala语言实现
自定义SecondSortKey类
package com.kfk.spark.core /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 1:27 下午 */ class SecondSortKeyScala(val first : String,val second : Int) extends Ordered[SecondSortKeyScala] with Serializable { override def compare(that: SecondSortKeyScala): Int = { val comp = this.first.compareTo(that.first) if (comp == 0){ return this.second.compareTo(that.second) } return comp; } }
实现代码:
package com.kfk.spark.core import org.apache.spark.{SparkConf, SparkContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 1:21 下午 */ object SecondSortScala { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local") val sc = new SparkContext(sparkConf) /** * class1 90 * class2 93 * class1 97 -> class1 (89,90,97...) class2(89,93...) class3(86,99...) * class1 89 * class3 99 * ... */ val list = Array("class1 90", "class2 93", "class1 97", "class1 89", "class3 99", "class3 78", "class1 79", "class2 85", "class2 89", "class2 96", "class3 92", "class3 86") val rdd = sc.parallelize(list) val beginSortValues = rdd.map(x => (new SecondSortKeyScala(x.split(" ")(0),x.split(" ")(1).toInt),x)) val sortValues = beginSortValues.sortByKey(false) sortValues.foreach(x => System.out.println(x._2)) } }
运行结果:
class3 99 class3 92 class3 86 class3 78 class2 96 class2 93 class2 89 class2 85 class1 97 class1 90 class1 89 class1 79