val spark = SparkSession.builder()
.master("local").appName("DatasetApp")
.getOrCreate()
Spark SQL支持两种不同方法将现有RDD转换为DataFrame:
1 反射推断
包含特定对象类型的 RDD 的schema。
这种基于反射的方法可以使代码更简洁,在编写 Spark 应用程序时已知schema时效果很好
val peopleRDD: RDD[String] = spark.sparkContext.textFile(
"/Users/javaedge/Downloads/sparksql-train/data/people.txt")
// RDD => DF
// RDD
val peopleDF: DataFrame = peopleRDD.map(_.split(","))
// RDD
.map(x => People(x(0), x(1).trim.toInt))
// DF
.toDF()
2 通过编程接口
构造一个schema,然后将其应用到现有的 RDD。
虽然这种方法更冗长,但它允许在运行时构造 Dataset,当列及其类型直到运行时才知道时很有用。
step1
val peopleRDD: RDD[String] = spark.sparkContext.textFile(
"/Users/javaedge/Downloads/sparksql-train/data/people.txt")
// RDD
val peopleRowRDD: RDD[Row] = peopleRDD.map(_.split(","))
.map(x => Row(x(0), x(1).trim.toInt))
step2
val struct = StructType(
StructField("name", StringType, nullable = true) ::
StructField("age", IntegerType, nullable = false) :: Nil)
step3
使用SparkSession的createDataFrame方法将RDD转换为DataFrame
val peopleDF: DataFrame = spark.createDataFrame(peopleRowRDD, struct)
peopleDF.show()