开发者社区> 问答> 正文

为什么这个Scala代码显然没有在Spark工作器上运行,而只在Spark驱动程序节点上运行?

我使用这里提到的代码在Scala中创建一个HashMap。为方便起见,下面复制粘贴:

def genList(xx: String) = {

Seq("one", "two", "three", "four")

}

val oriwords = Set("hello", "how", "are", "you")

val newMap = (Map[String, (String, Int)]() /: oriwords) (

(cmap, currentWord) => {
  val xv = 2

  genList(currentWord).foldLeft(cmap) {
    (acc, ps) => {
      val src = acc get ps

      if (src == None) {
        acc + (ps -> ((currentWord, xv)))
      }
      else {
        if (src.get._2 < xv) {
          acc + (ps -> ((currentWord, xv)))
        }
        else acc
      }

    }
  }
}

)

println(newMap)
注意:上面的代码适用于较小的oriwords,但是,当oriwords它很大时它不起作用。显然是因为计算发生在Spark驱动程序节点上。

当我运行时,我得到了内存不足异常,如下所示:

WARN HeartbeatReceiver:66 - Removing executor driver with no recent heartbeats: 159099 ms exceeds timeout 120000 ms
Exception in thread "dispatcher-event-loop-1"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "dispatcher-event-loop-1"
Exception in thread "refresh progress" java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
如何强制计算在Spark集群上发生并将生成的HashMap保存在Spark集群本身而不是计算并保存在Spark驱动程序节点上?

展开
收起
社区小助手 2018-12-12 13:29:39 2634 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    事件需要在RDD,数据集,Dataframe等为spark 分布你的计算。基本上所有的事情都发生在驱动程序上,除了那些在HoFs中发生的事件,比如map和foreach。

    2019-07-17 23:20:07
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spark Streaming At Bing Scale 立即下载
Apache Spark: Cloud and On-Prem 立即下载
JDK8新特性与生产-for“华东地区scala爱好者聚会” 立即下载