最近在弄spark on k8s的时候,要集成同事的一些功能,其实这并没有什么,但是里面涉及到了hive的类问题(具体指这个org.apache.hadoop.hive.包下的类)。之后发现hive类总是优先加载应用jar包里的类,而忽略掉spark自带的系统jars包,这给我带了了很大的困扰,大约花了一两周的时间,终于把这个问题排查清楚了。
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) // Let the main class re-initialize the logging system once it starts. if (uninitLog) { Logging.uninitialize() } if (args.verbose) { logInfo(s"Main class:\n$childMainClass") logInfo(s"Arguments:\n${childArgs.mkString("\n")}") // sysProps may contain sensitive information, so redact before printing logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}") logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}") logInfo("\n") } val loader = getSubmitClassLoader(sparkConf) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } var mainClass: Class[_] = null try { mainClass = Utils.classForName(childMainClass) } catch { ....
private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = { val loader = if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) { new ChildFirstURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } else { new MutableURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } Thread.currentThread.setContextClassLoader(loader) loader }
就在百思不得其解的时候,突然想到了spark 对于hive metastore的兼容性随笔–通过classloader实现,这里的实现,这里就不得不分析一下IsolatedClientLoader这个类的细节:
private[hive] val classLoader: MutableURLClassLoader = { val isolatedClassLoader = if (isolationOn) { if (allJars.isEmpty) { // See HiveUtils; this is the Java 9+ + builtin mode scenario baseClassLoader } else { val rootClassLoader: ClassLoader = if (SystemUtils.JAVA_VERSION.split("\\.")(1).toInt>=9) { // In Java 9, the boot classloader can see few JDK classes. The intended parent // classloader for delegation is now the platform classloader. // See http://java9.wtf/class-loading/ val platformCL = classOf[ClassLoader].getMethod("getPlatformClassLoader"). invoke(null).asInstanceOf[ClassLoader] // Check to make sure that the root classloader does not know about Hive. assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) platformCL } else { // The boot classloader is represented by null (the instance itself isn't accessible) // and before Java 9 can see all JDK classes null } new URLClassLoader(allJars, rootClassLoader) { override def loadClass(name: String, resolve: Boolean): Class[_] = { val loaded = findLoadedClass(name) if (loaded == null) doLoadClass(name, resolve) else loaded } def doLoadClass(name: String, resolve: Boolean): Class[_] = { val classFileName = name.replaceAll("\\.", "/") + ".class" if (isBarrierClass(name)) { // For barrier classes, we construct a new copy of the class. val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") defineClass(name, bytes, 0, bytes.length) } else if (!isSharedClass(name)) { logDebug(s"hive class: $name - ${getResource(classToPath(name))}") super.loadClass(name, resolve) } else { // For shared classes, we delegate to baseClassLoader, but fall back in case the // class is not found. logDebug(s"shared class: $name") try { baseClassLoader.loadClass(name) } catch { case _: ClassNotFoundException => super.loadClass(name, resolve) } } } } } } else { baseClassLoader }
val loader = getSubmitClassLoader(sparkConf) for (jar <- childClasspath) { addJarToClasspath(jar, loader) }
意childClasspath这个是怎么来着,我们在提交任务的时候,是可以看到Classpath elements输出的,也就是会包括–jars指定的jar包和因公的jar包,所以在加载hive相关的类的时候,就会优先从childClassPath中加载对应的class,这是通过IolatedClassLoader实现。
但是如果应用中用到的hive相关的类和系统的类不一致的话,该怎么解决,可以用maven shade插件进行重命名,使应用的jar包使用重名以后的类,而不会影响其他的类使用系统自带的hive相关的类