spark 对于hive metastore的兼容性随笔--通过classloader实现

简介: spark 对于hive metastore的兼容性随笔--通过classloader实现

背景


接着上次的文章,文章中我们只是简单的提了一下按照官网的配置就能够兼容不同的hive元数据,这次我们从代码级别来分析一下spark是怎么做到实现不同版本的元数据的访问。

注意:正如官网所说的,该部分只是用于hive元数据的访问,spark sql内部编译的其他版本的hive用于来进行其他执行,如序列化和反序列化,UDF和UDAF等等

这里提到这一点是为了释疑一下在源码中看到一些低版本不存在的类,因为这部分spark sql内置了其他版本的hive用于除了hive元数据之外的其他交互,如:hive/hiveShim.scala中的SerializationUtilities 这个类在hive 1.2.1是不存在的,但是hive高版本2.3.7是存在的

我们以spark 3.1.1进行分析


分析


我们知道spark跟外部元数据的交互是类ExternalCatalog来进行响应的,对应到hive元数据就是HiveExternalCatalog,转到client代码:

/**
   * A Hive client used to interact with the metastore.
   */
lazy val client: HiveClient = {
    HiveUtils.newClientForMetadata(conf, hadoopConf)
  }

该client在就是进行元数据交互的最终执行者,且这里直接调用了HiveUtils的newClientForMetadata方法,直接跳到最终调用的方法:

 protected[hive] def newClientForMetadata(
      conf: SparkConf,
      hadoopConf: Configuration,
      configurations: Map[String, String]): HiveClient = {
    val sqlConf = new SQLConf
    sqlConf.setConf(SQLContext.getSQLProperties(conf))
    val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf)
    val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf)
    val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf)
    val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf)
    val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
...
} else if (hiveMetastoreJars == "path") {
      // Convert to files and expand any directories.
      val jars =
        HiveUtils.hiveMetastoreJarsPath(sqlConf)
          .flatMap {
            case path if path.contains("\\") && Utils.isWindows =>
              addLocalHiveJars(new File(path))
            case path =>
              DataSource.checkAndGlobPathIfNecessary(
                pathStrings = Seq(path),
                hadoopConf = hadoopConf,
                checkEmptyGlobPath = true,
                checkFilesExist = false,
                enableGlobbing = true
              ).map(_.toUri.toURL)
          }
      logInfo(
        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
          s"using path: ${jars.mkString(";")}")
      new IsolatedClientLoader(
        version = metaVersion,
        sparkConf = conf,
        hadoopConf = hadoopConf,
        execJars = jars.toSeq,
        config = configurations,
        isolationOn = true,
        barrierPrefixes = hiveMetastoreBarrierPrefixes,
        sharedPrefixes = hiveMetastoreSharedPrefixes)
...

val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) 这里直接获取配置的元数据的版本,也就是spark.sql.hive.metastore.version配置项

val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) 这里配置hive元数据jar包的获取方式,默认是builtin内置,推荐使用path方式,因为一般线上环境是无网络环境

val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf)

val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) 这两个跟classloader有关,也就是说什么类用哪种classloader加载,用来隔离class

val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) 映射成spark内部的hive版本表示,用于进行元数据class的精细化操作

这里会根据配置的获取元数据jar包的方式而采用不同的初始化IsolatedClientLoader的方式。最终会调用isolatedLoader的createClient方法:

/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = synchronized {
  val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
  if (!isolationOn) {
    return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config,
      baseClassLoader, this)
  }
  // Pre-reflective instantiation setup.
  logDebug("Initializing the logger to avoid disaster...")
  val origLoader = Thread.currentThread().getContextClassLoader
  Thread.currentThread.setContextClassLoader(classLoader)
  try {
    classLoader
      .loadClass(classOf[HiveClientImpl].getName)
      .getConstructors.head
      .newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this)
      .asInstanceOf[HiveClient]
  } catch {
    case e: InvocationTargetException =>
      if (e.getCause().isInstanceOf[NoClassDefFoundError]) {
        val cnf = e.getCause().asInstanceOf[NoClassDefFoundError]
        throw new ClassNotFoundException(
          s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
          "Please make sure that jars for your version of hive and hadoop are included in the " +
          s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e)
      } else {
        throw e
      }
  } finally {
    Thread.currentThread.setContextClassLoader(origLoader)
  }
}

如果未开启隔离性,则直接返回HiveClientImpl,该client所有终端用户共享。如果开启了(默认值),则设置当前的contextClassLoader为classLoader:

该classLoader是自定义的:

...
new URLClassLoader(allJars, rootClassLoader) {
            override def loadClass(name: String, resolve: Boolean): Class[_] = {
              val loaded = findLoadedClass(name)
              if (loaded == null) doLoadClass(name, resolve) else loaded
            }
            def doLoadClass(name: String, resolve: Boolean): Class[_] = {
              val classFileName = name.replaceAll("\\.", "/") + ".class"
              if (isBarrierClass(name)) {
                // For barrier classes, we construct a new copy of the class.
                val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
                logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
                defineClass(name, bytes, 0, bytes.length)
              } else if (!isSharedClass(name)) {
                logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
                super.loadClass(name, resolve)
              } else {
                // For shared classes, we delegate to baseClassLoader, but fall back in case the
                // class is not found.
                logDebug(s"shared class: $name")
                try {
                  baseClassLoader.loadClass(name)
                } catch {
                  case _: ClassNotFoundException =>
                    super.loadClass(name, resolve)
                }
              }
            }
          }
        }
...

直接重点,对于开启了隔离(默认值),则直接返回该classLoader,关于classloader的知识,可以参考这里,要是还有真不明白的,可以参考classLoader类的源码。

这里我们重点观察一下该自定义classloader的loadClass方法,该方法是实现类隔离的关键,


如果是BarrierClass,比如HiveClientImpl/Shim/ShimLoader,或者包含了自定义的前缀.则从当前的ContextClassLoader中复制一份class类,且生成对应的class

如果不是共享类,也不是BarrierClass,也就是hive类,则使用super.loadClass 来加载类,而最终还是会调用URLClassLoader的findClass方法来加载类

否则不是barrierClass,是共享类,则用当前contextclassloader来加载当前class

通过该classLoader加载的方式,对于跟hive元数据相关的class就是通过该自定义的classLoader加载的(注意子classloader能够看见父加载器加载的类)

之后通过该classloader加载对应的HiveClientImpl类,进行反射实例化HiveClientImpl对象,从而实现了在运行的时候,根据传入的元数据jar包进行动态加载.

重置当前线程的contextClassLoader。


重点:hive元数据的jar包的动态记载是通过自定义classloader实现的


至于真正的和hive元数据进行交互就是HiveClientImpl,该类引入了shim的机制,也就是说,通过该shim机制,对于hive元数据版本的升级都是通过该shim来进行控制,比如增加方法,就会在shim中增加对应的方法,从而达到hive元数据的向后兼容性。 其实从shim这个英文单词中我们也能看出一二,shim(垫片)是为了切合版本的升级而做的垫片。


相关文章
|
10月前
|
SQL 分布式计算 IDE
如何在IDE中通过Spark操作Hive
通过以上方法和代码示例,你可以在IDE中成功通过Spark操作Hive,实现大规模数据处理和分析。确保理解每一步的实现细节,应用到实际项目中时能有效地处理各种复杂的数据场景。
549 28
|
10月前
|
SQL 分布式计算 关系型数据库
基于云服务器的数仓搭建-hive/spark安装
本文介绍了在本地安装和配置MySQL、Hive及Spark的过程。主要内容包括: - **MySQL本地安装**:详细描述了内存占用情况及安装步骤,涉及安装脚本的编写与执行,以及连接MySQL的方法。 - **Hive安装**:涵盖了从上传压缩包到配置环境变量的全过程,并解释了如何将Hive元数据存储配置到MySQL中。 - **Hive与Spark集成**:说明了如何安装Spark并将其与Hive集成,确保Hive任务由Spark执行,同时解决了依赖冲突问题。 - **常见问题及解决方法**:列举了安装过程中可能遇到的问题及其解决方案,如内存配置不足、节点间通信问题等。
基于云服务器的数仓搭建-hive/spark安装
|
10月前
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
406 4
|
SQL 负载均衡 监控
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
298 0
|
SQL 存储 关系型数据库
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL 分布式计算 NoSQL
使用Spark高效将数据从Hive写入Redis (功能最全)
使用Spark高效将数据从Hive写入Redis (功能最全)
934 1
|
7月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
404 0
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
1014 2
ClickHouse与大数据生态集成:Spark & Flink 实战