201 Spark SQL查询程序

简介: 201 Spark SQL查询程序

前面我们学习了如何在Spark Shell中使用SQL完成查询,现在我们来实现在自定义的程序中编写Spark SQL查询程序。

首先在maven项目的pom.xml中添加Spark SQL的依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.5.2</version>
</dependency>
1.通过反射推断Schema

创建一个object为cn.itcast.spark.sql.InferringSchema

package cn.itcast.spark.sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
object InferringSchema {
  def main(args: Array[String]) {
    //创建SparkConf()并设置App名称
    val conf = new SparkConf().setAppName("SQL-1")
    //SQLContext要依赖SparkContext
    val sc = new SparkContext(conf)
    //创建SQLContext
    val sqlContext = new SQLContext(sc)
    //从指定的地址创建RDD
    val lineRDD = sc.textFile(args(0)).map(_.split(" "))
    //创建case class
    //将RDD和case class关联
    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
    //导入隐式转换,如果不到人无法将RDD转换成DataFrame
    //将RDD转换成DataFrame
    import sqlContext.implicits._
    val personDF = personRDD.toDF
    //注册表
    personDF.registerTempTable("t_person")
    //传入SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 2")
    //将结果以JSON的方式存储到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}
//case class一定要放到外面
case class Person(id: Int, name: String, age: Int)

将程序打成jar包,上传到spark集群,提交Spark任务

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \
--class cn.itcast.spark.sql.InferringSchema \
--master spark://node1.itcast.cn:7077 \
/root/spark-mvn-1.0-SNAPSHOT.jar \
hdfs://node1.itcast.cn:9000/person.txt \
hdfs://node1.itcast.cn:9000/out

查看运行结果

hdfs dfs -cat  hdfs://node1.itcast.cn:9000/out/part-r-*

2.通过StructType直接指定Schema

创建一个object为cn.itcast.spark.sql.SpecifyingSchema

package cn.itcast.spark.sql
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}
/**
  * Created by ZX on 2015/12/11.
  */
object SpecifyingSchema {
  def main(args: Array[String]) {
    //创建SparkConf()并设置App名称
    val conf = new SparkConf().setAppName("SQL-2")
    //SQLContext要依赖SparkContext
    val sc = new SparkContext(conf)
    //创建SQLContext
    val sqlContext = new SQLContext(sc)
    //从指定的地址创建RDD
    val personRDD = sc.textFile(args(0)).map(_.split(" "))
    //通过StructType直接指定每个字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )
    //将RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
    //将schema信息应用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    //注册表
    personDataFrame.registerTempTable("t_person")
    //执行SQL
    val df = sqlContext.sql("select * from t_person order by age desc limit 4")
    //将结果以JSON的方式存储到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}

将程序打成jar包,上传到spark集群,提交Spark任务

/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \
--class cn.itcast.spark.sql.InferringSchema \
--master spark://node1.itcast.cn:7077 \
/root/spark-mvn-1.0-SNAPSHOT.jar \
hdfs://node1.itcast.cn:9000/person.txt \
hdfs://node1.itcast.cn:9000/out1

查看结果

hdfs dfs -cat  hdfs://node1.itcast.cn:9000/out1/part-r-*

目录
相关文章
|
6天前
|
SQL 监控 关系型数据库
SQL语句当前及历史信息查询-performance schema的使用
本文介绍了如何使用MySQL的Performance Schema来获取SQL语句的当前和历史执行信息。Performance Schema默认在MySQL 8.0中启用,可以通过查询相关表来获取详细的SQL执行信息,包括当前执行的SQL、历史执行记录和统计汇总信息,从而快速定位和解决性能瓶颈。
|
17天前
|
SQL 存储 缓存
如何优化SQL查询性能?
【10月更文挑战第28天】如何优化SQL查询性能?
61 10
|
11天前
|
SQL 关系型数据库 MySQL
|
25天前
|
SQL 数据库 开发者
功能发布-自定义SQL查询
本期主要为大家介绍ClkLog九月上线的新功能-自定义SQL查询。
|
10天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
1月前
|
SQL Java 数据库连接
如何使用`DriverManager.getConnection()`连接数据库,并利用`PreparedStatement`执行参数化查询,有效防止SQL注入。
【10月更文挑战第6天】在代码与逻辑交织的世界中,我从一名数据库新手出发,通过不断探索与实践,最终成为熟练掌握JDBC的开发者。这段旅程充满挑战与惊喜,从建立数据库连接到执行SQL语句,再到理解事务管理和批处理等高级功能,每一步都让我对JDBC有了更深的认识。示例代码展示了如何使用`DriverManager.getConnection()`连接数据库,并利用`PreparedStatement`执行参数化查询,有效防止SQL注入。
87 5
|
1月前
|
SQL 安全 网络安全
SQL安装程序规则错误解决方案
在安装SQL Server时,遇到安装程序规则错误是一个比较常见的问题
|
1月前
|
SQL 数据挖掘 数据库
SQL查询每秒的数据:技巧、方法与性能优化
id="">SQL查询功能详解 SQL(Structured Query Language,结构化查询语言)是一种专门用于与数据库进行沟通和操作的语言
|
1月前
|
SQL 移动开发 大数据
SQL语句查询连续六天满足条件的记录
在数据库管理和数据分析中,我们经常需要查询符合特定时间范围内连续几天的记录