[Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class test.Main, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic test/Main.lambda$main$49bd2722$1:(Lcom/opencsv/CSVParser;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class test.Main
$$ Lambda$19/429639728, test.Main $$
Lambda$19/429639728@72456279)
- field (class: org.apache.spark.api.java.JavaPairRDD
$$ anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD $$
anonfun$toScalaFunction$1, )
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 12 more
似乎Spark尝试序列化lambda表达式,不知何故lamba表达式保持引用parser导致上述错误。
问题是:有没有办法避免该异常并在传递给Spark的lambda表达式中使用不可序列化的库?我真的不想实现自己的csv解析器。
Spark支持开箱即用的CSV文件
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Dataset;
Dataset df = spark.read().format("csv")
.option("sep", ";")
.option("header", "true") //or "false" if no headers
.load("filename.csv");
编辑(提升评论到主要答案)
如果你真的需要它,你可以从DataFrame获取RDD,df.javaRDD() 尽管最好使用DataSet / DataFrame API
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。