1、WordCount案例实操
导入项目依赖
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.0</version> </dependency> </dependencies>
1.1 本地调试
本地Spark程序调试需要使用Local提交模式,即将本机当做运行环境,Master和Worker都为本机。运行时直接加断点调试即可。
1、准备测试文件word.txt
hello world hello zhm hello future
2、代码实现
package com.zhm.spark.wordcount; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * @ClassName WordCountLocal * @Description TODO * @Author Zouhuiming * @Date 2023/6/28 14:15 * @Version 1.0 */ public class WordCountLocal { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("WordCountLocal"); //2、创建sparkContext JavaSparkContext sparkContext = new JavaSparkContext(conf); //3、Todo 获取RDD JavaRDD<String> javaRDD = sparkContext.textFile("input/word.txt"); //4、对每行数据根据分隔符进行拆分 JavaRDD<String> stringJavaRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.stream(s.split(" ")).iterator(); } }); //5、给每个元素加上一个1 JavaPairRDD<String, Integer> javaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s, 1); } }); //6、利用ReduceByKey对相同key的数据进行累加 JavaPairRDD<String, Integer> result = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //7、收集结果输出 result.collect().forEach(System.out::println); //x 关闭 sparkContext sparkContext.stop(); } }
运行结果:
1.2 集群运行
1、修改代码
package com.zhm.spark.wordcount; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * @ClassName WordCountYarn * @Description TODO * @Author Zouhuiming * @Date 2023/6/28 14:25 * @Version 1.0 */ public class WordCountYarn { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("yarn").setAppName("WordCountYarn"); //2、创建sparkContext JavaSparkContext sparkContext = new JavaSparkContext(conf); //3、Todo 获取RDD JavaRDD<String> javaRDD = sparkContext.textFile(args[0]); //4、按行读取然后按分隔符切分字符串 JavaRDD<String> stringJavaRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.stream(s.split(" ")).iterator(); } }); //5、将每个单词转换为(word,1) JavaPairRDD<String, Integer> pairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s, 1); } }); //6、累加相同key的值 JavaPairRDD<String, Integer> result = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //7、将数据储存到文件上 result.saveAsTextFile(args[1]); //x 关闭 sparkContext sparkContext.stop(); } }
2、打包到集群测试
(1)点击package打包,然后,查看打完后的jar包
(2)将WordCount.jar上传到/opt/module/spark-yarn目录
(3)在HDFS上创建,存储输入文件的路径/input
(4)创建test_data并上传word.txt文件到/opt/module/spark-yarn/test_data/目录下,在上传到HDFS的/input路径下
(5)执行任务
bin/spark-submit \ --class com.atguigu.spark.WordCount \ --master yarn \ ./WordCount.jar \ /input \ /output ##注意:input和ouput都是HDFS上的集群路径
(6)查询运行结果
hadoop fs -cat /output/*
2、RDD序列化
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是:
(1)初始化工作(与计算无关的操作)是在Driver端进行的
(2)而实际运行程序(数据计算操作)是在Executor端进行的
这就涉及到了跨进程通信,是需要序列化的。
2.1 序列化测试
1、 创建包名:com.zhm.spark.operator.serializable
2、创建使用的javaBean:User
3、创建类:Test_user测试序列化:将RDD中元素包装为User进行测试
package com.zhm.spark.operator.serializable; /** * @ClassName User * @Description TODO * @Author Zouhuiming * @Date 2023/6/29 11:32 * @Version 1.0 */ public class User { private String name; private int age; public User() { } public User(String name, int age) { this.name = name; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "User{" + "name='" + name + '\'' + ", age=" + age + '}'; } }
package com.zhm.spark.operator.serializable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import java.util.Arrays; /** * @ClassName Test_User * @Description TODO * @Author Zouhuiming * @Date 2023/6/29 11:33 * @Version 1.0 */ public class Test_User { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test_User"); //2、创建sparkContext JavaSparkContext sparkContext = new JavaSparkContext(conf); //3、创建RDD数据集 JavaRDD<User> javaRDD = sparkContext.parallelize(Arrays.asList(new User("zhm", 24), new User("zhm1", 25))); javaRDD.foreach(new VoidFunction<User>() { @Override public void call(User user) throws Exception { System.out.println(user); } }); //x 关闭 sparkContext sparkContext.stop(); } }
运行结果:
对javaBean:User类进行修改
package com.zhm.spark.operator.serializable; import scala.Serializable; /** * @ClassName User * @Description TODO * @Author Zouhuiming * @Date 2023/6/29 11:32 * @Version 1.0 */ public class User implements Serializable { private String name; private int age; public User() { } public User(String name, int age) { this.name = name; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "User{" + "name='" + name + '\'' + ", age=" + age + '}'; } }
再次运行结果:
2.2 Kryo序列化框架
Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。
Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
使用Kryo序列化框架的步骤
// 1.创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore") // 替换默认的序列化机制 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册需要使用kryo序列化的自定义类 .registerKryoClasses(new Class[]{Class.forName("com.atguigu.bean.User")});
3、RDD依赖关系
3.1 查看血缘关系
RDD只支持粗粒度转换,每一个转换操作都是对上游RDD的元素执行函数f得到一个新的RDD,所以RDD之间就会形成类似流水线的前后依赖关系。
将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算丢失的RDD的数据分区所依赖的父RDD分区数据以实现恢复,这样就避免了从头再次开始计算了。
1、创建包名com.zhm.spark.operator.dependency
2、代码实现
package com.zhm.spark.operator.dependency; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * @ClassName Test01 * @Description TODO * @Author Zouhuiming * @Date 2023/6/29 11:41 * @Version 1.0 */ public class Test01 { /** * 不Shuffle的转换算子都是MapPartitionsRDD * 窄依赖:表示每一个父RDD的Partition最多被子RDD的一个Partition使用(独生子女) * * 宽依赖:表示同一个父RDD的Partition被多个子RDD的Partition依赖(超生) * --sort、reduceByKey、groupByKey、join和调用rePartition函数 一般都是要Shuffle的算子 * * * Stage任务划分 * 1、DAG有向无环图 * * RDD任务切分 * 分为:Application、Job、Stage和Task * Application:初始化一个SparkContext即生成一个 * Job:应该Action算子就会生成一个Job * Stage:等于宽依赖的个数加1 * Task:应该Stage阶段中,最后一个RDD的分区个数就是Task的个数 * */ public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01"); //2、创建sparkContext JavaSparkContext sparkContext = new JavaSparkContext(conf); //3、创建RDD数据集 JavaRDD<String> sourceRDD = sparkContext.textFile("./input/word.txt"); //4、打印sourceRDD的血缘 System.out.println("--------------sourceRDD的血缘------------"); System.out.println(sourceRDD.toDebugString()); //5 炸裂RDD(flatMap) JavaRDD<String> flatmapRDD = sourceRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.stream(s.split(" ")).iterator(); } }); //6、打印flatmapRDD的血缘 System.out.println("--------------flatmapRDD的血缘------------"); System.out.println(flatmapRDD.toDebugString()); //7、转换为--->(word,1) mapToPair JavaPairRDD<String, Integer> mapToPairRDD = flatmapRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s, 1); } }); //8、打印mapToPairRDD的血缘 System.out.println("--------------mapToPairRDD的血缘------------"); System.out.println(mapToPairRDD.toDebugString()); //9、统计每个单词的个数 JavaPairRDD<String, Integer> reduceByKeyRDD = mapToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //8、打印reduceByKeyRDD的血缘 System.out.println("--------------reduceByKeyRDD的血缘--------------"); System.out.println(reduceByKeyRDD.toDebugString()); //9、收集打印 System.out.println("打印结果:\n"); reduceByKeyRDD.collect().forEach(System.out::println); //x 关闭 sparkContext sparkContext.stop(); } }
运行结果:
3.2 依赖关系
1、窄依赖:
(1)表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一)
(2)窄依赖可以形象的比喻为独生子女
2、宽依赖
(1)表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle
(2)宽依赖可以形象的比喻为超生
3、总结
(1)具有宽依赖的transformations包括:sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作
(2)宽依赖对Spark去评估一个transformatioins有更加重要的影响,比如对性能的影响。
(3)在不影响业务要求的情况下,要避免使用具有宽依赖的转换算子,因为宽依赖一定会走Shuffle,影响性能。
3.3 Stage任务划分
1、DAG有向无环图
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。
如下,DAG记录了RDD的转换过程和任务的阶段。
2、任务运行的整体流程
3、RDD任务切分
RDD的任务切分中分为:Application、Job、Stage和Task。
(1)Application:初始化一个SparkContext即生成一个
(2)Job:一个Action算子就会生成一个
(3)Stage:Stage等于宽依赖的个数+1
(4)Task:一个Stage中,最后一个RDD的分区个数就是Task的个数。
4、执行任务
再次运行Test01_dependency程序,添加上线程睡眠,方可查看job信息
##额外添加两个Action算子 reduceByKeyRDD.collect().forEach(System.out::println); reduceByKeyRDD.collect().forEach(System.out::println);
5、查看Job个数
查看http://localhost:4040/jobs/,发现Job有三个。
6、查看Stage个数
查看Job0的Stage。由于只有1个Shuffle阶段,所以Stage个数为2。
job1的
job2是和job1一样的
7、Task个数
都是两个
注意:如果存在shuffle过程,系统会自动进行缓存,UI界面显示skipped的部分。