我无法执行下面的代码。此代码尝试使用SparkContext runJob()方法内的SparkSession从hive表执行hive查询。
val lines = sparkSession.sparkContext.parallelize(Seq("hello world"),1)
sparkSession.sparkContext.runJob(lines, (t: TaskContext, it: Iterator[String]) => {
val conf = new SparkConf().setAppName("Testing")
val session = SparkSession.builder().master("local[*]").config(conf).enableHiveSupport().getOrCreate()
var df : DataFrame= session.sql(s"select count(*) from employee")
println("Task attempt id => " + t.taskAttemptId() + " and count:" + df.first)
})
I am able to get the result in local IntelliJ. However when I try to run the JAR in "yarn cluster" mode, I am getting the below exception:-
18/12/17 12:18:27 ERROR Executor: Exception in task 0.3 in stage 0.0 (TID 3)
java.io.StreamCorruptedException: invalid type code: 00
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at org.apache.spark.rdd.ParallelCollectionPartition
$$ anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.rdd.ParallelCollectionPartition $$
anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
at org.apache.spark.rdd.ParallelCollectionPartition
$$ anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:298) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: java.util.NoSuchElementException: key not found: 0 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:147) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198) at org.apache.spark.rdd.RDD $$
anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD
$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$
anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD
$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$
anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD
$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.UnionRDD $$
anonfun$1.apply(UnionRDD.scala:84)
at org.apache.spark.rdd.UnionRDD
$$ anonfun$1.apply(UnionRDD.scala:84) at scala.collection.TraversableLike $$
anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike
$$ anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84) at org.apache.spark.rdd.RDD $$
anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD
$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$
anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD
$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$
anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD
$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$
anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD
$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.ShuffleDependency.(Dependency.scala:91) at org.apache.spark.sql.execution.exchange.ShuffleExchange$.prepareShuffleDependency(ShuffleExchange.scala:261) at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:84) at org.apache.spark.sql.execution.exchange.ShuffleExchange $$
anonfun$doExecute$1.apply(ShuffleExchange.scala:121)
at org.apache.spark.sql.execution.exchange.ShuffleExchange
$$ anonfun$doExecute$1.apply(ShuffleExchange.scala:112) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 38 more Caused by: java.util.NoSuchElementException: key not found: 0 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.mutable.HashMap.apply(HashMap.scala:65) at org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:450) at org.apache.spark.broadcast.TorrentBroadcast $$
anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
如何解决这个问题?
这在技术上是不可能的,Spark SQL / Spark Core不支持它。
Spark SQL中的Hive查询被转换为RDD,RDD本身就是分布式计算的描述。
SparkContext.runJob 是作为Spark作业触发分布式计算(它本身就是一组任务)。
看到区别?前者是描述,而后者是执行。它们是不同的,不会一起工作。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。