Spark中RDD、DataFrame、DataSet的生成与互相转换(非常重要)

简介: 笔记

一、RDD的生成


  • 使用parallelize/makeRDD算子从集合转换而来,常用于测试
  • 使用类似textFile()这样的算子从文件系统读取数据形成RDD
  • 使用transformation算子转换而来


二、DataFrame的生成


  • 直接读取文件系统数据形成
val df = spark.read.format.load()
  • RDD转换而来
  • DataSet转换而来


三、DataSet的生成


  • 直接读取文件系统数据形成
val ds = spark.read.textFile().as[xxx]
  • RDD转换而来
  • DataFrame转换而来
  • Seq序列生成DataSet
package com.kfk.spark.sql
import com.kfk.spark.common.CommSparkSessionScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 4:10 下午
 */
object DataSetDemoScala {
    case class Person(name: String, age: Int)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        //使用toDS()函数需要导入隐式转换
        import spark.implicits._
        val caseClassDS = Seq(Person("Andy", 20), Person("Tom", 30)).toDS()
        caseClassDS.show()
    /**
         * +----+---+
         * |name|age|
         * +----+---+
         * |Andy| 20|
         * | Tom| 30|
         * +----+---+
         */
    }
}


四、RDD和DataFrame的转换


初始化SparkSession,Spark SQL的统一入口。

Java语言:

package com.kfk.spark.common;
import org.apache.spark.sql.SparkSession;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:01 下午
 */
public class CommSparkSession {
    public static SparkSession getSparkSession(){
        SparkSession spark = SparkSession.builder()
                .appName("CommSparkSession")
                .master("local")
                .config("spark.sql.warehouse.dir","/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate();
        return spark;
    }
}

Scala语言:

package com.kfk.spark.common
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:02 下午
 */
object CommSparkSessionScala {
    def getSparkSession(): SparkSession ={
        val spark = SparkSession
                .builder
                .appName("CommSparkSessionScala")
                .master("local")
                .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate
        return spark
    }
}

(1)RDD转DataFrame

方案一:直接将字段名称传入toDF中

数据源

Michael, 29
Andy, 30
Justin, 19

使用toDF的方式需要导入隐式转换

import spark.implicits._
import spark.implicits._
val path = CommScala.fileDirPath + "people.txt";
val personDf1 = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            (x(0),x(1).trim)
        }).toDF("name", "age")
personDf1.show()
/**
* +-------+---+
* |   name|age|
* +-------+---+
* |Michael| 29|
* |   Andy| 30|
* | Justin| 19|
* +-------+---+
*/

方案二:通过反射的方式

Java语言实现

Spark SQL支持将JavaBean的RDD自动转换 为DataFrame。

package com.kfk.spark.sql;
import java.io.Serializable;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:15 下午
 */
public class Person implements Serializable {
    private String name;
    private long age;
    public Person(String name, long age) {
        this.name = name;
        this.age = age;
    }
    public Person() {
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public long getAge() {
        return age;
    }
    public void setAge(long age) {
        this.age = age;
    }
}
package com.kfk.spark.sql;
import com.kfk.spark.common.Comm;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:07 下午
 */
public class RDDToDFReflectionJava {
    public static void main(String[] args) {
        SparkSession spark = CommSparkSession.getSparkSession();
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        String filePath = Comm.fileDirPath + "people.txt";
        // 将文件转换成rdd
        JavaRDD<Person> personrdd = spark.read().textFile(filePath).javaRDD().map(line -> {
            String name = line.split(",")[0];
            long age = Long.parseLong(line.split(",")[1].trim());
            Person person = new Person(name,age);
            return person;
        });
        // 通过反射将rdd转换成DataFrame
        Dataset<Row> personDf = spark.createDataFrame(personrdd,Person.class);
        personDf.show();
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
        // 创建临时表
        personDf.createOrReplaceTempView("person");
        Dataset<Row> resultDf = spark.sql("select * from person a where a.age > 20");
        resultDf.show();
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+
         */
    }
}

Scala语言实现

Scala中借助case class使用反射

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/3
 * @time : 6:32 下午
 */
object RDDToDFReflectionScala {
    case class Person(name : String, age : Long)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession();
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        val path = CommScala.fileDirPath + "people.txt";
        // 通过case class反射将rdd转换成DataFrame
        import spark.implicits._
        val personDf = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            Person(x(0),x(1).trim.toLong)
        }).toDF()
        personDf.show()
        // 直接将字段名称传入toDF中转换成DataFrame
        val personDf1 = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            (x(0),x(1).trim)
        }).toDF("name", "age")
        personDf1.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
        // 创建临时表
        personDf.createOrReplaceTempView("person")
        val resultDf = spark.sql("select * from person a where a.age > 20")
        resultDf.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+
         */
    }
}

方案三:构造Schema的方式

Java语言实现

package com.kfk.spark.sql;
import com.kfk.spark.common.Comm;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/3
 * @time : 9:32 下午
 */
public class RDDToDFProgramJava {
    public static void main(String[] args) {
        SparkSession spark = CommSparkSession.getSparkSession();
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        String filePath = Comm.fileDirPath + "people.txt";
        List<StructField> fields = new ArrayList<StructField>();
        StructField structField_Name = DataTypes.createStructField("name",DataTypes.StringType,true);
        StructField structField_Age = DataTypes.createStructField("age",DataTypes.StringType,true);
        fields.add(structField_Name);
        fields.add(structField_Age);
        // 构造Schema
        StructType scheme = DataTypes.createStructType(fields);
        // 将rdd(people)转换成RDD[Row]
        JavaRDD<Row> personRdd = spark.read().textFile(filePath).javaRDD().map(line ->{
            String[] attributes = line.split(",");
            return RowFactory.create(attributes[0], attributes[1].trim());
        });
        // 通过rdd和scheme创建DataFrame
        Dataset<Row> personDataFrame = spark.createDataFrame(personRdd,scheme);
        personDataFrame.show();
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
        // 创建临时表
        personDataFrame.createOrReplaceTempView("person");
        Dataset<Row> resultDataFrame = spark.sql("select * from person a where a.age > 20");
        resultDataFrame.show();
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+
         */
    }
}

Scala语言实现

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/3
 * @time : 10:05 下午
 */
object RDDToDFProgramScala {
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession();
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        val path = CommScala.fileDirPath + "people.txt";
        // 构造Schema
        val scheme = StructType(Array(
            StructField("name",DataTypes.StringType,true),
            StructField("age",DataTypes.LongType,true)
        ))
        // 将rdd(people)转换成RDD[Row]
        val rdd = spark.sparkContext.textFile(path).map(line => {
            line.split(",")
        }).map(x => {
            Row(x(0),x(1).trim.toLong)
        })
        // 通过rdd和scheme创建DataFrame
        val personDataFrame = spark.createDataFrame(rdd,scheme)
        personDataFrame.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
        // 创建临时表
        personDataFrame.createOrReplaceTempView("person")
        val resultDataFrame = spark.sql("select * from person a where a.age > 20")
        resultDataFrame.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+
         */
        for (elem <- resultDataFrame.collect()) {
            System.out.println(elem)
        }
        /**
         * [Michael,29]
         * [Andy,30]
         */
    }
}

(2)DataFrame转RDD

通过df.rdd或者df.javaRDD() 将DataFrame转RDD

Java语言实现

// 将DataFrame转换成rdd
JavaRDD<Person> resultRdd = resultDf.javaRDD().map(line -> {
    Person person = new Person();
    person.setName(line.getAs("name"));
    person.setAge(line.getAs("age"));
    return person;
});
for (Person person : resultRdd.collect()){
    System.out.println(person.getName() + " : " + person.getAge());
}
/**
 * Michael : 29
 * Andy : 30
 */

Scala语言实现

// 将DataFrame转换成rdd
val resultRdd = resultDf.rdd.map(row => {
    val name = row.getAs[String]("name")
    val age = row.getAs[Long]("age")
    Person(name,age)
})
for (elem <- resultRdd.collect()) {
    System.out.println(elem.name + " : " + elem.age)
}
/**
 * Michael : 29
 * Andy : 30
 */


五、RDD和DataSet的转换


(1)RDD转DataSet

方案一:使用toDS()算子

使用toDS()算子,需要导入隐式转换,schema参考RDD转DataFrame的方法(schema信息不可以通过toDS直接传入)

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 4:10 下午
 */
object DataSetDemoScala {
    case class Person(name: String, age: Int)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        val path = CommScala.fileDirPath + "people.txt";
        //使用toDS()函数需要导入隐式转换
        import spark.implicits._
        // 方案一:通过toDS()将RDD转换成DataSet
        val dataDS = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            Person(x(0),x(1).trim.toInt)
        }).toDS()
        dataDS.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
    }
}

方案二:使用spark.createDataset(rdd)

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 4:10 下午
 */
object DataSetDemoScala {
    case class Person(name: String, age: Int)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        val path = CommScala.fileDirPath + "people.txt";
        //使用toDS()函数需要导入隐式转换
        import spark.implicits._
        // 方案二:使用spark.createDataset(rdd)将RDD转换成DataSet
        val rdd = spark.sparkContext.textFile(path).map(line => {
            line.split(",")
        }).map(x => {
            Person(x(0),x(1).trim.toInt)
        })
        val dataDS1 = spark.createDataset(rdd)
        dataDS1.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
    }
}

(2)DataSet转RDD

ds.rdd


// 将DataSet转RDD
val rdd1 = dataDS1.rdd
for (elem <- rdd1.collect()) {
   System.out.println(elem.name + " : " + elem.age)
}
/**
 * Michael : 29
 * Andy : 30
 * Justin : 19
 */


六、DataFrame与DataSet的转换


(1)DataFrame转DataSet

DataSet 含有信息最多, 相对最复杂, 所以 RDD 和 DataFrame 向其转换的时候都需要样例类, 而复杂的 DataSet 向简单的 RDD 和 DataFrame 转换时直接调用方法即可

case class xxx()
df.as[xxx]
package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommScala, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 5:35 下午
 */
object DataFrameToDataSetScala {
    case class Person(name:String,age:Long)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        import spark.implicits._
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        val path = CommScala.fileDirPath + "people.txt";
        val df = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => Person(x(0),x(1).trim.toLong)).toDF()
        df.show()
        // 将DF转换成DS
        val personDS = df.as[Person]
        personDS.show()
        personDS.printSchema()
        // 将DS装换成DF
        val personDF = personDS.toDF()
        personDF.show()
        personDF.printSchema()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         *
         * root
         *  |-- name: string (nullable = true)
         *  |-- age: long (nullable = false)
        */
    }
}

(2)DataSet转DataFrame

ds.toDF()


相关文章
|
2月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
123 1
|
2月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
2月前
|
分布式计算 Shell 开发工具
Spark编程实验二:RDD编程初级实践
Spark编程实验二:RDD编程初级实践
41 1
|
2月前
|
存储 分布式计算 程序员
Spark中的RDD介绍
Spark中的RDD介绍
23 0
|
2月前
|
分布式计算 Spark
Spark【Spark学习大纲】简介+生态+RDD+安装+使用(xmind分享)
【2月更文挑战第14天】Spark【Spark学习大纲】简介+生态+RDD+安装+使用(xmind分享)
50 1
|
2月前
|
分布式计算 Hadoop Java
Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
【2月更文挑战第14天】Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
77 1
|
2月前
|
存储 缓存 分布式计算
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
|
2月前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
2月前
|
分布式计算 数据处理 Apache
Spark RDD的行动操作与延迟计算
Spark RDD的行动操作与延迟计算
Spark RDD的行动操作与延迟计算
|
2月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析