十五、Repartition案例实战详解
Repartition算子是将任意RDD的partition数量增大或者减小,与coalesce不同的是,coalease只能将rdd的partition数量减少。而repatition对rdd的partition数量做到自由改变。
建议使用的场景:
Spark SQL加载hive的数据之后,自动分配(这里是按照hive数据对应到HDFS文件的block数量)的partition数量比较少,影响算子的运行速度。此时,在Spark SQL加载hive数据后,我们可以手动的去设置partition的数量来提供算子的运行速度。
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 10:12 下午 */ public class RePartitionJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry","lucy","pony","leo"); JavaRDD rdd = sc.parallelize(list,2); // 查看每个值对应每个分区 JavaRDD<String> indexValues1 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { public Iterator<String> call(Integer index, Iterator iterator) throws Exception { List<String> list = new ArrayList<String>(); while (iterator.hasNext()){ String indexStr = iterator.next() + " " + "以前分区" + " : " + (index+1); list.add(indexStr); } return list.iterator(); } },false); // 增加为四个分区 JavaRDD<String> repartitionValues = indexValues1.repartition(4); // 增加四个分区之后查看每个值对应每个分区 JavaRDD<String> indexValues2 = repartitionValues.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { public Iterator<String> call(Integer index, Iterator iterator) throws Exception { List<String> list = new ArrayList<String>(); while (iterator.hasNext()){ String indexStr = iterator.next() + " " + "现在分区" + " : " + (index+1); list.add(indexStr); } return list.iterator(); } },false); indexValues2.foreach(new VoidFunction<String>() { public void call(String s) throws Exception { System.out.println(s); } }); } }
运行结果
herry 以前分区 : 1 现在分区 : 1 pony 以前分区 : 2 现在分区 : 1 lili 以前分区 : 1 现在分区 : 2 jone 以前分区 : 2 现在分区 : 2 leo 以前分区 : 2 现在分区 : 2 ben 以前分区 : 1 现在分区 : 3 cherry 以前分区 : 2 现在分区 : 3 alex 以前分区 : 1 现在分区 : 4 jack 以前分区 : 1 现在分区 : 4 lucy 以前分区 : 2 现在分区 : 4
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 10:20 下午 */ object RePartitionScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry", "lucy", "pony", "leo") val rdd = sc.parallelize(list,2) // 查看每个值对应每个分区 val indexValues1 = rdd.mapPartitionsWithIndex((index,x) => { var list = List[String]() while (x.hasNext){ val indexStr = x.next() + " " + "以前分区" + " : " + (index + 1) list .::= (indexStr) } list.iterator }) // 增加为四个分区 val repartitionValues = indexValues1.repartition(4) // 增加四个分区之后查看每个值对应每个分区 val indexValues2 = repartitionValues.mapPartitionsWithIndex((index,y) => { var list = List[String]() while (y.hasNext){ val indexStr = y.next() + " " + "现在分区" + " : " + (index + 1) list .::= (indexStr) } list.iterator }) indexValues2.foreach(x => System.out.println(x)) } }
运行结果
herry 以前分区 : 1 现在分区 : 1 pony 以前分区 : 2 现在分区 : 1 lili 以前分区 : 1 现在分区 : 2 jone 以前分区 : 2 现在分区 : 2 leo 以前分区 : 2 现在分区 : 2 ben 以前分区 : 1 现在分区 : 3 cherry 以前分区 : 2 现在分区 : 3 alex 以前分区 : 1 现在分区 : 4 jack 以前分区 : 1 现在分区 : 4 lucy 以前分区 : 2 现在分区 : 4
十六、TakeSample案例实战详解
taskSample算子与sample不同之处有两点:
taskSample算子 是action操作,sample 是transformation操作
taskSapple不能按照比例来抽取,只能按照设定个数来抽取
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 10:22 下午 */ public class TakeSampleJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry","lucy","pony","leo"); JavaRDD rdd = sc.parallelize(list,4); // 随机抽取3个数据 List takeSampleList = rdd.takeSample(false,3); for (Object obj : takeSampleList){ System.out.println(obj); } } }
运行结果
alex lucy lili
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 10:26 下午 */ object TakeSampleScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry", "lucy", "pony", "leo") val rdd = sc.parallelize(list,2) // 随机抽取3个数据 val takeSampleList = rdd.takeSample(false,3) for (elem <- takeSampleList) { System.out.println(elem) } } }
运行结果
cherry jone ben
十七、Sample案例实战详解
Sample算子是从RDD中随机抽取10%-90%的数据。
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/30 * @time : 9:37 上午 */ public class SampleJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry","lucy","pony","leo"); JavaRDD rdd = sc.parallelize(list); // 随机抽取50% JavaRDD<String> sampleValues = rdd.sample(false,0.5); for (Object obj : sampleValues.collect()){ System.out.println(obj); } } }
运行结果
herry lili ben jone cherry
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/30 * @time : 9:42 上午 */ object SampleScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry") val rdd = sc.parallelize(list) // 随机抽取50% val sampleValues =rdd.sample(false,0.5) for (elem <- sampleValues.collect()) { System.out.println(elem) } } }
运行结果
alex herry lili jone
十八、Union案例实战详解
Union算子是将两个RDD的数据合并为一个RDD。
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/30 * @time : 9:51 上午 */ public class UnionJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list1 = Arrays.asList("alex","herry","lili","ben","jack"); List list2 = Arrays.asList("jone","cherry","lucy","pony","leo"); JavaRDD rdd1 = sc.parallelize(list1); JavaRDD rdd2 = sc.parallelize(list2); // 将两个RDD的数据合并为一个RDD JavaRDD<String> unionValues = rdd1.union(rdd2); for (Object obj : unionValues.collect()){ System.out.println(obj); } } }
运行结果
alex herry lili ben jack jone cherry lucy pony leo
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/30 * @time : 9:55 上午 */ object UnionScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list1 = Array("alex", "herry", "lili", "ben", "jack") val list2 = Array("jone", "cherry", "lucy", "pony", "leo") val rdd1 = sc.parallelize(list1) val rdd2 = sc.parallelize(list2) // 将两个RDD的数据合并为一个RDD val sampleValues = rdd1.union(rdd2) for (elem <- sampleValues.collect()) { System.out.println(elem) } } }
运行结果
alex herry lili ben jack jone cherry lucy pony leo
十九、Intersection案例实战详解
Intersection算子获取两个RDD相同的数据。
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/30 * @time : 9:59 上午 */ public class IntersectionJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list1 = Arrays.asList("alex","herry","lili","ben","jack"); List list2 = Arrays.asList("jone","alex","lili","pony","leo"); JavaRDD rdd1 = sc.parallelize(list1); JavaRDD rdd2 = sc.parallelize(list2); // 获取两个RDD相同的数据 JavaRDD<String> intersectionValues = rdd1.intersection(rdd2); for (Object obj : intersectionValues.collect()){ System.out.println(obj); } } }
运行结果
alex lili
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/30 * @time : 10:01 上午 */ object IntersectionScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list1 = Array("alex", "herry", "lili", "ben", "jack") val list2 = Array("jone", "alex", "lili", "pony", "leo") val rdd1 = sc.parallelize(list1) val rdd2 = sc.parallelize(list2) // 获取两个RDD相同的数据 val intersectionValues = rdd1.intersection(rdd2) for (elem <- intersectionValues.collect()) { System.out.println(elem) } } }
运行结果
alex lili
二十、Distinct案例实战详解
Distinct算子是对RDD中的数据进行去重。
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/30 * @time : 10:03 上午 */ public class DistinctJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list = Arrays.asList("alex","herry","lili","ben","jack","alex","herry"); JavaRDD rdd = sc.parallelize(list); // 对RDD中的数据进行去重 JavaRDD<String> distinctValues = rdd.distinct(); for (Object obj : distinctValues.collect()){ System.out.println(obj); } } }
运行结果
alex ben jack lili herry
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/30 * @time : 10:06 上午 */ object DistinctScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list = Array("alex", "herry", "lili", "ben", "jack", "alex", "herry") val rdd = sc.parallelize(list) // 对RDD中的数据进行去重 val distinctValues = rdd.distinct() for (elem <- distinctValues.collect()) { System.out.println(elem) } } }
运行结果
alex ben jack lili herry
二十一、AggregateByKey案例实战详解
reduceByKey我们可以认为是aggregateByKey算子的简化版,aggregateByKey最重要的一 点是多提供了一个函数(SeqFunction)。就是说自已可以控制如何对每个parition中的数据进行先聚合(combine),然后对所有partition中的数据进行全局聚合。
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/30 * @time : 10:57 上午 */ public class AggregateByKeyJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list = Arrays.asList("alex herry","lili ben","jack alex"); JavaRDD lines = sc.parallelize(list); /** * java python hive flatMap() -> java python hive hive java... * hive java ... */ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() { public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); /** * java python hive hive java... mapToPair() -> (java,1)(hive,1)(java,1)(python,1)... */ JavaPairRDD<String,Integer> word = words.mapToPair(new PairFunction<String,String,Integer>() { public Tuple2<String,Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word,1); } }); /** * (java,1)(hive,1)(java,1)(python,1)... aggregateByKey() -> (java,2)(hive,1)(python,1)... */ JavaPairRDD<String, Integer> wordcount = word.aggregateByKey(0, new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }, new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); /** * (spark,1)(hive,3)(hadoop,3)... mapToPair() -> (3,hadoop)(3,hive)... */ JavaPairRDD<Integer,String> wordcountSortValue = wordcount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return new Tuple2<Integer, String>(stringIntegerTuple2._2,stringIntegerTuple2._1); } }); /** * (3,hadoop)(3,hive)... sortByKey(false) -> (3,hadoop)(3,hive)(2,java)(1,python)... */ JavaPairRDD<Integer,String> sort = wordcountSortValue.sortByKey(false); /** * (3,hadoop)(3,hive)(2,java)(1,python)... mapToPair() -> (hadoop,3)(hive,3)(java,2)(python,1)... */ JavaPairRDD<String,Integer> wordcountSortValues = sort.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception { return new Tuple2<String, Integer>(integerStringTuple2._2,integerStringTuple2._1); } }); /** * foreach() */ wordcountSortValues.foreach(new VoidFunction<Tuple2<String,Integer>>() { public void call(Tuple2<String,Integer> o) throws Exception { System.out.println(o._1 + " : " + o._2); } }); } }
运行结果
alex : 2 ben : 1 jack : 1 lili : 1 herry : 1
(2)使用Scala语言实现
package com.kfk.spark.core import org.apache.spark.{SparkConf, SparkContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/30 * @time : 11:05 上午 */ object AggregateByKeyScala { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local") val sc = new SparkContext(sparkConf) val list = Array("alex herry", "lili ben", "jack alex") val lines = sc.parallelize(list) /** * java python hive flatMap() -> java python hive hive java... * hive java ... */ val words = lines.flatMap(line => line.split(" ")) /** * java python hive hive java... map() -> (java,1)(hive,1)(java,1)(python,1)... */ val word = words.map(word => (word, 1)) /** * (java,1)(hive,1)(java,1)(python,1)... aggregateByKey() -> (java,2)(hive,1)(python,1)... */ val wordcount = word.aggregateByKey(0)((x1,y1) => x1 + y1,(x2,y2) => x2 + y2) // val wordcount = word.aggregateByKey(0)(_ + _,_ + _) /** * (spark,1)(hive,3)(hadoop,3)... map() -> (3,hadoop)(3,hive)... */ val wordcountSortValue = wordcount.map(x => (x._2,x._1)) /** * (3,hadoop)(3,hive)... sortByKey(false) -> (3,hadoop)(3,hive)(2,java)(1,python)... */ val sort = wordcountSortValue.sortByKey(false) /** * (3,hadoop)(3,hive)(2,java)(1,python)... map() -> (hadoop,3)(hive,3)(java,2)(python,1)... */ val wordcountSortValues = sort.map(x => (x._2,x._1)) /** * foreach() */ wordcountSortValues.foreach(_wordcount => println(_wordcount._1 + " : " + _wordcount._2)) } }
运行结果
alex : 2 ben : 1 jack : 1 lili : 1 herry : 1