返回第二章
第三个flatMap:从元素到集合、再从集合到元素
flatMap 其实和 map 与 mapPartitions 算子类似,在功能上,与 map 和 mapPartitions 一样,flatMap 也是用来做数据映射的,在实现上,对于给定映射函数 f,flatMap(f) 以元素为粒度,对 RDD 进行数据转换。不过,与前两者相比,flatMap 的映射函数 f 有着显著的不同。对于 map 和 mapPartitions 来说,其映射函数 f 的类型,都是(元素) => (元素),即元素到元素。而 flatMap 映射函数 f 的类型,是(元素) => (集合),即元素到集合(如数组、列表等)。因此,flatMap 的映射过程在逻辑上分为两步:
- 以元素为单位,创建集合;
- 去掉集合“外包装”,提取集合元素。
这么说比较抽象,我们还是来举例说明。假设,我们再次改变 Word Count 的计算逻辑,由原来统计单词的计数,改为统计相邻单词共现的次数,如下图所示:
对于这样的计算逻辑,我们该如何使用 flatMap 进行实现呢?这里我们先给出代码实现,然后再分阶段地分析 flatMap 的映射过程:
// 读取文件内容 val lineRDD: RDD[String] = _ // 请参考第一讲获取完整代码 // 以行为单位提取相邻单词 val wordPairRDD: RDD[String] = lineRDD.flatMap( line => { // 将行转换为单词数组 val words: Array[String] = line.split(" ") // 将单个单词数组,转换为相邻单词数组 for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1) })
在上面的代码中,我们采用匿名函数的形式,来提供映射函数 f。这里 f 的形参是 String 类型的 line,也就是源文件中的一行文本,而 f 的返回类型是 Array[String],也就是 String 类型的数组。在映射函数 f 的函数体中,我们先用 split 语句把 line 转化为单词数组,然后再用 for 循环结合 yield 语句,依次把单个的单词,转化为相邻单词词对。注意,for 循环返回的依然是数组,也即类型为 Array[String]的词对数组。由此可见,函数 f 的类型是(String) => (Array[String]),也就是刚刚说的第一步,从元素到集合。
但如果我们去观察转换前后的两个 RDD,也就是 lineRDD 和 wordPairRDD,会发现它们的类型都是 RDD[String],换句话说,它们的元素类型都是 String。回顾 map 与 mapPartitions 这两个算子,我们会发现,转换前后 RDD 的元素类型,与映射函数 f 的类型是一致的。但在 flatMap 这里,却出现了 RDD 元素类型与函数类型不一致的情况。这是怎么回事呢?
其实呢,这正是 flatMap 的“奥妙”所在,为了让你直观地理解 flatMap 的映射过程,我画了一张示意图,如下所示:
不难发现,映射函数 f 的计算过程,对应着图中的步骤 1 与步骤 2,每行文本都被转化为包含相邻词对的数组。紧接着,flatMap 去掉每个数组的“外包装”,提取出数组中类型为 String 的词对元素,然后以词对为单位,构建新的数据分区,如图中步骤 3 所示。
这就是 flatMap 映射过程的第二步:去掉集合“外包装”,提取集合元素。
得到包含词对元素的 wordPairRDD 之后,我们就可以沿用 Word Count 的后续逻辑,去计算相邻词汇的共现次数。你不妨结合文稿中的代码与第一讲中 Word Count 的代码,去实现完整版的“相邻词汇计数统计”。