spark hive类总是优先记载应用里面的jar包,跟spark.{driver/executor}.userClassPathFirst无关

简介: spark hive类总是优先记载应用里面的jar包,跟spark.{driver/executor}.userClassPathFirst无关

背景


最近在弄spark on k8s的时候,要集成同事的一些功能,其实这并没有什么,但是里面涉及到了hive的类问题(具体指这个org.apache.hadoop.hive.包下的类)。之后发现hive类总是优先加载应用jar包里的类,而忽略掉spark自带的系统jars包,这给我带了了很大的困扰,大约花了一两周的时间,终于把这个问题排查清楚了。


问题分析


直接分析:

我们知道在spark提交的时候,会获取URLClassLoader去加载类,如下:

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    // Let the main class re-initialize the logging system once it starts.
    if (uninitLog) {
      Logging.uninitialize()
    }
    if (args.verbose) {
      logInfo(s"Main class:\n$childMainClass")
      logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
      // sysProps may contain sensitive information, so redact before printing
      logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
      logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
      logInfo("\n")
    }
    val loader = getSubmitClassLoader(sparkConf)
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }
    var mainClass: Class[_] = null
    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      ....

这里的getSubmitClassLoader方法就是获得URLClassloader:

private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
    val loader =
      if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)
    loader
  }

区别就是ChildFirstURLClassLoader自定义了类加载的顺序,也就是会优先加载应用jar包里的顺序,可是我们的应用的并没有设置spark.{driver/executor}.userClassPathFirst,所以该hive类是跟这个加载器无关的。

就在百思不得其解的时候,突然想到了spark 对于hive metastore的兼容性随笔–通过classloader实现,这里的实现,这里就不得不分析一下IsolatedClientLoader这个类的细节:

 private[hive] val classLoader: MutableURLClassLoader = {
    val isolatedClassLoader =
      if (isolationOn) {
        if (allJars.isEmpty) {
          // See HiveUtils; this is the Java 9+ + builtin mode scenario
          baseClassLoader
        } else {
          val rootClassLoader: ClassLoader =
            if (SystemUtils.JAVA_VERSION.split("\\.")(1).toInt>=9) {
              // In Java 9, the boot classloader can see few JDK classes. The intended parent
              // classloader for delegation is now the platform classloader.
              // See http://java9.wtf/class-loading/
              val platformCL =
              classOf[ClassLoader].getMethod("getPlatformClassLoader").
                invoke(null).asInstanceOf[ClassLoader]
              // Check to make sure that the root classloader does not know about Hive.
              assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
              platformCL
            } else {
              // The boot classloader is represented by null (the instance itself isn't accessible)
              // and before Java 9 can see all JDK classes
              null
            }
          new URLClassLoader(allJars, rootClassLoader) {
            override def loadClass(name: String, resolve: Boolean): Class[_] = {
              val loaded = findLoadedClass(name)
              if (loaded == null) doLoadClass(name, resolve) else loaded
            }
            def doLoadClass(name: String, resolve: Boolean): Class[_] = {
              val classFileName = name.replaceAll("\\.", "/") + ".class"
              if (isBarrierClass(name)) {
                // For barrier classes, we construct a new copy of the class.
                val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
                logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
                defineClass(name, bytes, 0, bytes.length)
              } else if (!isSharedClass(name)) {
                logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
                super.loadClass(name, resolve)
              } else {
                // For shared classes, we delegate to baseClassLoader, but fall back in case the
                // class is not found.
                logDebug(s"shared class: $name")
                try {
                  baseClassLoader.loadClass(name)
                } catch {
                  case _: ClassNotFoundException =>
                    super.loadClass(name, resolve)
                }
              }
            }
          }
        }
      } else {
        baseClassLoader
      }

如果是在JDK8的情况下rootClassLoader是为null的,这就导致了在加载hive相关的类的时候,super.loadClass方法就会直接执行URLClassLoader的findClass方法,进而从URL(也就是通过addURL方法加载进来的jar)查着相关的类

而在spark中,最终的任务提交是通过SparkSubmit的runMain方法来提交的,代码如第一块代码:

 val loader = getSubmitClassLoader(sparkConf)
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }

意childClasspath这个是怎么来着,我们在提交任务的时候,是可以看到Classpath elements输出的,也就是会包括–jars指定的jar包和因公的jar包,所以在加载hive相关的类的时候,就会优先从childClassPath中加载对应的class,这是通过IolatedClassLoader实现。


解决方法


但是如果应用中用到的hive相关的类和系统的类不一致的话,该怎么解决,可以用maven shade插件进行重命名,使应用的jar包使用重名以后的类,而不会影响其他的类使用系统自带的hive相关的类


结论


hive相关的class(在org.apache.hadoop.hive包下),跟spark.{driver/executor}.userClassPathFirst无关,跟IolatedClassLoader实现有关。具体想看哪些hive类是从哪个jar包加载进来的,可以开启debug日志


目录
打赏
0
0
0
0
9
分享
相关文章
如何在IDE中通过Spark操作Hive
通过以上方法和代码示例,你可以在IDE中成功通过Spark操作Hive,实现大规模数据处理和分析。确保理解每一步的实现细节,应用到实际项目中时能有效地处理各种复杂的数据场景。
237 28
基于云服务器的数仓搭建-hive/spark安装
本文介绍了在本地安装和配置MySQL、Hive及Spark的过程。主要内容包括: - **MySQL本地安装**:详细描述了内存占用情况及安装步骤,涉及安装脚本的编写与执行,以及连接MySQL的方法。 - **Hive安装**:涵盖了从上传压缩包到配置环境变量的全过程,并解释了如何将Hive元数据存储配置到MySQL中。 - **Hive与Spark集成**:说明了如何安装Spark并将其与Hive集成,确保Hive任务由Spark执行,同时解决了依赖冲突问题。 - **常见问题及解决方法**:列举了安装过程中可能遇到的问题及其解决方案,如内存配置不足、节点间通信问题等。
基于云服务器的数仓搭建-hive/spark安装
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
205 0
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
541 2
ClickHouse与大数据生态集成:Spark & Flink 实战
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
166 0
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
141 0
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
183 0

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等