Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)

简介: Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)

1、RDD概述

1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫弹性分布式数据集,是Spark中对于分布式数据集的抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

1.2 RDD五大特性

1、一组分区,即是数据集的基本组成单位,标记数据是哪个分区的

2、一个计算每个分区的函数

3、RDD之间的依赖关系

4、一个Partitioner,即RDD的分片函数:控制分区的数据流向(键值对)

5、一个列表,储存存取每个Partition的优先位置(prefered Location)。如果节点和分区个数不对应优先把分区设置在那个节点。移动数据不如移动计算,除非资源不够。

2、RDD编程

2.1 RDD的创建

在Spark中创建RDD的创建方式可以分为三种:

1、从集合中创建

2、从外部储存创建

3、从其他RDD创建

2.1.1 IDEA环境准备

1、创建一个maven工程,工程名称叫SparkCore

2、在pom文件中添加spark-core的依赖和scala的编译插件

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.3.0</version>
    </dependency>
</dependencies>

3、如果不想运行时打印大量日志,可以在resources文件夹中添加log4j2.properties文件,并添加日志配置信息

log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
2.1.2 创建IDEA快捷键

创建SparkContext和SparkConf时存在的模板代码,我们可以设置idea快捷键一键生成。

1、点击File->Settings…->Editor->Live Templates->output->Live Template

bcdfb0f56be44dc69cd1894f874af392.png

6ef6d0c6f4b54016a9a65e2d58986884.png

f716f71111b44101ac6d909c8764eebd.png

//第八步的代码
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// TODO. 编写代码
// x. 关闭sc
sc.stop();

设置自动导包

5679b9df63aa42458dad1272fce08110.png

2.1.3 从集合中创建

1、创建包com.zhm.spark

2、创建类Test01_createRDDWithList

public class Test01_createRDDWithList {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo 编写代码--由字符串数组创建RDD
        JavaRDD<String> stringRDD = sparkContext.parallelize(Arrays.asList("hello", "zhm"));
        //4、收集RDD
        List<String> result = stringRDD.collect();
        //5、遍历打印输出结果
        result.forEach(System.out::println);
        //6、 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:

7571d319df8c4c51952b160f479aedb1.png

2.1.4从外部储存系统的数据集创建

外部存储系统的数据集创建RDD如:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等。

1、数据准备

在新建的SparkCore项目名称上右键–>新建input文件夹–>在input文件夹上右键–>新建word.txt。编辑如下内容

hello world
hello zhm
hello future

2、创建RDD

public class Test02_createRDDWithFile {
    public static void main(String[] args) {
       //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建SparkContext
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
        //3、编写代码--读取路径./input下的文件,并创建RDD
        JavaRDD<String> fileRDD = javaSparkContext.textFile("./input/word.txt");
        //4、收集RDD
        List<String> result = fileRDD.collect();
        //5、遍历打印输出结果
        result.forEach(System.out::println);
        //6、关闭 sparkContext
        javaSparkContext.stop();
    }
}

运行结果:


14d1a3866fde4984a563edd5357baaba.png

2.2 分区规则

2.2.1 从集合创建RDD

1、创建一个包名:com.zhm.spark.partition

2、代码验证

package com.zhm.spark.partition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
/**
 * @ClassName Test01_ListPartition
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 11:42
 * @Version 1.0
 */
public class Test01_ListPartition {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
        //3、编写代码
        // 0*(5/2)=0   1*(5/2)=2.5   (0,2.5] 左开右闭 1,2
        //1*(5/2)=2.5   2*(5/2)=5     (2.5,5]左开右闭 3,4,5
        JavaRDD<Integer> integerRDD = javaSparkContext.parallelize(Arrays.asList(11, 12, 36, 14, 05), 2);
//        4、将RDD储存问文件观察文件判断分区
        integerRDD.saveAsTextFile("output");
//        JavaRDD<String> stringRDD = javaSparkContext.parallelize(Arrays.asList("1", "2", "3", "4", "5"),2);
//        stringRDD.saveAsTextFile("output");
        //5、关闭javaSparkContext
        javaSparkContext.stop();
    }
}

运行结果:

2fcfca88d22347ecb479b1ee69385d2d.png

7f7e8f14f046402282b4b85c212a6818.png

2.2.2 从文件创建RDD

package com.zhm.spark.partition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
 * @ClassName Test02_FilePartition
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/27 13:54
 * @Version 1.0
 */
public class Test02_FilePartition {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext对象
        JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
        //3、编写代码
        JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("input/1.txt",3);
        //4、将stringJavaRDD储存到文件中
        stringJavaRDD.saveAsTextFile("output");
        //5、关闭资源
        javaSparkContext.stop();
    }
}

运行结果:

703d97317c9540ccbffa8b05bbe75daf.png


7e53b8733ac84159bf897378f4fb5385.png

dc90214355764b1a95fe09568a0f8701.png

、分区规则

(1)分区数量的计算方式:

如果: JavaRDD stringJavaRDD = javaSparkContext.textFile(“input/1.txt”,3);

a332de95075f47a4b8adb4b22a9816d4.png

totalSize = 10 // totalSize指的是文件中的真实长度,这里需要确认你的文件换行符,不同的换行符是不一样的

goalSize = 10 / 3 = 3(byte) //表示每个分区存储3字节的数据

分区数= totalSize/ goalSize = 10 /3 => 3,3,4

由于第三个分区的4子节大于3子节的1.1倍,符合hadoop切片1.1倍的策略,因此会多创建一个分区,即3,3,3,1

(2)Spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,跟字节数没有关系

(3)数据读取位置计算是以偏移量为单位来进行计算的。

(4)数据分区的偏移量范围的计算

f3f5368baec24e688dc5c1dadaf4a297.png

2.3 Transformation 转换算子

2.3.1 Value类型

创建包名com.zhm.spark.operator.value

2.3.1.1 map()映射

1、用法:给定映射函数f,map(f)以元素为粒度对RDD做数据转换

2、映射函数f:

(1)映射函数f可以带有明确签名函数,也可以是匿名内部函数

(2)映射函数f的参数类型必须与RDD的元素类型保持一致,而输出类型则任由开发者自行决定。

3、解释说明:

函数f是一个函数可以写作匿名子类,它可以接受一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并一次应用f函数,从而产生一个新的RDD。即这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。

4、需求:将Lover.txt文件中的每行结尾拼接“Thank you”

5、具体实现

package com.zhm.spark.operator.value;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
/**
 * @ClassName StudyMap
 * @Description 对单个元素进行操作
 * @Author Zouhuiming
 * @Date 2023/6/27 14:04
 * @Version 1.0
 */
public class StudyMap {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、编写代码  对元素进行操作
        JavaRDD<String> stringJavaRDD = sparkContext.textFile("input/Lover.txt");
        JavaRDD<String> mapRDD = stringJavaRDD.map(s -> s + " Thank you");
        JavaRDD<String> mapRDD1 = stringJavaRDD.map(new Function<String, String>() {
            @Override
            public String call(String s) throws Exception {
                return s + "Thank you";
            }
        });
        //4、遍历打印输出结果
        mapRDD.collect().forEach(System.out::println);
        System.out.println("++++++++++++++++++++++++");
        mapRDD1.collect().forEach(System.out::println);
        //5 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果

991690b424e442a38d4a770f92b58299.png

2.3.1.2 flatMap()扁平化

flatMap其实和map算子一样,flatMap也是用来做数据映射的。

1、用法:flatMap(f),以元素为粒度,对RDD进行数据转换。

2、特点:

不同于map映射函数f的类型是(元素)->(元素)

flatMap的映射函数类型是(元素)->(集合)

3、过程:

(1)以元素为单位,创建集合

(2)去掉集合“外包装”,提前集合元素

4、功能说明

与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。

区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。

5、案例说明:创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中。

6、具体实现

package com.zhm.spark.operator.value;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
 * @ClassName StudyFLatMap
 * @Description 炸裂
 * @Author Zouhuiming
 * @Date 2023/6/27 14:09
 * @Version 1.0
 */
public class StudyFLatMap {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、编写逻辑代码--创建列表arrayList,其中每个元素的类型是字符串列表
        ArrayList<List<String>> arrayList = new ArrayList<>();
        arrayList.add(Arrays.asList("1","2","3"));
        arrayList.add(Arrays.asList("4","5","6"));
        arrayList.add(Arrays.asList("7","8","9"));
        //4、根据arraylist创建RDD
        JavaRDD<List<String>> listJavaRDD = sparkContext.parallelize(arrayList);
        //5、使用flatMap将RDD中每个元素进行转换打散,泛型为打散之后的数据
        JavaRDD<String> stringJavaRDD = listJavaRDD.flatMap(new FlatMapFunction<List<String>, String>() {
            @Override
            public Iterator<String> call(List<String> strings) throws Exception {
                return strings.iterator();
            }
        });
        //6、收集RDD,并打印输出
        System.out.println("---------输出集合构建的RDD之flatMap测试------------");
        stringJavaRDD.collect().forEach(System.out::println);
        //Todo 从文件读取数据的话要自己实现将元素转换为集合
        //7、读取文件中的数据
        JavaRDD<String> javaRDD = sparkContext.textFile("input/word.txt");
        //8、将每行数据按空格切分之后,转换为一个list数组再将String数组转换为list集合返回list集合的迭代器
        JavaRDD<String> stringJavaRDD1 = javaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                String[] split = s.split(" ");
                return Arrays.asList(split).iterator();
            }
        });
        //9、收集RDD,并打印输出
        System.out.println("-----输出文件系统构建的RDD之flatMap测试---");
        stringJavaRDD1.collect().forEach(s -> {
            System.out.println(s);
        });
        //10 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果

0c4021864d5f42c7b80e423f0078507b.png

2.3.1.3 filter()过滤

1、用法:filter(f),以元素为粒度对RDD执行判定函数f

2、判定函数

(1)f,指的是类型为(RDD元素类型)=> (Boolean)的函数

(2)判定函数f的形参类型,必须与RDD的元素类型保持一致,而f的返回结果,只能是True或者False。

3、功能说明

(1)在任何一个RDD上调用filter(f)方法时,会对该RDD中每一个元素应用f函数

(2)作用是保留RDD中满足f(即f返回值为True)的数据,过滤掉不满足f(即f返回值为false)的数据。

4、需求说明:创建一个RDD,过滤出对2取余等于0的数据

26cafc5439e04b4db6068e62327b11fb.png

5、代码实现

package com.zhm.spark.operator.value;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.Arrays;
/**
 * @ClassName StudyFilter
 * @Description 过滤元素
 * @Author Zouhuiming
 * @Date 2023/6/27 14:26
 * @Version 1.0
 */
public class StudyFilter {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
        //2、创建sparkContext
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        //3、Todo  根据集合创建RDD
        JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
        // 根据数据与2取模,过滤掉余数不是0的数据元素
        JavaRDD<Integer> filterRDD = javaRDD.filter(new Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        });
        System.out.println("------filter算子测试------");
        filterRDD.collect().forEach(System.out::println);
        //x 关闭 sparkContext
        sparkContext.stop();
    }
}

运行结果:


0ac8374526e042a49cfa236e728794c9.png

相关文章
|
2月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
3月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
51 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
43 0
|
3月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
97 0
|
3月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
58 0
|
3月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
73 0
|
3月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
69 0
|
分布式计算 Java Spark
Spark学习之编程进阶——累加器与广播(5)
Spark学习之编程进阶——累加器与广播(5) 1. Spark中两种类型的共享变量:累加器(accumulator)与广播变量(broadcast variable)。累加器对信息进行聚合,而广播变量用来高效分发较大的对象。 2. 共享变量是一种可以在Spark任务中使用的特殊类型的变量。 3. 累加器的用法: 通过在驱动器中调用SparkContex
1845 0
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
180 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
82 0