背景
接着上次的文章,文章中我们只是简单的提了一下按照官网的配置就能够兼容不同的hive元数据,这次我们从代码级别来分析一下spark是怎么做到实现不同版本的元数据的访问。
注意:正如官网所说的,该部分只是用于hive元数据的访问,spark sql内部编译的其他版本的hive用于来进行其他执行,如序列化和反序列化,UDF和UDAF等等
这里提到这一点是为了释疑一下在源码中看到一些低版本不存在的类,因为这部分spark sql内置了其他版本的hive用于除了hive元数据之外的其他交互,如:hive/hiveShim.scala中的SerializationUtilities 这个类在hive 1.2.1是不存在的,但是hive高版本2.3.7是存在的
我们以spark 3.1.1进行分析
分析
我们知道spark跟外部元数据的交互是类ExternalCatalog来进行响应的,对应到hive元数据就是HiveExternalCatalog,转到client代码:
/** * A Hive client used to interact with the metastore. */ lazy val client: HiveClient = { HiveUtils.newClientForMetadata(conf, hadoopConf) }
该client在就是进行元数据交互的最终执行者,且这里直接调用了HiveUtils的newClientForMetadata方法,直接跳到最终调用的方法:
protected[hive] def newClientForMetadata( conf: SparkConf, hadoopConf: Configuration, configurations: Map[String, String]): HiveClient = { val sqlConf = new SQLConf sqlConf.setConf(SQLContext.getSQLProperties(conf)) val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf) val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) ... } else if (hiveMetastoreJars == "path") { // Convert to files and expand any directories. val jars = HiveUtils.hiveMetastoreJarsPath(sqlConf) .flatMap { case path if path.contains("\\") && Utils.isWindows => addLocalHiveJars(new File(path)) case path => DataSource.checkAndGlobPathIfNecessary( pathStrings = Seq(path), hadoopConf = hadoopConf, checkEmptyGlobPath = true, checkFilesExist = false, enableGlobbing = true ).map(_.toUri.toURL) } logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + s"using path: ${jars.mkString(";")}") new IsolatedClientLoader( version = metaVersion, sparkConf = conf, hadoopConf = hadoopConf, execJars = jars.toSeq, config = configurations, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) ...
val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) 这里直接获取配置的元数据的版本,也就是spark.sql.hive.metastore.version配置项
val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) 这里配置hive元数据jar包的获取方式,默认是builtin内置,推荐使用path方式,因为一般线上环境是无网络环境
val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf)
val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) 这两个跟classloader有关,也就是说什么类用哪种classloader加载,用来隔离class
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) 映射成spark内部的hive版本表示,用于进行元数据class的精细化操作
这里会根据配置的获取元数据jar包的方式而采用不同的初始化IsolatedClientLoader的方式。最终会调用isolatedLoader的createClient方法:
/** The isolated client interface to Hive. */ private[hive] def createClient(): HiveClient = synchronized { val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)) if (!isolationOn) { return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config, baseClassLoader, this) } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") val origLoader = Thread.currentThread().getContextClassLoader Thread.currentThread.setContextClassLoader(classLoader) try { classLoader .loadClass(classOf[HiveClientImpl].getName) .getConstructors.head .newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this) .asInstanceOf[HiveClient] } catch { case e: InvocationTargetException => if (e.getCause().isInstanceOf[NoClassDefFoundError]) { val cnf = e.getCause().asInstanceOf[NoClassDefFoundError] throw new ClassNotFoundException( s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" + "Please make sure that jars for your version of hive and hadoop are included in the " + s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e) } else { throw e } } finally { Thread.currentThread.setContextClassLoader(origLoader) } }
如果未开启隔离性,则直接返回HiveClientImpl,该client所有终端用户共享。如果开启了(默认值),则设置当前的contextClassLoader为classLoader:
该classLoader是自定义的:
... new URLClassLoader(allJars, rootClassLoader) { override def loadClass(name: String, resolve: Boolean): Class[_] = { val loaded = findLoadedClass(name) if (loaded == null) doLoadClass(name, resolve) else loaded } def doLoadClass(name: String, resolve: Boolean): Class[_] = { val classFileName = name.replaceAll("\\.", "/") + ".class" if (isBarrierClass(name)) { // For barrier classes, we construct a new copy of the class. val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") defineClass(name, bytes, 0, bytes.length) } else if (!isSharedClass(name)) { logDebug(s"hive class: $name - ${getResource(classToPath(name))}") super.loadClass(name, resolve) } else { // For shared classes, we delegate to baseClassLoader, but fall back in case the // class is not found. logDebug(s"shared class: $name") try { baseClassLoader.loadClass(name) } catch { case _: ClassNotFoundException => super.loadClass(name, resolve) } } } } } ...
直接重点,对于开启了隔离(默认值),则直接返回该classLoader,关于classloader的知识,可以参考这里,要是还有真不明白的,可以参考classLoader类的源码。
这里我们重点观察一下该自定义classloader的loadClass方法,该方法是实现类隔离的关键,
如果是BarrierClass,比如HiveClientImpl/Shim/ShimLoader,或者包含了自定义的前缀.则从当前的ContextClassLoader中复制一份class类,且生成对应的class
如果不是共享类,也不是BarrierClass,也就是hive类,则使用super.loadClass 来加载类,而最终还是会调用URLClassLoader的findClass方法来加载类
否则不是barrierClass,是共享类,则用当前contextclassloader来加载当前class
通过该classLoader加载的方式,对于跟hive元数据相关的class就是通过该自定义的classLoader加载的(注意子classloader能够看见父加载器加载的类)
之后通过该classloader加载对应的HiveClientImpl类,进行反射实例化HiveClientImpl对象,从而实现了在运行的时候,根据传入的元数据jar包进行动态加载.
重置当前线程的contextClassLoader。
重点:hive元数据的jar包的动态记载是通过自定义classloader实现的
至于真正的和hive元数据进行交互就是HiveClientImpl,该类引入了shim的机制,也就是说,通过该shim机制,对于hive元数据版本的升级都是通过该shim来进行控制,比如增加方法,就会在shim中增加对应的方法,从而达到hive元数据的向后兼容性。 其实从shim这个英文单词中我们也能看出一二,shim(垫片)是为了切合版本的升级而做的垫片。