spark hiveUDF 不要定义static成员变量

简介: spark hiveUDF 不要定义static成员变量

背景

最近在帮同事排查hive UDF的时候,发现了在udf中定义了静态成员变量引发的NullPointerException,具体报错如下:

java.lang.NullPointerException
  at java.lang.String.contains(String.java:2133)
  at org.apache.spark.sql.hive.HiveGenericUDF.eval(hiveUDFs.scala:181)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_6$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
  at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)

代码很简单:

public class GenericUDF extends GenericUDF {
...
private transient static final List<String> list = new ArrayList();
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        super.init(stdFactory);
        list.add("1");
        list.add("2");
        ...
    }
    ...
    for (String i : list) {
            if (s.contains(i)) {
                result = i;
            }
        }
    }

分析

其实从正常的角度来说,把共有的成员变量定义为静态的是合理的,这样能够减少GC的频率,但是这次为什么不行了呢?

这个还得从spark 对hiveUDF的封装类HiveGenericUDF说起,如下:

    @transient
    lazy val function = funcWrapper.createFunction[GenericUDF]()
    @transient
    private lazy val returnInspector = {
      function.initializeAndFoldConstants(argumentInspectors.toArray)
    }
    @transient
    private lazy val unwrapper = unwrapperFor(returnInspector)
    override def eval(input: InternalRow): Any = {
    returnInspector // Make sure initialized.
    var i = 0
    val length = children.length
    while (i < length) {
      val idx = i
      deferredObjects(i).asInstanceOf[DeferredObjectAdapter]
        .set(() => children(idx).eval(input))
      i += 1
    }
    unwrapper(function.evaluate(deferredObjects))
  }

挑选了两个重点的方法,其中funcWrapper.createFunction的实现很简单,就是通过反射new出一个对象,这没什么说的,

但是对于initializeAndFoldConstants第二个方法,

public ObjectInspector initializeAndFoldConstants(ObjectInspector[] arguments)
      throws UDFArgumentException {
    ObjectInspector oi = initialize(arguments);
    ...

这里会调用用户自己实现的GenericUDF的 initialize,而initializeAndFoldConstants的初次调用会只有到了真正计算的时候才会触发,

我们知道在spark executor 的task是以线程的模式运行的,所以在executor有多个task并发的情况下,会存在多线程的问题,回到我们的例子上来,

我们在init方法中调用了add方法,而这个变量是整个JVM共享的,也就是说有可能一个task在运行的时候,另一个线程改变了list的内容,这样就会导致不一致的情况。


解决


解决的方法很多:


把静态的成员变量改成非静态的,如:

private transient final List<String> list = new ArrayList();

这样list变量就是对象级别的可见性了,就能避免多线程问题了

相关文章
|
7月前
|
SQL 分布式计算 Java
Note_Spark_Day08:Spark SQL(Dataset是什么、外部数据源、UDF定义和分布式SQL引擎)
Note_Spark_Day08:Spark SQL(Dataset是什么、外部数据源、UDF定义和分布式SQL引擎)
79 0
|
分布式计算 Spark
spark hiveUDF transient的重要性
spark hiveUDF transient的重要性
2011 0
|
26天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
78 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
63 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
42 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
93 0
|
27天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
50 6
|
25天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
86 2
|
26天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
64 1
|
27天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
54 1

热门文章

最新文章

下一篇
无影云桌面