我一直在谷歌或Stackoverflow上搜索一个星期,仍然无法找到一个好的答案。
我有一个化合物数据集,我需要使用第三方Jar来读取SDF中的这些化合物(类似JSON的数据格式)。然后我必须计算不同化合物之间的相似性。读取和计算需要非常复杂的化学细节,所以我不能自己重现这个功能。也就是说,我必须使用第三方Jar在Spark上的映射函数中运行该函数。Jar文件名为JCompoundMapper。它使用DFS算法迭代地读取原子键,看起来非常复杂。无论如何,这个线程不是关于阅读化合物。但是关于如何在Spark上映射第三方jar。当我尝试这样做时,我遇到的任务不是可序列化的问题:
mport de.zbit.jcmapper.distance.DistanceTanimoto
import de.zbit.jcmapper.distance.IDistanceMeasure
import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
import de.zbit.jcmapper.fingerprinters.features.FeatureMap
import de.zbit.jcmapper.fingerprinters.features.IFeature
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
import de.zbit.jcmapper.io.writer.ExporterLinear
import de.zbit.jcmapper.io.writer.ExporterSDFProperty
import java.io.FileWriter
import java.util.List
import java.io.File
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
val func = combinations.map(x => {
getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 48 elided
Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`
我读了其他线程并理解我必须将变量和函数放在一个对象中以使其可序列化。但是,如果我这样做,我遇到了空指针异常错误:
object Holder{
val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
val similarity: IDistanceMeasure = new DistanceTanimoto()
val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
def getSimilarity( id1:Int, id2:Int ) : Double = {
val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
featureMaps.add(new FeatureMap(rawFeatures))
featureMaps.add(new FeatureMap(rawFeatures2))
val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
return temp
}
val func = combinations.map(x => {
Holder.getSimilarity(0, 1)
}).take(5)
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`
对于阅读部分,我可以使用巨大的LinkedHashMap并存储所有化合物。但是,我必须使用getSimilarity()函数来使用第三方jar计算相似度。因此,即使我只使用getSimilarity()函数,如果我把它放在一个对象中,我也有空指针异常。如果我把它放在对象之外,那我的任务就不是可序列化的问题。因此,我有几个问题,我无法找到一个好的答案:
Spark是否支持将第三方Jar映射到每个执行程序?在读者文件中,Spark是否将读取器类分发到每个执行器中并单独读取文件或作为整体读取文件,然后将文件分发到每个执行器上的小块中?
为什么它显示空指针异常问题?似乎该对象确实解决了序列化问题,但没有解决空指针异常。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。