我使用这里提到的代码在Scala中创建一个HashMap。为方便起见,下面复制粘贴:
def genList(xx: String) = {
Seq("one", "two", "three", "four")
}
val oriwords = Set("hello", "how", "are", "you")
val newMap = (Map[String, (String, Int)]() /: oriwords) (
(cmap, currentWord) => {
val xv = 2
genList(currentWord).foldLeft(cmap) {
(acc, ps) => {
val src = acc get ps
if (src == None) {
acc + (ps -> ((currentWord, xv)))
}
else {
if (src.get._2 < xv) {
acc + (ps -> ((currentWord, xv)))
}
else acc
}
}
}
}
)
println(newMap)
注意:上面的代码适用于较小的oriwords,但是,当oriwords它很大时它不起作用。显然是因为计算发生在Spark驱动程序节点上。
当我运行时,我得到了内存不足异常,如下所示:
WARN HeartbeatReceiver:66 - Removing executor driver with no recent heartbeats: 159099 ms exceeds timeout 120000 ms
Exception in thread "dispatcher-event-loop-1"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "dispatcher-event-loop-1"
Exception in thread "refresh progress" java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
如何强制计算在Spark集群上发生并将生成的HashMap保存在Spark集群本身而不是计算并保存在Spark驱动程序节点上?
事件需要在RDD,数据集,Dataframe等为spark 分布你的计算。基本上所有的事情都发生在驱动程序上,除了那些在HoFs中发生的事件,比如map和foreach。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。