前面我们学习了如何在Spark Shell中使用SQL完成查询,现在我们来实现在自定义的程序中编写Spark SQL查询程序。
首先在maven项目的pom.xml中添加Spark SQL的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.5.2</version> </dependency>
1.通过反射推断Schema
创建一个object为cn.itcast.spark.sql.InferringSchema
package cn.itcast.spark.sql import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object InferringSchema { def main(args: Array[String]) { //创建SparkConf()并设置App名称 val conf = new SparkConf().setAppName("SQL-1") //SQLContext要依赖SparkContext val sc = new SparkContext(conf) //创建SQLContext val sqlContext = new SQLContext(sc) //从指定的地址创建RDD val lineRDD = sc.textFile(args(0)).map(_.split(" ")) //创建case class //将RDD和case class关联 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //导入隐式转换,如果不到人无法将RDD转换成DataFrame //将RDD转换成DataFrame import sqlContext.implicits._ val personDF = personRDD.toDF //注册表 personDF.registerTempTable("t_person") //传入SQL val df = sqlContext.sql("select * from t_person order by age desc limit 2") //将结果以JSON的方式存储到指定位置 df.write.json(args(1)) //停止Spark Context sc.stop() } } //case class一定要放到外面 case class Person(id: Int, name: String, age: Int)
将程序打成jar包,上传到spark集群,提交Spark任务
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \ --class cn.itcast.spark.sql.InferringSchema \ --master spark://node1.itcast.cn:7077 \ /root/spark-mvn-1.0-SNAPSHOT.jar \ hdfs://node1.itcast.cn:9000/person.txt \ hdfs://node1.itcast.cn:9000/out
查看运行结果
hdfs dfs -cat hdfs://node1.itcast.cn:9000/out/part-r-*
2.通过StructType直接指定Schema
创建一个object为cn.itcast.spark.sql.SpecifyingSchema
package cn.itcast.spark.sql import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types._ import org.apache.spark.{SparkContext, SparkConf} /** * Created by ZX on 2015/12/11. */ object SpecifyingSchema { def main(args: Array[String]) { //创建SparkConf()并设置App名称 val conf = new SparkConf().setAppName("SQL-2") //SQLContext要依赖SparkContext val sc = new SparkContext(conf) //创建SQLContext val sqlContext = new SQLContext(sc) //从指定的地址创建RDD val personRDD = sc.textFile(args(0)).map(_.split(" ")) //通过StructType直接指定每个字段的schema val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) ) ) //将RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) //将schema信息应用到rowRDD上 val personDataFrame = sqlContext.createDataFrame(rowRDD, schema) //注册表 personDataFrame.registerTempTable("t_person") //执行SQL val df = sqlContext.sql("select * from t_person order by age desc limit 4") //将结果以JSON的方式存储到指定位置 df.write.json(args(1)) //停止Spark Context sc.stop() } }
将程序打成jar包,上传到spark集群,提交Spark任务
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit \ --class cn.itcast.spark.sql.InferringSchema \ --master spark://node1.itcast.cn:7077 \ /root/spark-mvn-1.0-SNAPSHOT.jar \ hdfs://node1.itcast.cn:9000/person.txt \ hdfs://node1.itcast.cn:9000/out1
查看结果
hdfs dfs -cat hdfs://node1.itcast.cn:9000/out1/part-r-*