如何在sparkcontext.parallelize(...)。map()中执行Hive查询?-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

如何在sparkcontext.parallelize(...)。map()中执行Hive查询?

2018-12-19 17:08:05 3527 1

我无法执行下面的代码。此代码尝试使用SparkContext runJob()方法内的SparkSession从hive表执行hive查询。

val lines = sparkSession.sparkContext.parallelize(Seq("hello world"),1)
sparkSession.sparkContext.runJob(lines, (t: TaskContext, it: Iterator[String]) => {

val conf = new SparkConf().setAppName("Testing")
val session = SparkSession.builder().master("local[*]").config(conf).enableHiveSupport().getOrCreate()

var df : DataFrame= session.sql(s"select count(*) from employee")
println("Task attempt id => " + t.taskAttemptId() + " and count:" + df.first)

})

I am able to get the result in local IntelliJ. However when I try to run the JAR in "yarn cluster" mode, I am getting the below exception:-

18/12/17 12:18:27 ERROR Executor: Exception in task 0.3 in stage 0.0 (TID 3)
java.io.StreamCorruptedException: invalid type code: 00

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at org.apache.spark.rdd.ParallelCollectionPartition

$$ anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.rdd.ParallelCollectionPartition $$

anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)

at org.apache.spark.rdd.ParallelCollectionPartition

$$ anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:298) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: java.util.NoSuchElementException: key not found: 0 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:147) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.UnionRDD $$

anonfun$1.apply(UnionRDD.scala:84)

at org.apache.spark.rdd.UnionRDD

$$ anonfun$1.apply(UnionRDD.scala:84) at scala.collection.TraversableLike $$

anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike

$$ anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.ShuffleDependency.(Dependency.scala:91) at org.apache.spark.sql.execution.exchange.ShuffleExchange$.prepareShuffleDependency(ShuffleExchange.scala:261) at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:84) at org.apache.spark.sql.execution.exchange.ShuffleExchange $$

anonfun$doExecute$1.apply(ShuffleExchange.scala:121)

at org.apache.spark.sql.execution.exchange.ShuffleExchange

$$ anonfun$doExecute$1.apply(ShuffleExchange.scala:112) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 38 more Caused by: java.util.NoSuchElementException: key not found: 0 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.mutable.HashMap.apply(HashMap.scala:65) at org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:450) at org.apache.spark.broadcast.TorrentBroadcast $$

anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)

如何解决这个问题?

取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:23:04

    这在技术上是不可能的,Spark SQL / Spark Core不支持它。

    Spark SQL中的Hive查询被转换为RDD,RDD本身就是分布式计算的描述。

    SparkContext.runJob 是作为Spark作业触发分布式计算(它本身就是一组任务)。

    看到区别?前者是描述,而后者是执行。它们是不同的,不会一起工作。

    0 0
相关问答

40

回答

[@徐雷frank][¥20]什么是JAVA的平台无关性

大河人家 2018-10-29 23:55:20 144722浏览量 回答数 40

162

回答

惊喜翻倍:免费ECS+免费环境配置~!(ECS免费体验6个月活动3月31日结束)

豆妹 2014-10-29 17:52:21 226145浏览量 回答数 162

8

回答

OceanBase 使用动画(持续更新)

mq4096 2019-02-20 17:16:36 337010浏览量 回答数 8

13

回答

[@饭娱咖啡][¥20]我想知道 Java 关于引用那一块的知识

心意乱 2018-10-31 18:44:12 142458浏览量 回答数 13

38

回答

[@饭娱咖啡][¥20]对于慢sql有没有什么比较实用的诊断和处理方法?

江小白太白 2018-10-30 18:47:38 142067浏览量 回答数 38

110

回答

OSS存储服务-客户端工具

newegg11 2012-05-17 15:37:18 295546浏览量 回答数 110

22

回答

爬虫数据管理【问答合集】

我是管理员 2018-08-10 16:37:41 147237浏览量 回答数 22

18

回答

阿里云开放端口权限

xcxx 2016-07-20 15:03:33 646780浏览量 回答数 18

31

回答

[@倚贤][¥20]刚学完html/css/js的新手学习servlet、jsp需要注意哪些问题?

弗洛伊德6 2018-10-27 21:52:43 146040浏览量 回答数 31

42

回答

【精品问答集锦】Python热门问题

小六码奴 2019-05-30 15:27:34 136985浏览量 回答数 42
+关注
社区小助手
社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。
12
文章
824
问答
问答排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载