开发者社区> 问答> 正文

spark UDF结果可以做'show',但不能做'filter''

UDF当我这样做时show(),spark会起作用,但是当我filter对UDF结果做出反应时它会给我错误 。

udf功能

def chkInterPunctuation(sent) :

for char in sent[1:-2] : 
    if char in ["\"", "'", ".", "!", "?"] :
        return True
return False

cip = udf(chkInterPunctuation, BooleanType())
show() 工作

df_punct = dfs.withColumn("in_length", length("input")).\
withColumn("out_length", length("output")).withColumn("cip", cip(col("input")))
df_punct.show()

但是当我这样做时,它给了我错误 filter

df_punct.where(col("cip") == True).show()
这些都是filter错误的


Py4JJavaError Traceback (most recent call last)
in ()
----> 1 df_punct.where(col("cip") == True).collect()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in collect(self)

308         """
309         with SCCallSiteSync(self._sc) as css:

--> 310 port = self._jdf.collectToPython()

311         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
312 

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)

931         answer = self.gateway_client.send_command(command)
932         return_value = get_return_value(

--> 933 answer, self.gateway_client, self.target_id, self.name)

934 
935         for temp_arg in temp_args:

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(a, *kw)

 61     def deco(*a, **kw):
 62         try:

---> 63 return f(a, *kw)

 64         except py4j.protocol.Py4JJavaError as e:
 65             s = e.java_exception.toString()

/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)

310                 raise Py4JJavaError(
311                     "An error occurred while calling {0}{1}{2}.\n".

--> 312 format(target_id, ".", name), value)

313             else:
314                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3378.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 40 in stage 238.0 failed 1 times, most recent failure: Lost task 40.0 in stage 238.0 (TID 8862, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main

process()

File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process

serializer.dump_stream(func(split_index, iterator), outfile)

File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in

func = lambda _, it: map(mapper, it)

File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in

mapper = lambda a: udf(*a)

File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in

return lambda *a: f(*a)

File "", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

at org.apache.spark.api.python.PythonRunner

$$ anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner $$

anon$1.(PythonRDD.scala:234)

at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec

$$ anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124) at org.apache.spark.sql.execution.python.BatchEvalPythonExec $$

anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)

at org.apache.spark.rdd.RDD

$$ anonfun$mapPartitions$1 $$

anonfun$apply$23.apply(RDD.scala:766)

at org.apache.spark.rdd.RDD

$$ anonfun$mapPartitions$1 $$

anonfun$apply$23.apply(RDD.scala:766)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler

$$ failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler $$

anonfun$abortStage$1.apply(DAGScheduler.scala:1438)

at org.apache.spark.scheduler.DAGScheduler

$$ anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler $$

anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)

at org.apache.spark.scheduler.DAGScheduler

$$ anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop $$

anon$1.run(EventLoop.scala:48)

at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD

$$ anonfun$collect$1.apply(RDD.scala:893) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.collect(RDD.scala:892) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453) at org.apache.spark.sql.Dataset $$

anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2513)

at org.apache.spark.sql.Dataset

$$ anonfun$collectToPython$1.apply(Dataset.scala:2513) at org.apache.spark.sql.Dataset $$

anonfun$collectToPython$1.apply(Dataset.scala:2513)

at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2512)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main

process()

File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process

serializer.dump_stream(func(split_index, iterator), outfile)

File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 106, in

func = lambda _, it: map(mapper, it)

File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in

mapper = lambda a: udf(*a)

File "/home1/irteam/nmt_common/nexus/spark-2.0.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 70, in

return lambda *a: f(*a)

File "", line 5, in chkInterPunctuation
TypeError: 'NoneType' object has no attribute '__getitem__'

at org.apache.spark.api.python.PythonRunner

$$ anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner $$

anon$1.(PythonRDD.scala:234)

at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec

$$ anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124) at org.apache.spark.sql.execution.python.BatchEvalPythonExec $$

anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)

at org.apache.spark.rdd.RDD

$$ anonfun$mapPartitions$1 $$

anonfun$apply$23.apply(RDD.scala:766)

at org.apache.spark.rdd.RDD

$$ anonfun$mapPartitions$1 $$

anonfun$apply$23.apply(RDD.scala:766)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:103)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

我的谷歌搜索表明,py4j当UDF函数没有返回正确的值或有错误时,通常会发生错误。但我UDF function总是回归真假。另外,当我显示时,spark查询返回正确的值。这对我来说没有意义。可能的原因是什么?

展开
收起
flink小助手 2018-12-11 17:02:31 2740 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。
    这是因为你没有纠正NULL存在。尝试:
    

    def chkInterPunctuation(sent) :

    if not sent: return   # In None return
    for char in sent[1:-2] : 
        if char in ["\"", "'", ".", "!", "?"] :
            return True
    return False
    
    2019-07-17 23:19:55
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载