开发者社区> 问答> 正文

ExternalSorter的insertAll方法中定义的aggregator执行过程是什么?

ExternalSorter的insertAll方法中定义的aggregator执行过程是什么?

展开
收起
游客fbdr25iajcjto 2021-12-06 21:30:57 640 0
1 条回答
写回答
取消 提交回答
  • 1) 由于设置了聚合函数aggregator,则从聚合函数获取mergeValue(word count例子中为Function2)、createCombiner(word count例子中为PairFunction)等函数。

    2) 定义update偏函数,此函数用于操作mergeValue和createCombiner。

    3) 迭代之前创建的iterator,每读取一条Product2[K, V],将每行字符串按照空格划分,并且给每个文本设置1,比如(#,1)、(Apache,1)、(Spark,1)...。

    4) 以(分区索引,Produce2[K,V]._1)为参数调用SizeTrackingAppendOnlyMap的changeValue函数,与update函数配合,按照key值叠加value。

    5) 调用mapbeSpillCollection方法,来处理SizeTrackingAppendOnlyMap溢出(当SizeTrackingAppendOnlyMap的大小超过myMemoryThreshold时,将集合中的数据写入磁盘并新建SizeTrackingAppendOnlyMap)。这样做是为了防止内存溢出,解决了Spark早期版本shuffle的内存撑爆问题。

    2021-12-06 21:31:27
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
建立联系方法之一 立即下载
fibjs 模块重构从回调到协程--陈垒 立即下载
继承与功能组合 立即下载