在给出异常的RDD上执行combineByKey转换。Spark转换-问答-阿里云开发者社区-阿里云

在给出异常的RDD上执行combineByKey转换。Spark转换

def createComb = (t:Array[String]) => {
val total = t(5).toDouble
val q = t(4).toInt
(total/q, total/q, q, total)}

def mergeValues : ((Double,Double,Int,Double), Array[String]) =>
(Double,Double,Int,Double) =
{case((mx,mn,q,tot),t) =>{
val total = t(5).toDouble
val quan = t(4).toInt
val mxx = scala.math.max(mx, total/q)
val minn = scala.math.min(mn, total/q)
(mxx,minn,quan+q,total+tot)}}

def mergeComb:((Double,Double,Int,Double),(Double,Double,Int,Double)) =>
(Double,Double,Int,Double) =
{ case((mx1,mn1,q1,tot1),(mx2,mn2,q2,tot2)) =>
(scala.math.max(mx1,mx2), scala.math.min(mn1,mn2), q1+q2, tot1+tot2)}

val statsOfCust = productsTotalByKey.combineByKey(createComb, mergeValues, mergeComb, new org.apache.spark.HashPartitioner(productsTotalByKey.partitions.size))

scala> statsOfCust.first
[Stage 22:> (0 + 1) / 2]18/11/17 21:26:31 WARN TaskSetManager: Lost task 0.0 in stage 22.0 (TID 26, wn01.itversity.com, executor 9): java.lang.ArrayIndexOutOfBoundsException: 5

``at \$line80.\$read``

\$\$ iw \$\$

iw

\$\$ iw \$\$

iw

\$\$ iw \$\$

iw

\$\$ iw \$\$

iw

iw

\$\$ iw \$\$

iw

\$\$ iw \$\$

iw

\$\$ iw \$\$

iw

\$\$ iw \$\$

anonfun\$createComb\$1.apply(:23)

``at org.apache.spark.util.collection.ExternalSorter``

\$\$ anonfun\$5.apply(ExternalSorter.scala:189) at org.apache.spark.util.collection.ExternalSorter \$\$

anonfun\$5.apply(ExternalSorter.scala:188)

``````at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org\$apache\$spark\$scheduler\$DAGScheduler

\$\$ failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler \$\$

anonfun\$abortStage\$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler

\$\$ anonfun\$abortStage\$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray\$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler \$\$

at org.apache.spark.scheduler.DAGScheduler

\$\$ anonfun\$handleTaskSetFailed\$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at at org.apache.spark.rdd.RDDOperationScope\$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope\$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.take(RDD.scala:1331) at org.apache.spark.rdd.RDD \$\$

anonfun\$first\$1.apply(RDD.scala:1372)
at org.apache.spark.rdd.RDDOperationScope\$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope\$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.first(RDD.scala:1371)
... 49 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
at \$anonfun\$createComb\$1.apply(:24)
at \$anonfun\$createComb\$1.apply(:23)
at org.apache.spark.util.collection.ExternalSorter

\$\$ anonfun\$5.apply(ExternalSorter.scala:189) at org.apache.spark.util.collection.ExternalSorter \$\$

anonfun\$5.apply(ExternalSorter.scala:188)
at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at org.apache.spark.util.EventLoop

\$\$ anon\$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074) at org.apache.spark.rdd.RDD \$\$

anonfun\$take\$1.apply(RDD.scala:1358)

1 条回答

• 社区小助手是spark中国社区的管理员，我会定期更新直播回顾等资料和文章干货，还整合了大家在钉群提出的有关spark的问题及回答。

看起来像createComb方法的问题是你假设t数组至少有6个元素。

这只是一个快速的geuss。如果有帮助，请告诉我。如果没有，我会尝试进一步调查:)

2019-07-17 23:20:11
赞同 展开评论 打赏

76
1
0
77
1
0
68
1
0
98
1
0
99
0
0
88
1
0
81
1
0
145
1
0
167
1
0
178
1
0

R AND SPARK

Spark Autotuning