Spark核心编程与项目案例详解(一)下

简介: 笔记

四、Action算子


常见Action算子:

30.png

使用Java语言对每一种Action算子举例讲解:

package com.kfk.spark.core;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/26
 * @time : 10:42 下午
 */
public class ActionJava {
    public static JavaSparkContext getsc(){
        SparkConf sparkConf = new SparkConf().setAppName("ActionJava").setMaster("local");
        return new JavaSparkContext(sparkConf);
    }
    public static void main(String[] args) {
        reduce();
        collect();
        count();
        take();
        save();
        countByKey();
    }
    /**
     * <"class_1","alex">
     * <"class_2","jone">
     * <"class_1","lucy">                           <class_1,4>
     * <"class_1","lili">       countByKey() ->
     * <"class_2","ben">                            <class_2,3>
     * <"class_2","jack">
     * <"class_1","cherry">
     */
    private static void countByKey() {
        List list = Arrays.asList(new Tuple2<String,String>("class_1","alex"),
                new Tuple2<String,String>("class_2","jone"),
                new Tuple2<String,String>("class_1","lucy"),
                new Tuple2<String,String>("class_1","lili"),
                new Tuple2<String,String>("class_2","ben"),
                new Tuple2<String,String>("class_2","jack"),
                new Tuple2<String,String>("class_1","cherry"));
        JavaPairRDD javaPairRdd = getsc().parallelizePairs(list);
        Map<String,Integer> countByKeyValues = javaPairRdd.countByKey();
        for (Map.Entry obj : countByKeyValues.entrySet()){
            System.out.println(obj.getKey() + " : " + obj.getValue());
        }
    }
    /**
     * saveAsTextFile()
     */
    private static void save() {
        List<Integer> list = Arrays.asList(1,2,3,4,5);
        JavaRDD<Integer> javaRdd = getsc().parallelize(list);
        javaRdd.saveAsTextFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/javaRdd");
    }
    /**
     * 1,2,3,4,5    take(3) -> [1,2,3]
     */
    private static void take() {
        List<Integer> list = Arrays.asList(1,2,3,4,5);
        JavaRDD<Integer> javaRdd = getsc().parallelize(list);
        List<Integer> listValues = javaRdd.take(3);
        for (int value : listValues){
            System.out.println(value);
        }
    }
    /**
     * 1,2,3,4,5    count() -> 5
     */
    private static void count() {
        List<Integer> list = Arrays.asList(1,2,3,4,5);
        JavaRDD<Integer> javaRdd = getsc().parallelize(list);
        System.out.println(javaRdd.count());
    }
    /**
     * 1,2,3,4,5    collect() -> [1,2,3,4,5]
     */
    private static void collect() {
        List<Integer> list = Arrays.asList(1,2,3,4,5);
        JavaRDD<Integer> javaRdd = getsc().parallelize(list);
        List<Integer> collectValues = javaRdd.collect();
        for (int value : collectValues){
            System.out.println(value);
        }
    }
    /**
     * 1,2,3,4,5    reduce() -> 15
     */
    private static void reduce() {
        List<Integer> list = Arrays.asList(1,2,3,4,5);
        JavaRDD<Integer> javaRdd = getsc().parallelize(list);
        Integer reduceValues = javaRdd.reduce(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        System.out.println(reduceValues);
    }
}

使用Scala语言对每一种Action算子举例讲解:

package com.kfk.spark.core
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/26
 * @time : 10:42 下午
 */
object ActionScala {
    def getsc():SparkContext ={
        val sparkConf = new SparkConf().setAppName("TransformationScala").setMaster("local")
        return new SparkContext(sparkConf)
    }
    def main(args: Array[String]): Unit = {
        reduce()
        collect()
        count()
        take()
        save()
        countByKey()
    }
    /**
     * 1,2,3,4,5    reduce() -> 15
     */
    def reduce(): Unit = {
        val list = Array(1,2,3,4,5)
        val rdd = getsc().parallelize(list)
        val reduceValues = rdd.reduce((x,y) => x + y)
        System.out.println(reduceValues)
    }
    /**
     * 1,2,3,4,5    collect() -> [1,2,3,4,5]
     */
    def collect(): Unit ={
        val list = Array(1,2,3,4,5)
        val rdd = getsc().parallelize(list)
        val collectValue = rdd.collect()
        for (value <- collectValue){
            System.out.println(value)
        }
    }
    /**
     * 1,2,3,4,5    count() -> 5
     */
    def count(): Unit ={
        val list = Array(1,2,3,4,5)
        val rdd = getsc().parallelize(list)
        System.out.println(rdd.count())
    }
    /**
     * 1,2,3,4,5    take(3) -> [1,2,3]
     */
    def take(): Unit ={
        val list = Array(1,2,3,4,5)
        val rdd = getsc().parallelize(list)
        val takeValues = rdd.take(3)
        for (value <- takeValues){
            System.out.println(value)
        }
    }
    /**
     * saveAsTextFile()
     */
    def save(): Unit ={
        val list = Array(1,2,3,4,5)
        val rdd = getsc().parallelize(list)
        rdd.saveAsTextFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/scalaRdd")
    }
    /**
     * <"class_1","alex">
     * <"class_2","jone">
     * <"class_1","lucy">                           <class_1,4>
     * <"class_1","lili">       countByKey() ->
     * <"class_2","ben">                            <class_2,3>
     * <"class_2","jack">
     * <"class_1","cherry">
     */
    def countByKey(): Unit ={
        val list = Array(
            Tuple2("class_1", "alex"),
            Tuple2("class_2", "jack"),
            Tuple2("class_1", "jone"),
            Tuple2("class_1", "lili"),
            Tuple2("class_2", "ben"),
            Tuple2("class_2", "lucy"),
            Tuple2("class_1", "cherry"))
        val rdd = getsc().parallelize(list)
        val countByKeyValues = rdd.countByKey()
        System.out.println(countByKeyValues)
    }
}


五、RDD持久化详解


(1)持久化原理

Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。


巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。


要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何parition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。


不使用持久化

31.png

使用持久化

32.png


(2)cache和persist的区别


基于Spark 1.6.1 的源码,可以看到

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()

说明是cache()调用了persist(), 想要知道二者的不同还需要看一下persist函数:

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

可以看到persist()内部调用了persist(StorageLevel.MEMORY_ONLY),继续深入:

/**
 * Set this RDD's storage level to persist its values across operations after the first time
 * it is computed. This can only be used to assign a new storage level if the RDD does not
 * have a storage level set yet..
 */
def persist(newLevel: StorageLevel): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
  }
  sc.persistRDD(this)
  // Register the RDD with the ContextCleaner for automatic GC-based cleanup
  sc.cleaner.foreach(_.registerRDDForCleanup(this))
  storageLevel = newLevel
  this
}

可以看出来persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。


至此便可得出cache和persist的区别了:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。


(3)持久化策略

RDD持久化是可以手动选择不同的策略的。比如可以将RDD持久化在内存中、持久化到磁盘上、使用序列化的方式持久化,多持久化的数据进行多路复用。只要在调用persist()时传入对应的StorogeLevel即可。

40.png顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  ......
}

可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下

val MEMORY_ONLY = new StorageLevel(false, true, false, true)

查看其构造函数

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  ......
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  ......
}

可以看到StorageLevel类的主构造器包含了5个参数:


useDisk:使用硬盘(外存)

useMemory:使用内存

useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。

deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象

replication:备份数(在多个节点上备份)

理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。


val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)


另外还注意到有一种特殊的缓存级别

val OFF_HEAP = new StorageLevel(false, false, true, false)

使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。

if (useOffHeap) {
  require(!useDisk, "Off-heap storage level does not support using disk")
  require(!useMemory, "Off-heap storage level does not support using heap memory")
  require(!deserialized, "Off-heap storage level does not support deserialized storage")
  require(replication == 1, "Off-heap storage level does not support multiple replication")
}

(4)如何选择RDD持久化策略

Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议:


1、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作。

2、如果MEMORY_ ONLY策略,无法存储的下所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操作还是非常快,只是要消耗CPU进行反序列化。

3、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。

4、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次。


(5)测试RDD持久化

下面我将用代码来验证一下RDD持久化的优势

package com.kfk.spark.core;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import javax.swing.*;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/27
 * @time : 2:35 下午
 */
public class persistJava {
    public static JavaSparkContext getsc(){
        SparkConf sparkConf = new SparkConf().setAppName("ActionJava").setMaster("local");
        return new JavaSparkContext(sparkConf);
    }
    public static void main(String[] args) {
        JavaRDD lines = getsc().textFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/2015082818").cache();
        // 第一次开始时间
        long begin = System.currentTimeMillis();
        System.out.println("行数:"+lines.count());
        // 第一次总时间
        System.out.println("第一次总时间:"+(System.currentTimeMillis() - begin));
        // 第二次开始时间
        long begin1 = System.currentTimeMillis();
        System.out.println("行数:"+lines.count());
        // 第二次总时间
        System.out.println("第二次总时间:"+(System.currentTimeMillis() - begin1));
    }
}

不使用rdd持久化:

行数:64972
第一次总时间:1085


行数:64972
第二次总时间:191

使用rdd持久化:

行数:64972
第一次总时间:986

行数:64972
第二次总时间:23

测试的话尽量采用数据量大一些,不然测试结果会看不出效果,根据上面的测试结果,我们可以看出使用rdd持久化和不使用rdd持久化差距很明显。

相关文章
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
75 5
|
2月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
45 4
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
56 3
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
73 0
|
2月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
49 4
|
2月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
49 1
|
2月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
47 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
2月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
111 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
61 0
|
2月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
55 0