spark 对于hive metastore的兼容性随笔--通过classloader实现

简介: spark 对于hive metastore的兼容性随笔--通过classloader实现

背景


接着上次的文章,文章中我们只是简单的提了一下按照官网的配置就能够兼容不同的hive元数据,这次我们从代码级别来分析一下spark是怎么做到实现不同版本的元数据的访问。

注意:正如官网所说的,该部分只是用于hive元数据的访问,spark sql内部编译的其他版本的hive用于来进行其他执行,如序列化和反序列化,UDF和UDAF等等

这里提到这一点是为了释疑一下在源码中看到一些低版本不存在的类,因为这部分spark sql内置了其他版本的hive用于除了hive元数据之外的其他交互,如:hive/hiveShim.scala中的SerializationUtilities 这个类在hive 1.2.1是不存在的,但是hive高版本2.3.7是存在的

我们以spark 3.1.1进行分析


分析


我们知道spark跟外部元数据的交互是类ExternalCatalog来进行响应的,对应到hive元数据就是HiveExternalCatalog,转到client代码:

/**
   * A Hive client used to interact with the metastore.
   */
lazy val client: HiveClient = {
    HiveUtils.newClientForMetadata(conf, hadoopConf)
  }

该client在就是进行元数据交互的最终执行者,且这里直接调用了HiveUtils的newClientForMetadata方法,直接跳到最终调用的方法:

 protected[hive] def newClientForMetadata(
      conf: SparkConf,
      hadoopConf: Configuration,
      configurations: Map[String, String]): HiveClient = {
    val sqlConf = new SQLConf
    sqlConf.setConf(SQLContext.getSQLProperties(conf))
    val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf)
    val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf)
    val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf)
    val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf)
    val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
...
} else if (hiveMetastoreJars == "path") {
      // Convert to files and expand any directories.
      val jars =
        HiveUtils.hiveMetastoreJarsPath(sqlConf)
          .flatMap {
            case path if path.contains("\\") && Utils.isWindows =>
              addLocalHiveJars(new File(path))
            case path =>
              DataSource.checkAndGlobPathIfNecessary(
                pathStrings = Seq(path),
                hadoopConf = hadoopConf,
                checkEmptyGlobPath = true,
                checkFilesExist = false,
                enableGlobbing = true
              ).map(_.toUri.toURL)
          }
      logInfo(
        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
          s"using path: ${jars.mkString(";")}")
      new IsolatedClientLoader(
        version = metaVersion,
        sparkConf = conf,
        hadoopConf = hadoopConf,
        execJars = jars.toSeq,
        config = configurations,
        isolationOn = true,
        barrierPrefixes = hiveMetastoreBarrierPrefixes,
        sharedPrefixes = hiveMetastoreSharedPrefixes)
...

val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) 这里直接获取配置的元数据的版本,也就是spark.sql.hive.metastore.version配置项

val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) 这里配置hive元数据jar包的获取方式,默认是builtin内置,推荐使用path方式,因为一般线上环境是无网络环境

val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf)

val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) 这两个跟classloader有关,也就是说什么类用哪种classloader加载,用来隔离class

val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) 映射成spark内部的hive版本表示,用于进行元数据class的精细化操作

这里会根据配置的获取元数据jar包的方式而采用不同的初始化IsolatedClientLoader的方式。最终会调用isolatedLoader的createClient方法:

/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = synchronized {
  val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
  if (!isolationOn) {
    return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config,
      baseClassLoader, this)
  }
  // Pre-reflective instantiation setup.
  logDebug("Initializing the logger to avoid disaster...")
  val origLoader = Thread.currentThread().getContextClassLoader
  Thread.currentThread.setContextClassLoader(classLoader)
  try {
    classLoader
      .loadClass(classOf[HiveClientImpl].getName)
      .getConstructors.head
      .newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this)
      .asInstanceOf[HiveClient]
  } catch {
    case e: InvocationTargetException =>
      if (e.getCause().isInstanceOf[NoClassDefFoundError]) {
        val cnf = e.getCause().asInstanceOf[NoClassDefFoundError]
        throw new ClassNotFoundException(
          s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
          "Please make sure that jars for your version of hive and hadoop are included in the " +
          s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e)
      } else {
        throw e
      }
  } finally {
    Thread.currentThread.setContextClassLoader(origLoader)
  }
}

如果未开启隔离性,则直接返回HiveClientImpl,该client所有终端用户共享。如果开启了(默认值),则设置当前的contextClassLoader为classLoader:

该classLoader是自定义的:

...
new URLClassLoader(allJars, rootClassLoader) {
            override def loadClass(name: String, resolve: Boolean): Class[_] = {
              val loaded = findLoadedClass(name)
              if (loaded == null) doLoadClass(name, resolve) else loaded
            }
            def doLoadClass(name: String, resolve: Boolean): Class[_] = {
              val classFileName = name.replaceAll("\\.", "/") + ".class"
              if (isBarrierClass(name)) {
                // For barrier classes, we construct a new copy of the class.
                val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
                logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
                defineClass(name, bytes, 0, bytes.length)
              } else if (!isSharedClass(name)) {
                logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
                super.loadClass(name, resolve)
              } else {
                // For shared classes, we delegate to baseClassLoader, but fall back in case the
                // class is not found.
                logDebug(s"shared class: $name")
                try {
                  baseClassLoader.loadClass(name)
                } catch {
                  case _: ClassNotFoundException =>
                    super.loadClass(name, resolve)
                }
              }
            }
          }
        }
...

直接重点,对于开启了隔离(默认值),则直接返回该classLoader,关于classloader的知识,可以参考这里,要是还有真不明白的,可以参考classLoader类的源码。

这里我们重点观察一下该自定义classloader的loadClass方法,该方法是实现类隔离的关键,


如果是BarrierClass,比如HiveClientImpl/Shim/ShimLoader,或者包含了自定义的前缀.则从当前的ContextClassLoader中复制一份class类,且生成对应的class

如果不是共享类,也不是BarrierClass,也就是hive类,则使用super.loadClass 来加载类,而最终还是会调用URLClassLoader的findClass方法来加载类

否则不是barrierClass,是共享类,则用当前contextclassloader来加载当前class

通过该classLoader加载的方式,对于跟hive元数据相关的class就是通过该自定义的classLoader加载的(注意子classloader能够看见父加载器加载的类)

之后通过该classloader加载对应的HiveClientImpl类,进行反射实例化HiveClientImpl对象,从而实现了在运行的时候,根据传入的元数据jar包进行动态加载.

重置当前线程的contextClassLoader。


重点:hive元数据的jar包的动态记载是通过自定义classloader实现的


至于真正的和hive元数据进行交互就是HiveClientImpl,该类引入了shim的机制,也就是说,通过该shim机制,对于hive元数据版本的升级都是通过该shim来进行控制,比如增加方法,就会在shim中增加对应的方法,从而达到hive元数据的向后兼容性。 其实从shim这个英文单词中我们也能看出一二,shim(垫片)是为了切合版本的升级而做的垫片。


相关文章
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
35 0
|
3月前
|
SQL 存储 关系型数据库
|
5月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 分布式计算 NoSQL
使用Spark高效将数据从Hive写入Redis (功能最全)
使用Spark高效将数据从Hive写入Redis (功能最全)
377 1
|
6月前
|
SQL 分布式计算 关系型数据库
使用 Spark 抽取 MySQL 数据到 Hive 时某列字段值出现异常(字段错位)
在 MySQL 的 `order_info` 表中,包含 `order_id` 等5个字段,主要存储订单信息。执行按 `create_time` 降序的查询,显示了部分结果。在 Hive 中复制此表结构时,所有字段除 `order_id` 外设为 `string` 类型,并添加了 `etl_date` 分区字段。然而,由于使用逗号作为字段分隔符,当 `address` 字段含逗号时,数据写入 Hive 出现错位,导致 `create_time` 值变为中文字符串。问题解决方法包括更换字段分隔符或使用 Hive 默认分隔符 `\u0001`。此案例提醒在建表时需谨慎选择字段分隔符。
115 6
|
6月前
|
SQL 分布式计算 Java
Spark 为什么比 Hive 快
Spark与Hive在数据处理上有显著区别。Spark以其内存计算和线程级并行提供更快的速度,但稳定性受内存限制。相比之下,Hive虽较慢,因使用MapReduce,其稳定性更高,对内存需求较小。在Shuffle方式上,Spark的内存 Shuffle 比Hive的磁盘 Shuffle 更高效。综上,Spark在处理速度和Shuffle上占优,Hive则在稳定性和资源管理上更胜一筹。
169 0
|
6月前
|
SQL 分布式计算 Hadoop
[AIGC ~大数据] 深入理解Hadoop、HDFS、Hive和Spark:Java大师的大数据研究之旅
[AIGC ~大数据] 深入理解Hadoop、HDFS、Hive和Spark:Java大师的大数据研究之旅
182 0
|
6月前
|
SQL 存储 分布式计算
Spark与Hive的集成与互操作
Spark与Hive的集成与互操作
|
6月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
191 1
|
6月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
251 0