Spark核心编程与项目案例详解(一)上

简介: 笔记

一、Spark结构原理


1.Driver进程启动之后,会做一些初始化的操作。在这个操作中,就会发送请求到Master进行spark应用程序的注册;目的是告诉master主节点,有一个新的spark应用程序要运行。

2.Master在接受到spark应用程序的注册申请之后,会发送请求给Worker进行资源的调度和分配,也即是Executor的分配

3.Worker接受到Master的请求之后,会为spark应用程序启动Executor。

4.Executor启动之后,会向Driver进行反向注册。这样driver就知道有哪些Executor正在为它服务了

5.Driver注册了一些Executor之后,就可以开始正式执行我们的spark应用程序了它首先要做的,就是读取HDFS的数据来创建初始RDD。

6.HDFS文件内容被读取到多个worker节点上,形成内存中的分布式数据集,也即是RDD partition

7.Driver会根据我们对RDD定义的操作,提交一大堆Task到Executor。


二、创建RDD的几种方式


进行Spark核心编程时,首先要做的第一件事,就是创建一个初始的RDD。该RDD中,通常就代表和包含了Spark应用程序的输入源数据。然后在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。


Spark Core提供了三种创建RDD的方式,包括:


使用程序中的集合创建RDD

使用本地文件创建RDD

使用HDFS文件创建RDD

个人经验认为:


1、使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程。

2、使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件。

3、使用HDFS文件创建RDD,应该是最常用的生产环境处理方式,主要可以针对HDFS上存储的大数据,进行离线批处理操作。


(1)Parallelize集合

如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。 Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。

// 案例: 1到10累加求和
val arr= Array(1,2, 3, 4, 5,6,7,8, 9, 10)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)

调用parallelize()时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会 为每一个parition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2~4个partition。Spark默认 会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如pallelize(arr,10)


(2)本地或者HDFS文件

Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、 HBase以及 本地文件。通过调用SparkContext的**textFile()**方法,可以针对本地文件或HDFS文件创建RDD。

有几个事项是需要注意的:


1、如果是针对本地文件的话,如果是在windows上本地测试,windows上有一份文件即可;如果是在spark集群上针对linux本地文件,那么需要将文件拷贝到所有worker节点上。

2、Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建。

3、Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少。

//案例:文件字数统计
val rdd = sc.textFile("data.txt")
val wordCount = rdd.map(line => line.length).reduce(_ + _)


三、Transformation算子


Spark支持两种RDD操作: transformation和action


transformation操作会针对已有的RDD创建一个新的RDD;而action则主要是对RDD进行最后的操作,比如遍历、reduce、 保存到文件等,并可以返回结果给Driver程序。


例如,map就是一种transformnation操作,它用于将已有RDD的每个元素传入一个自定义的函数,并获取一个新的元素,然后将所有的新元素组成一个新的RDD。而reduce就是一种action操作,它用于对RDD中的所有元素进行聚合操作,并获取一个最终的结果,然后返回给Driver程序。


transformation的特点就是lazy特性。lazy特性 指的是,如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是 不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过 这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果。


action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行。这是action的特性。


常见的Transformation算子:21.png

使用Java语言对每一种Transformation算子举例讲解:

package com.kfk.spark.core;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/25
 * @time : 7:10 下午
 */
public class TransformationJava {
    public static JavaSparkContext getsc(){
        SparkConf sparkConf = new SparkConf().setAppName("TransformationJava").setMaster("local");
        return new JavaSparkContext(sparkConf);
    }
    public static void main(String[] args) {
         map();
         filter();
         flatmap();
         groupByKey();
         reduceByKey();
         sortByKey();
         join();
         cogroup();
    }
    /**
     * 数据集一:(2,"lili")                cogroup() -> <2,<"lili",(90,95,99)>>
     * 数据集二:(2,90)(2,95)(2,99)
     */
    private static void cogroup() {
        List stuList = Arrays.asList(new Tuple2<Integer,String>(1,"alex"),
                new Tuple2<Integer,String>(2,"lili"),
                new Tuple2<Integer,String>(3,"cherry"),
                new Tuple2<Integer,String>(4,"jack"),
                new Tuple2<Integer,String>(5,"jone"),
                new Tuple2<Integer,String>(6,"lucy"),
                new Tuple2<Integer,String>(7,"aliy"));
        List scoreList = Arrays.asList(new Tuple2<Integer,Integer>(1,90),
                new Tuple2<Integer,Integer>(2,79),
                new Tuple2<Integer,Integer>(2,95),
                new Tuple2<Integer,Integer>(2,99),
                new Tuple2<Integer,Integer>(3,87),
                new Tuple2<Integer,Integer>(3,88),
                new Tuple2<Integer,Integer>(3,89),
                new Tuple2<Integer,Integer>(4,98),
                new Tuple2<Integer,Integer>(5,89),
                new Tuple2<Integer,Integer>(6,93),
                new Tuple2<Integer,Integer>(7,96));
        JavaSparkContext sc = getsc();
        JavaPairRDD javaPairStuRdd = sc.parallelizePairs(stuList);
        JavaPairRDD javaPairScoreRdd = sc.parallelizePairs(scoreList);
        JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> cogroupValues = javaPairStuRdd.cogroup(javaPairScoreRdd);
        cogroupValues.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<Integer>, Iterable<Integer>>>>() {
            public void call(Tuple2<Integer, Tuple2<Iterable<Integer>, Iterable<Integer>>> integerTuple2Tuple2) throws Exception {
                System.out.println(integerTuple2Tuple2._1);
                System.out.println(integerTuple2Tuple2._2._1 + " : " + integerTuple2Tuple2._2._2);
            }
        });
    }
    /**
     * 数据集一:(1,"alex")   join() -> <1,<"alex",90>>
     * 数据集二:(1,90)
     */
    private static void join() {
        List stuList = Arrays.asList(new Tuple2<Integer,String>(1,"alex"),
                new Tuple2<Integer,String>(2,"lili"),
                new Tuple2<Integer,String>(3,"cherry"),
                new Tuple2<Integer,String>(4,"jack"),
                new Tuple2<Integer,String>(5,"jone"),
                new Tuple2<Integer,String>(6,"lucy"),
                new Tuple2<Integer,String>(7,"aliy"));
        List scoreList = Arrays.asList(new Tuple2<Integer,Integer>(1,90),
                new Tuple2<Integer,Integer>(2,95),
                new Tuple2<Integer,Integer>(3,87),
                new Tuple2<Integer,Integer>(4,98),
                new Tuple2<Integer,Integer>(5,89),
                new Tuple2<Integer,Integer>(6,93),
                new Tuple2<Integer,Integer>(7,96));
        JavaSparkContext sc = getsc();
        JavaPairRDD javaPairStuRdd = sc.parallelizePairs(stuList);
        JavaPairRDD javaPairScoreRdd = sc.parallelizePairs(scoreList);
        JavaPairRDD<Integer,Tuple2<String,Integer>> joinValue = javaPairStuRdd.join(javaPairScoreRdd);
        joinValue.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
            public void call(Tuple2<Integer, Tuple2<String, Integer>> integerTuple2Tuple2) throws Exception {
                System.out.println(integerTuple2Tuple2._1);
                System.out.println(integerTuple2Tuple2._2._1 + " : " + integerTuple2Tuple2._2._2);
            }
        });
    }
    /**
     * <90,alex>
     * <95,lili>    sortByKey() -> <87,cherry> <90,alex> <95,lili>
     * <87,cherry>
     */
    private static void sortByKey() {
        List list = Arrays.asList(new Tuple2<Integer,String>(90,"alex"),
                new Tuple2<Integer,String>(95,"lili"),
                new Tuple2<Integer,String>(87,"cherry"),
                new Tuple2<Integer,String>(98,"jack"),
                new Tuple2<Integer,String>(89,"jone"),
                new Tuple2<Integer,String>(93,"lucy"),
                new Tuple2<Integer,String>(96,"aliy"));
        JavaPairRDD<Integer, String> javaPairRdd = getsc().parallelizePairs(list);
        JavaPairRDD<Integer,String> sortByKeyValues = javaPairRdd.sortByKey(true);
        sortByKeyValues.foreach(new VoidFunction<Tuple2<Integer, String>>() {
            public void call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                System.out.println(integerStringTuple2._1 + " : " + integerStringTuple2._2);
            }
        });
    }
    /**
     * <class_1,(90,87,98,96)>  reduceByKey()  -> <class_1,(90+87+98+96)>
     * <class_2,(95,89,93)>     reduceByKey()  -> <class_2,(95+89+93)>
     */
    private static void reduceByKey() {
        List list = Arrays.asList(new Tuple2<String,Integer>("class_1",90),
                new Tuple2<String,Integer>("class_2",95),
                new Tuple2<String,Integer>("class_1",87),
                new Tuple2<String,Integer>("class_1",98),
                new Tuple2<String,Integer>("class_2",89),
                new Tuple2<String,Integer>("class_2",93),
                new Tuple2<String,Integer>("class_1",96));
        final JavaPairRDD<String, Integer> javaPairRdd = getsc().parallelizePairs(list);
        JavaPairRDD<String,Integer> reduceByKeyValues = javaPairRdd.reduceByKey(new Function2<Integer,Integer,Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        reduceByKeyValues.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(stringIntegerTuple2._1 + " : " + stringIntegerTuple2._2);
            }
        });
    }
    /**
     * class_1 90       groupByKey()  -> <class_1,(90,87,98,96)>  <class_2,(95,89,93)>
     * class_2 95
     * class_1 87
     * class_1 98
     * class_2 89
     * class_2 93
     * class_1 96
     */
    private static void groupByKey() {
        List list = Arrays.asList(new Tuple2<String,Integer>("class_1",90),
                new Tuple2<String,Integer>("class_2",95),
                new Tuple2<String,Integer>("class_1",87),
                new Tuple2<String,Integer>("class_1",98),
                new Tuple2<String,Integer>("class_2",89),
                new Tuple2<String,Integer>("class_2",93),
                new Tuple2<String,Integer>("class_1",96));
        JavaPairRDD<String,Integer> javaPairRdd = getsc().parallelizePairs(list);
        JavaPairRDD<String,Iterable<Integer>> groupByKeyValue = javaPairRdd.groupByKey();
        groupByKeyValue.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            public void call(Tuple2<String,Iterable<Integer>> stringIteratorTuple2) throws Exception {
                System.out.println(stringIteratorTuple2._1);
                Iterator<Integer> iterator = stringIteratorTuple2._2.iterator();
                while (iterator.hasNext()){
                    System.out.println(iterator.next());
                }
            }
        });
    }
    /**
     * hbase hadoop hive
     * java python          flatmap() -> hbase hadoop hive java python java python
     * java python
     */
    private static void flatmap() {
        List<String> list = Arrays.asList("hbase hadoop hive","java python","java python");
        JavaRDD<String> javaRdd = getsc().parallelize(list);
        JavaRDD<String> flatMapValue = javaRdd.flatMap(new FlatMapFunction<String,String>() {
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        flatMapValue.foreach(new VoidFunction<String>() {
            public void call(String value) throws Exception {
                System.out.println(value);
            }
        });
    }
    /**
     * 1,2,3,4,5,6,7,8,9,10     filter() -> 2,4,6,8,10
     */
    private static void filter() {
        List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        JavaRDD<Integer> javaRdd = getsc().parallelize(list);
        // 取偶数
        JavaRDD<Integer> filterValue = javaRdd.filter(new Function<Integer,Boolean>() {
            public Boolean call(Integer value) throws Exception {
                return value % 2 == 0;
            }
        });
        filterValue.foreach(new VoidFunction<Integer>() {
            public void call(Integer o) throws Exception {
                System.out.println(o);
            }
        });
    }
    /**
     * 1,2,3,4,5    map() -> 10,20,30,40,50
     */
    public static void map(){
        List<Integer> list = Arrays.asList(1,2,3,4,5);
        JavaRDD<Integer> javaRdd = getsc().parallelize(list);
        JavaRDD<Integer> mapValue = javaRdd.map(new Function<Integer,Integer>() {
            public Integer call(Integer value) throws Exception {
                return value * 10;
            }
        });
        mapValue.foreach(new VoidFunction<Integer>() {
            public void call(Integer o) throws Exception {
                System.out.println(o);
            }
        });
    }
}

使用Scala语言对每一种Transformation算子举例讲解:

package com.kfk.spark.core
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/25
 * @time : 7:19 下午
 */
object TransformationScala {
    def getsc():SparkContext ={
        val sparkConf = new SparkConf().setAppName("TransformationScala").setMaster("local")
        return new SparkContext(sparkConf)
    }
    def main(args: Array[String]): Unit = {
        map()
        filter()
        flatmap()
        groupByKey()
        reduceByKey()
        sortByKey()
        join()
        cogroup()
    }
    /**
     * 1,2,3,4,5    map() -> 10,20,30,40,50
     */
    def map(): Unit ={
        val list = Array(1,2,3,4,5)
        val rdd = getsc().parallelize(list)
        val mapValue = rdd.map(x => x * 10)
        mapValue.foreach(x => System.out.println(x))
    }
    /**
     * 1,2,3,4,5,6,7,8,9,10     filter() -> 2,4,6,8,10
     */
    def filter(): Unit ={
        val list = Array(1,2,3,4,5,6,7,8,9,10)
        val rdd = getsc().parallelize(list)
        val filterValue = rdd.filter(x => x % 2 == 0)
        filterValue.foreach(x => System.out.println(x))
    }
    /**
     * hbase hadoop hive
     * java python          flatmap() -> hbase hadoop hive java python java python
     * java python
     */
    def flatmap(): Unit = {
        val list = Array("hbase hadoop hive", "java python", "storm spark")
        val rdd = getsc().parallelize(list)
        val flatMapValue = rdd.flatMap(x => x.split(" "))
        flatMapValue.foreach(x => System.out.println(x))
    }
    /**
     * class_1 90       groupByKey()  -> <class_1,(90,87,98,96)>  <class_2,(95,89,93)>
     * class_2 95
     * class_1 87
     * class_1 98
     * class_2 89
     * class_2 93
     * class_1 96
     */
    def groupByKey(): Unit = {
        val list = Array(
            Tuple2("class_1", 90),
            Tuple2("class_2", 95),
            Tuple2("class_1", 87),
            Tuple2("class_1", 98),
            Tuple2("class_2", 89),
            Tuple2("class_2", 93),
            Tuple2("class_1", 96))
        val rdd = getsc().parallelize(list)
        val groupByKeyValue = rdd.groupByKey()
        groupByKeyValue.foreach(x => {
            System.out.println(x._1)
            x._2.foreach(y => System.out.println(y))
        })
    }
    /**
     * <class_1,(90,87,98,96)>  reduceByKey()  -> <class_1,(90+87+98+96)>
     * <class_2,(95,89,93)>     reduceByKey()  -> <class_2,(95+89+93)>
     */
    def reduceByKey(): Unit = {
        val list = Array(
            Tuple2("class_1", 90),
            Tuple2("class_2", 95),
            Tuple2("class_1", 87),
            Tuple2("class_1", 98),
            Tuple2("class_2", 89),
            Tuple2("class_2", 93),
            Tuple2("class_1", 96))
        val rdd = getsc().parallelize(list)
        val reduceByKeyValues = rdd.reduceByKey((x,y) => x+y)
        reduceByKeyValues.foreach(x => {
            System.out.println(x._1 + " : " + x._2)
        })
    }
    /**
     * <90,alex>
     * <95,lili>     -> <87,cherry> <90,alex> <95,lili>
     * <87,cherry>
     */
    def sortByKey(): Unit ={
        val list = Array(Tuple2(90, "alex"),
            Tuple2(95, "lili"),
            Tuple2(87, "cherry"),
            Tuple2(98, "jack"),
            Tuple2(89, "jone"),
            Tuple2(93, "lucy"),
            Tuple2(96, "aliy")
        )
        val rdd = getsc().parallelize(list)
        val sortByKeyValues = rdd.sortByKey(true)
        sortByKeyValues.foreach(x => {
            System.out.println(x._1 + " : " + x._2)
        })
    }
    /**
     * 数据集一:(1,"alex")   join() -> <1,<"alex",90>>
     * 数据集二:(1,90)
     */
    def join(): Unit ={
        val stuList = Array(Tuple2(1, "alex"),
            Tuple2(2, "lili"),
            Tuple2(3, "cherry"),
            Tuple2(4, "jack"),
            Tuple2(5, "jone"),
            Tuple2(6, "lucy"),
            Tuple2(7, "aliy"))
        val scoreList = Array(Tuple2(1, 90),
            Tuple2(2, 95),
            Tuple2(3, 87),
            Tuple2(4, 98),
            Tuple2(5, 89),
            Tuple2(6, 93),
            Tuple2(7, 96))
        val sc = getsc()
        val stuRdd = sc.parallelize(stuList)
        val scoreRdd = sc.parallelize(scoreList)
        val joinValue = stuRdd.join(scoreRdd)
        joinValue.foreach(x => {
            System.out.println(x._1 + " > " + x._2._1 + " : " + x._2._2)
        })
    }
    /**
     * 数据集一:(2,"lili")                cogroup() -> <2,<"lili",(90,95,99)>>
     * 数据集二:(2,90)(2,95)(2,99)
     */
    def cogroup(): Unit ={
        val stuList = Array(Tuple2(1, "alex"),
            Tuple2(2, "lili"),
            Tuple2(3, "cherry"),
            Tuple2(4, "jack"),
            Tuple2(5, "jone"),
            Tuple2(6, "lucy"),
            Tuple2(7, "aliy"))
        val scoreList = Array(Tuple2(1, 90),
            Tuple2(2, 95),
            Tuple2(2, 95),
            Tuple2(2, 99),
            Tuple2(3, 87),
            Tuple2(3, 88),
            Tuple2(3, 89),
            Tuple2(4, 98),
            Tuple2(5, 89),
            Tuple2(6, 93),
            Tuple2(7, 96))
        val sc = getsc()
        val stuRdd = sc.parallelize(stuList)
        val scoreRdd = sc.parallelize(scoreList)
        val cogroupValues = stuRdd.cogroup(scoreRdd)
        cogroupValues.foreach(x => {
            System.out.println(x._1 + " > " + x._2._1.toList + " : " + x._2._2.toList)
        })
    }
}



相关文章
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
66 5
|
2月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
44 4
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
54 3
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
72 0
|
2月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
47 4
|
2月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
44 1
|
2月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
42 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
2月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
102 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
59 0
|
2月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
49 0
下一篇
DataWorks