[Spark精进]必须掌握的4个RDD算子之map算子

简介: [Spark精进]必须掌握的4个RDD算子之map算子

序章

第一个map. 以元素为粒度的数据转换

我们先来说说 map 算子的用法:给定映射函数 f,map(f) 以元素为粒度对 RDD 做数据转换。其中 f 可以是带有明确签名的带名函数,也可以是匿名函数,它的形参类型必须与 RDD 的元素类型保持一致,而输出类型则任由开发者自行决定。

我们使用如下代码,把包含单词的 RDD 转换成元素为(Key,Value)对的 RDD,后者统称为 Paired RDD。

// 把普通RDD转换为Paired RDD
val cleanWordRDD: RDD[String] = _ 
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

在上面的代码实现中,传递给 map 算子的形参,即:word => (word,1),就是我们上面说的映射函数 f。只不过,这里 f 是以匿名函数的方式进行定义的,其中左侧的 word 表示匿名函数 f 的输入形参,而右侧的(word,1)则代表函数 f 的输出结果。

如果我们把匿名函数变成带名函数的话,可能你会看的更清楚一些。这里我用一段代码重新定义了带名函数 f。

// 把RDD元素转换为(Key,Value)的形式
// 定义映射函数f
def f(word: String): (String, Int) = {
return (word, 1)
}
val cleanWordRDD: RDD[String] = _ 
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)

可以看到,我们使用 Scala 的 def 语法,明确定义了带名映射函数 f,它的计算逻辑与刚刚的匿名函数是一致的。在做 RDD 数据转换的时候,我们只需把函数 f 传递给 map 算子即可。不管 f 是匿名函数,还是带名函数,map 算子的转换逻辑都是一样的,你不妨把以上两种实现方式分别敲入到 spark-shell,去验证执行结果的一致性。

到这里为止,我们就掌握了 map 算子的基本用法。现在你就可以定义任意复杂的映射函数 f,然后在 RDD 之上通过调用 map(f) 去翻着花样地做各种各样的数据转换。

比如,通过定义如下的映射函数 f,我们就可以改写 Word Count 的计数逻辑,也就是把“Spark”这个单词的统计计数权重提高一倍:

// 把RDD元素转换为(Key,Value)的形式
// 定义映射函数f
def f(word: String): (String, Int) = {
if (word.equals("Spark")) { return (word, 2) }
return (word, 1)
}
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)

尽管 map 算子足够灵活,允许开发者自由定义转换逻辑。不过,就像我们刚刚说的,map(f) 是以元素为粒度对 RDD 做数据转换的,在某些计算场景下,这个特点会严重影响执行效率。为什么这么说呢?我们来看一个具体的例子。

比方说,我们把 Word Count 的计数需求,从原来的对单词计数,改为对单词的哈希值计数,在这种情况下,我们的代码实现需要做哪些改动呢?我来示范一下:

// 把普通RDD转换为Paired RDD
import java.security.MessageDigest
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
  // 获取MD5对象实例
  val md5 = MessageDigest.getInstance("MD5")
  // 使用MD5计算哈希值
  val hash = md5.digest(word.getBytes).mkString
  // 返回哈希值与数字1的Pair
  (hash, 1)
}

由于 map(f) 是以元素为单元做转换的,那么对于 RDD 中的每一条数据记录,我们都需要实例化一个 MessageDigest 对象来计算这个元素的哈希值。

在工业级生产系统中,一个 RDD 动辄包含上百万甚至是上亿级别的数据记录,如果处理每条记录都需要事先创建 MessageDigest,那么实例化对象的开销就会聚沙成塔,不知不觉地成为影响执行效率的罪魁祸首。

那么问题来了,有没有什么办法,能够让 Spark 在更粗的数据粒度上去处理数据呢?还真有,mapPartitions 和 mapPartitionsWithIndex 这对“孪生兄弟”就是用来解决类似的问题。相比 mapPartitions,mapPartitionsWithIndex 仅仅多出了一个数据分区索引,因此接下来我们把重点放在 mapPartitions 上面。

点击跳转到下一讲


目录
相关文章
|
1月前
|
SQL 消息中间件 分布式计算
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(一)
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(一)
38 5
|
1月前
|
分布式计算 大数据 数据处理
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(二)
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(二)
38 4
|
1月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
35 4
|
1月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
41 4
|
1月前
|
JSON 分布式计算 大数据
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
28 1
|
1月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
35 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
28 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
42 0
|
1月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
30 0
|
1月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
40 0