Spark工作原理及基础概念(超详细!)下

简介: 笔记

三、Spark 运行模式及集群角色


(1)Spark运行模式

8.png

(2)Spark集群角色

下图是spark的集群角色图,主要有集群管理节点cluster manager,工作节点worker,执行器executor,驱动器driver和应用程序application 五部分组成,下面详细说明每部分的特点。


10.png

(1)Cluster Manager

集群管理器,它存在于Master进程中,主要用来对应用程序申请的资源进行管理,根据其部署模式的不同,可以分为local,standalone,yarn,mesos等模式。


(2)worker

worker是spark的工作节点,用于执行任务的提交,主要工作职责有下面四点:


worker节点通过注册机向cluster manager汇报自身的cpu,内存等信息。

worker 节点在spark master作用下创建并启用executor,executor是真正的计算单元。

spark master将任务Task分配给worker节点上的executor并执行运用。

worker节点同步资源信息和executor状态信息给cluster manager。

11.png

在yarn 模式下运行worker节点一般指的是NodeManager节点,standalone模式下运行一般指的是slave节点。


(3)executor

executor 是真正执行计算任务的组件,它是application运行在worker上的一个进程。这个进程负责Task的运行,它能够将数据保存在内存或磁盘存储中,也能够将结果数据返回给Driver。


(4)Application

application是Spark API 编程的应用程序,它包括实现Driver功能的代码和在程序中各个executor上要执行的代码,一个application由多个job组成。其中应用程序的入口为用户所定义的main方法。


(5)Driver

驱动器节点,它是一个运行Application中main函数并创建SparkContext的进程。application通过Driver 和Cluster Manager及executor进行通讯。它可以运行在application节点上,也可以由application提交给Cluster Manager,再由Cluster Manager安排worker进行运行。


Driver节点也负责提交Job,并将Job转化为Task,在各个Executor进程间协调Task的调度。


(6)sparkContext

sparkContext是整个spark应用程序最关键的一个对象,是Spark所有功能的主要入口点。核心作用是初始化spark应用程序所需要的组件,同时还负责向master程序进行注册等。


(3)Spark其他核心概念

(1)RDD

它是Spark中最重要的一个概念,是弹性分布式数据集,是一种容错的、可以被并行操作的元素集合,是Spark对所有数据处理的一种基本抽象。可以通过一系列的算子对rdd进行操作,主要分为Transformation和Action两种操作。


‍‍‍‍‍Transformation(转换):是对已有的RDD进行换行生成新的RDD,对于转换过程采用惰性计算机制,不会立即计算出结果。常用的方法有map,filter,flatmap等。

Action(执行):对已有对RDD对数据执行计算产生结果,并将结果返回Driver或者写入到外部存储中。常用到方法有reduce,collect,saveAsTextFile等。

12.png


(2)DAG

DAG是一个有向无环图,在Spark中, 使用 DAG 来描述我们的计算逻辑。主要分为DAG Scheduler 和Task Scheduler。

13.png

(3)DAG Scheduler

DAG Scheduler 是面向stage的高层级的调度器,DAG Scheduler把DAG拆分为多个Task,每组Task都是一个stage,解析时是以shuffle为边界进行反向构建的,每当遇见一个shuffle,spark就会产生一个新的stage,接着以TaskSet的形式提交给底层的调度器(task scheduler),每个stage封装成一个TaskSet。DAG Scheduler需要记录RDD被存入磁盘物化等动作,同时会需要Task寻找最优等调度逻辑,以及监控因shuffle跨节点输出导致的失败。

14.png


(4)Task Scheduler

Task Scheduler 负责每一个具体任务的执行。它的主要职责包括


任务集的调度管理

状态结果跟踪

物理资源调度管理

任务执行

获取结果

(5)Job

job是有多个stage构建的并行的计算任务,job是由spark的action操作来触发的,在spark中一个job包含多个RDD以及作用在RDD的各种操作算子。

15.png


(6)stage

DAG Scheduler会把DAG切割成多个相互依赖的Stage,划分Stage的一个依据是RDD间的宽窄依赖。


在对Job中的所有操作划分Stage时,一般会按照倒序进行,即从Action开始,遇到窄依赖操作,则划分到同一个执行阶段,遇到宽依赖操作,则划分一个新的执行阶段,且新的阶段为之前阶段的parent,然后依次类推递归执行。


child Stage需要等待所有的parent Stage执行完之后才可以执行,这时Stage之间根据依赖关系构成了一个大粒度的DAG。在一个Stage内,所有的操作以串行的Pipeline的方式,由一组Task完成计算。


(7)TaskSet Task

TaskSet 可以理解为一种任务,对应一个stage,是Task组成的任务集。一个TaskSet中的所有Task没有shuffle依赖可以并行计算。


Task是spark中最独立的计算单元,由Driver Manager发送到executer执行,通常情况一个task处理spark RDD一个partition。Task分为ShuffleMapTask和ResultTask两种,位于最后一个Stage的Task为ResultTask,其他阶段的属于ShuffleMapTask。


四、Spark作业运行流程


(1)Spark作业运行流程

spark应用程序以进程集合为单位在分布式集群上运行,通过driver程序的main方法创建sparkContext的对象与集群进行交互。具体运行流程如下:


sparkContext向cluster Manager申请CPU,内存等计算资源。

cluster Manager分配应用程序执行所需要的资源,在worker节点创建executor。

sparkContext将程序代码和task任务发送到executor上进行执行,代码可以是编译成的jar包或者python文件等。接着sparkContext会收集结果到Driver端。


16.png

(2) Spark RDD迭代过程

sparkContext创建RDD对象,计算RDD间的依赖关系,并组成一个DAG有向无环图。

DAGScheduler将DAG划分为多个stage,并将stage对应的TaskSet提交到集群的管理中心,stage的划分依据是RDD中的宽窄依赖,spark遇见宽依赖就会划分为一个stage,每个stage中包含来一个或多个task任务,避免多个stage之间消息传递产生的系统开销。

taskScheduler 通过集群管理中心为每一个task申请资源并将task提交到worker的节点上进行执行。

worker上的executor执行具体的任务。


17.png

(3)Yarn资源管理器介绍

spark 程序一般是运行在集群上的,spark on yarn是工作或生产上用的非常多的一种运行模式。


没有yarn模式前,每个分布式框架都要跑在一个集群上面,比如说Hadoop要跑在一个集群上,Spark用集群的时候跑在standalone上。这样的话整个集群的资源的利用率低,且管理起来比较麻烦。


yarn是分布式资源管理和任务管理管理,主要由ResourceManager,NodeManager和ApplicationMaster三个模块组成。


18.png

ResourceManager 主要负责集群的资源管理,监控和分配。对于所有的应用它有绝对的控制权和资源管理权限。


NodeManager 负责节点的维护,执行和监控task运行状况。会通过心跳的方式向ResourceManager汇报自己的资源使用情况。


yarn资源管理器的每个节点都运行着一个NodeManager,是ResourceManager的代理。如果主节点的ResourceManager宕机后,会连接ResourceManager的备用节点。


ApplicationMaster 负责具体应用程序的调度和资源的协调,它会与ResourceManager协商进行资源申请。ResourceManager以container容器的形式将资源分配给application进行运行。同时负责任务的启停。


container 是资源的抽象,它封装着每个节点上的资源信息(cpu,内存,磁盘,网络等),yarn将任务分配到container上运行,同时该任务只能使用container描述的资源,达到各个任务间资源的隔离。


(4)Spark程序在Yarn上执行流程

spark on yarn分为两种模式yarn-client模式,和yarn—cluster模式,一般线上采用的是yarn-cluster模式。


(1)yarn-client模式

driver在客户端本地执行,这种模式可以使得spark application和客户端进行交互,因为driver在客户端可以通过webUI访问driver的状态。同时Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加。


(2)yarn-cluster模式

Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台NodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。


下图是yarn-cluster运行模式:

19.png


client 向yarn提交应用程序,包含ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等。

ApplicationMaster程序启动ApplicationMaster的命令、需要在Executor中运行的程序等。

ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态。

ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,启动Task。

Task向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。

应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。


五、基于WordCount程序开发


下面使用Java和Scala语言编写WordCount程序

20.png

(1)Java开发WordCount程序

package com.kfk.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/22
 * @time : 10:19 下午
 */
public class WordCountJava {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaRDD lines = sc.textFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/wordcount.txt");
        JavaRDD words = lines.flatMap(new FlatMapFunction<String,String>() {
            public Iterator call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        JavaPairRDD word = words.mapToPair(new PairFunction<String,String,Integer>() {
            public Tuple2 call(String word) throws Exception {
                return new Tuple2(word,1);
            }
        });
        JavaPairRDD wordcount = word.reduceByKey(new Function2<Integer,Integer,Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+ v2;
            }
        });
        wordcount.foreach(new VoidFunction<Tuple2<String,Integer>>() {
            public void call(Tuple2<String,Integer> o) throws Exception {
                System.out.println(o._1 + " : " + o._2);
            }
        });
    }
}

运行结果:

spark : 1
hive : 3
hadoop : 3
python : 1
flink : 2
java : 5
storm : 1
hbase : 1

(2)Scala开发WordCount程序

package com.kfk.spark
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/22
 * @time : 10:48 下午
 */
object WordCountScala {
    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("wordCountApp").setMaster("local")
        val sc = new SparkContext(sparkConf)
        val lines = sc.textFile("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/wordcount.txt")
        // val wordcount = lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((x,y) => (x+y)).foreach((_wordcount => println(_wordcount._1 + " : " + _wordcount._2))
        val words = lines.flatMap(line => line.split(" "))
        val word = words.map(word => (word,1))
        val wordcount = word.reduceByKey((x,y) => (x+y))
        // println(wordcount.collect().toList)
        wordcount.foreach(_wordcount => println(_wordcount._1 + " : " + _wordcount._2))
    }
}

运行结果:

spark : 1
hive : 3
hadoop : 3
python : 1
flink : 2
java : 5
storm : 1
hbase : 1

(3)程序打包提交Spark集群测试

bin/spark-submit --class com.kfk.spark.WordCountJava --master local[2] /opt/jars/wordcountjava.jar 

运行结果:

spark : 1
hive : 3
hadoop : 3
python : 1
flink : 2
java : 5
storm : 1
hbase : 1


相关文章
|
6月前
|
移动开发 分布式计算 Spark
Spark的几种去重的原理分析
Spark的几种去重的原理分析
132 0
|
6月前
|
机器学习/深度学习 SQL 分布式计算
Spark核心原理与应用场景解析:面试经验与必备知识点解析
本文深入探讨Spark核心原理(RDD、DAG、内存计算、容错机制)和生态系统(Spark SQL、MLlib、Streaming),并分析其在大规模数据处理、机器学习及实时流处理中的应用。通过代码示例展示DataFrame操作,帮助读者准备面试,同时强调结合个人经验、行业趋势和技术发展以展现全面的技术实力。
562 0
|
6月前
|
存储 分布式计算 数据处理
bigdata-35-Spark工作原理
bigdata-35-Spark工作原理
43 0
|
1月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
39 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
1月前
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
50 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
40 0
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
45 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
80 0
|
3月前
|
存储 分布式计算 监控
|
6月前
|
SQL 存储 分布式计算
spark执行sql的原理是什么
spark执行sql的原理是什么
126 1
下一篇
无影云桌面