Spark(九) -- SparkSQL API编程

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45957991 本文测试的Spark版本是1.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45957991

本文测试的Spark版本是1.3.1

Text文本文件测试

一个简单的person.txt文件内容为:

JChubby,13
Looky,14
LL,15

分别是Name和Age

在Idea中新建Object,原始代码如下:

object  TextFile{
    def main(args:Array[String]){

    }
}

SparkSQL编程模型:

第一步:
需要一个SQLContext对象,该对象是SparkSQL操作的入口
而构建一个SQLContext对象需要一个SparkContext

第二步:
构建好入口对象之后,要引入隐式转换的方法,作用是将读取到的各种文件转换成DataFrame,DataFrame是SparkSQL上进行统一操作的数据类型

第三步:
根据数据的格式,构建一个样例类。作用是提供将读取到的各种各样的数据类型隐式转换成一个统一的数据格式,方便编程

第四步:
使用SQLContext对象读取文件,并将其转换成DataFrame

第五步:
对数据进行相关操作。
1.DataFrame自带的操作方式。DataFrame提供了很多操作数据的方法,如where,select等

2.DSL方式。DSL其实使用的也是DataFrame提供的方法,但是在操作属性时可以方便的使用’ + 属性名的方式进行操作

3.将数据注册成表,通过SQL语句操作

object  TextFile{
    def main(args:Array[String]){
        //第一步
        //构建SparkContext对象,主要要使用new调用构造方法,否则就变成使用样例类的Apply方法了
        val sc = new SparkContext()
        //构建SQLContext对象
        val sqlContext = new SQLContext(sc)

        //第二步
        import sqlContext.implicits._
        //第三步
        case Person(name:String,age:Int)

        //第四步,textFile从指定路径读取文件如果是集群模式要写hdfs文件地址;通过两个map操作将读取到的文件转换成Person类的对象,每一行对应一个Person对象;toDF将其转换成DataFrame
        val people = sc.textFile("文件路径").map(_.split(",")).map{case (name,age) => Person(name,age.toInt)}.toDF()
        //第五步
        //DataFrame方法
        println("------------------------DataFrame------------------------------------")
        //赛选出age>10的记录,然后只选择name属性,show方法将其输出
        people.where(people("age") > 10).select(people("name")).show()

        //DSL
         println("---------------------------DSL---------------------------------")
         people.where('age > 10).select('name).show()

        //SQL
        println("-----------------------------SQL-------------------------------")
        //将people注册成people表
        people.registerTempTable("people")
        //使用sqlContext的sql方法来写SQL语句
        //查询返回的是RDD,所以对其进行collect操作,之后循环打印
        sqlContext.sql("select name from people where age > 10").collect.foreach(println)

        //保存为parquet文件,之后的parquet演示会用到
        people.saveAsParquet("保存的路径")
    }
}

parquet格式文件测试:

val sc = new SparkContext()
    val sql = new SQLContext(sc)
    import sql.implicits._
    val parquet = sql.parquetFile(args(0))
    println("------------------------DataFrame------------------------------------")
    println(parquet.where(parquet("age") > 10).select(parquet("name")).show())

    println("---------------------------DSL---------------------------------")
    println(parquet.where('age > 10).select('name).show())

    println("-----------------------------SQL-------------------------------")
    parquet.registerTempTable("parquet")
    sql.sql("select name from parquet where age > 10").map(p => "name:" + p(0)).collect().foreach(println)

Json格式测试:

val sc = new SparkContext()
    val sql = new SQLContext(sc)
    import sql.implicits._
    val json = sql.jsonFile(args(0))
    println("------------------------DataFrame------------------------------------")
    println(json.where(json("age") > 10).select(json("name")).show())

    println("---------------------------DSL---------------------------------")
    println(json.where('age > 10).select('name).show())

    println("-----------------------------SQL-------------------------------")
    json.registerTempTable("json")
    sql.sql("select name from json where age > 10").map(p => "name:" + p(0)).collect().foreach(println)

可以看到上面的代码几乎和读取文本文件的一模一样,只不顾sc在读取文件的时候使用了parquetFile/jsonFile方法,而之后的操作是一摸一样的
由于parquet和json数据读取进来就是一个可操作的格式并且会自动转换成DataFrame,所以省去了case class的定义步骤和toDF的操作

以上为SparkSQL API的简单使用

相关文章
|
12月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
827 2
|
机器学习/深度学习 设计模式 API
Python 高级编程与实战:构建 RESTful API
本文深入探讨了使用 Python 构建 RESTful API 的方法,涵盖 Flask、Django REST Framework 和 FastAPI 三个主流框架。通过实战项目示例,详细讲解了如何处理 GET、POST 请求,并返回相应数据。学习这些技术将帮助你掌握构建高效、可靠的 Web API。
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
518 4
|
机器学习/深度学习 开发框架 API
Python 高级编程与实战:深入理解 Web 开发与 API 设计
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧以及数据科学和机器学习。本文将深入探讨 Python 在 Web 开发和 API 设计中的应用,并通过实战项目帮助你掌握这些技术。
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
2246 0
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
336 4
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
293 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
498 0
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
689 0
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
341 0