大数据Spark DataFrame

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Spark DataFrame

1 DataFrame是什么

DataFrame它不是Spark SQL提出来的,而是早期在R、Pandas语言就已经有了的。就易用性而言,对比传统的MapReduce API,说Spark的RDD API有了数量级的飞跃并不为过。然而,对于没有MapReduce和函数式编程经验的新手来说,RDD API仍然存在着一定的门槛。另一方面,数据科学家们所熟悉的R、Pandas等传统数据框架虽然提供了直观的API,却局限于单机处理,无法胜任大数据场景。为了解决这一矛盾,Spark SQL 1.3.0在原有SchemaRDD的基础上提供了与R和Pandas风格类似的DataFrame API。新的DataFrame AP不仅可以大幅度降低普通开发者的学习门槛,同时还支持Scala、Java与Python三种语言。更重要的是,由于脱胎自SchemaRDD,DataFrame天然适用于分布式大数据场景。在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表

数据集的每一列都带有名称和类型。

6323573c0c37439782ecc1694067b63d.png

使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行针对性的优化,最终达到大幅提升运行时效率。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

303d5ee5d20640c0983d325c88585d4f.png

上图中左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。了解了这些信息之后,Spark SQL的查询优化器就可以进行针对性的优化。后者由于在编译期有详尽的类型信息,编译期就可以编译出更加有针对性、更加优化的可执行代码。官方定义:


Dataset:A DataSet is a distributed collection of data. (分布式的数据集)

DataFrame: A DataFrame is a DataSet organized into named columns.(以列(列名,列类型,列值)的形式构成的分布式的数据集,按照列赋予不同的名称)

5f831a22382242acbe6636ee2316a82a.png

DataFrame有如下特性:

1)、分布式的数据集,并且以列的方式组合的,相当于具有schema的RDD;
2)、相当于关系型数据库中的表,但是底层有优化;
3)、提供了一些抽象的操作,如select、filter、aggregation、plot;
4)、它是由于R语言或者Pandas语言处理小数据集的经验应用到处理分布式大数据集上;
5)、在1.3版本之前,叫SchemaRDD;

范例演示:加载json格式数据


第一步、上传官方测试数据$SPARK_HOME/examples/src/main/resources至HDFS目录/datas


e58d6bb88396442ba045f5a0a084b6ee.png


查看HDFS上数据文件,其中雇员信息数据【employees.json】

161a0f42de7045c9ab7dc545a46783cf.png

第二步、启动spark-shell命令行,采用本地模式localmode运行

f994035244964542934ef0cb10e80484.png


第三步、读取雇员信息数据


09f8d6851d534bb78bc12c9fafdc2a30.png

2 Schema 信息

查看DataFrame中Schema是什么,执行如下命令:

可以看出Schema信息封装在StructType中,包含很多StructField对象,查看源码。

05ef165f96b04a84b5201f77e19e16aa.png

其一、StructType 定义,是一个样例类,属性为StructField的数组


2cee9bbfcbe4485399d57f74c29fbc98.png

其二、StructField 定义,同样是一个样例类,有四个属性,其中字段名称和类型为必填

d9b2fb45715240d9ae982dc3f5c24c31.png


自定义Schema结构,官方提供实例代码:


7b0beb5edce0497b9fde0640c86c7389.png

3 Row

DataFrame中每条数据封装在Row中,Row表示每行数据,具体哪些字段位置,获取DataFrame中第一条数据。

3656e98cbfe14f8eb23ef614ac6d9f7f.png

如何构建Row对象:要么是传递value,要么传递Seq,官方实例代码:

import org.apache.spark.sql._
// Create a Row from values.
Row(value1, value2, value3, ...)
// Create a Row from a Seq of values.
Row.fromSeq(Seq(value1, value2, ...))

如何获取Row中每个字段的值呢????


方式一:下标获取,从0开始,类似数组下标获取

bc50a03b0f254fa39b85164175b0f750.png

方式二:指定下标,知道类型

f73bddd202fc4a0b9fa96b597f406a51.png

方式三:通过As转换类型, 此种方式开发中使用最多81e84d4b147a48db8a588dac36fdd80e.png

4 RDD转换DataFrame

实际项目开发中,往往需要将RDD数据集转换为DataFrame,本质上就是给RDD加上Schema信息,官方提供两种方式:类型推断和自定义Schema。

官方文档: http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds

9876c48bd3cf4a0db98b1a2762203ebd.png

范例演示说明:使用经典数据集【电影评分数据u.data】,先读取为RDD,再转换为DataFrame。

81a3535d97de49768260c64800dad63c.png

字段信息:user id 、 item id、 rating 、 timestamp。


4.1 反射类型推断

当RDD中数据类型CaseClass样例类时,通过反射Reflecttion获取属性名称和类型,构建

Schema,应用到RDD数据集,将其转换为DataFrame。


第一步、定义CaseClass样例类,封装电影评分数据

/**
 * 封装电影评分数据
 *
 * @param userId 用户ID
 * @param itemId 电影ID
 * @param rating 用户对电影评分
 * @param timestamp 评分时间戳
 */
case class MovieRating(
                        userId: String,
                        itemId: String,
                        rating: Double,
                        timestamp: Long
                      )

第二步、SparkContext读取电影评分数据封装到RDD中,转换数据类型

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 采用反射的方式将RDD转换为DataFrame和Dataset
 */
object SparkRDDInferring {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession
      .builder() // 使用建造者模式构建对象
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .getOrCreate()
    import spark.implicits._
    // 读取电影评分数据u.data, 每行数据有四个字段,使用制表符分割
    // user id | item id | rating | timestamp.
    val rawRatingsRDD: RDD[String] = spark.sparkContext
      .textFile("datas/ml-100k/u.data", minPartitions = 2)
    // 转换数据
    val ratingsRDD: RDD[MovieRating] = rawRatingsRDD
      .filter(line => null != line && line.trim.split("\t").length == 4)
      .mapPartitions{iter =>
        iter.map{line =>
          // 拆箱操作, Python中常用
          val Array(userId, itemId, rating, timestamp) = line.trim.split("\t")
          // 返回MovieRating实例对象
          MovieRating(userId, itemId, rating.toDouble, timestamp.toLong)
        }
      }
    // 将RDD转换为DataFrame和Dataset
    val ratingsDF: DataFrame = ratingsRDD.toDF()
    /*
    root
    |-- userId: string (nullable = true)
    |-- itemId: string (nullable = true)
    |-- rating: double (nullable = false)
    |-- timestamp: long (nullable = false)
    */
    ratingsDF.printSchema()
    ratingsDF.show(10)
    // 应用结束,关闭资源
    spark.stop()
  }
}

此种方式要求RDD数据类型必须为CaseClass,转换的DataFrame中字段名称就是CaseClass中

属性名称。


4.2 自定义Schema

依据RDD中数据自定义Schema,类型为StructType,每个字段的约束使用StructField定义,具

体步骤如下:


第一步、RDD中数据类型为Row:RDD[Row];

第二步、针对Row中数据定义Schema:StructType;

第三步、使用SparkSession中方法将定义的Schema应用到RDD[Row]上;

范例演示代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
 * 自定义Schema方式转换RDD为DataFrame
 */
object SparkRDDSchema {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession
      .builder() // 使用建造者模式构建对象
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .getOrCreate()
    import spark.implicits._
    // 读取电影评分数据u.data, 每行数据有四个字段,使用制表符分割
    // user id | item id | rating | timestamp.
    val ratingsRDD: RDD[String] = spark
      .sparkContext.textFile("datas/ml-100k/u.data", minPartitions = 2)
    // a. RDD[Row]
    val rowsRDD: RDD[Row] = ratingsRDD.mapPartitions{ iter =>
      iter.map{line =>
        // 拆箱操作, Python中常用
        val Array(userId, itemId, rating, timestamp) = line.trim.split("\t")
        // 返回Row实例对象
        Row(userId, itemId, rating.toDouble, timestamp.toLong)
      }
    }
    // b. schema
    val rowSchema: StructType = StructType(
      Array(
        StructField("userId", StringType, nullable = true),
        StructField("itemId", StringType, nullable = true),
        StructField("rating", DoubleType, nullable = true),
        StructField("timestamp", LongType, nullable = true)
      )
    )
    // c. 应用函数createDataFrame
    val ratingDF: DataFrame = spark.createDataFrame(rowsRDD, rowSchema)
    ratingDF.printSchema()
    ratingDF.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

此种方式可以更加体会到DataFrame = RDD[Row] + Schema组成,在实际项目开发中灵活的

选择方式将RDD转换为DataFrame。

3.5 toDF函数

除了上述两种方式将RDD转换为DataFrame以外,SparkSQL中提供一个函数:toDF,通过指

定列名称,将数据类型为元组的RDD或Seq转换为DataFrame,实际开发中也常常使用。

09bd7f56e9c24974a43e4290e2ea0c6b.png

范例演示:将数据类型为元组的RDD或Seq直接转换为DataFrame。

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 隐式调用toDF函数,将数据类型为元组的Seq和RDD集合转换为DataFrame
 */
object SparkSQLToDF {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象,通过建造者模式创建
    val spark: SparkSession = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .getOrCreate()
    import spark.implicits._
    // TODO: 1、构建RDD,数据类型为三元组形式
    val usersRDD: RDD[(Int, String, Int)] = spark.sparkContext.parallelize(
      Seq(
        (10001, "zhangsan", 23),
        (10002, "lisi", 22),
        (10003, "wangwu", 23),
        (10004, "zhaoliu", 24)
      )
    )
    // 将RDD转换为DataFrame
    val usersDF: DataFrame = usersRDD.toDF("id", "name", "age")
    usersDF.printSchema()
    usersDF.show(10, truncate = false)
    println("========================================================")
    val df: DataFrame = Seq(
      (10001, "zhangsan", 23),
      (10002, "lisi", 22),
      (10003, "wangwu", 23),
      (10004, "zhaoliu", 24)
    ).toDF("id", "name", "age")
    df.printSchema()
    df.show(10, truncate = false)
    // TODO: 应用结束,关闭资源
    spark.stop()
  }
}

运行程序结果如下截图:

463872e75532415ab9d8a0b676c3ce6e.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
3月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
231 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
187 2
|
3月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
182 1
|
3月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
3月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
98 1
|
3月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
27天前
|
SQL 数据可视化 大数据
从数据小白到大数据达人:一步步成为数据分析专家
从数据小白到大数据达人:一步步成为数据分析专家
210 92
|
3月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
730 7
|
3月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
97 2
|
1月前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试