1、RDD概述
1.1 什么是RDD
RDD(Resilient Distributed Dataset)叫弹性分布式数据集,是Spark中对于分布式数据集的抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
1.2 RDD五大特性
1、一组分区,即是数据集的基本组成单位,标记数据是哪个分区的
2、一个计算每个分区的函数
3、RDD之间的依赖关系
4、一个Partitioner,即RDD的分片函数:控制分区的数据流向(键值对)
5、一个列表,储存存取每个Partition的优先位置(prefered Location)。如果节点和分区个数不对应优先把分区设置在那个节点。移动数据不如移动计算,除非资源不够。
2、RDD编程
2.1 RDD的创建
在Spark中创建RDD的创建方式可以分为三种:
1、从集合中创建
2、从外部储存创建
3、从其他RDD创建
2.1.1 IDEA环境准备
1、创建一个maven工程,工程名称叫SparkCore
2、在pom文件中添加spark-core的依赖和scala的编译插件
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.0</version> </dependency> </dependencies>
3、如果不想运行时打印大量日志,可以在resources文件夹中添加log4j2.properties文件,并添加日志配置信息
log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Set the default spark-shell log level to ERROR. When running the spark-shell, the # log level for this class is used to overwrite the root logger's log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=ERROR # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=ERROR log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
2.1.2 创建IDEA快捷键
创建SparkContext和SparkConf时存在的模板代码,我们可以设置idea快捷键一键生成。
1、点击File->Settings…->Editor->Live Templates->output->Live Template
//第八步的代码 // 1.创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore"); // 2. 创建sparkContext JavaSparkContext sc = new JavaSparkContext(conf); // TODO. 编写代码 // x. 关闭sc sc.stop();
设置自动导包
2.1.3 从集合中创建
1、创建包com.zhm.spark
2、创建类Test01_createRDDWithList
public class Test01_createRDDWithList { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore"); //2、创建sparkContext JavaSparkContext sparkContext = new JavaSparkContext(conf); //3、Todo 编写代码--由字符串数组创建RDD JavaRDD<String> stringRDD = sparkContext.parallelize(Arrays.asList("hello", "zhm")); //4、收集RDD List<String> result = stringRDD.collect(); //5、遍历打印输出结果 result.forEach(System.out::println); //6、 关闭 sparkContext sparkContext.stop(); } }
运行结果:
2.1.4从外部储存系统的数据集创建
外部存储系统的数据集创建RDD如:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等。
1、数据准备
在新建的SparkCore项目名称上右键–>新建input文件夹–>在input文件夹上右键–>新建word.txt。编辑如下内容
hello world hello zhm hello future
2、创建RDD
public class Test02_createRDDWithFile { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore"); //2、创建SparkContext JavaSparkContext javaSparkContext = new JavaSparkContext(conf); //3、编写代码--读取路径./input下的文件,并创建RDD JavaRDD<String> fileRDD = javaSparkContext.textFile("./input/word.txt"); //4、收集RDD List<String> result = fileRDD.collect(); //5、遍历打印输出结果 result.forEach(System.out::println); //6、关闭 sparkContext javaSparkContext.stop(); } }
运行结果:
2.2 分区规则
2.2.1 从集合创建RDD
1、创建一个包名:com.zhm.spark.partition
2、代码验证
package com.zhm.spark.partition; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; /** * @ClassName Test01_ListPartition * @Description TODO * @Author Zouhuiming * @Date 2023/6/27 11:42 * @Version 1.0 */ public class Test01_ListPartition { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore"); //2、创建sparkContext JavaSparkContext javaSparkContext = new JavaSparkContext(conf); //3、编写代码 // 0*(5/2)=0 1*(5/2)=2.5 (0,2.5] 左开右闭 1,2 //1*(5/2)=2.5 2*(5/2)=5 (2.5,5]左开右闭 3,4,5 JavaRDD<Integer> integerRDD = javaSparkContext.parallelize(Arrays.asList(11, 12, 36, 14, 05), 2); // 4、将RDD储存问文件观察文件判断分区 integerRDD.saveAsTextFile("output"); // JavaRDD<String> stringRDD = javaSparkContext.parallelize(Arrays.asList("1", "2", "3", "4", "5"),2); // stringRDD.saveAsTextFile("output"); //5、关闭javaSparkContext javaSparkContext.stop(); } }
运行结果:
2.2.2 从文件创建RDD
package com.zhm.spark.partition; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; /** * @ClassName Test02_FilePartition * @Description TODO * @Author Zouhuiming * @Date 2023/6/27 13:54 * @Version 1.0 */ public class Test02_FilePartition { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore"); //2、创建sparkContext对象 JavaSparkContext javaSparkContext = new JavaSparkContext(conf); //3、编写代码 JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("input/1.txt",3); //4、将stringJavaRDD储存到文件中 stringJavaRDD.saveAsTextFile("output"); //5、关闭资源 javaSparkContext.stop(); } }
运行结果:
、分区规则
(1)分区数量的计算方式:
如果: JavaRDD stringJavaRDD = javaSparkContext.textFile(“input/1.txt”,3);
totalSize = 10 // totalSize指的是文件中的真实长度,这里需要确认你的文件换行符,不同的换行符是不一样的
goalSize = 10 / 3 = 3(byte) //表示每个分区存储3字节的数据
分区数= totalSize/ goalSize = 10 /3 => 3,3,4
由于第三个分区的4子节大于3子节的1.1倍,符合hadoop切片1.1倍的策略,因此会多创建一个分区,即3,3,3,1
(2)Spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,跟字节数没有关系
(3)数据读取位置计算是以偏移量为单位来进行计算的。
(4)数据分区的偏移量范围的计算
2.3 Transformation 转换算子
2.3.1 Value类型
创建包名com.zhm.spark.operator.value
2.3.1.1 map()映射
1、用法:给定映射函数f,map(f)以元素为粒度对RDD做数据转换
2、映射函数f:
(1)映射函数f可以带有明确签名函数,也可以是匿名内部函数
(2)映射函数f的参数类型必须与RDD的元素类型保持一致,而输出类型则任由开发者自行决定。
3、解释说明:
函数f是一个函数可以写作匿名子类,它可以接受一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并一次应用f函数,从而产生一个新的RDD。即这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。
4、需求:将Lover.txt文件中的每行结尾拼接“Thank you”
5、具体实现
package com.zhm.spark.operator.value; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; /** * @ClassName StudyMap * @Description 对单个元素进行操作 * @Author Zouhuiming * @Date 2023/6/27 14:04 * @Version 1.0 */ public class StudyMap { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore"); //2、创建sparkContext JavaSparkContext sparkContext = new JavaSparkContext(conf); //3、编写代码 对元素进行操作 JavaRDD<String> stringJavaRDD = sparkContext.textFile("input/Lover.txt"); JavaRDD<String> mapRDD = stringJavaRDD.map(s -> s + " Thank you"); JavaRDD<String> mapRDD1 = stringJavaRDD.map(new Function<String, String>() { @Override public String call(String s) throws Exception { return s + "Thank you"; } }); //4、遍历打印输出结果 mapRDD.collect().forEach(System.out::println); System.out.println("++++++++++++++++++++++++"); mapRDD1.collect().forEach(System.out::println); //5 关闭 sparkContext sparkContext.stop(); } }
运行结果
2.3.1.2 flatMap()扁平化
flatMap其实和map算子一样,flatMap也是用来做数据映射的。
1、用法:flatMap(f),以元素为粒度,对RDD进行数据转换。
2、特点:
不同于map映射函数f的类型是(元素)->(元素)
flatMap的映射函数类型是(元素)->(集合)
3、过程:
(1)以元素为单位,创建集合
(2)去掉集合“外包装”,提前集合元素
4、功能说明
与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。
区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。
5、案例说明:创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中。
6、具体实现
package com.zhm.spark.operator.value; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * @ClassName StudyFLatMap * @Description 炸裂 * @Author Zouhuiming * @Date 2023/6/27 14:09 * @Version 1.0 */ public class StudyFLatMap { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore"); //2、创建sparkContext JavaSparkContext sparkContext = new JavaSparkContext(conf); //3、编写逻辑代码--创建列表arrayList,其中每个元素的类型是字符串列表 ArrayList<List<String>> arrayList = new ArrayList<>(); arrayList.add(Arrays.asList("1","2","3")); arrayList.add(Arrays.asList("4","5","6")); arrayList.add(Arrays.asList("7","8","9")); //4、根据arraylist创建RDD JavaRDD<List<String>> listJavaRDD = sparkContext.parallelize(arrayList); //5、使用flatMap将RDD中每个元素进行转换打散,泛型为打散之后的数据 JavaRDD<String> stringJavaRDD = listJavaRDD.flatMap(new FlatMapFunction<List<String>, String>() { @Override public Iterator<String> call(List<String> strings) throws Exception { return strings.iterator(); } }); //6、收集RDD,并打印输出 System.out.println("---------输出集合构建的RDD之flatMap测试------------"); stringJavaRDD.collect().forEach(System.out::println); //Todo 从文件读取数据的话要自己实现将元素转换为集合 //7、读取文件中的数据 JavaRDD<String> javaRDD = sparkContext.textFile("input/word.txt"); //8、将每行数据按空格切分之后,转换为一个list数组再将String数组转换为list集合返回list集合的迭代器 JavaRDD<String> stringJavaRDD1 = javaRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { String[] split = s.split(" "); return Arrays.asList(split).iterator(); } }); //9、收集RDD,并打印输出 System.out.println("-----输出文件系统构建的RDD之flatMap测试---"); stringJavaRDD1.collect().forEach(s -> { System.out.println(s); }); //10 关闭 sparkContext sparkContext.stop(); } }
运行结果
2.3.1.3 filter()过滤
1、用法:filter(f),以元素为粒度对RDD执行判定函数f
2、判定函数
(1)f,指的是类型为(RDD元素类型)=> (Boolean)的函数
(2)判定函数f的形参类型,必须与RDD的元素类型保持一致,而f的返回结果,只能是True或者False。
3、功能说明
(1)在任何一个RDD上调用filter(f)方法时,会对该RDD中每一个元素应用f函数
(2)作用是保留RDD中满足f(即f返回值为True)的数据,过滤掉不满足f(即f返回值为false)的数据。
4、需求说明:创建一个RDD,过滤出对2取余等于0的数据
5、代码实现
package com.zhm.spark.operator.value; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import java.util.Arrays; /** * @ClassName StudyFilter * @Description 过滤元素 * @Author Zouhuiming * @Date 2023/6/27 14:26 * @Version 1.0 */ public class StudyFilter { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore"); //2、创建sparkContext JavaSparkContext sparkContext = new JavaSparkContext(conf); //3、Todo 根据集合创建RDD JavaRDD<Integer> javaRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); // 根据数据与2取模,过滤掉余数不是0的数据元素 JavaRDD<Integer> filterRDD = javaRDD.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer integer) throws Exception { return integer % 2 == 0; } }); System.out.println("------filter算子测试------"); filterRDD.collect().forEach(System.out::println); //x 关闭 sparkContext sparkContext.stop(); } }
运行结果: