首先将SparkConf分装在一个类中
Java:
package com.kfk.spark.common; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 6:18 下午 */ public class CommSparkContext { public static JavaSparkContext getsc(){ SparkConf sparkConf = new SparkConf().setAppName("CommSparkContext").setMaster("local"); return new JavaSparkContext(sparkConf); } }
Scala:
package com.kfk.spark.common import org.apache.spark.{SparkConf, SparkContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/28 * @time : 6:18 下午 */ object CommSparkContextScala { def getsc():SparkContext ={ val sparkConf = new SparkConf().setAppName("CommSparkContextScala").setMaster("local") return new SparkContext(sparkConf) } }
十一、MapPartitons案例实战详解
MapPartitions类似于Map,不同之处在于: Map算子一次就处理一个partition中的一条数据,而
mapPartion算子一次处理一个partition中所有的数据。
MapPartion的使用场景:
如果RDD的数据量不是很大,那么建议采用mapPartition算子替代map算子。可以加快处理速度;但是如果RDD的数据量非常大,比如10亿,那么我们不建议用mapParition,可能会造成内存溢出。
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; import java.util.*; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 4:18 下午 */ public class MapPartitionJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry"); // 将rdd分为2个partition JavaRDD rdd = sc.parallelize(list,2); final Map<String,Double> map = new HashMap<String,Double>(); map.put("alex",98.6); map.put("herry",89.5); map.put("lili",87.3); map.put("ben",91.2); map.put("jack",78.9); map.put("jone",95.4); map.put("cherry",96.1); // 对每一个分区中的数据同时做处理 JavaRDD<Double> mapPartition = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, Double>() { public Iterator<Double> call(Iterator iterator) throws Exception { List<Double> list = new ArrayList<Double>(); while (iterator.hasNext()){ String userName = String.valueOf(iterator.next()); Double score = map.get(userName); list.add(score); } return list.iterator(); } }); mapPartition.foreach(new VoidFunction<Double>() { public void call(Double o) throws Exception { System.out.println(o); } }); } }
运行结果
98.6 89.5 87.3 91.2 78.9 95.4 96.1
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala import org.apache.spark.SparkContext /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 4:59 下午 */ object MapPartitionScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry") val map = Map("alex" -> 98.6,"herry" -> 89.5,"lili" -> 87.3,"ben" -> 91.2,"jack" -> 78.9,"jone" -> 95.4,"cherry" -> 96.1) // 将rdd分为2个partition val rdd = sc.parallelize(list,2) // 对每一个分区中的数据同时做处理 val mapPartitionValues = rdd.mapPartitions(x => { var list = List[Double]() while (x.hasNext){ val userName = x.next() val score = map.get(userName).get list .::= (score) } list.iterator }) mapPartitionValues.foreach(x => System.out.println(x)) } }
运行结果
98.6 89.5 87.3 91.2 78.9 95.4 96.1
十二、MapPartitonsWithIndex案例实战详解
在parallelize(list, 2)并行集合中如果指定了numPartitions为2,就说明数据集被分到2个分区中进行执行,怎么分是由spark自己来判定的。
如果我们想知道数据集对应的分区,就需要mapPartitionsWithIndex这个算子来做。这个算子可以告诉我们每个partition的index是多少,并且对应到那些数据。
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 7:31 下午 */ public class MapPartitionWithIndexJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry"); // 将rdd分为3个partition JavaRDD rdd = sc.parallelize(list,3); // 查看每个数据对应的分区号 JavaRDD<String> indexValues = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { public Iterator<String> call(Integer index, Iterator iterator) throws Exception { List<String> list = new ArrayList<String>(); while (iterator.hasNext()){ String indexStr = iterator.next() + " : " + (index+1); list.add(indexStr); } return list.iterator(); } },false); indexValues.foreach(new VoidFunction<String>() { public void call(String s) throws Exception { System.out.println(s); } }); } }
运行结果
alex : 1 herry : 1 lili : 2 ben : 2 jack : 3 jone : 3 cherry : 3
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 8:40 下午 */ object MapPartitionWithIndexScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry") // 将rdd分为3个partition val rdd = sc.parallelize(list,3) // 查看每个数据对应的分区号 val indexValues = rdd.mapPartitionsWithIndex((index,x) => { var list = List[String]() while (x.hasNext){ val userNameIndex = x.next + " : " + (index + 1) list .::= (userNameIndex) } list.iterator }) indexValues.foreach(x => System.out.println(x)) } }
运行结果
alex : 1 herry : 1 lili : 2 ben : 2 jack : 3 jone : 3 cherry : 3
十三、Cartesian案例实战详解
Cartesian:笛卡尔乘积
比如有两个RDD,那么通过cartesian算子可以将两个RDD的每一条数据相互join一次,最终组成一个笛卡尔乘积
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 8:52 下午 */ public class CartesionJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list1 = Arrays.asList("衣服-1","衣服-2"); List list2 = Arrays.asList("裤子-1","裤子-2"); JavaRDD<String> rdd1 = sc.parallelize(list1); JavaRDD<String> rdd2 = sc.parallelize(list2); /** * (cherry,alex)(cherry,jack)(herry,alex)(herry,jack) */ JavaPairRDD<String,String> carteValues = rdd1.cartesian(rdd2); for (Object obj : carteValues.collect()){ System.out.println(obj); } } }
运行结果
(衣服-1,裤子-1) (衣服-1,裤子-2) (衣服-2,裤子-1) (衣服-2,裤子-2)
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 9:00 下午 */ object CartesionScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list1 = Array("衣服-1", "衣服-2") val list2 = Array("裤子-1", "裤子-2") val rdd1 = sc.parallelize(list1) val rdd2 = sc.parallelize(list2) /** * (cherry,alex)(cherry,jack)(herry,alex)(herry,jack) */ val carteValues = rdd1.cartesian(rdd2) carteValues.foreach(x => System.out.println(x)) } }
运行结果
(衣服-1,裤子-1) (衣服-1,裤子-2) (衣服-2,裤子-1) (衣服-2,裤子-2)
十四、Coalesce案例实战详解
Coalesce算子是将RDD的partition数量缩减,将一定量的数据压缩到更少放入partiton中。
一般与filter算子配合使用,使用filter算子过滤掉很多数据之后,就会出现很多partition种数据不均匀的情况,此时我们就可以使用coalesce算子来压缩RDD的partition的数量,从而让各个partition中的数据均匀,不浪费内存占用资源。
比如:公司部门的合并
(1)使用Java语言实现
package com.kfk.spark.core; import com.kfk.spark.common.CommSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 9:40 下午 */ public class CoalesceJava { public static void main(String[] args) { JavaSparkContext sc = CommSparkContext.getsc(); List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry","lucy","pony","leo"); JavaRDD rdd = sc.parallelize(list,4); // 查看每个值对应每个分区 JavaRDD<String> indexValues1 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { public Iterator<String> call(Integer index, Iterator iterator) throws Exception { List<String> list = new ArrayList<String>(); while (iterator.hasNext()){ String indexStr = iterator.next() + " " + "以前分区" + " : " + (index+1); list.add(indexStr); } return list.iterator(); } },false); // 合并为两个分区 JavaRDD<String> coalesceValues = indexValues1.coalesce(2); // 合并分区之后重新查看每个值对应每个分区 JavaRDD<String> indexValues2 = coalesceValues.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { public Iterator<String> call(Integer index, Iterator iterator) throws Exception { List<String> list = new ArrayList<String>(); while (iterator.hasNext()){ String indexStr = iterator.next() + " " + "现在分区" + " : " + (index+1); list.add(indexStr); } return list.iterator(); } },false); indexValues2.foreach(new VoidFunction<String>() { public void call(String s) throws Exception { System.out.println(s); } }); } }
运行结果
lili 以前分区 : 2 现在分区 : 1 ben 以前分区 : 2 现在分区 : 1 jack 以前分区 : 2 现在分区 : 1 alex 以前分区 : 1 现在分区 : 1 herry 以前分区 : 1 现在分区 : 1 lucy 以前分区 : 4 现在分区 : 2 pony 以前分区 : 4 现在分区 : 2 leo 以前分区 : 4 现在分区 : 2 jone 以前分区 : 3 现在分区 : 2 cherry 以前分区 : 3 现在分区 : 2
(2)使用Scala语言实现
package com.kfk.spark.core import com.kfk.spark.common.CommSparkContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/29 * @time : 9:56 下午 */ object CoalesceScala { def main(args: Array[String]): Unit = { val sc = CommSparkContextScala.getsc() val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry", "lucy", "pony", "leo") val rdd = sc.parallelize(list,4) // 查看每个值对应每个分区 val indexValues1 = rdd.mapPartitionsWithIndex((index,x) => { var list = List[String]() while (x.hasNext){ val indexStr = x.next() + " " + "以前分区" + " : " + (index + 1) list .::= (indexStr) } list.iterator }) // 合并为两个分区 val coalesceValues = indexValues1.coalesce(2) // 合并分区之后重新查看每个值对应每个分区 val indexValues2 = coalesceValues.mapPartitionsWithIndex((index,y) => { var list = List[String]() while (y.hasNext){ val indexStr = y.next() + " " + "现在分区" + " : " + (index + 1) list .::= (indexStr) } list.iterator }) indexValues2.foreach(x => System.out.println(x)) } }
运行结果
lili 以前分区 : 2 现在分区 : 1 ben 以前分区 : 2 现在分区 : 1 jack 以前分区 : 2 现在分区 : 1 alex 以前分区 : 1 现在分区 : 1 herry 以前分区 : 1 现在分区 : 1 lucy 以前分区 : 4 现在分区 : 2 pony 以前分区 : 4 现在分区 : 2 leo 以前分区 : 4 现在分区 : 2 jone 以前分区 : 3 现在分区 : 2 cherry 以前分区 : 3 现在分区 : 2