一、Spark结构原理
1.Driver进程启动之后,会做一些初始化的操作。在这个操作中,就会发送请求到Master进行spark应用程序的注册;目的是告诉master主节点,有一个新的spark应用程序要运行。
2.Master在接受到spark应用程序的注册申请之后,会发送请求给Worker进行资源的调度和分配,也即是Executor的分配
3.Worker接受到Master的请求之后,会为spark应用程序启动Executor。
4.Executor启动之后,会向Driver进行反向注册。这样driver就知道有哪些Executor正在为它服务了
5.Driver注册了一些Executor之后,就可以开始正式执行我们的spark应用程序了它首先要做的,就是读取HDFS的数据来创建初始RDD。
6.HDFS文件内容被读取到多个worker节点上,形成内存中的分布式数据集,也即是RDD partition
7.Driver会根据我们对RDD定义的操作,提交一大堆Task到Executor。
二、创建RDD的几种方式
进行Spark核心编程时,首先要做的第一件事,就是创建一个初始的RDD。该RDD中,通常就代表和包含了Spark应用程序的输入源数据。然后在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。
Spark Core提供了三种创建RDD的方式,包括:
使用程序中的集合创建RDD
使用本地文件创建RDD
使用HDFS文件创建RDD
个人经验认为:
1、使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程。
2、使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件。
3、使用HDFS文件创建RDD,应该是最常用的生产环境处理方式,主要可以针对HDFS上存储的大数据,进行离线批处理操作。
(1)Parallelize集合
如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。 Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。
// 案例: 1到10累加求和 val arr= Array(1,2, 3, 4, 5,6,7,8, 9, 10) val rdd = sc.parallelize(arr) val sum = rdd.reduce(_ + _)
调用parallelize()时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会 为每一个parition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2~4个partition。Spark默认 会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如pallelize(arr,10)
(2)本地或者HDFS文件
Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、 HBase以及 本地文件。通过调用SparkContext的**textFile()**方法,可以针对本地文件或HDFS文件创建RDD。
有几个事项是需要注意的:
1、如果是针对本地文件的话,如果是在windows上本地测试,windows上有一份文件即可;如果是在spark集群上针对linux本地文件,那么需要将文件拷贝到所有worker节点上。
2、Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建。
3、Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少。
//案例:文件字数统计 val rdd = sc.textFile("data.txt") val wordCount = rdd.map(line => line.length).reduce(_ + _)
三、Transformation算子
Spark支持两种RDD操作: transformation和action
transformation操作会针对已有的RDD创建一个新的RDD;而action则主要是对RDD进行最后的操作,比如遍历、reduce、 保存到文件等,并可以返回结果给Driver程序。
例如,map就是一种transformnation操作,它用于将已有RDD的每个元素传入一个自定义的函数,并获取一个新的元素,然后将所有的新元素组成一个新的RDD。而reduce就是一种action操作,它用于对RDD中的所有元素进行聚合操作,并获取一个最终的结果,然后返回给Driver程序。
transformation的特点就是lazy特性。lazy特性 指的是,如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是 不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过 这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果。
action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行。这是action的特性。
常见的Transformation算子:
使用Java语言对每一种Transformation算子举例讲解:
package com.kfk.spark.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/25 * @time : 7:10 下午 */ public class TransformationJava { public static JavaSparkContext getsc(){ SparkConf sparkConf = new SparkConf().setAppName("TransformationJava").setMaster("local"); return new JavaSparkContext(sparkConf); } public static void main(String[] args) { map(); filter(); flatmap(); groupByKey(); reduceByKey(); sortByKey(); join(); cogroup(); } /** * 数据集一:(2,"lili") cogroup() -> <2,<"lili",(90,95,99)>> * 数据集二:(2,90)(2,95)(2,99) */ private static void cogroup() { List stuList = Arrays.asList(new Tuple2<Integer,String>(1,"alex"), new Tuple2<Integer,String>(2,"lili"), new Tuple2<Integer,String>(3,"cherry"), new Tuple2<Integer,String>(4,"jack"), new Tuple2<Integer,String>(5,"jone"), new Tuple2<Integer,String>(6,"lucy"), new Tuple2<Integer,String>(7,"aliy")); List scoreList = Arrays.asList(new Tuple2<Integer,Integer>(1,90), new Tuple2<Integer,Integer>(2,79), new Tuple2<Integer,Integer>(2,95), new Tuple2<Integer,Integer>(2,99), new Tuple2<Integer,Integer>(3,87), new Tuple2<Integer,Integer>(3,88), new Tuple2<Integer,Integer>(3,89), new Tuple2<Integer,Integer>(4,98), new Tuple2<Integer,Integer>(5,89), new Tuple2<Integer,Integer>(6,93), new Tuple2<Integer,Integer>(7,96)); JavaSparkContext sc = getsc(); JavaPairRDD javaPairStuRdd = sc.parallelizePairs(stuList); JavaPairRDD javaPairScoreRdd = sc.parallelizePairs(scoreList); JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> cogroupValues = javaPairStuRdd.cogroup(javaPairScoreRdd); cogroupValues.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<Integer>, Iterable<Integer>>>>() { public void call(Tuple2<Integer, Tuple2<Iterable<Integer>, Iterable<Integer>>> integerTuple2Tuple2) throws Exception { System.out.println(integerTuple2Tuple2._1); System.out.println(integerTuple2Tuple2._2._1 + " : " + integerTuple2Tuple2._2._2); } }); } /** * 数据集一:(1,"alex") join() -> <1,<"alex",90>> * 数据集二:(1,90) */ private static void join() { List stuList = Arrays.asList(new Tuple2<Integer,String>(1,"alex"), new Tuple2<Integer,String>(2,"lili"), new Tuple2<Integer,String>(3,"cherry"), new Tuple2<Integer,String>(4,"jack"), new Tuple2<Integer,String>(5,"jone"), new Tuple2<Integer,String>(6,"lucy"), new Tuple2<Integer,String>(7,"aliy")); List scoreList = Arrays.asList(new Tuple2<Integer,Integer>(1,90), new Tuple2<Integer,Integer>(2,95), new Tuple2<Integer,Integer>(3,87), new Tuple2<Integer,Integer>(4,98), new Tuple2<Integer,Integer>(5,89), new Tuple2<Integer,Integer>(6,93), new Tuple2<Integer,Integer>(7,96)); JavaSparkContext sc = getsc(); JavaPairRDD javaPairStuRdd = sc.parallelizePairs(stuList); JavaPairRDD javaPairScoreRdd = sc.parallelizePairs(scoreList); JavaPairRDD<Integer,Tuple2<String,Integer>> joinValue = javaPairStuRdd.join(javaPairScoreRdd); joinValue.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() { public void call(Tuple2<Integer, Tuple2<String, Integer>> integerTuple2Tuple2) throws Exception { System.out.println(integerTuple2Tuple2._1); System.out.println(integerTuple2Tuple2._2._1 + " : " + integerTuple2Tuple2._2._2); } }); } /** * <90,alex> * <95,lili> sortByKey() -> <87,cherry> <90,alex> <95,lili> * <87,cherry> */ private static void sortByKey() { List list = Arrays.asList(new Tuple2<Integer,String>(90,"alex"), new Tuple2<Integer,String>(95,"lili"), new Tuple2<Integer,String>(87,"cherry"), new Tuple2<Integer,String>(98,"jack"), new Tuple2<Integer,String>(89,"jone"), new Tuple2<Integer,String>(93,"lucy"), new Tuple2<Integer,String>(96,"aliy")); JavaPairRDD<Integer, String> javaPairRdd = getsc().parallelizePairs(list); JavaPairRDD<Integer,String> sortByKeyValues = javaPairRdd.sortByKey(true); sortByKeyValues.foreach(new VoidFunction<Tuple2<Integer, String>>() { public void call(Tuple2<Integer, String> integerStringTuple2) throws Exception { System.out.println(integerStringTuple2._1 + " : " + integerStringTuple2._2); } }); } /** * <class_1,(90,87,98,96)> reduceByKey() -> <class_1,(90+87+98+96)> * <class_2,(95,89,93)> reduceByKey() -> <class_2,(95+89+93)> */ private static void reduceByKey() { List list = Arrays.asList(new Tuple2<String,Integer>("class_1",90), new Tuple2<String,Integer>("class_2",95), new Tuple2<String,Integer>("class_1",87), new Tuple2<String,Integer>("class_1",98), new Tuple2<String,Integer>("class_2",89), new Tuple2<String,Integer>("class_2",93), new Tuple2<String,Integer>("class_1",96)); final JavaPairRDD<String, Integer> javaPairRdd = getsc().parallelizePairs(list); JavaPairRDD<String,Integer> reduceByKeyValues = javaPairRdd.reduceByKey(new Function2<Integer,Integer,Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); reduceByKeyValues.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { System.out.println(stringIntegerTuple2._1 + " : " + stringIntegerTuple2._2); } }); } /** * class_1 90 groupByKey() -> <class_1,(90,87,98,96)> <class_2,(95,89,93)> * class_2 95 * class_1 87 * class_1 98 * class_2 89 * class_2 93 * class_1 96 */ private static void groupByKey() { List list = Arrays.asList(new Tuple2<String,Integer>("class_1",90), new Tuple2<String,Integer>("class_2",95), new Tuple2<String,Integer>("class_1",87), new Tuple2<String,Integer>("class_1",98), new Tuple2<String,Integer>("class_2",89), new Tuple2<String,Integer>("class_2",93), new Tuple2<String,Integer>("class_1",96)); JavaPairRDD<String,Integer> javaPairRdd = getsc().parallelizePairs(list); JavaPairRDD<String,Iterable<Integer>> groupByKeyValue = javaPairRdd.groupByKey(); groupByKeyValue.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() { public void call(Tuple2<String,Iterable<Integer>> stringIteratorTuple2) throws Exception { System.out.println(stringIteratorTuple2._1); Iterator<Integer> iterator = stringIteratorTuple2._2.iterator(); while (iterator.hasNext()){ System.out.println(iterator.next()); } } }); } /** * hbase hadoop hive * java python flatmap() -> hbase hadoop hive java python java python * java python */ private static void flatmap() { List<String> list = Arrays.asList("hbase hadoop hive","java python","java python"); JavaRDD<String> javaRdd = getsc().parallelize(list); JavaRDD<String> flatMapValue = javaRdd.flatMap(new FlatMapFunction<String,String>() { public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); flatMapValue.foreach(new VoidFunction<String>() { public void call(String value) throws Exception { System.out.println(value); } }); } /** * 1,2,3,4,5,6,7,8,9,10 filter() -> 2,4,6,8,10 */ private static void filter() { List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10); JavaRDD<Integer> javaRdd = getsc().parallelize(list); // 取偶数 JavaRDD<Integer> filterValue = javaRdd.filter(new Function<Integer,Boolean>() { public Boolean call(Integer value) throws Exception { return value % 2 == 0; } }); filterValue.foreach(new VoidFunction<Integer>() { public void call(Integer o) throws Exception { System.out.println(o); } }); } /** * 1,2,3,4,5 map() -> 10,20,30,40,50 */ public static void map(){ List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); JavaRDD<Integer> mapValue = javaRdd.map(new Function<Integer,Integer>() { public Integer call(Integer value) throws Exception { return value * 10; } }); mapValue.foreach(new VoidFunction<Integer>() { public void call(Integer o) throws Exception { System.out.println(o); } }); } }
使用Scala语言对每一种Transformation算子举例讲解:
package com.kfk.spark.core import org.apache.spark.{SparkConf, SparkContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/25 * @time : 7:19 下午 */ object TransformationScala { def getsc():SparkContext ={ val sparkConf = new SparkConf().setAppName("TransformationScala").setMaster("local") return new SparkContext(sparkConf) } def main(args: Array[String]): Unit = { map() filter() flatmap() groupByKey() reduceByKey() sortByKey() join() cogroup() } /** * 1,2,3,4,5 map() -> 10,20,30,40,50 */ def map(): Unit ={ val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) val mapValue = rdd.map(x => x * 10) mapValue.foreach(x => System.out.println(x)) } /** * 1,2,3,4,5,6,7,8,9,10 filter() -> 2,4,6,8,10 */ def filter(): Unit ={ val list = Array(1,2,3,4,5,6,7,8,9,10) val rdd = getsc().parallelize(list) val filterValue = rdd.filter(x => x % 2 == 0) filterValue.foreach(x => System.out.println(x)) } /** * hbase hadoop hive * java python flatmap() -> hbase hadoop hive java python java python * java python */ def flatmap(): Unit = { val list = Array("hbase hadoop hive", "java python", "storm spark") val rdd = getsc().parallelize(list) val flatMapValue = rdd.flatMap(x => x.split(" ")) flatMapValue.foreach(x => System.out.println(x)) } /** * class_1 90 groupByKey() -> <class_1,(90,87,98,96)> <class_2,(95,89,93)> * class_2 95 * class_1 87 * class_1 98 * class_2 89 * class_2 93 * class_1 96 */ def groupByKey(): Unit = { val list = Array( Tuple2("class_1", 90), Tuple2("class_2", 95), Tuple2("class_1", 87), Tuple2("class_1", 98), Tuple2("class_2", 89), Tuple2("class_2", 93), Tuple2("class_1", 96)) val rdd = getsc().parallelize(list) val groupByKeyValue = rdd.groupByKey() groupByKeyValue.foreach(x => { System.out.println(x._1) x._2.foreach(y => System.out.println(y)) }) } /** * <class_1,(90,87,98,96)> reduceByKey() -> <class_1,(90+87+98+96)> * <class_2,(95,89,93)> reduceByKey() -> <class_2,(95+89+93)> */ def reduceByKey(): Unit = { val list = Array( Tuple2("class_1", 90), Tuple2("class_2", 95), Tuple2("class_1", 87), Tuple2("class_1", 98), Tuple2("class_2", 89), Tuple2("class_2", 93), Tuple2("class_1", 96)) val rdd = getsc().parallelize(list) val reduceByKeyValues = rdd.reduceByKey((x,y) => x+y) reduceByKeyValues.foreach(x => { System.out.println(x._1 + " : " + x._2) }) } /** * <90,alex> * <95,lili> -> <87,cherry> <90,alex> <95,lili> * <87,cherry> */ def sortByKey(): Unit ={ val list = Array(Tuple2(90, "alex"), Tuple2(95, "lili"), Tuple2(87, "cherry"), Tuple2(98, "jack"), Tuple2(89, "jone"), Tuple2(93, "lucy"), Tuple2(96, "aliy") ) val rdd = getsc().parallelize(list) val sortByKeyValues = rdd.sortByKey(true) sortByKeyValues.foreach(x => { System.out.println(x._1 + " : " + x._2) }) } /** * 数据集一:(1,"alex") join() -> <1,<"alex",90>> * 数据集二:(1,90) */ def join(): Unit ={ val stuList = Array(Tuple2(1, "alex"), Tuple2(2, "lili"), Tuple2(3, "cherry"), Tuple2(4, "jack"), Tuple2(5, "jone"), Tuple2(6, "lucy"), Tuple2(7, "aliy")) val scoreList = Array(Tuple2(1, 90), Tuple2(2, 95), Tuple2(3, 87), Tuple2(4, 98), Tuple2(5, 89), Tuple2(6, 93), Tuple2(7, 96)) val sc = getsc() val stuRdd = sc.parallelize(stuList) val scoreRdd = sc.parallelize(scoreList) val joinValue = stuRdd.join(scoreRdd) joinValue.foreach(x => { System.out.println(x._1 + " > " + x._2._1 + " : " + x._2._2) }) } /** * 数据集一:(2,"lili") cogroup() -> <2,<"lili",(90,95,99)>> * 数据集二:(2,90)(2,95)(2,99) */ def cogroup(): Unit ={ val stuList = Array(Tuple2(1, "alex"), Tuple2(2, "lili"), Tuple2(3, "cherry"), Tuple2(4, "jack"), Tuple2(5, "jone"), Tuple2(6, "lucy"), Tuple2(7, "aliy")) val scoreList = Array(Tuple2(1, 90), Tuple2(2, 95), Tuple2(2, 95), Tuple2(2, 99), Tuple2(3, 87), Tuple2(3, 88), Tuple2(3, 89), Tuple2(4, 98), Tuple2(5, 89), Tuple2(6, 93), Tuple2(7, 96)) val sc = getsc() val stuRdd = sc.parallelize(stuList) val scoreRdd = sc.parallelize(scoreList) val cogroupValues = stuRdd.cogroup(scoreRdd) cogroupValues.foreach(x => { System.out.println(x._1 + " > " + x._2._1.toList + " : " + x._2._2.toList) }) } }