前言
配置的虚拟机为Centos6.7系统,hadoop版本为2.6.0版本,先前已经完成搭建CentOS部署Hbase、CentOS6.7搭建Zookeeper和编写MapReduce前置插件Hadoop-Eclipse-Plugin 安装。在此基础上完成了Hive详解以及CentOS下部署Hive和Mysql和Spark框架在CentOS下部署搭建。Spark的组件Spark SQL的部署:Spark SQL CLI部署CentOS分布式集群Hadoop上方法。
配置JDK1.8、Scala11.12
本文将介绍DataFrame基础操作以及实例运用
一、DataFrame
Spark SQL提供了一个名为DataFrame的抽象编程模型,是由SchemaRDD发展而来。不同于SchemaRDD直接继承RDD,DataFrame自己实现了RDD的绝大多数功能。可以把Spark SQL DataFrame理解为一个分布式的Row对象的数据集合。
Spark SQL已经集成在spark-shell中,因此只要启动spark-shell就可以使用Spark SQL的Shell交互接口。如果在spark-shell中执行SQL语句,需要使用SQLContext对象来调用sql()方法。Spark SQL对数据的查询分成了两个分支:SQLContext和HiveContext,其中HiveContext继承了SQLContext,因此HiveContext除了拥有SQLContext的特性之外还拥有自身的特性。
二、创建DataFrame对象
DataFrame可以通过结构化数据文件、Hive中的表、外部数据库、Spark计算过程中生成的RDD进行创建。不同的数据源转换成DataFrame的方式也不同。
创建sqlContext对象:
val sqlContext=new org.apache.spark.sql.SQLContext(sc)
通过这种方式创建的SQLContext只能执行SQL语句,不能执行HQL的语句。
创建HiveContext对象:
val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
HiveContext不仅支持HiveQL语法解析器,同时也支持SQL语法解析器。
1.结构化数据文件创建DataFrane
一般情况下,把结构化数据文件存储在HDFS。Spark SQL最常见的结构化数据文件格式是Parquet文件或JSON文件。Spark SQL可以通过load()方法将HDFS上的格式化文件转换为DataFrame,load默认导入的文件格式是Parquet。
JSON文件转换DataFrame有两种方法,一种使用format()方法:
val dfPeople=sqlContext.read.format(“json”).load(“/user/SparkSql/”test2.json")
也可以直接用json()方法:
2.外部数据库创建DataFrame
SparkSQL还可以从外部数据库中创建DataFrame,使用这种方式创建DataFrame需要通过JDBC连接或者ODBC连接的方式访问数据库。
这个应该是常用方法通过数据库导入,本人虚拟机MYsql并没有导入文件这里不作演示,代码:
val jdbcDF=sqlContext.read.format("jdbc").options( |Map("url"->url, |"user"->"root", |"passwword"->"root", |"dbtable"->"people")).load()
3.RDD创建DataFrame
RDD数据转为DataFrame有两种方式:
第一种方式利用反射机制推断RDD模式,需要定义一个case class类:
第二种方式是当无法提前定义case class时,可以采用编程指定Schema的方式将RDD转换成DataFrame。通过编程指定Schame需要3步:
(1)从原来的RDD创建一个元组或列表的RDD。
(2)用StructType创建一个和步骤(1)在创建的RDD中元组或列表的结构相匹配的Schema。
(3)通过SQLContext提供的createDataFrame方法将Schema应用到RDD上。
4.Hive中的表创建DataFrame
从Hive表中的表创建DataFrame,可以声明一个HiveContext对象: