spark2.1.0之源码分析——RPC配置TransportConf

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80888076       在《Spark2.1.0之内置RPC框架》提到TransportContext中的TransportConf给Spark的RPC框架提供配置信息,它有两个成员属性——配置提供者conf和配置的模块名称module。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80888076

      在《Spark2.1.0之内置RPC框架》提到TransportContext中的TransportConf给Spark的RPC框架提供配置信息,它有两个成员属性——配置提供者conf和配置的模块名称module。这两个属性的定义如下:

  private final ConfigProvider conf;
  private final String module;

其中conf是真正的配置提供者,其类型ConfigProvider是一个抽象类,见代码清单1。

代码清单1  ConfigProvider的实现
public abstract class ConfigProvider {
  public abstract String get(String name);

  public String get(String name, String defaultValue) {
    try {
      return get(name);
    } catch (NoSuchElementException e) {
      return defaultValue;
    }
  }

  public int getInt(String name, int defaultValue) {
    return Integer.parseInt(get(name, Integer.toString(defaultValue)));
  }

  public long getLong(String name, long defaultValue) {
    return Long.parseLong(get(name, Long.toString(defaultValue)));
  }

  public double getDouble(String name, double defaultValue) {
    return Double.parseDouble(get(name, Double.toString(defaultValue)));
  }

  public boolean getBoolean(String name, boolean defaultValue) {
    return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue)));
  }
}

从代码清单1,可以看到ConfigProvider中包括get、getInt、getLong、getDouble、getBoolean等方法,这些方法都是基于抽象方法get获取值,经过一次类型转换而实现。这个抽象的get方法将需要子类去实现。

         Spark通常使用SparkTransportConf创建TransportConf,其实现见代码清单2。

代码清单2  SparkTransportConf的实现
object SparkTransportConf {
  private val MAX_DEFAULT_NETTY_THREADS = 8
  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
    val conf = _conf.clone
    val numThreads = defaultNumThreads(numUsableCores)
    conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
    conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)

    new TransportConf(module, new ConfigProvider {
      override def get(name: String): String = conf.get(name)
    })
  }
  private def defaultNumThreads(numUsableCores: Int): Int = {
    val availableCores =
      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
    math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
  }
}

从代码清单2看到,可以使用SparkTransportConf的fromSparkConf方法来构造TransportConf。传递的三个参数分别为SparkConf、模块名module及可用的内核数numUsableCores。如果numUsableCores小于等于0,那么线程数是系统可用处理器的数量,不过系统的内核数不可能全部用于网络传输使用,所以这里还将分配给网络传输的内核数量最多限制在8个。最终确定的线程数将被用于设置客户端传输线程数(spark.$module.io.clientThreads属性)和服务端传输线程数(spark.$module.io.serverThreads属性)。fromSparkConf最终构造TransportConf对象时传递的ConfigProvider为实现了get方法的匿名的内部类,get的实现实际是代理了SparkConf的get方法。


关于《Spark内核设计的艺术 架构设计与实现

经过近一年的准备,基于Spark2.1.0版本的《 Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:


纸质版售卖链接如下:
相关文章
|
2月前
|
Java 应用服务中间件 API
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
|
2月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之spark3.1.1通过resource目录下的conf文件配置,报错如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
2月前
|
分布式计算 Hadoop Scala
Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
【4月更文挑战第13天】Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
90 0
|
8月前
|
分布式计算 Java Scala
配置spark,并在idea中搭建项目
配置spark,并在idea中搭建项目
93 0
|
9月前
|
SQL 分布式计算 Hadoop
配置Hive使用Spark执行引擎
在Hive中,可以通过配置来指定使用不同的执行引擎。Hive执行引擎包括:默认MR、tez、spark。
187 0
|
分布式计算 大数据 Shell
Spark 环境搭建_配置 HistoryServer|学习笔记
快速学习 Spark 环境搭建_配置 HistoryServer
378 0
Spark 环境搭建_配置 HistoryServer|学习笔记
|
10月前
|
负载均衡 Dubbo Java
RPC框架-dubbo:架构及源码分析-初篇
在自学或面试dubbo时,相关的问题有很多,例如dubbo 的基本工作原理,这是使用过dubbo后应该知道的。包括dubbo的分层架构、长短链接选择、二进制协议支持;之后是使用方式(服务的注册、发现、调用方式),基础配置(超时时间、线程数),这些是最基本的。 在这些问题之后,就可以继续深入底层:关于连接方式,使用长连接还是短连接?为什么? dubbo的二进制协议支持哪些,之间有什么区别/优缺点等等,也可以考察在使用过程中遇到过哪些问题,是如何解决的。这些都需要深入理解,并且有真实、长时间使用经验。
190 0
|
SQL 分布式计算 Hadoop
Spark SQL CLI配置
Spark SQL CLI配置
Spark SQL CLI配置
|
分布式计算 大数据 调度
Spark 集群搭建_高可用配置|学习笔记
快速学习 Spark 集群搭建_高可用配置
222 0
Spark 集群搭建_高可用配置|学习笔记
|
分布式计算 Hadoop 大数据
分布式集群环境之Spark的安装与配置(Centos7)
分布式集群环境之Spark的安装与配置(Centos7)
304 0
分布式集群环境之Spark的安装与配置(Centos7)