一、RDD的生成
- 使用parallelize/makeRDD算子从集合转换而来,常用于测试
- 使用类似textFile()这样的算子从文件系统读取数据形成RDD
- 使用transformation算子转换而来
二、DataFrame的生成
- 直接读取文件系统数据形成
val df = spark.read.format.load()
- RDD转换而来
- DataSet转换而来
三、DataSet的生成
- 直接读取文件系统数据形成
val ds = spark.read.textFile().as[xxx]
- RDD转换而来
- DataFrame转换而来
- Seq序列生成DataSet
package com.kfk.spark.sql import com.kfk.spark.common.CommSparkSessionScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/5 * @time : 4:10 下午 */ object DataSetDemoScala { case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() //使用toDS()函数需要导入隐式转换 import spark.implicits._ val caseClassDS = Seq(Person("Andy", 20), Person("Tom", 30)).toDS() caseClassDS.show() /** * +----+---+ * |name|age| * +----+---+ * |Andy| 20| * | Tom| 30| * +----+---+ */ } }
四、RDD和DataFrame的转换
初始化SparkSession,Spark SQL的统一入口。
Java语言:
package com.kfk.spark.common; import org.apache.spark.sql.SparkSession; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/2 * @time : 10:01 下午 */ public class CommSparkSession { public static SparkSession getSparkSession(){ SparkSession spark = SparkSession.builder() .appName("CommSparkSession") .master("local") .config("spark.sql.warehouse.dir","/Users/caizhengjie/Document/spark/spark-warehouse") .getOrCreate(); return spark; } }
Scala语言:
package com.kfk.spark.common import org.apache.spark.sql.SparkSession /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/2 * @time : 10:02 下午 */ object CommSparkSessionScala { def getSparkSession(): SparkSession ={ val spark = SparkSession .builder .appName("CommSparkSessionScala") .master("local") .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse") .getOrCreate return spark } }
(1)RDD转DataFrame
数据源
Michael, 29 Andy, 30 Justin, 19
使用toDF的方式需要导入隐式转换
import spark.implicits._
import spark.implicits._ val path = CommScala.fileDirPath + "people.txt"; val personDf1 = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => { (x(0),x(1).trim) }).toDF("name", "age") personDf1.show() /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * | Justin| 19| * +-------+---+ */
方案二:通过反射的方式
Java语言实现
Spark SQL支持将JavaBean的RDD自动转换 为DataFrame。
package com.kfk.spark.sql; import java.io.Serializable; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/2 * @time : 10:15 下午 */ public class Person implements Serializable { private String name; private long age; public Person(String name, long age) { this.name = name; this.age = age; } public Person() { } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getAge() { return age; } public void setAge(long age) { this.age = age; } }
package com.kfk.spark.sql; import com.kfk.spark.common.Comm; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/2 * @time : 10:07 下午 */ public class RDDToDFReflectionJava { public static void main(String[] args) { SparkSession spark = CommSparkSession.getSparkSession(); /** * 数据源 * Michael, 29 * Andy, 30 * Justin, 19 */ String filePath = Comm.fileDirPath + "people.txt"; // 将文件转换成rdd JavaRDD<Person> personrdd = spark.read().textFile(filePath).javaRDD().map(line -> { String name = line.split(",")[0]; long age = Long.parseLong(line.split(",")[1].trim()); Person person = new Person(name,age); return person; }); // 通过反射将rdd转换成DataFrame Dataset<Row> personDf = spark.createDataFrame(personrdd,Person.class); personDf.show(); /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * | Justin| 19| * +-------+---+ */ // 创建临时表 personDf.createOrReplaceTempView("person"); Dataset<Row> resultDf = spark.sql("select * from person a where a.age > 20"); resultDf.show(); /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * +-------+---+ */ } }
Scala语言实现
Scala中借助case class使用反射
package com.kfk.spark.sql import com.kfk.spark.common.{CommScala, CommSparkSessionScala} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/3 * @time : 6:32 下午 */ object RDDToDFReflectionScala { case class Person(name : String, age : Long) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession(); /** * 数据源 * Michael, 29 * Andy, 30 * Justin, 19 */ val path = CommScala.fileDirPath + "people.txt"; // 通过case class反射将rdd转换成DataFrame import spark.implicits._ val personDf = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => { Person(x(0),x(1).trim.toLong) }).toDF() personDf.show() // 直接将字段名称传入toDF中转换成DataFrame val personDf1 = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => { (x(0),x(1).trim) }).toDF("name", "age") personDf1.show() /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * | Justin| 19| * +-------+---+ */ // 创建临时表 personDf.createOrReplaceTempView("person") val resultDf = spark.sql("select * from person a where a.age > 20") resultDf.show() /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * +-------+---+ */ } }
方案三:构造Schema的方式
Java语言实现
package com.kfk.spark.sql; import com.kfk.spark.common.Comm; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/3 * @time : 9:32 下午 */ public class RDDToDFProgramJava { public static void main(String[] args) { SparkSession spark = CommSparkSession.getSparkSession(); /** * 数据源 * Michael, 29 * Andy, 30 * Justin, 19 */ String filePath = Comm.fileDirPath + "people.txt"; List<StructField> fields = new ArrayList<StructField>(); StructField structField_Name = DataTypes.createStructField("name",DataTypes.StringType,true); StructField structField_Age = DataTypes.createStructField("age",DataTypes.StringType,true); fields.add(structField_Name); fields.add(structField_Age); // 构造Schema StructType scheme = DataTypes.createStructType(fields); // 将rdd(people)转换成RDD[Row] JavaRDD<Row> personRdd = spark.read().textFile(filePath).javaRDD().map(line ->{ String[] attributes = line.split(","); return RowFactory.create(attributes[0], attributes[1].trim()); }); // 通过rdd和scheme创建DataFrame Dataset<Row> personDataFrame = spark.createDataFrame(personRdd,scheme); personDataFrame.show(); /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * | Justin| 19| * +-------+---+ */ // 创建临时表 personDataFrame.createOrReplaceTempView("person"); Dataset<Row> resultDataFrame = spark.sql("select * from person a where a.age > 20"); resultDataFrame.show(); /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * +-------+---+ */ } }
Scala语言实现
package com.kfk.spark.sql import com.kfk.spark.common.{CommScala, CommSparkSessionScala} import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/3 * @time : 10:05 下午 */ object RDDToDFProgramScala { def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession(); /** * 数据源 * Michael, 29 * Andy, 30 * Justin, 19 */ val path = CommScala.fileDirPath + "people.txt"; // 构造Schema val scheme = StructType(Array( StructField("name",DataTypes.StringType,true), StructField("age",DataTypes.LongType,true) )) // 将rdd(people)转换成RDD[Row] val rdd = spark.sparkContext.textFile(path).map(line => { line.split(",") }).map(x => { Row(x(0),x(1).trim.toLong) }) // 通过rdd和scheme创建DataFrame val personDataFrame = spark.createDataFrame(rdd,scheme) personDataFrame.show() /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * | Justin| 19| * +-------+---+ */ // 创建临时表 personDataFrame.createOrReplaceTempView("person") val resultDataFrame = spark.sql("select * from person a where a.age > 20") resultDataFrame.show() /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * +-------+---+ */ for (elem <- resultDataFrame.collect()) { System.out.println(elem) } /** * [Michael,29] * [Andy,30] */ } }
(2)DataFrame转RDD
通过df.rdd或者df.javaRDD() 将DataFrame转RDD
Java语言实现
// 将DataFrame转换成rdd JavaRDD<Person> resultRdd = resultDf.javaRDD().map(line -> { Person person = new Person(); person.setName(line.getAs("name")); person.setAge(line.getAs("age")); return person; }); for (Person person : resultRdd.collect()){ System.out.println(person.getName() + " : " + person.getAge()); } /** * Michael : 29 * Andy : 30 */
Scala语言实现
// 将DataFrame转换成rdd val resultRdd = resultDf.rdd.map(row => { val name = row.getAs[String]("name") val age = row.getAs[Long]("age") Person(name,age) }) for (elem <- resultRdd.collect()) { System.out.println(elem.name + " : " + elem.age) } /** * Michael : 29 * Andy : 30 */
五、RDD和DataSet的转换
(1)RDD转DataSet
方案一:使用toDS()算子
使用toDS()算子,需要导入隐式转换,schema参考RDD转DataFrame的方法(schema信息不可以通过toDS直接传入)
package com.kfk.spark.sql import com.kfk.spark.common.{CommScala, CommSparkSessionScala} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/5 * @time : 4:10 下午 */ object DataSetDemoScala { case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() /** * 数据源 * Michael, 29 * Andy, 30 * Justin, 19 */ val path = CommScala.fileDirPath + "people.txt"; //使用toDS()函数需要导入隐式转换 import spark.implicits._ // 方案一:通过toDS()将RDD转换成DataSet val dataDS = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => { Person(x(0),x(1).trim.toInt) }).toDS() dataDS.show() /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * | Justin| 19| * +-------+---+ */ } }
方案二:使用spark.createDataset(rdd)
package com.kfk.spark.sql import com.kfk.spark.common.{CommScala, CommSparkSessionScala} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/5 * @time : 4:10 下午 */ object DataSetDemoScala { case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() /** * 数据源 * Michael, 29 * Andy, 30 * Justin, 19 */ val path = CommScala.fileDirPath + "people.txt"; //使用toDS()函数需要导入隐式转换 import spark.implicits._ // 方案二:使用spark.createDataset(rdd)将RDD转换成DataSet val rdd = spark.sparkContext.textFile(path).map(line => { line.split(",") }).map(x => { Person(x(0),x(1).trim.toInt) }) val dataDS1 = spark.createDataset(rdd) dataDS1.show() /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * | Justin| 19| * +-------+---+ */ } }
(2)DataSet转RDD
ds.rdd
// 将DataSet转RDD val rdd1 = dataDS1.rdd for (elem <- rdd1.collect()) { System.out.println(elem.name + " : " + elem.age) } /** * Michael : 29 * Andy : 30 * Justin : 19 */
六、DataFrame与DataSet的转换
(1)DataFrame转DataSet
DataSet 含有信息最多, 相对最复杂, 所以 RDD 和 DataFrame 向其转换的时候都需要样例类, 而复杂的 DataSet 向简单的 RDD 和 DataFrame 转换时直接调用方法即可
case class xxx() df.as[xxx]
package com.kfk.spark.sql import com.kfk.spark.common.{Comm, CommScala, CommSparkSessionScala} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/5 * @time : 5:35 下午 */ object DataFrameToDataSetScala { case class Person(name:String,age:Long) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() import spark.implicits._ /** * 数据源 * Michael, 29 * Andy, 30 * Justin, 19 */ val path = CommScala.fileDirPath + "people.txt"; val df = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => Person(x(0),x(1).trim.toLong)).toDF() df.show() // 将DF转换成DS val personDS = df.as[Person] personDS.show() personDS.printSchema() // 将DS装换成DF val personDF = personDS.toDF() personDF.show() personDF.printSchema() /** * +-------+---+ * | name|age| * +-------+---+ * |Michael| 29| * | Andy| 30| * | Justin| 19| * +-------+---+ * * root * |-- name: string (nullable = true) * |-- age: long (nullable = false) */ } }
(2)DataSet转DataFrame
ds.toDF()