spark2.1.0之配置与源码分析

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80468207       任何优秀的软件或服务都会提供一些配置参数,这些配置参数有些是内置的,有些则是可以由用户配置的。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80468207

      任何优秀的软件或服务都会提供一些配置参数,这些配置参数有些是内置的,有些则是可以由用户配置的。对于熟悉Java的开发人员来说,对JVM进行性能调优是一个经常需要面对的工作,这个过程常常伴随着各种JVM参数的调整与测试。之所以将这些参数交给具体的开发人员去调整,是因为软件或者服务的提供者也无法保证给定的默认参数是最符合用户应用场景与软硬件环境的。一个简单的例子:当用户的QPS发生变化时,对于Web服务的JVM来说也应当相应调整内存的大小或限制。

    Spark作为一款优秀的计算框架,也配备了各种各样的系统配置参数(例如:spark.master,spark.app.name,spark.driver.memory,spark.executor.memory等)。通过这些配置参数可以定义应用的名称、使用的部署模式、调度模式、executor数量、executor的内核数、driver或executor的内存大小、采用的内存模型等。

      SparkConf是Spark的配置类,这个类在Spark的历史版本中已经存在很久了,Spark中的每一个组件都直接或者间接的使用着它所存储的属性,这些属性都存储在如下的数据结构中:

  private val settings = new ConcurrentHashMap[String, String]()

由以上代码的泛型[1] 可以看出Spark的所有配置,无论是key还是value都是String类型。Spark的配置通过以下三种方式获取:

  • 来源于系统参数(即使用System.getProperties获取的属性)中以spark.作为前缀的那部分属性;
  • 使用SparkConf的API进行设置;
  • 从其它SparkConf中克隆。

下面将具体说明这三种方式的实现。

系统属性中的配置

      在SparkConf中有一个Boolean类型的构造器属性loadDefaults,当loadDefaults为true时将会从系统属性中加载Spark配置,代码如下:

  if (loadDefaults) {
    loadFromSystemProperties(false)
  }

  private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
    // Load any spark.* system properties
    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
      set(key, value, silent)
    }
    this
  }

以上代码调用了Utils工具类[2] 的getSystemProperties方法,其作用为获取系统的键值对属性。loadFromSystemProperties方法在获取了系统属性后,使用Scala守卫过滤出其中以“spark.”字符串为前缀的key和value并且调用set方法(见代码清单3-1)最终设置到settings中。

代码清单3-1    SparkConf中set方法的实现

  private[spark] def set(key:String, value: String, silent: Boolean): SparkConf = {

    if (key == null) {

      throw newNullPointerException("nullkey")

    }

    if (value == null) {

      throw newNullPointerException("nullvalue for " + key)

    }

    if (!silent) {

     logDeprecationWarning(key)

    }

    settings.put(key,value)

    this

  }

使用SparkConf配置的API

      给SparkConf添加配置的一种常见方式是使用SparkConf中提供的API。其中有些API最终实际调用了set的重载方法,见代码清单3-2。

代码清单3-2    SparkConf中重载的set方法

  def set(key:String, value: String): SparkConf = {

    set(key,value, false)

  }

可以看到代码清单3-2中的set方法实际也是调用了代码清单3-1中的set方法。

SparkConf中的setMaster、setAppName、setJars、setExecutorEnv、setSparkHome、setAll等方法最终都是通过代码清单3-2中的set方法完成Spark配置的,本书以其中最为常用的setMaster和setAppName为例,用代码清单3-3和代码清单3-4来展示他们的实现。

代码清单3-3    设置Spark的部署模式的配置方法setMaster

  def setMaster(master: String): SparkConf = {

    set("spark.master", master)

  }

代码清单3-4    设置Spark的应用名称的配置方法setAppName

  def setAppName(name: String): SparkConf = {

    set("spark.app.name", name)

  }

克隆SparkConf配置

      有些情况下,同一个SparkConf实例中的配置信息需要被Spark中的多个组件共用,例如:组件A中存在一个SparkConf实例a,组件B中也很需要实例a中的配置信息,这时该如何处理?我们往往首先想到的方法是将SparkConf实例定义为全局变量或者通过参数传递给其它组件,但是这会引入并发问题。虽然settings是线程安全的ConcurrentHashMap类,而且ConcurrentHashMap也被证明是高并发下性能表现不错的数据结构,但是只要存在并发就一定会有性能的损失问题。我们可以新建一个SparkConf实例b,并将a中的配置信息全部拷贝到b中,这种方式显然不是最优雅的,复制代码会散落在程序的各个角落。现在是时候阅读下SparkConf的构造器了,代码如下所示:

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging withSerializable {

  //省略无关代码

  def this() = this(true)

SparkConf继承了Cloneable特质并实现了clone方法,clone方法(见代码清单3-5)的实现跟我们所讨论的方式是一样的,并且通过Cloneable特质提高了代码的可复用性。

代码清单3-5    克隆SparkConf配置

  override def clone: SparkConf ={

    val cloned = new SparkConf(false)

    settings.entrySet().asScala.foreach { e =>

      cloned.set(e.getKey(),e.getValue(), true)

    }

    cloned

  }

这样我们就可以在任何想要使用SparkConf的地方使用克隆方式来优雅的编程了。



[1] Scala泛型的语法采用了方括号,而非Java中的尖括号。

[2] Utils是Spark中最常用的工具类,其每个方法实现的功能都比较单一,理解起来比较简单,所以本书只将相关的介绍放入附录A中单独进行介绍。如果不去阅读Utils中各个方法的实现,对阅读本书主干内容也不会有太多影响。如果是Spark的初学者或者是刚刚接触Scala语言的开发者还是建议阅读。

关于《Spark内核设计的艺术 架构设计与实现

经过近一年的准备,《 Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:


纸质版售卖链接如下:
相关文章
|
7月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之spark3.1.1通过resource目录下的conf文件配置,报错如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
7月前
|
分布式计算 Hadoop Scala
Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
【4月更文挑战第13天】Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
293 0
|
分布式计算 大数据 Shell
Spark 环境搭建_配置 HistoryServer|学习笔记
快速学习 Spark 环境搭建_配置 HistoryServer
Spark 环境搭建_配置 HistoryServer|学习笔记
|
分布式计算 Java Scala
配置spark,并在idea中搭建项目
配置spark,并在idea中搭建项目
151 0
|
SQL 分布式计算 Hadoop
配置Hive使用Spark执行引擎
在Hive中,可以通过配置来指定使用不同的执行引擎。Hive执行引擎包括:默认MR、tez、spark。
290 0
|
SQL 分布式计算 Hadoop
Spark SQL CLI配置
Spark SQL CLI配置
Spark SQL CLI配置
|
分布式计算 大数据 调度
Spark 集群搭建_高可用配置|学习笔记
快速学习 Spark 集群搭建_高可用配置
Spark 集群搭建_高可用配置|学习笔记
|
分布式计算 Hadoop 大数据
分布式集群环境之Spark的安装与配置(Centos7)
分布式集群环境之Spark的安装与配置(Centos7)
353 0
分布式集群环境之Spark的安装与配置(Centos7)
|
分布式计算 Hadoop Linux
Spark集群搭建记录 | 云计算[CentOS7] | Spark配置
写在前面 step1 Spark下载 step2 修改环境变量 ~/.bashrc /etc/profile step3 配置Master-文件修改 slaves spark-env.sh step4 配置slave节点 step5 集群启动 step6 web浏览器状态查看 step7 配置开机启动(可选)
277 0
Spark集群搭建记录 | 云计算[CentOS7] | Spark配置
|
SQL 分布式计算 数据管理
spark SQL配置连接Hive Metastore 3.1.2
Hive Metastore作为元数据管理中心,支持多种计算引擎的读取操作,例如Flink、Presto、Spark等。本文讲述通过spark SQL配置连接Hive Metastore,并以3.1.2版本为例。
spark SQL配置连接Hive Metastore 3.1.2