除了 RDD 的创建和基本操作,还有 RDD 的转换和动作操作可以对 RDD 进行处理和操作。
- RDD 转换操作:
- map(): 针对 RDD 中的每个元素进行转换,返回一个新的 RDD。例如:
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val newRdd = rdd.map(_ * 2)
- filter(): 对 RDD 中的每个元素进行过滤,返回一个新的 RDD。例如:
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val newRdd = rdd.filter(_ % 2 == 0)
- flatMap(): 针对 RDD 中的每个元素进行扁平化操作,返回一个新的 RDD。例如:
val rdd = sc.parallelize(List("apple pear", "banana orange", "watermelon"))
val newRdd = rdd.flatMap(line => line.split(" "))
- groupByKey(): 按照 RDD 中 key 进行分组,在返回一个新的元素为 (K, Iterable[V]) 结构的 RDD。例如:
val rdd = sc.parallelize(Array("Hello World", "Hello Spark", "Hello Java"))
val pairRdd = rdd.flatMap(line => line.split(" ")).map(word => (word, 1))
val groupRdd = pairRdd.groupByKey()
- reduceByKey(): 分组后对每一个 key 的 value 进行 reduce 操作。例如:
// 统计单词数,key 为单词,value 为出现的次数
val rdd = sc.parallelize(Array("Hello World", "Hello Spark", "Hello Java"))
val pairRdd = rdd.flatMap(line => line.split(" ")).map(word => (word, 1))
val result = pairRdd.reduceByKey(_ + _)
- join(): 将两个 RDD 进行 join 操作,返回一个新的 RDD。例如:
val rdd1 = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
val rdd2 = sc.parallelize(Array(("C", 4), ("B", 5), ("D", 6)))
val joinRdd = rdd1.join(rdd2)
- RDD 动作操作:
- count(): 统计 RDD 中元素个数,返回一个 long 类型整数。例如:
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val count = rdd.count()
- collect(): 将 RDD 中所有元素收集到 driver 端,返回一个新的数组。例如:
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val arr = rdd.collect()
- saveAsTextFile(): 将 RDD 中的元素保存到文本文件中,该操作是一个懒操作,只有调用动作操作时才触发计算。例如:
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
rdd.saveAsTextFile("hdfs://output.txt")
- reduce(): 对 RDD 中所有元素进行 reduce 操作,返回一个单独的元素。例如:
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val result = rdd.reduce((a, b) => a + b)
了解 RDD 的转换和动作操作可以帮助开发者更好地处理和操作 RDD 数据。Spark 编程中,针对具体场景选择相应的转换和动作操作可以提高代码效率和准确率。