Spark核心编程与项目案例详解(三)上

简介: 笔记

首先将SparkConf分装在一个类中

Java:

package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 6:18 下午
 */
public class CommSparkContext {
    public static JavaSparkContext getsc(){
        SparkConf sparkConf = new SparkConf().setAppName("CommSparkContext").setMaster("local");
        return new JavaSparkContext(sparkConf);
    }
}

Scala:

package com.kfk.spark.common
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/28
 * @time : 6:18 下午
 */
object CommSparkContextScala {
    def getsc():SparkContext ={
        val sparkConf = new SparkConf().setAppName("CommSparkContextScala").setMaster("local")
        return new SparkContext(sparkConf)
    }
}


十一、MapPartitons案例实战详解


MapPartitions类似于Map,不同之处在于: Map算子一次就处理一个partition中的一条数据,而

mapPartion算子一次处理一个partition中所有的数据。


MapPartion的使用场景:

如果RDD的数据量不是很大,那么建议采用mapPartition算子替代map算子。可以加快处理速度;但是如果RDD的数据量非常大,比如10亿,那么我们不建议用mapParition,可能会造成内存溢出。


(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.*;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 4:18 下午
 */
public class MapPartitionJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry");
        // 将rdd分为2个partition
        JavaRDD rdd = sc.parallelize(list,2);
        final Map<String,Double> map = new HashMap<String,Double>();
        map.put("alex",98.6);
        map.put("herry",89.5);
        map.put("lili",87.3);
        map.put("ben",91.2);
        map.put("jack",78.9);
        map.put("jone",95.4);
        map.put("cherry",96.1);
        // 对每一个分区中的数据同时做处理
        JavaRDD<Double>  mapPartition = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, Double>() {
            public Iterator<Double> call(Iterator iterator) throws Exception {
                List<Double> list = new ArrayList<Double>();
                while (iterator.hasNext()){
                    String userName = String.valueOf(iterator.next());
                    Double score = map.get(userName);
                    list.add(score);
                }
                return list.iterator();
            }
        });
        mapPartition.foreach(new VoidFunction<Double>() {
            public void call(Double o) throws Exception {
                System.out.println(o);
            }
        });
    }
}

运行结果

98.6
89.5
87.3
91.2
78.9
95.4
96.1

(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
import org.apache.spark.SparkContext
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 4:59 下午
 */
object MapPartitionScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry")
        val map = Map("alex" -> 98.6,"herry" -> 89.5,"lili" -> 87.3,"ben" -> 91.2,"jack" -> 78.9,"jone" -> 95.4,"cherry" -> 96.1)
        // 将rdd分为2个partition
        val rdd = sc.parallelize(list,2)
        // 对每一个分区中的数据同时做处理
        val mapPartitionValues = rdd.mapPartitions(x => {
            var list = List[Double]()
            while (x.hasNext){
                val userName = x.next()
                val score = map.get(userName).get
                list .::= (score)
            }
            list.iterator
        })
        mapPartitionValues.foreach(x => System.out.println(x))
    }
}

运行结果

98.6
89.5
87.3
91.2
78.9
95.4
96.1


十二、MapPartitonsWithIndex案例实战详解


在parallelize(list, 2)并行集合中如果指定了numPartitions为2,就说明数据集被分到2个分区中进行执行,怎么分是由spark自己来判定的。

如果我们想知道数据集对应的分区,就需要mapPartitionsWithIndex这个算子来做。这个算子可以告诉我们每个partition的index是多少,并且对应到那些数据。


(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 7:31 下午
 */
public class MapPartitionWithIndexJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry");
        // 将rdd分为3个partition
        JavaRDD rdd = sc.parallelize(list,3);
        // 查看每个数据对应的分区号
        JavaRDD<String> indexValues = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            public Iterator<String> call(Integer index, Iterator iterator) throws Exception {
                List<String> list = new ArrayList<String>();
                while (iterator.hasNext()){
                    String indexStr = iterator.next() + " : " + (index+1);
                    list.add(indexStr);
                }
                return list.iterator();
            }
        },false);
        indexValues.foreach(new VoidFunction<String>() {
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
    }
}

运行结果

alex : 1
herry : 1
lili : 2
ben : 2
jack : 3
jone : 3
cherry : 3

(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 8:40 下午
 */
object MapPartitionWithIndexScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry")
        // 将rdd分为3个partition
        val rdd = sc.parallelize(list,3)
        // 查看每个数据对应的分区号
        val indexValues = rdd.mapPartitionsWithIndex((index,x) => {
            var list = List[String]()
            while (x.hasNext){
                val userNameIndex = x.next + " : " + (index + 1)
                list .::= (userNameIndex)
            }
            list.iterator
        })
        indexValues.foreach(x => System.out.println(x))
    }
}

运行结果

alex : 1
herry : 1
lili : 2
ben : 2
jack : 3
jone : 3
cherry : 3


十三、Cartesian案例实战详解


Cartesian:笛卡尔乘积

比如有两个RDD,那么通过cartesian算子可以将两个RDD的每一条数据相互join一次,最终组成一个笛卡尔乘积

3.png

(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 8:52 下午
 */
public class CartesionJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list1 = Arrays.asList("衣服-1","衣服-2");
        List list2 = Arrays.asList("裤子-1","裤子-2");
        JavaRDD<String> rdd1 = sc.parallelize(list1);
        JavaRDD<String> rdd2 = sc.parallelize(list2);
        /**
         * (cherry,alex)(cherry,jack)(herry,alex)(herry,jack)
         */
        JavaPairRDD<String,String> carteValues = rdd1.cartesian(rdd2);
        for (Object obj : carteValues.collect()){
            System.out.println(obj);
        }
    }
}

运行结果

(衣服-1,裤子-1)
(衣服-1,裤子-2)
(衣服-2,裤子-1)
(衣服-2,裤子-2)

(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 9:00 下午
 */
object CartesionScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list1 = Array("衣服-1", "衣服-2")
        val list2 = Array("裤子-1", "裤子-2")
        val rdd1 = sc.parallelize(list1)
        val rdd2 = sc.parallelize(list2)
        /**
         * (cherry,alex)(cherry,jack)(herry,alex)(herry,jack)
         */
        val carteValues = rdd1.cartesian(rdd2)
        carteValues.foreach(x => System.out.println(x))
    }
}

运行结果

(衣服-1,裤子-1)
(衣服-1,裤子-2)
(衣服-2,裤子-1)
(衣服-2,裤子-2)


十四、Coalesce案例实战详解


Coalesce算子是将RDD的partition数量缩减,将一定量的数据压缩到更少放入partiton中。


一般与filter算子配合使用,使用filter算子过滤掉很多数据之后,就会出现很多partition种数据不均匀的情况,此时我们就可以使用coalesce算子来压缩RDD的partition的数量,从而让各个partition中的数据均匀,不浪费内存占用资源。


比如:公司部门的合并


(1)使用Java语言实现

package com.kfk.spark.core;
import com.kfk.spark.common.CommSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 9:40 下午
 */
public class CoalesceJava {
    public static void main(String[] args) {
        JavaSparkContext sc = CommSparkContext.getsc();
        List list = Arrays.asList("alex","herry","lili","ben","jack","jone","cherry","lucy","pony","leo");
        JavaRDD rdd = sc.parallelize(list,4);
        // 查看每个值对应每个分区
        JavaRDD<String> indexValues1 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            public Iterator<String> call(Integer index, Iterator iterator) throws Exception {
                List<String> list = new ArrayList<String>();
                while (iterator.hasNext()){
                    String indexStr = iterator.next() + " " + "以前分区" + " : " + (index+1);
                    list.add(indexStr);
                }
                return list.iterator();
            }
        },false);
        // 合并为两个分区
        JavaRDD<String>  coalesceValues = indexValues1.coalesce(2);
        // 合并分区之后重新查看每个值对应每个分区
        JavaRDD<String> indexValues2 = coalesceValues.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            public Iterator<String> call(Integer index, Iterator iterator) throws Exception {
                List<String> list = new ArrayList<String>();
                while (iterator.hasNext()){
                    String indexStr = iterator.next() + " " + "现在分区" + " : " + (index+1);
                    list.add(indexStr);
                }
                return list.iterator();
            }
        },false);
        indexValues2.foreach(new VoidFunction<String>() {
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });
    }
}

运行结果

lili 以前分区 : 2 现在分区 : 1
ben 以前分区 : 2 现在分区 : 1
jack 以前分区 : 2 现在分区 : 1
alex 以前分区 : 1 现在分区 : 1
herry 以前分区 : 1 现在分区 : 1
lucy 以前分区 : 4 现在分区 : 2
pony 以前分区 : 4 现在分区 : 2
leo 以前分区 : 4 现在分区 : 2
jone 以前分区 : 3 现在分区 : 2
cherry 以前分区 : 3 现在分区 : 2

(2)使用Scala语言实现

package com.kfk.spark.core
import com.kfk.spark.common.CommSparkContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/29
 * @time : 9:56 下午
 */
object CoalesceScala {
    def main(args: Array[String]): Unit = {
        val sc = CommSparkContextScala.getsc()
        val list = Array("alex", "herry", "lili", "ben", "jack", "jone", "cherry", "lucy", "pony", "leo")
        val rdd = sc.parallelize(list,4)
        // 查看每个值对应每个分区
        val indexValues1 = rdd.mapPartitionsWithIndex((index,x) => {
            var list = List[String]()
            while (x.hasNext){
                val indexStr = x.next() + " " + "以前分区" + " : " + (index + 1)
                list .::= (indexStr)
            }
            list.iterator
        })
        // 合并为两个分区
        val coalesceValues = indexValues1.coalesce(2)
        // 合并分区之后重新查看每个值对应每个分区
        val indexValues2 = coalesceValues.mapPartitionsWithIndex((index,y) => {
            var list = List[String]()
            while (y.hasNext){
                val indexStr = y.next() + " " + "现在分区" + " : " + (index + 1)
                list .::= (indexStr)
            }
            list.iterator
        })
        indexValues2.foreach(x => System.out.println(x))
    }
}

运行结果

lili 以前分区 : 2 现在分区 : 1
ben 以前分区 : 2 现在分区 : 1
jack 以前分区 : 2 现在分区 : 1
alex 以前分区 : 1 现在分区 : 1
herry 以前分区 : 1 现在分区 : 1
lucy 以前分区 : 4 现在分区 : 2
pony 以前分区 : 4 现在分区 : 2
leo 以前分区 : 4 现在分区 : 2
jone 以前分区 : 3 现在分区 : 2
cherry 以前分区 : 3 现在分区 : 2




相关文章
|
1月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
113 1
|
1月前
|
SQL 分布式计算 大数据
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
77 0
|
1月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
|
1月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
116 0
|
1月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
1月前
|
存储 分布式计算 Hadoop
Spark编程实验一:Spark和Hadoop的安装使用
Spark编程实验一:Spark和Hadoop的安装使用
47 4
|
16天前
|
分布式计算 数据可视化 大数据
基于spark的医疗大数据可视化大屏项目
基于spark的医疗大数据可视化大屏项目
|
1月前
|
分布式计算 关系型数据库 MySQL
Spark编程实验四:Spark Streaming编程
Spark编程实验四:Spark Streaming编程
57 2
|
1月前
|
SQL 分布式计算 关系型数据库
Spark编程实验三:Spark SQL编程
Spark编程实验三:Spark SQL编程
31 1
|
1月前
|
分布式计算 Shell 开发工具
Spark编程实验二:RDD编程初级实践
Spark编程实验二:RDD编程初级实践
34 1