开发者社区> 问答> 正文

AttributeError:'NoneType'对象没有属性'_jvm - PySpark UDF

我有杂志订阅的数据及其创建时间,以及包含与给定用户关联的所有订阅到期日期数组的列:

user_id created_date expiration_dates_for_user
202394 '2018-05-04' ['2019-1-03', '2018-10-06', '2018-07-05']
202394 '2017-01-04' ['2019-1-03', '2018-10-06', '2018-07-05']
202394 '2016-05-04' ['2019-1-03', '2018-10-06', '2018-07-05']
我正在尝试创建一个新列,该列是一个包含created_date 45天内所有到期日期的数组,如下所示:

user_id created_date expiration_dates_for_user near_expiration_dates
202394 '2018-05-04' ['2019-1-03', '2018-10-06', '2020-07-05'] []
202394 '2019-01-04' ['2019-1-03', '2018-10-06', '2020-07-05'] ['2019-1-03']
202394 '2016-05-04' ['2019-1-03', '2018-10-06', '2020-07-05'] []
这是我正在使用的代码:

def check_if_sub_connected(created_at, expiration_array):
if not expiration_array:

return []

if created_at == None:

return []

else:

close_to_array = []
for i in expiration_array:
  if datediff(created_at, i) < 45:
    if created_at != i:
      if datediff(created_at, i) > -45:
        close_to_array.append(i)
return close_to_array

check_if_sub_connected = udf(check_if_sub_connected, ArrayType(TimestampType()))
但是当我应用函数创建一个列时......

df = df.withColumn('near_expiration-dates', check_if_sub_connected(df.created_date, df.expiration_dates_for_user)
我得到了这个疯狂的错误:

AttributeError: 'NoneType' object has no attribute '_jvm'

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:317)
at org.apache.spark.sql.execution.python.PythonUDFRunner

$$ anon$1.read(PythonUDFRunner.scala:83) at org.apache.spark.sql.execution.python.PythonUDFRunner $$

anon$1.read(PythonUDFRunner.scala:66)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:271)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator

$$ anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator $$

anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator

$$ anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec $$

anonfun$10

$$ anon$1.hasNext(WholeStageCodegenExec.scala:620) at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:49) at org.apache.spark.sql.execution.collect.Collector $$

anonfun$2.apply(Collector.scala:126)
at org.apache.spark.sql.execution.collect.Collector

$$ anonfun$2.apply(Collector.scala:125) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:384) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler $$

failJobAndIndependentStages(DAGScheduler.scala:1747)
at org.apache.spark.scheduler.DAGScheduler

$$ anonfun$abortStage$1.apply(DAGScheduler.scala:1735) at org.apache.spark.scheduler.DAGScheduler $$

anonfun$abortStage$1.apply(DAGScheduler.scala:1734)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1734)
at org.apache.spark.scheduler.DAGScheduler

$$ anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:962) at org.apache.spark.scheduler.DAGScheduler $$

anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:962)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:962)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1970)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1918)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1906)
at org.apache.spark.util.EventLoop

$$ anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2141) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:237) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:247) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:64) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:70) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497) at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:48) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset $$

collectResult(Dataset.scala:2775)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset

$$ collectFromPlan(Dataset.scala:3350) at org.apache.spark.sql.Dataset $$

anonfun$head$1.apply(Dataset.scala:2504)
at org.apache.spark.sql.Dataset

$$ anonfun$head$1.apply(Dataset.scala:2504) at org.apache.spark.sql.Dataset $$

anonfun$53.apply(Dataset.scala:3334)
at org.apache.spark.sql.execution.SQLExecution

$$ anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:89) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:175) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:84) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:126) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3333) at org.apache.spark.sql.Dataset.head(Dataset.scala:2504) at org.apache.spark.sql.Dataset.take(Dataset.scala:2718) at org.apache.spark.sql.Dataset.showString(Dataset.scala:259) at sun.reflect.GeneratedMethodAccessor472.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 262, in main process() File "/databricks/spark/python/pyspark/worker.py", line 257, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/worker.py", line 183, in func = lambda _, it: map(mapper, it) File "", line 1, in File "/databricks/spark/python/pyspark/worker.py", line 77, in return lambda *a: toInternal(f(*a)) File "/databricks/spark/python/pyspark/util.py", line 55, in wrapper return f(*args, **kwargs) File "", line 9, in check_if_sub_connected File "/databricks/spark/python/pyspark/sql/functions.py", line 1045, in datediff return Column(sc._jvm.functions.datediff(_to_java_column(end), _to_java_column(start))) AttributeError: 'NoneType' object has no attribute '_jvm' at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:317) at org.apache.spark.sql.execution.python.PythonUDFRunner $$

anon$1.read(PythonUDFRunner.scala:83)
at org.apache.spark.sql.execution.python.PythonUDFRunner

$$ anon$1.read(PythonUDFRunner.scala:66) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:271) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator $$

anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator

$$ anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator $$

anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage17.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec

$$ anonfun$10 $$

anon$1.hasNext(WholeStageCodegenExec.scala:620)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:49)
at org.apache.spark.sql.execution.collect.Collector

$$ anonfun$2.apply(Collector.scala:126) at org.apache.spark.sql.execution.collect.Collector $$

anonfun$2.apply(Collector.scala:125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:384)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
udf中是否不允许使用datediff函数?或者这是某种导入错误?我正在使用最新版本在数据库上运行spark。

展开
收起
社区小助手 2018-12-21 13:50:52 8637 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    def check_if_sub_connected(created_at, expiration_array):
    if not expiration_array:

    return []

    else:

    close_to_array = []
    for i in expiration_array:
      if created_at - i < pd.Timedelta(days=45):
        if created_at - i > pd.Timedelta(days=-45):
          close_to_array.append(i)
    return close_to_array
    

    check_if_sub_connected = udf(check_if_sub_connected, ArrayType(TimestampType()))
    pyspark.sql.functions不能在UDF中工作。因此,这是我的替代解决方案。

    2019-07-17 23:23:25
    赞同 展开评论 打赏
问答标签:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
JVM实战 立即下载
JVM的GC 立即下载
基于JVM的脚本语言开发、运用实践 立即下载