Spark核心编程与项目案例详解(三)下

简介: 笔记

十五、Repartition案例实战详解


Repartition算子是将任意RDD的partition数量增大或者减小,与coalesce不同的是,coalease只能将rdd的partition数量减少。而repatition对rdd的partition数量做到自由改变。


建议使用的场景:

Spark SQL加载hive的数据之后,自动分配(这里是按照hive数据对应到HDFS文件的block数量)的partition数量比较少,影响算子的运行速度。此时,在Spark SQL加载hive数据后,我们可以手动的去设置partition的数量来提供算子的运行速度。


(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 10:12 下午
 */
public class RePartitionJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry","lucy","pony","leo");
        JavaRDD rdd = sc.parallelize(list,2);
        // 查看每个值对应每个分区
        JavaRDD<String> indexValues1 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            public Iterator<String> call(Integer index, Iterator iterator) throws Exception {
                List<String> list = new ArrayList<String>();
                while (iterator.hasNext()){
                    String indexStr = iterator.next() + " " + "以前分区" + " : " + (index+1);
                    list.add(indexStr);
                }
                return list.iterator();
            }
        },false);
        // 增加为四个分区
        JavaRDD<String>  repartitionValues = indexValues1.repartition(4);
        // 增加四个分区之后查看每个值对应每个分区
        JavaRDD<String> indexValues2 = repartitionValues.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            public Iterator<String> call(Integer index, Iterator iterator) throws Exception {
                List<String> list = new ArrayList<String>();
                while (iterator.hasNext()){
                    String indexStr = iterator.next() + " " + "现在分区" + " : " + (index+1);
                    list.add(indexStr);
                }
                return list.iterator();
            }
        },false);
        indexValues2.foreach(new VoidFunction<String>() {
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
    }
}

运行结果

herry 以前分区 : 1 现在分区 : 1
pony 以前分区 : 2 现在分区 : 1
lili 以前分区 : 1 现在分区 : 2
jone 以前分区 : 2 现在分区 : 2
leo 以前分区 : 2 现在分区 : 2
ben 以前分区 : 1 现在分区 : 3
cherry 以前分区 : 2 现在分区 : 3
alex 以前分区 : 1 现在分区 : 4
jack 以前分区 : 1 现在分区 : 4
lucy 以前分区 : 2 现在分区 : 4

(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 10:20 下午
 */
object RePartitionScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry", "lucy", "pony", "leo")
        val rdd = sc.parallelize(list,2)
        // 查看每个值对应每个分区
        val indexValues1 = rdd.mapPartitionsWithIndex((index,x) => {
            var list = List[String]()
            while (x.hasNext){
                val indexStr = x.next() + " " + "以前分区" + " : " + (index + 1)
                list .::= (indexStr)
            }
            list.iterator
        })
        // 增加为四个分区
        val repartitionValues = indexValues1.repartition(4)
        // 增加四个分区之后查看每个值对应每个分区
        val indexValues2 = repartitionValues.mapPartitionsWithIndex((index,y) => {
            var list = List[String]()
            while (y.hasNext){
                val indexStr = y.next() + " " + "现在分区" + " : " + (index + 1)
                list .::= (indexStr)
            }
            list.iterator
        })
        indexValues2.foreach(x => System.out.println(x))
    }
}

运行结果

herry 以前分区 : 1 现在分区 : 1
pony 以前分区 : 2 现在分区 : 1
lili 以前分区 : 1 现在分区 : 2
jone 以前分区 : 2 现在分区 : 2
leo 以前分区 : 2 现在分区 : 2
ben 以前分区 : 1 现在分区 : 3
cherry 以前分区 : 2 现在分区 : 3
alex 以前分区 : 1 现在分区 : 4
jack 以前分区 : 1 现在分区 : 4
lucy 以前分区 : 2 现在分区 : 4


十六、TakeSample案例实战详解


taskSample算子与sample不同之处有两点:


taskSample算子 是action操作,sample 是transformation操作

taskSapple不能按照比例来抽取,只能按照设定个数来抽取

(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 10:22 下午
 */
public class TakeSampleJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry","lucy","pony","leo");
        JavaRDD rdd = sc.parallelize(list,4);
        // 随机抽取3个数据
        List takeSampleList = rdd.takeSample(false,3);
        for (Object obj : takeSampleList){
            System.out.println(obj);
        }
    }
}

运行结果

alex
lucy
lili


(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 10:26 下午
 */
object TakeSampleScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry", "lucy", "pony", "leo")
        val rdd = sc.parallelize(list,2)
        // 随机抽取3个数据
        val takeSampleList = rdd.takeSample(false,3)
        for (elem <- takeSampleList) {
            System.out.println(elem)
        }
    }
}

运行结果

cherry
jone
ben


十七、Sample案例实战详解

Sample算子是从RDD中随机抽取10%-90%的数据。

(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 9:37 上午
 */
public class SampleJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry","lucy","pony","leo");
        JavaRDD rdd = sc.parallelize(list);
        // 随机抽取50%
        JavaRDD<String> sampleValues = rdd.sample(false,0.5);
        for (Object obj : sampleValues.collect()){
            System.out.println(obj);
        }
    }
}

运行结果

herry
lili
ben
jone
cherry

(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 9:42 上午
 */
object SampleScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry")
        val rdd = sc.parallelize(list)
        // 随机抽取50%
        val sampleValues  =rdd.sample(false,0.5)
        for (elem <- sampleValues.collect()) {
            System.out.println(elem)
        }
    }
}

运行结果

alex
herry
lili
jone


十八、Union案例实战详解

Union算子是将两个RDD的数据合并为一个RDD。

(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 9:51 上午
 */
public class UnionJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list1 = Arrays.asList("alex","herry","lili","ben","jack");
        List list2 = Arrays.asList("jone","cherry","lucy","pony","leo");
        JavaRDD rdd1 = sc.parallelize(list1);
        JavaRDD rdd2 = sc.parallelize(list2);
        // 将两个RDD的数据合并为一个RDD
        JavaRDD<String> unionValues = rdd1.union(rdd2);
        for (Object obj : unionValues.collect()){
            System.out.println(obj);
        }
    }
}

运行结果

alex
herry
lili
ben
jack
jone
cherry
lucy
pony
leo

(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 9:55 上午
 */
object UnionScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list1 = Array("alex", "herry", "lili", "ben", "jack")
        val list2 = Array("jone", "cherry", "lucy", "pony", "leo")
        val rdd1 = sc.parallelize(list1)
        val rdd2 = sc.parallelize(list2)
        // 将两个RDD的数据合并为一个RDD
        val sampleValues = rdd1.union(rdd2)
        for (elem <- sampleValues.collect()) {
            System.out.println(elem)
        }
    }
}

运行结果

alex
herry
lili
ben
jack
jone
cherry
lucy
pony
leo


十九、Intersection案例实战详解


Intersection算子获取两个RDD相同的数据。

(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 9:59 上午
 */
public class IntersectionJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list1 = Arrays.asList("alex","herry","lili","ben","jack");
        List list2 = Arrays.asList("jone","alex","lili","pony","leo");
        JavaRDD rdd1 = sc.parallelize(list1);
        JavaRDD rdd2 = sc.parallelize(list2);
        // 获取两个RDD相同的数据
        JavaRDD<String> intersectionValues = rdd1.intersection(rdd2);
        for (Object obj : intersectionValues.collect()){
            System.out.println(obj);
        }
    }
}

运行结果

alex
lili

(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 10:01 上午
 */
object IntersectionScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list1 = Array("alex", "herry", "lili", "ben", "jack")
        val list2 = Array("jone", "alex", "lili", "pony", "leo")
        val rdd1 = sc.parallelize(list1)
        val rdd2 = sc.parallelize(list2)
        // 获取两个RDD相同的数据
        val intersectionValues = rdd1.intersection(rdd2)
        for (elem <- intersectionValues.collect()) {
            System.out.println(elem)
        }
    }
}

运行结果

alex
lili


二十、Distinct案例实战详解


Distinct算子是对RDD中的数据进行去重。

(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 10:03 上午
 */
public class DistinctJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list = Arrays.asList("alex","herry","lili","ben","jack","alex","herry");
        JavaRDD rdd = sc.parallelize(list);
        // 对RDD中的数据进行去重
        JavaRDD<String> distinctValues = rdd.distinct();
        for (Object obj : distinctValues.collect()){
            System.out.println(obj);
        }
    }
}

运行结果

alex
ben
jack
lili
herry

(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 10:06 上午
 */
object DistinctScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list = Array("alex", "herry", "lili", "ben", "jack", "alex", "herry")
        val rdd = sc.parallelize(list)
        // 对RDD中的数据进行去重
        val distinctValues = rdd.distinct()
        for (elem <- distinctValues.collect()) {
            System.out.println(elem)
        }
    }
}

运行结果

alex
ben
jack
lili
herry


二十一、AggregateByKey案例实战详解


reduceByKey我们可以认为是aggregateByKey算子的简化版,aggregateByKey最重要的一 点是多提供了一个函数(SeqFunction)。就是说自已可以控制如何对每个parition中的数据进行先聚合(combine),然后对所有partition中的数据进行全局聚合。

9.png

(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 10:57 上午
 */
public class AggregateByKeyJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list = Arrays.asList("alex herry","lili ben","jack alex");
        JavaRDD lines = sc.parallelize(list);
        /**
         * java python hive     flatMap() -> java python hive hive java...
         * hive java ...
         */
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        /**
         * java python hive hive java...    mapToPair() -> (java,1)(hive,1)(java,1)(python,1)...
         */
        JavaPairRDD<String,Integer> word = words.mapToPair(new PairFunction<String,String,Integer>() {
            public Tuple2<String,Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word,1);
            }
        });
        /**
         * (java,1)(hive,1)(java,1)(python,1)...    aggregateByKey() -> (java,2)(hive,1)(python,1)...
         */
        JavaPairRDD<String, Integer> wordcount = word.aggregateByKey(0, new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }, new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        /**
         * (spark,1)(hive,3)(hadoop,3)...  mapToPair() -> (3,hadoop)(3,hive)...
         */
        JavaPairRDD<Integer,String> wordcountSortValue =  wordcount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return new Tuple2<Integer, String>(stringIntegerTuple2._2,stringIntegerTuple2._1);
            }
        });
        /**
         * (3,hadoop)(3,hive)...    sortByKey(false) -> (3,hadoop)(3,hive)(2,java)(1,python)...
         */
        JavaPairRDD<Integer,String> sort = wordcountSortValue.sortByKey(false);
        /**
         * (3,hadoop)(3,hive)(2,java)(1,python)...      mapToPair() -> (hadoop,3)(hive,3)(java,2)(python,1)...
         */
        JavaPairRDD<String,Integer> wordcountSortValues = sort.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                return new Tuple2<String, Integer>(integerStringTuple2._2,integerStringTuple2._1);
            }
        });
        /**
         * foreach()
         */
        wordcountSortValues.foreach(new VoidFunction<Tuple2<String,Integer>>() {
            public void call(Tuple2<String,Integer> o) throws Exception {
                System.out.println(o._1 + " : " + o._2);
            }
        });
    }
}

运行结果

alex : 2
ben : 1
jack : 1
lili : 1
herry : 1

(2)使用Scala语言实现

package com.kfk.spark.core
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/30
 * @time : 11:05 上午
 */
object AggregateByKeyScala {
    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local")
        val sc = new SparkContext(sparkConf)
        val list = Array("alex herry", "lili ben", "jack alex")
        val lines = sc.parallelize(list)
        /**
         * java python hive     flatMap() -> java python hive hive java...
         * hive java ...
         */
        val words = lines.flatMap(line => line.split(" "))
        /**
         * java python hive hive java...    map() -> (java,1)(hive,1)(java,1)(python,1)...
         */
        val word = words.map(word => (word, 1))
        /**
         * (java,1)(hive,1)(java,1)(python,1)...    aggregateByKey() -> (java,2)(hive,1)(python,1)...
         */
        val wordcount = word.aggregateByKey(0)((x1,y1) => x1 + y1,(x2,y2) => x2 + y2)
        // val wordcount = word.aggregateByKey(0)(_ + _,_ + _)
        /**
         * (spark,1)(hive,3)(hadoop,3)...  map() -> (3,hadoop)(3,hive)...
         */
        val wordcountSortValue = wordcount.map(x => (x._2,x._1))
        /**
         * (3,hadoop)(3,hive)...    sortByKey(false) -> (3,hadoop)(3,hive)(2,java)(1,python)...
         */
        val sort = wordcountSortValue.sortByKey(false)
        /**
         * (3,hadoop)(3,hive)(2,java)(1,python)...      map() -> (hadoop,3)(hive,3)(java,2)(python,1)...
         */
        val wordcountSortValues = sort.map(x => (x._2,x._1))
        /**
         * foreach()
         */
        wordcountSortValues.foreach(_wordcount => println(_wordcount._1 + " : " + _wordcount._2))
    }
}

运行结果

alex : 2
ben : 1
jack : 1
lili : 1
herry : 1


相关文章
|
5月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
104 5
|
5月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
75 4
|
5月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
77 3
|
5月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
108 0
|
5月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
79 4
|
5月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
75 1
|
5月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
66 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
5月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
154 0
|
5月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
90 0
|
5月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
79 0