Spark学习--3、WordCount案例、RDD序列化、RDD依赖关系、RDD持久化(一)

简介: Spark学习--3、WordCount案例、RDD序列化、RDD依赖关系、RDD持久化(一)

1、WordCount案例实操

导入项目依赖

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.3.0</version>
    </dependency>
</dependencies>

1.1 本地调试

本地Spark程序调试需要使用Local提交模式,即将本机当做运行环境,Master和Worker都为本机。运行时直接加断点调试即可。

1、准备测试文件word.txt

hello world
hello zhm
hello future

2、代码实现

package com.zhm.spark.wordcount;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName WordCountLocal
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/28 14:15
 * @Version 1.0
 */
public class WordCountLocal {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("WordCountLocal");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo  获取RDD
        JavaRDD<String> javaRDD = sparkContext.textFile("input/word.txt");
        //4、对每行数据根据分隔符进行拆分
        JavaRDD<String> stringJavaRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //5、给每个元素加上一个1
        JavaPairRDD<String, Integer> javaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        //6、利用ReduceByKey对相同key的数据进行累加
        JavaPairRDD<String, Integer> result = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //7、收集结果输出
        result.collect().forEach(System.out::println);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:


1.2 集群运行

1、修改代码

package com.zhm.spark.wordcount;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName WordCountYarn
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/28 14:25
 * @Version 1.0
 */
public class WordCountYarn {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("yarn").setAppName("WordCountYarn");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo 获取RDD
        JavaRDD<String> javaRDD = sparkContext.textFile(args[0]);
        //4、按行读取然后按分隔符切分字符串
        JavaRDD<String> stringJavaRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //5、将每个单词转换为(word,1)
        JavaPairRDD<String, Integer> pairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        //6、累加相同key的值
        JavaPairRDD<String, Integer> result = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //7、将数据储存到文件上
        result.saveAsTextFile(args[1]);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

2、打包到集群测试

(1)点击package打包,然后,查看打完后的jar包

(2)将WordCount.jar上传到/opt/module/spark-yarn目录

(3)在HDFS上创建,存储输入文件的路径/input

(4)创建test_data并上传word.txt文件到/opt/module/spark-yarn/test_data/目录下,在上传到HDFS的/input路径下

(5)执行任务

bin/spark-submit \
--class com.atguigu.spark.WordCount \
--master yarn \
./WordCount.jar \
/input \
/output
##注意:input和ouput都是HDFS上的集群路径

(6)查询运行结果

hadoop fs -cat /output/*

2、RDD序列化

在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是:

(1)初始化工作(与计算无关的操作)是在Driver端进行的

(2)而实际运行程序(数据计算操作)是在Executor端进行的

这就涉及到了跨进程通信,是需要序列化的。

2.1 序列化测试

1、 创建包名:com.zhm.spark.operator.serializable

2、创建使用的javaBean:User

3、创建类:Test_user测试序列化:将RDD中元素包装为User进行测试

package com.zhm.spark.operator.serializable;
/**
 * @ClassName User
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 11:32
 * @Version 1.0
 */
public class User  {
    private String name;
    private int age;
    public User() {
    }
    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}
package com.zhm.spark.operator.serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import java.util.Arrays;
/**
 * @ClassName Test_User
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 11:33
 * @Version 1.0
 */
public class Test_User {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test_User");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、创建RDD数据集
        JavaRDD<User> javaRDD = sparkContext.parallelize(Arrays.asList(new User("zhm", 24), new User("zhm1", 25)));
        javaRDD.foreach(new VoidFunction<User>() {
            @Override
            public void call(User user) throws Exception {
                System.out.println(user);
            }
        });
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:


对javaBean:User类进行修改

package com.zhm.spark.operator.serializable;
import scala.Serializable;
/**
 * @ClassName User
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 11:32
 * @Version 1.0
 */
public class User implements Serializable {
    private String name;
    private int age;
    public User() {
    }
    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

再次运行结果:

2.2 Kryo序列化框架

Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。

Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

使用Kryo序列化框架的步骤

// 1.创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore")
                // 替换默认的序列化机制
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                // 注册需要使用kryo序列化的自定义类
                .registerKryoClasses(new Class[]{Class.forName("com.atguigu.bean.User")});

3、RDD依赖关系

3.1 查看血缘关系

RDD只支持粗粒度转换,每一个转换操作都是对上游RDD的元素执行函数f得到一个新的RDD,所以RDD之间就会形成类似流水线的前后依赖关系。

将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算丢失的RDD的数据分区所依赖的父RDD分区数据以实现恢复,这样就避免了从头再次开始计算了。

1、创建包名com.zhm.spark.operator.dependency

2、代码实现

package com.zhm.spark.operator.dependency;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test01
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/29 11:41
 * @Version 1.0
 */
public class Test01 {
    /**
     * 不Shuffle的转换算子都是MapPartitionsRDD
     * 窄依赖:表示每一个父RDD的Partition最多被子RDD的一个Partition使用(独生子女)
     *
     * 宽依赖:表示同一个父RDD的Partition被多个子RDD的Partition依赖(超生)
     * --sort、reduceByKey、groupByKey、join和调用rePartition函数     一般都是要Shuffle的算子
     *
     *
     * Stage任务划分
     * 1、DAG有向无环图
     *
     * RDD任务切分
     * 分为:Application、Job、Stage和Task
     * Application:初始化一个SparkContext即生成一个
     * Job:应该Action算子就会生成一个Job
     * Stage:等于宽依赖的个数加1
     * Task:应该Stage阶段中,最后一个RDD的分区个数就是Task的个数
     *
     */
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、创建RDD数据集
        JavaRDD<String> sourceRDD = sparkContext.textFile("./input/word.txt");
        //4、打印sourceRDD的血缘
        System.out.println("--------------sourceRDD的血缘------------");
        System.out.println(sourceRDD.toDebugString());
        //5 炸裂RDD(flatMap)
        JavaRDD<String> flatmapRDD = sourceRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //6、打印flatmapRDD的血缘
        System.out.println("--------------flatmapRDD的血缘------------");
        System.out.println(flatmapRDD.toDebugString());
        //7、转换为--->(word,1) mapToPair
        JavaPairRDD<String, Integer> mapToPairRDD = flatmapRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        //8、打印mapToPairRDD的血缘
        System.out.println("--------------mapToPairRDD的血缘------------");
        System.out.println(mapToPairRDD.toDebugString());
        //9、统计每个单词的个数
        JavaPairRDD<String, Integer> reduceByKeyRDD = mapToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //8、打印reduceByKeyRDD的血缘
        System.out.println("--------------reduceByKeyRDD的血缘--------------");
        System.out.println(reduceByKeyRDD.toDebugString());
        //9、收集打印
        System.out.println("打印结果:\n");
        reduceByKeyRDD.collect().forEach(System.out::println);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:

3.2 依赖关系

1、窄依赖:

(1)表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一)

(2)窄依赖可以形象的比喻为独生子女

2、宽依赖

(1)表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle

(2)宽依赖可以形象的比喻为超生

3、总结

(1)具有宽依赖的transformations包括:sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作

(2)宽依赖对Spark去评估一个transformatioins有更加重要的影响,比如对性能的影响。

(3)在不影响业务要求的情况下,要避免使用具有宽依赖的转换算子,因为宽依赖一定会走Shuffle,影响性能。

3.3 Stage任务划分

1、DAG有向无环图

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。

如下,DAG记录了RDD的转换过程和任务的阶段。

2、任务运行的整体流程

3、RDD任务切分

RDD的任务切分中分为:Application、Job、Stage和Task。

(1)Application:初始化一个SparkContext即生成一个

(2)Job:一个Action算子就会生成一个

(3)Stage:Stage等于宽依赖的个数+1

(4)Task:一个Stage中,最后一个RDD的分区个数就是Task的个数。

4、执行任务

再次运行Test01_dependency程序,添加上线程睡眠,方可查看job信息

##额外添加两个Action算子
     reduceByKeyRDD.collect().forEach(System.out::println);
        reduceByKeyRDD.collect().forEach(System.out::println);

5、查看Job个数

查看http://localhost:4040/jobs/,发现Job有三个。

6、查看Stage个数

查看Job0的Stage。由于只有1个Shuffle阶段,所以Stage个数为2。

job1的

job2是和job1一样的

7、Task个数

都是两个

注意:如果存在shuffle过程,系统会自动进行缓存,UI界面显示skipped的部分。

相关文章
|
7天前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
59 0
|
1月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
27 0
|
1月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
62 0
|
1月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
16 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0
|
1月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
29 0
|
1月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
43 0
|
1月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
36 4
|
1月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
42 4