背景
最近在帮同事排查hive UDF的时候,发现了在udf中定义了静态成员变量引发的NullPointerException,具体报错如下:
java.lang.NullPointerException at java.lang.String.contains(String.java:2133) at org.apache.spark.sql.hive.HiveGenericUDF.eval(hiveUDFs.scala:181) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_6$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226) at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
代码很简单:
public class GenericUDF extends GenericUDF { ... private transient static final List<String> list = new ArrayList(); @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { super.init(stdFactory); list.add("1"); list.add("2"); ... } ... for (String i : list) { if (s.contains(i)) { result = i; } } }
分析
其实从正常的角度来说,把共有的成员变量定义为静态的是合理的,这样能够减少GC的频率,但是这次为什么不行了呢?
这个还得从spark 对hiveUDF的封装类HiveGenericUDF说起,如下:
@transient lazy val function = funcWrapper.createFunction[GenericUDF]() @transient private lazy val returnInspector = { function.initializeAndFoldConstants(argumentInspectors.toArray) } @transient private lazy val unwrapper = unwrapperFor(returnInspector) override def eval(input: InternalRow): Any = { returnInspector // Make sure initialized. var i = 0 val length = children.length while (i < length) { val idx = i deferredObjects(i).asInstanceOf[DeferredObjectAdapter] .set(() => children(idx).eval(input)) i += 1 } unwrapper(function.evaluate(deferredObjects)) }
挑选了两个重点的方法,其中funcWrapper.createFunction的实现很简单,就是通过反射new出一个对象,这没什么说的,
但是对于initializeAndFoldConstants第二个方法,
public ObjectInspector initializeAndFoldConstants(ObjectInspector[] arguments) throws UDFArgumentException { ObjectInspector oi = initialize(arguments); ...
这里会调用用户自己实现的GenericUDF的 initialize,而initializeAndFoldConstants的初次调用会只有到了真正计算的时候才会触发,
我们知道在spark executor 的task是以线程的模式运行的,所以在executor有多个task并发的情况下,会存在多线程的问题,回到我们的例子上来,
我们在init方法中调用了add方法,而这个变量是整个JVM共享的,也就是说有可能一个task在运行的时候,另一个线程改变了list的内容,这样就会导致不一致的情况。
解决
解决的方法很多:
把静态的成员变量改成非静态的,如:
private transient final List<String> list = new ArrayList();
这样list变量就是对象级别的可见性了,就能避免多线程问题了