Spark中RDD、DataFrame、DataSet的生成与互相转换(非常重要)

简介: 笔记

一、RDD的生成


  • 使用parallelize/makeRDD算子从集合转换而来,常用于测试
  • 使用类似textFile()这样的算子从文件系统读取数据形成RDD
  • 使用transformation算子转换而来


二、DataFrame的生成


  • 直接读取文件系统数据形成
val df = spark.read.format.load()
  • RDD转换而来
  • DataSet转换而来


三、DataSet的生成


  • 直接读取文件系统数据形成
val ds = spark.read.textFile().as[xxx]
  • RDD转换而来
  • DataFrame转换而来
  • Seq序列生成DataSet
package com.kfk.spark.sql
import com.kfk.spark.common.CommSparkSessionScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 4:10 下午
 */
object DataSetDemoScala {
    case class Person(name: String, age: Int)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        //使用toDS()函数需要导入隐式转换
        import spark.implicits._
        val caseClassDS = Seq(Person("Andy", 20), Person("Tom", 30)).toDS()
        caseClassDS.show()
    /**
         * +----+---+
         * |name|age|
         * +----+---+
         * |Andy| 20|
         * | Tom| 30|
         * +----+---+
         */
    }
}


四、RDD和DataFrame的转换


初始化SparkSession,Spark SQL的统一入口。

Java语言:

package com.kfk.spark.common;
import org.apache.spark.sql.SparkSession;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:01 下午
 */
public class CommSparkSession {
    public static SparkSession getSparkSession(){
        SparkSession spark = SparkSession.builder()
                .appName("CommSparkSession")
                .master("local")
                .config("spark.sql.warehouse.dir","/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate();
        return spark;
    }
}

Scala语言:

package com.kfk.spark.common
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:02 下午
 */
object CommSparkSessionScala {
    def getSparkSession(): SparkSession ={
        val spark = SparkSession
                .builder
                .appName("CommSparkSessionScala")
                .master("local")
                .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate
        return spark
    }
}

(1)RDD转DataFrame

方案一:直接将字段名称传入toDF中

数据源

Michael, 29
Andy, 30
Justin, 19

使用toDF的方式需要导入隐式转换

import spark.implicits._
import spark.implicits._
val path = CommScala.fileDirPath + "people.txt";
val personDf1 = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            (x(0),x(1).trim)
        }).toDF("name", "age")
personDf1.show()
/**
* +-------+---+
* |   name|age|
* +-------+---+
* |Michael| 29|
* |   Andy| 30|
* | Justin| 19|
* +-------+---+
*/

方案二:通过反射的方式

Java语言实现

Spark SQL支持将JavaBean的RDD自动转换 为DataFrame。

package com.kfk.spark.sql;
import java.io.Serializable;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:15 下午
 */
public class Person implements Serializable {
    private String name;
    private long age;
    public Person(String name, long age) {
        this.name = name;
        this.age = age;
    }
    public Person() {
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public long getAge() {
        return age;
    }
    public void setAge(long age) {
        this.age = age;
    }
}
package com.kfk.spark.sql;
import com.kfk.spark.common.Comm;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:07 下午
 */
public class RDDToDFReflectionJava {
    public static void main(String[] args) {
        SparkSession spark = CommSparkSession.getSparkSession();
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        String filePath = Comm.fileDirPath + "people.txt";
        // 将文件转换成rdd
        JavaRDD<Person> personrdd = spark.read().textFile(filePath).javaRDD().map(line -> {
            String name = line.split(",")[0];
            long age = Long.parseLong(line.split(",")[1].trim());
            Person person = new Person(name,age);
            return person;
        });
        // 通过反射将rdd转换成DataFrame
        Dataset<Row> personDf = spark.createDataFrame(personrdd,Person.class);
        personDf.show();
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
        // 创建临时表
        personDf.createOrReplaceTempView("person");
        Dataset<Row> resultDf = spark.sql("select * from person a where a.age > 20");
        resultDf.show();
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+
         */
    }
}

Scala语言实现

Scala中借助case class使用反射

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/3
 * @time : 6:32 下午
 */
object RDDToDFReflectionScala {
    case class Person(name : String, age : Long)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession();
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        val path = CommScala.fileDirPath + "people.txt";
        // 通过case class反射将rdd转换成DataFrame
        import spark.implicits._
        val personDf = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            Person(x(0),x(1).trim.toLong)
        }).toDF()
        personDf.show()
        // 直接将字段名称传入toDF中转换成DataFrame
        val personDf1 = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            (x(0),x(1).trim)
        }).toDF("name", "age")
        personDf1.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
        // 创建临时表
        personDf.createOrReplaceTempView("person")
        val resultDf = spark.sql("select * from person a where a.age > 20")
        resultDf.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+
         */
    }
}

方案三:构造Schema的方式

Java语言实现

package com.kfk.spark.sql;
import com.kfk.spark.common.Comm;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/3
 * @time : 9:32 下午
 */
public class RDDToDFProgramJava {
    public static void main(String[] args) {
        SparkSession spark = CommSparkSession.getSparkSession();
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        String filePath = Comm.fileDirPath + "people.txt";
        List<StructField> fields = new ArrayList<StructField>();
        StructField structField_Name = DataTypes.createStructField("name",DataTypes.StringType,true);
        StructField structField_Age = DataTypes.createStructField("age",DataTypes.StringType,true);
        fields.add(structField_Name);
        fields.add(structField_Age);
        // 构造Schema
        StructType scheme = DataTypes.createStructType(fields);
        // 将rdd(people)转换成RDD[Row]
        JavaRDD<Row> personRdd = spark.read().textFile(filePath).javaRDD().map(line ->{
            String[] attributes = line.split(",");
            return RowFactory.create(attributes[0], attributes[1].trim());
        });
        // 通过rdd和scheme创建DataFrame
        Dataset<Row> personDataFrame = spark.createDataFrame(personRdd,scheme);
        personDataFrame.show();
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
        // 创建临时表
        personDataFrame.createOrReplaceTempView("person");
        Dataset<Row> resultDataFrame = spark.sql("select * from person a where a.age > 20");
        resultDataFrame.show();
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+
         */
    }
}

Scala语言实现

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/3
 * @time : 10:05 下午
 */
object RDDToDFProgramScala {
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession();
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        val path = CommScala.fileDirPath + "people.txt";
        // 构造Schema
        val scheme = StructType(Array(
            StructField("name",DataTypes.StringType,true),
            StructField("age",DataTypes.LongType,true)
        ))
        // 将rdd(people)转换成RDD[Row]
        val rdd = spark.sparkContext.textFile(path).map(line => {
            line.split(",")
        }).map(x => {
            Row(x(0),x(1).trim.toLong)
        })
        // 通过rdd和scheme创建DataFrame
        val personDataFrame = spark.createDataFrame(rdd,scheme)
        personDataFrame.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
        // 创建临时表
        personDataFrame.createOrReplaceTempView("person")
        val resultDataFrame = spark.sql("select * from person a where a.age > 20")
        resultDataFrame.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * +-------+---+
         */
        for (elem <- resultDataFrame.collect()) {
            System.out.println(elem)
        }
        /**
         * [Michael,29]
         * [Andy,30]
         */
    }
}

(2)DataFrame转RDD

通过df.rdd或者df.javaRDD() 将DataFrame转RDD

Java语言实现

// 将DataFrame转换成rdd
JavaRDD<Person> resultRdd = resultDf.javaRDD().map(line -> {
    Person person = new Person();
    person.setName(line.getAs("name"));
    person.setAge(line.getAs("age"));
    return person;
});
for (Person person : resultRdd.collect()){
    System.out.println(person.getName() + " : " + person.getAge());
}
/**
 * Michael : 29
 * Andy : 30
 */

Scala语言实现

// 将DataFrame转换成rdd
val resultRdd = resultDf.rdd.map(row => {
    val name = row.getAs[String]("name")
    val age = row.getAs[Long]("age")
    Person(name,age)
})
for (elem <- resultRdd.collect()) {
    System.out.println(elem.name + " : " + elem.age)
}
/**
 * Michael : 29
 * Andy : 30
 */


五、RDD和DataSet的转换


(1)RDD转DataSet

方案一:使用toDS()算子

使用toDS()算子,需要导入隐式转换,schema参考RDD转DataFrame的方法(schema信息不可以通过toDS直接传入)

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 4:10 下午
 */
object DataSetDemoScala {
    case class Person(name: String, age: Int)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        val path = CommScala.fileDirPath + "people.txt";
        //使用toDS()函数需要导入隐式转换
        import spark.implicits._
        // 方案一:通过toDS()将RDD转换成DataSet
        val dataDS = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => {
            Person(x(0),x(1).trim.toInt)
        }).toDS()
        dataDS.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
    }
}

方案二:使用spark.createDataset(rdd)

package com.kfk.spark.sql
import com.kfk.spark.common.{CommScala, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 4:10 下午
 */
object DataSetDemoScala {
    case class Person(name: String, age: Int)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        val path = CommScala.fileDirPath + "people.txt";
        //使用toDS()函数需要导入隐式转换
        import spark.implicits._
        // 方案二:使用spark.createDataset(rdd)将RDD转换成DataSet
        val rdd = spark.sparkContext.textFile(path).map(line => {
            line.split(",")
        }).map(x => {
            Person(x(0),x(1).trim.toInt)
        })
        val dataDS1 = spark.createDataset(rdd)
        dataDS1.show()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         */
    }
}

(2)DataSet转RDD

ds.rdd


// 将DataSet转RDD
val rdd1 = dataDS1.rdd
for (elem <- rdd1.collect()) {
   System.out.println(elem.name + " : " + elem.age)
}
/**
 * Michael : 29
 * Andy : 30
 * Justin : 19
 */


六、DataFrame与DataSet的转换


(1)DataFrame转DataSet

DataSet 含有信息最多, 相对最复杂, 所以 RDD 和 DataFrame 向其转换的时候都需要样例类, 而复杂的 DataSet 向简单的 RDD 和 DataFrame 转换时直接调用方法即可

case class xxx()
df.as[xxx]
package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommScala, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/5
 * @time : 5:35 下午
 */
object DataFrameToDataSetScala {
    case class Person(name:String,age:Long)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        import spark.implicits._
        /**
         * 数据源
         * Michael, 29
         * Andy, 30
         * Justin, 19
         */
        val path = CommScala.fileDirPath + "people.txt";
        val df = spark.sparkContext.textFile(path).map(line => line.split(",")).map(x => Person(x(0),x(1).trim.toLong)).toDF()
        df.show()
        // 将DF转换成DS
        val personDS = df.as[Person]
        personDS.show()
        personDS.printSchema()
        // 将DS装换成DF
        val personDF = personDS.toDF()
        personDF.show()
        personDF.printSchema()
        /**
         * +-------+---+
         * |   name|age|
         * +-------+---+
         * |Michael| 29|
         * |   Andy| 30|
         * | Justin| 19|
         * +-------+---+
         *
         * root
         *  |-- name: string (nullable = true)
         *  |-- age: long (nullable = false)
        */
    }
}

(2)DataSet转DataFrame

ds.toDF()


相关文章
|
3月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
4月前
|
SQL 消息中间件 分布式计算
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(一)
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(一)
74 5
|
14天前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
56 15
|
3月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
4月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
66 4
|
4月前
|
JSON 分布式计算 大数据
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
56 1
|
4月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
60 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
4月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
53 0
|
4月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
134 0
|
4月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
71 0