Spark(九) -- SparkSQL API编程

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45957991 本文测试的Spark版本是1.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45957991

本文测试的Spark版本是1.3.1

Text文本文件测试

一个简单的person.txt文件内容为:

JChubby,13
Looky,14
LL,15

分别是Name和Age

在Idea中新建Object,原始代码如下:

object  TextFile{
    def main(args:Array[String]){

    }
}

SparkSQL编程模型:

第一步:
需要一个SQLContext对象,该对象是SparkSQL操作的入口
而构建一个SQLContext对象需要一个SparkContext

第二步:
构建好入口对象之后,要引入隐式转换的方法,作用是将读取到的各种文件转换成DataFrame,DataFrame是SparkSQL上进行统一操作的数据类型

第三步:
根据数据的格式,构建一个样例类。作用是提供将读取到的各种各样的数据类型隐式转换成一个统一的数据格式,方便编程

第四步:
使用SQLContext对象读取文件,并将其转换成DataFrame

第五步:
对数据进行相关操作。
1.DataFrame自带的操作方式。DataFrame提供了很多操作数据的方法,如where,select等

2.DSL方式。DSL其实使用的也是DataFrame提供的方法,但是在操作属性时可以方便的使用’ + 属性名的方式进行操作

3.将数据注册成表,通过SQL语句操作

object  TextFile{
    def main(args:Array[String]){
        //第一步
        //构建SparkContext对象,主要要使用new调用构造方法,否则就变成使用样例类的Apply方法了
        val sc = new SparkContext()
        //构建SQLContext对象
        val sqlContext = new SQLContext(sc)

        //第二步
        import sqlContext.implicits._
        //第三步
        case Person(name:String,age:Int)

        //第四步,textFile从指定路径读取文件如果是集群模式要写hdfs文件地址;通过两个map操作将读取到的文件转换成Person类的对象,每一行对应一个Person对象;toDF将其转换成DataFrame
        val people = sc.textFile("文件路径").map(_.split(",")).map{case (name,age) => Person(name,age.toInt)}.toDF()
        //第五步
        //DataFrame方法
        println("------------------------DataFrame------------------------------------")
        //赛选出age>10的记录,然后只选择name属性,show方法将其输出
        people.where(people("age") > 10).select(people("name")).show()

        //DSL
         println("---------------------------DSL---------------------------------")
         people.where('age > 10).select('name).show()

        //SQL
        println("-----------------------------SQL-------------------------------")
        //将people注册成people表
        people.registerTempTable("people")
        //使用sqlContext的sql方法来写SQL语句
        //查询返回的是RDD,所以对其进行collect操作,之后循环打印
        sqlContext.sql("select name from people where age > 10").collect.foreach(println)

        //保存为parquet文件,之后的parquet演示会用到
        people.saveAsParquet("保存的路径")
    }
}

parquet格式文件测试:

val sc = new SparkContext()
    val sql = new SQLContext(sc)
    import sql.implicits._
    val parquet = sql.parquetFile(args(0))
    println("------------------------DataFrame------------------------------------")
    println(parquet.where(parquet("age") > 10).select(parquet("name")).show())

    println("---------------------------DSL---------------------------------")
    println(parquet.where('age > 10).select('name).show())

    println("-----------------------------SQL-------------------------------")
    parquet.registerTempTable("parquet")
    sql.sql("select name from parquet where age > 10").map(p => "name:" + p(0)).collect().foreach(println)

Json格式测试:

val sc = new SparkContext()
    val sql = new SQLContext(sc)
    import sql.implicits._
    val json = sql.jsonFile(args(0))
    println("------------------------DataFrame------------------------------------")
    println(json.where(json("age") > 10).select(json("name")).show())

    println("---------------------------DSL---------------------------------")
    println(json.where('age > 10).select('name).show())

    println("-----------------------------SQL-------------------------------")
    json.registerTempTable("json")
    sql.sql("select name from json where age > 10").map(p => "name:" + p(0)).collect().foreach(println)

可以看到上面的代码几乎和读取文本文件的一模一样,只不顾sc在读取文件的时候使用了parquetFile/jsonFile方法,而之后的操作是一摸一样的
由于parquet和json数据读取进来就是一个可操作的格式并且会自动转换成DataFrame,所以省去了case class的定义步骤和toDF的操作

以上为SparkSQL API的简单使用

相关文章
|
1月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
75 1
|
2月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
57 2
|
1月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
41 1
|
15天前
|
存储 Java 关系型数据库
掌握Java 8 Stream API的艺术:详解流式编程(一)
掌握Java 8 Stream API的艺术:详解流式编程
47 1
|
1月前
|
算法 Linux API
【Linux系统编程】一文了解 Linux目录的创建和删除API 创建、删除与读取
【Linux系统编程】一文了解 Linux目录的创建和删除API 创建、删除与读取
28 0
【Linux系统编程】一文了解 Linux目录的创建和删除API 创建、删除与读取
|
1月前
|
Linux API C++
【Linux C/C++ 线程同步 】Linux API 读写锁的编程使用
【Linux C/C++ 线程同步 】Linux API 读写锁的编程使用
21 1
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
107 1
|
1月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
107 2
|
2月前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
42 1
|
3月前
|
JSON Java API
Java 编程问题:十三、HTTP 客户端和 WebSocket API
Java 编程问题:十三、HTTP 客户端和 WebSocket API
89 0