返回第三章
第四个filter:过滤 RDD
在今天的最后,我们再来学习一下,与 map 一样常用的算子:filter。filter,顾名思义,这个算子的作用,是对 RDD 进行过滤。就像是 map 算子依赖其映射函数一样,filter 算子也需要借助一个判定函数 f,才能实现对 RDD 的过滤转换。所谓判定函数,它指的是类型为(RDD 元素类型) => (Boolean)的函数。可以看到,判定函数 f 的形参类型,必须与 RDD 的元素类型保持一致,而 f 的返回结果,只能是 True 或者 False。在任何一个 RDD 之上调用 filter(f),其作用是保留 RDD 中满足 f(也就是 f 返回 True)的数据元素,而过滤掉不满足 f(也就是 f 返回 False)的数据元素。老规矩,我们还是结合示例来讲解 filter 算子与判定函数 f。在上面 flatMap 例子的最后,我们得到了元素为相邻词汇对的 wordPairRDD,它包含的是像“Spark-is”、“is-cool”这样的字符串。为了仅保留有意义的词对元素,我们希望结合标点符号列表,对 wordPairRDD 进行过滤。例如,我们希望过滤掉像“Spark-&”、“|-data”这样的词对。掌握了 filter 算子的用法之后,要实现这样的过滤逻辑,我相信你很快就能写出如下的代码实现:
// 定义特殊字符列表 val list: List[String] = List("&", "|", "#", "^", "@") // 定义判定函数f def f(s: String): Boolean = { val words: Array[String] = s.split("-") val b1: Boolean = list.contains(words(0)) val b2: Boolean = list.contains(words(1)) return !b1 && !b2 // 返回不在特殊字符列表中的词汇对 } // 使用filter(f)对RDD进行过滤 val cleanedPairRDD: RDD[String] = wordPairRDD.filter(f)
掌握了 filter 算子的用法之后,你就可以定义任意复杂的判定函数 f,然后在 RDD 之上通过调用 filter(f) 去变着花样地做数据过滤,从而满足不同的业务需求。
总结
首先,我们讲了 map 算子的用法,它允许开发者自由地对 RDD 做各式各样的数据转换,给定映射函数 f,map(f) 以元素为粒度对 RDD 做数据转换。其中 f 可以是带名函数,也可以是匿名函数,它的形参类型必须与 RDD 的元素类型保持一致,而输出类型则任由开发者自行决定。为了提升数据转换的效率,Spark 提供了以数据分区为粒度的 mapPartitions 算子。
mapPartitions 的形参是代表数据分区的 partition,它通过在 partition 之上再次调用 map(f) 来完成数据的转换。相比 map,mapPartitions 的优势是以数据分区为粒度初始化共享对象,这些共享对象在我们日常的开发中很常见,比如数据库连接对象、S3 文件句柄、机器学习模型等等。紧接着,我们介绍了 flatMap 算子。flatMap 的映射函数 f 比较特殊,它的函数类型是(元素) => (集合),这里集合指的是像数组、列表这样的数据结构。
因此,flatMap 的映射过程在逻辑上分为两步,这一点需要你特别注意:以元素为单位,创建集合;去掉集合“外包装”,提取集合元素。最后,我们学习了 filter 算子,filter 算子的用法与 map 很像,它需要借助判定函数 f 来完成对 RDD 的数据过滤。判定函数的类型必须是(RDD 元素类型) => (Boolean),也就是形参类型必须与 RDD 的元素类型保持一致,返回结果类型则必须是布尔值。
RDD 中的元素是否能够得以保留,取决于判定函数 f 的返回值是 True 还是 False。
虽然今天我们只学了 4 个算子,但这 4 个算子在日常开发中的出现频率非常之高。掌握了这几个简单的 RDD 算子,你几乎可以应对 RDD 中 90% 的数据转换场景。希望你对这几个算子多多加以练习,从而在日常的开发工作中学以致用