四、Action算子
常见Action算子:
使用Java语言对每一种Action算子举例讲解:
package com.kfk.spark.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import scala.Tuple2; import java.util.Arrays; import java.util.List; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/26 * @time : 10:42 下午 */ public class ActionJava { public static JavaSparkContext getsc(){ SparkConf sparkConf = new SparkConf().setAppName("ActionJava").setMaster("local"); return new JavaSparkContext(sparkConf); } public static void main(String[] args) { reduce(); collect(); count(); take(); save(); countByKey(); } /** * <"class_1","alex"> * <"class_2","jone"> * <"class_1","lucy"> <class_1,4> * <"class_1","lili"> countByKey() -> * <"class_2","ben"> <class_2,3> * <"class_2","jack"> * <"class_1","cherry"> */ private static void countByKey() { List list = Arrays.asList(new Tuple2<String,String>("class_1","alex"), new Tuple2<String,String>("class_2","jone"), new Tuple2<String,String>("class_1","lucy"), new Tuple2<String,String>("class_1","lili"), new Tuple2<String,String>("class_2","ben"), new Tuple2<String,String>("class_2","jack"), new Tuple2<String,String>("class_1","cherry")); JavaPairRDD javaPairRdd = getsc().parallelizePairs(list); Map<String,Integer> countByKeyValues = javaPairRdd.countByKey(); for (Map.Entry obj : countByKeyValues.entrySet()){ System.out.println(obj.getKey() + " : " + obj.getValue()); } } /** * saveAsTextFile() */ private static void save() { List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); javaRdd.saveAsTextFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/javaRdd"); } /** * 1,2,3,4,5 take(3) -> [1,2,3] */ private static void take() { List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); List<Integer> listValues = javaRdd.take(3); for (int value : listValues){ System.out.println(value); } } /** * 1,2,3,4,5 count() -> 5 */ private static void count() { List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); System.out.println(javaRdd.count()); } /** * 1,2,3,4,5 collect() -> [1,2,3,4,5] */ private static void collect() { List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); List<Integer> collectValues = javaRdd.collect(); for (int value : collectValues){ System.out.println(value); } } /** * 1,2,3,4,5 reduce() -> 15 */ private static void reduce() { List<Integer> list = Arrays.asList(1,2,3,4,5); JavaRDD<Integer> javaRdd = getsc().parallelize(list); Integer reduceValues = javaRdd.reduce(new Function2<Integer, Integer, Integer>() { public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); System.out.println(reduceValues); } }
使用Scala语言对每一种Action算子举例讲解:
package com.kfk.spark.core import org.apache.spark.{SparkConf, SparkContext} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/26 * @time : 10:42 下午 */ object ActionScala { def getsc():SparkContext ={ val sparkConf = new SparkConf().setAppName("TransformationScala").setMaster("local") return new SparkContext(sparkConf) } def main(args: Array[String]): Unit = { reduce() collect() count() take() save() countByKey() } /** * 1,2,3,4,5 reduce() -> 15 */ def reduce(): Unit = { val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) val reduceValues = rdd.reduce((x,y) => x + y) System.out.println(reduceValues) } /** * 1,2,3,4,5 collect() -> [1,2,3,4,5] */ def collect(): Unit ={ val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) val collectValue = rdd.collect() for (value <- collectValue){ System.out.println(value) } } /** * 1,2,3,4,5 count() -> 5 */ def count(): Unit ={ val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) System.out.println(rdd.count()) } /** * 1,2,3,4,5 take(3) -> [1,2,3] */ def take(): Unit ={ val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) val takeValues = rdd.take(3) for (value <- takeValues){ System.out.println(value) } } /** * saveAsTextFile() */ def save(): Unit ={ val list = Array(1,2,3,4,5) val rdd = getsc().parallelize(list) rdd.saveAsTextFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/scalaRdd") } /** * <"class_1","alex"> * <"class_2","jone"> * <"class_1","lucy"> <class_1,4> * <"class_1","lili"> countByKey() -> * <"class_2","ben"> <class_2,3> * <"class_2","jack"> * <"class_1","cherry"> */ def countByKey(): Unit ={ val list = Array( Tuple2("class_1", "alex"), Tuple2("class_2", "jack"), Tuple2("class_1", "jone"), Tuple2("class_1", "lili"), Tuple2("class_2", "ben"), Tuple2("class_2", "lucy"), Tuple2("class_1", "cherry")) val rdd = getsc().parallelize(list) val countByKeyValues = rdd.countByKey() System.out.println(countByKeyValues) } }
五、RDD持久化详解
(1)持久化原理
Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。
巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。
要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何parition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
不使用持久化
使用持久化
(2)cache和persist的区别
基于Spark 1.6.1 的源码,可以看到
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()
说明是cache()调用了persist(), 想要知道二者的不同还需要看一下persist函数:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
可以看到persist()内部调用了persist(StorageLevel.MEMORY_ONLY),继续深入:
/** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet.. */ def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this }
可以看出来persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。
至此便可得出cache和persist的区别了:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
(3)持久化策略
RDD持久化是可以手动选择不同的策略的。比如可以将RDD持久化在内存中、持久化到磁盘上、使用序列化的方式持久化,多持久化的数据进行多路复用。只要在调用persist()时传入对应的StorogeLevel即可。
顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) ...... }
可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
查看其构造函数
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable { ...... def useDisk: Boolean = _useDisk def useMemory: Boolean = _useMemory def useOffHeap: Boolean = _useOffHeap def deserialized: Boolean = _deserialized def replication: Int = _replication ...... }
可以看到StorageLevel类的主构造器包含了5个参数:
useDisk:使用硬盘(外存)
useMemory:使用内存
useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
replication:备份数(在多个节点上备份)
理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
另外还注意到有一种特殊的缓存级别
val OFF_HEAP = new StorageLevel(false, false, true, false)
使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。
if (useOffHeap) { require(!useDisk, "Off-heap storage level does not support using disk") require(!useMemory, "Off-heap storage level does not support using heap memory") require(!deserialized, "Off-heap storage level does not support deserialized storage") require(replication == 1, "Off-heap storage level does not support multiple replication") }
(4)如何选择RDD持久化策略
Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议:
1、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作。
2、如果MEMORY_ ONLY策略,无法存储的下所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操作还是非常快,只是要消耗CPU进行反序列化。
3、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。
4、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次。
(5)测试RDD持久化
下面我将用代码来验证一下RDD持久化的优势
package com.kfk.spark.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import javax.swing.*; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/11/27 * @time : 2:35 下午 */ public class persistJava { public static JavaSparkContext getsc(){ SparkConf sparkConf = new SparkConf().setAppName("ActionJava").setMaster("local"); return new JavaSparkContext(sparkConf); } public static void main(String[] args) { JavaRDD lines = getsc().textFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/2015082818").cache(); // 第一次开始时间 long begin = System.currentTimeMillis(); System.out.println("行数:"+lines.count()); // 第一次总时间 System.out.println("第一次总时间:"+(System.currentTimeMillis() - begin)); // 第二次开始时间 long begin1 = System.currentTimeMillis(); System.out.println("行数:"+lines.count()); // 第二次总时间 System.out.println("第二次总时间:"+(System.currentTimeMillis() - begin1)); } }
不使用rdd持久化:
行数:64972 第一次总时间:1085
行数:64972 第二次总时间:191
使用rdd持久化:
行数:64972 第一次总时间:986
行数:64972 第二次总时间:23
测试的话尽量采用数据量大一些,不然测试结果会看不出效果,根据上面的测试结果,我们可以看出使用rdd持久化和不使用rdd持久化差距很明显。