Spark编程模型(博主推荐)

简介:

一、Spark编程模型(上)

   从Hadoop MR到Spark

    回顾hadoop—mapreduce计算过程

 

 

 

 

 

 

   MR   VS   Spark

          

 

 

 

 

Spark编程模型

  核心概念

      

注意:对比mr里的概念来学习。

   这些概念,大家一定要好好理解!!!

 

 

 

  Spark Application的组成

          

   其中,Driver Program是运行main函数并且新建SparkContext的程序

        Cluster Manager是在集群上获取资源的外部服务(例如: standalonde、Mesos、Yarn)

        Worker Node是集群中任何运行应用代码的节点

       Executor是在一个worker node上为某应用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或磁盘上。每个应用都有各自独立的executors。

        Task是被送到某个executor上的工作单元

 

 

 

 

 

  Spark应用程序的组成

    (1)Driver

    (2)Executor

  注意:对照helloworld来思考

 

 

 

 

  Spark Application基本概念

    

 

 

 

 

 

 

 

Spark Application编程模型

  Spark 应用程序编程模型

– Driver Program ( SparkContext )

– Executor ( RDD 操作)

  (1)输入Base-> RDD

  (2)Transformation RDD->RDD

  (3)Action RDD->driver or Base

  (4)缓存 Persist or cache()

– 共享变量

  (1)broadcast variables(广播变量)

  (2)accumulators(累加器)

 

 

 

  回顾Spark Hello World

    

 

 

 

 

 

 

初识RDD

  什么是RDD

定义:Resilient distributed datasets (RDD), an efficient, general-purpose and fault-tolerant abstraction for sharing data in cluster applications.

复制代码
RDD 是只读的。
RDD 是分区记录的集合。
RDD 是容错的。--- lineage
RDD 是高效的。
RDD 不需要物化。---物化:进行实际的变换并最终写入稳定的存储器上
RDD 可以缓存的。---课指定缓存级别
复制代码

 

  RDD是spark的核心,也是整个spark的架构基础,RDD是弹性分布式集合(Resilient Distributed Datasets)的简称,是分布式只读且已分区集合对象。这些集合是弹性的,如果数据集一部分丢失,则可以对它们进行重建。

 

 

  RDD接口

    

 

 

 

 

 

  RDD的本质特征

      

 

 

 

  RDD--partitions

Spark中将1~100的数组转换为rdd

      

 通过第15行的size获得rdd的partition的个数,此处创建rdd显式指定定分区个数2,默认数值是这个程序所分配到的资源的cpu核的个数

 

 

 

   RDD-preferredLocations

返回此RDD的一个partition的数据块信息,如果一个数据块(block)有多个备份在返回所有备份的location地址信息

主机ip或域名

作用:spark在进行任务调度室尽可能根据block的地址做到本地计算

 

 

  RDD-dependencies

RDD之间的依赖关系分为两类:

  (1)窄依赖

    每个父RDD的分区都至多被一个子RDD的分区使用,即为OneToOneDependecies;

  (2)宽依赖

    多个子RDD的分区依赖一个父RDD的分区,即为ShuffleDependency 。例如,map操作是一种窄依赖,而join操作是一种宽依赖(除非父RDD已经基于Hash策略被划分过了,co-partitioned)

        

 

 

  (1)窄依赖相比宽依赖更高效资源消耗更少

  (2)允许在单个集群节点上流水线式执行,这个节点可以计算所有父级分区。例如,可以逐个元素地依次执行filter操作和map操作。

  (3)相反,宽依赖需要所有的父RDD数据可用并且数据已经通过类MapReduce的操作shuffle完成。

  (4) 在窄依赖中,节点失败后的恢复更加高效。因为只有丢失的父级分区需要重新计算,并且这些丢失的父级分区可以并行地在不同节点上重新计算。

  (5)与此相反,在宽依赖的继承关系中,单个失败的节点可能导致一个RDD的所有先祖RDD中的一些分区丢失,导致计算的重新执行。

 

 

 

  RDD-compute

  分区计算

    Spark对RDD的计算是以partition为最小单位的,并且都是对迭代器进行复合,不需要保存每次的计算结果

 

  RDD- partitioner

  分区函数:目前spark中提供两种分区函数:

    (1)HashPatitioner(哈希分区)

    (2)RangePatitioner(区域分区)

  且partitioner只存在于(K,V)类型的RDD中,rdd本身决定了分区的数量。

 

 

 

  RDD- lineage

复制代码
val lines = sc.textFile("hdfs://...")
// transformed RDDs
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split("\t")).map(r => r(1))
messages.cache()
// action 1
messages.filter(_.contains("mysql")).count()
// action 2
messages.filter(_.contains("php")).count()
复制代码

 

 

 

  RDD经过trans或action后产生一个新的RDD,RDD之间的通过lineage来表达依赖关系,lineage是rdd容错的重要机制,rdd转换后的分区可能在转换前分区的节点内存中

      

 

 

  典型RDD的特征

    

 

 

 

 

   不同角度看RDD

         

 

  

 

   Scheduler Optimizations

               

 

 

 

 

 

 

 

 

 

 

 

 Spark编程模型(中)

 创建RDD

  方式一:从集合创建RDD

  (1)makeRDD

  (2)Parallelize

注意:makeRDD可以指定每个分区perferredLocations参数,而parallelize则没有。

 

  方式二:读取外部存储创建RDD

  Spark与Hadoop完全兼容,所以对Hadoop所支持的文件类型或者数据库类型,Spark同样支持。

  (1)多文件格式支持:

      

 

   (2)多文件系统支持:

      1)本地文件系统

      2)S3

      3)HDFS

 

  (3)数据库

      1)JdbcRDD

      2)spark-cassandra-connector(datastax/spark-cassandra-connector)

      3)org.apache.hadoop.hbase.mapreduce.TableInputFormat(SparkContext.newAPIHadoopRDD)

      4)Elasticsearch-Hadoop

 

 

 

transformation操作

  惰性求值

    (1)RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前Spark 不会开始计算

    (2)读取数据到RDD的操作也是惰性的

    (3)惰性求值的好处:

      a.Spark 使用惰性求值可以把一些操作合并到一起来减少计算数据的步骤。在类似 Hadoop MapReduce 的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少MapReduce 的周期数。

      b.而在Spark 中,写出一个非常复杂的映射并不见得能比使用很多简单的连续操作获得好很多的性能。因此,用户可以用更小的操作来组织他们的程序,这样也使这些操作更容易管理。

 

 

  转换操作

    RDD 的转化操作是返回新RDD 的操作

    我们不应该把RDD 看作存放着特定数据的数据集,而最好把每个RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。

    基本转换操作1

    

 

     基本转换操作2

 

 

 

   控制操作

    (1)persist操作,可以将RDD持久化到不同层次的存储介质,以便后续操作重复使用。

      1)cache:RDD[T]

      2)persist:RDD[T]

      3)Persist(level:StorageLevel):RDD[T]

    (2)checkpoint

      将RDD持久化到HDFS中,与persist操作不同的是checkpoint会切断此RDD之前的依赖关系,而persist依然保留RDD的依赖关系。

   注意:控制操作的细节会在后续博客专门讲解

 

 

 

     action操作

         

 

 

 

 

 

 

Spark编程模型(下)

  什么是Pair RDD

    (1)包含键值对类型的RDD被称作Pair RDD

    (2)Pair RDD通常用来进行聚合计算

    (3)Pair RDD通常由普通RDD做ETL转换而来

 

 

 

  创建Pair RDD

Python语言
pairs = lines.map(lambda x: (x.split(" ")[0], x))

 

 scala语言

val pairs = lines.map(x => (x.split(" ")(0), x))

 

Java语言

复制代码
PairFunction keyData =
    new PairFunction() {
    public Tuple2 call(String x) {
        return new Tuple2(x.split(" ")[0], x);
    }
};
JavaPairRDD pairs = lines.mapToPair(keyData);
复制代码

 

  

  

 

 

Pair RDD的transformation操作

  Pair RDD转换操作1

  Pair RDD 可以使用所有标准RDD 上转化操作,还提供了特有的转换操作。

          

 

 

 

  Pair RDD转换操作2

 

 

 

 

 

 

Pair RDD的action操作

  Pair RDD转换操作1

  所有基础RDD 支持的行动操作也都在pair RDD 上可用

 

 

 

   

 

 

 

Pair RDD的分区控制

  Pair RDD的分区控制

  (1) Spark 中所有的键值对RDD 都可以进行分区控制---自定义分区

  (2)自定义分区的好处:

     1)避免数据倾斜

    2)控制task并行度

 

  自定义分区方式

复制代码
class DomainNamePartitioner(numParts: Int) extends Partitioner {
    override def numPartitions: Int = numParts
    override def getPartition(key: Any): Int = {
        val domain = new Java.net.URL(key.toString).getHost()
        val code = (domain.hashCode % numPartitions)
        if(code < 0) {
            code + numPartitions // 使其非负
        }else{
            code
        }
    }
    // 用来让Spark区分分区函数对象的Java equals方法
    override def equals(other: Any): Boolean = other match {
        case dnp: DomainNamePartitioner =>
            dnp.numPartitions == numPartitions
        case _ =>
            false
    }

相关文章
|
2月前
|
SQL 分布式计算 大数据
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
28 0
|
23天前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
35 2
|
2月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
40 0
|
3天前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
19 1
|
1月前
|
存储 分布式计算 Apache
Spark编程范例:Word Count示例解析
Spark编程范例:Word Count示例解析
|
6月前
|
存储 分布式计算 并行计算
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
2月前
|
分布式计算 数据处理 Spark
Spark【RDD编程(四)综合案例】
Spark【RDD编程(四)综合案例】
|
2月前
|
分布式计算 Hadoop 数据处理
Spark【RDD编程(三)键值对RDD】
Spark【RDD编程(三)键值对RDD】
|
2月前
|
缓存 分布式计算 Java
Spark【RDD编程(二)RDD编程基础】
Spark【RDD编程(二)RDD编程基础】
|
2月前
|
存储 分布式计算 Hadoop
Spark 【RDD编程(一)RDD编程基础】
Spark 【RDD编程(一)RDD编程基础】

相关产品

  • 云迁移中心