ExternalSorter的insertAll方法中定义的aggregator执行过程是什么?
1) 由于设置了聚合函数aggregator,则从聚合函数获取mergeValue(word count例子中为Function2)、createCombiner(word count例子中为PairFunction)等函数。
2) 定义update偏函数,此函数用于操作mergeValue和createCombiner。
3) 迭代之前创建的iterator,每读取一条Product2[K, V],将每行字符串按照空格划分,并且给每个文本设置1,比如(#,1)、(Apache,1)、(Spark,1)...。
4) 以(分区索引,Produce2[K,V]._1)为参数调用SizeTrackingAppendOnlyMap的changeValue函数,与update函数配合,按照key值叠加value。
5) 调用mapbeSpillCollection方法,来处理SizeTrackingAppendOnlyMap溢出(当SizeTrackingAppendOnlyMap的大小超过myMemoryThreshold时,将集合中的数据写入磁盘并新建SizeTrackingAppendOnlyMap)。这样做是为了防止内存溢出,解决了Spark早期版本shuffle的内存撑爆问题。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。