Spark SQL编写流程
1. SparkSession对象
封装了spark sql执行环境信息,是所有Spark SQL程序的唯一入口
sparkSession中包含 sparkContext和sqlContext两个对象,不用再自己创建sparkContext
2. 创建DataFrame或Dataset
Spark SQL支持各种数据源
3. 在DataFrame或Dataset之上进行transformation和action
Spark SQL提供了多种transformation和action函数
4. 返回结果
保存到HDFS中,或直接打印出来
val spark=SparkSession.build
.master("local")
.appName("appName")
.getOrCreate()
// 注意,后面所有程序片段总的spark变量均值SparkSession
// 将RDD隐式转换为DataFrame
import spark.implicits._
DataFrame与Dataset
1. DataFrame=Dataset[Row]
row表示一行数据,例如Row=["1",23,44]
RDD /DataFream/Dataset 之间可以相互转化
2. DataFream
内部数据无类型统一为row
DataFream是一种特殊的Dataset
3. Dataset
内部数据有类型,需要用户自己设定
Spark SQL数据源
RDD->DataFrame:反射方式
1.定义case class ,作为RDD的schema
2. 直接通过RDD.toDF,将RDD转为DataFream
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
case class User(userID:Long,gennder:String, age: Int, occupation: String, zipcode: String)
val userRdd=sc.textFile("//")
val userRDD=usrRdd.map(_.split("::")).map(p=>User(p(0).ToLong,p(1).ToStringp(2).toInt, p(3), p(4)))
val userDF=userRDD.toDF()
userDF.count()
json->DataFrame
val userjson=spark.read.format("json").load("//user.json") //一行json数据要在一行
userjson.take(10)
val userjson=spark.read.json("")
userjson.take(10)
text->DataSet
val userDS=spark.read.textFile("").map(_.split(":"))
RDD、DataFrame与Dataset的关系
val ds = df.as[Person] // DataFrame -> Dataset
val df2 = ds.toDF() / Dataset -> DataFrame
val rdd1 = ds.rdd // Dataset -> RDD
val rdd2 = df.rdd // DataFrame -> RDD