RDD (Resilient Distributed Dataset) 是 Spark 的核心抽象,是一种分布式、容错、可并行计算的数据结构。在 Spark 中,RDD 可以通过以下几种方式进行创建:
- 并行化现有集合:
可以使用 SparkContext 的 parallelize() 方法将已有的集合并行化为 RDD。例如:
val list = List(1, 2, 3, 4, 5)
val rdd = sc.parallelize(list)
- 从外部存储创建:
Spark 支持多种外部存储格式,如 Hadoop FileSystem、HBase、Cassandra、Amazon S3 等。可以使用 SparkContext 的各种文件系统 API,如 textFile() 方法、hadoopFile() 方法、sequenceFile() 方法等从外部存储创建 RDD。例如:
val rdd = sc.textFile("hdfs://input.txt")
- 转换一个已有的 RDD:
可以通过对一个已有的 RDD 进行转换来创建一个新的 RDD。例如:
val rdd = sc.textFile("hdfs://input.txt")
val newRdd = rdd.filter(line=> line.contains("Spark"))
在创建并获得 RDD 后,我们可以对 RDD 执行各种转换和动作操作来处理数据。RDD 常用的基本操作包括:
- map():
对 RDD 中的每一个元素进行映射,返回一个新的 RDD。例如:
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val newRdd = rdd.map(_ * 2)
- filter():
对 RDD 中的每一个元素进行过滤,返回一个新的 RDD。例如:
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val newRdd = rdd.filter(_ % 2 == 0)
- reduce():
将 RDD 中的每一个元素聚合成单个值。例如:
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val result = rdd.reduce((a, b) => a + b)
- flatMap():
对 RDD 中的每一个元素进行扁平化处理,返回一个新的 RDD,通常用于将一个元素映射为多个元素。例如:
val rdd = sc.parallelize(List("apple pear", "banana orange", "watermelon"))
val newRdd = rdd.flatMap(line => line.split(" "))
- union():
将多个 RDD 合并成一个 RDD,并去除重复元素。例如:
val rdd1 = sc.parallelize(List(1, 2, 3))
val rdd2 = sc.parallelize(List(3, 4, 5))
val unionRdd = rdd1.union(rdd2)
- distinct():
去除 RDD 中重复的元素,返回一个新的 RDD。例如:
val rdd = sc.parallelize(List(1, 2, 3, 3, 4, 5, 5))
val newRdd = rdd.distinct()
了解 RDD 的创建和基本操作,是 Spark 编程的基础。熟练掌握这些操作,可以进行更高效、更灵活的数据处理。