spark2.1.0之配置与源码分析

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80468207       任何优秀的软件或服务都会提供一些配置参数,这些配置参数有些是内置的,有些则是可以由用户配置的。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80468207

      任何优秀的软件或服务都会提供一些配置参数,这些配置参数有些是内置的,有些则是可以由用户配置的。对于熟悉Java的开发人员来说,对JVM进行性能调优是一个经常需要面对的工作,这个过程常常伴随着各种JVM参数的调整与测试。之所以将这些参数交给具体的开发人员去调整,是因为软件或者服务的提供者也无法保证给定的默认参数是最符合用户应用场景与软硬件环境的。一个简单的例子:当用户的QPS发生变化时,对于Web服务的JVM来说也应当相应调整内存的大小或限制。

    Spark作为一款优秀的计算框架,也配备了各种各样的系统配置参数(例如:spark.master,spark.app.name,spark.driver.memory,spark.executor.memory等)。通过这些配置参数可以定义应用的名称、使用的部署模式、调度模式、executor数量、executor的内核数、driver或executor的内存大小、采用的内存模型等。

      SparkConf是Spark的配置类,这个类在Spark的历史版本中已经存在很久了,Spark中的每一个组件都直接或者间接的使用着它所存储的属性,这些属性都存储在如下的数据结构中:

  private val settings = new ConcurrentHashMap[String, String]()

由以上代码的泛型[1] 可以看出Spark的所有配置,无论是key还是value都是String类型。Spark的配置通过以下三种方式获取:

  • 来源于系统参数(即使用System.getProperties获取的属性)中以spark.作为前缀的那部分属性;
  • 使用SparkConf的API进行设置;
  • 从其它SparkConf中克隆。

下面将具体说明这三种方式的实现。

系统属性中的配置

      在SparkConf中有一个Boolean类型的构造器属性loadDefaults,当loadDefaults为true时将会从系统属性中加载Spark配置,代码如下:

  if (loadDefaults) {
    loadFromSystemProperties(false)
  }

  private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
    // Load any spark.* system properties
    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
      set(key, value, silent)
    }
    this
  }

以上代码调用了Utils工具类[2] 的getSystemProperties方法,其作用为获取系统的键值对属性。loadFromSystemProperties方法在获取了系统属性后,使用Scala守卫过滤出其中以“spark.”字符串为前缀的key和value并且调用set方法(见代码清单3-1)最终设置到settings中。

代码清单3-1    SparkConf中set方法的实现

  private[spark] def set(key:String, value: String, silent: Boolean): SparkConf = {

    if (key == null) {

      throw newNullPointerException("nullkey")

    }

    if (value == null) {

      throw newNullPointerException("nullvalue for " + key)

    }

    if (!silent) {

     logDeprecationWarning(key)

    }

    settings.put(key,value)

    this

  }

使用SparkConf配置的API

      给SparkConf添加配置的一种常见方式是使用SparkConf中提供的API。其中有些API最终实际调用了set的重载方法,见代码清单3-2。

代码清单3-2    SparkConf中重载的set方法

  def set(key:String, value: String): SparkConf = {

    set(key,value, false)

  }

可以看到代码清单3-2中的set方法实际也是调用了代码清单3-1中的set方法。

SparkConf中的setMaster、setAppName、setJars、setExecutorEnv、setSparkHome、setAll等方法最终都是通过代码清单3-2中的set方法完成Spark配置的,本书以其中最为常用的setMaster和setAppName为例,用代码清单3-3和代码清单3-4来展示他们的实现。

代码清单3-3    设置Spark的部署模式的配置方法setMaster

  def setMaster(master: String): SparkConf = {

    set("spark.master", master)

  }

代码清单3-4    设置Spark的应用名称的配置方法setAppName

  def setAppName(name: String): SparkConf = {

    set("spark.app.name", name)

  }

克隆SparkConf配置

      有些情况下,同一个SparkConf实例中的配置信息需要被Spark中的多个组件共用,例如:组件A中存在一个SparkConf实例a,组件B中也很需要实例a中的配置信息,这时该如何处理?我们往往首先想到的方法是将SparkConf实例定义为全局变量或者通过参数传递给其它组件,但是这会引入并发问题。虽然settings是线程安全的ConcurrentHashMap类,而且ConcurrentHashMap也被证明是高并发下性能表现不错的数据结构,但是只要存在并发就一定会有性能的损失问题。我们可以新建一个SparkConf实例b,并将a中的配置信息全部拷贝到b中,这种方式显然不是最优雅的,复制代码会散落在程序的各个角落。现在是时候阅读下SparkConf的构造器了,代码如下所示:

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging withSerializable {

  //省略无关代码

  def this() = this(true)

SparkConf继承了Cloneable特质并实现了clone方法,clone方法(见代码清单3-5)的实现跟我们所讨论的方式是一样的,并且通过Cloneable特质提高了代码的可复用性。

代码清单3-5    克隆SparkConf配置

  override def clone: SparkConf ={

    val cloned = new SparkConf(false)

    settings.entrySet().asScala.foreach { e =>

      cloned.set(e.getKey(),e.getValue(), true)

    }

    cloned

  }

这样我们就可以在任何想要使用SparkConf的地方使用克隆方式来优雅的编程了。



[1] Scala泛型的语法采用了方括号,而非Java中的尖括号。

[2] Utils是Spark中最常用的工具类,其每个方法实现的功能都比较单一,理解起来比较简单,所以本书只将相关的介绍放入附录A中单独进行介绍。如果不去阅读Utils中各个方法的实现,对阅读本书主干内容也不会有太多影响。如果是Spark的初学者或者是刚刚接触Scala语言的开发者还是建议阅读。

关于《Spark内核设计的艺术 架构设计与实现

经过近一年的准备,《 Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:


纸质版售卖链接如下:
相关文章
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
1968 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
902 0
|
分布式计算 Spark
Spark2.4.0源码分析之WorldCount 事件循环处理器(三)
理解DAG事件循环处理器处理事件流程
1023 0
|
分布式计算 Spark Hadoop
Spark MapOutputTracker源码分析
## 技能标签 - Spark ShuffleMapTask处理完成后,把MapStatus数据(BlockManagerId,[compressSize])发送给MapOutputTrackerMaster.
1666 0
|
分布式计算 搜索推荐 Spark
Spark 源码分析之ShuffleMapTask内存数据Spill和合并
- Spark ShuffleMapTask 内存中的数据Spill到临时文件 - 临时文件中的数据是如何定入的,如何按partition升序排序,再按Key升序排序写入(key,value)数据 - 每个临时文件,都存入对应的每个分区有多少个(key,value)对,有多少次流提交数组,数组中...
1783 0
|
分布式计算 Scala Spark
Spark源码分析之ResultTask处理
ResultTask 执行当前分区的计算,首先从ShuffleMapTask拿到当前分区的数据,会从所有的ShuffleMapTask都拿一遍当前的分区数据,然后调用reduceByKey自定义的函数进行计算,最后合并所有的ResultTask输出结果,进行输出
2278 0
|
分布式计算 Shell Scala
Spark源码分析之ShuffleMapTask处理
Spark源码分析之ShuffleMapTask处理,在map端对数据的处理源码分析
1672 0
|
分布式计算 Apache Spark
Spark Master启动源码分析
Spark Master启动源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.
950 0
|
分布式计算 Spark
Spark Worker启动源码分析
Spark Worker启动源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.
1106 0
|
分布式计算 Spark
Spark Executor启动源码分析
Spark CoarseGrainedExecutorBackend启动源码分析 更多资源 github: https://github.
1377 0