本节主要内容
- 数据准备
- 案例实战
数据准备
将实验数据Date.txt、Stock.txt、StockDetail.txt(hadoop fs -put /data /data)上传到HDFS上,如下图所示
Date.txt格式如下:
//Date.txt文件定义了日期的分类,将每天分别赋予所属的月份、星期、季度等属性
//日期,年月,年,月,日,周几,第几周,季度,旬、半月
2014-12-24,201412,2014,12,24,3,52,4,36,24
Stock.txt格式如下:
//Stock.txt文件定义了订单表头
//订单号,交易位置,交易日期
ZYSL00014630,ZY,2009-5-7
StockDetail.txt格式如下:
//订单号,行号,货品,数量,价格,金额
HMJSL00006421,9,QY524266010101,1,80,80
案例实战-查询所有订单中每年的销售单数、销售总额:
//定义case class用于后期创建DataFrame schema
//对应Date.txt
case class DateInfo(dateID:String,theyearmonth :String,theyear:String,themonth:String,thedate :String,theweek:String,theweeks:String,thequot :String,thetenday:String,thehalfmonth:String)
//对应Stock.txt
case class StockInfo(ordernumber:String,locationid :String,dateID:String)
//对应StockDetail.txt
case class StockDetailInfo(ordernumber:String,rownum :Int,itemid:String,qty:Int,price:Double,amount:Double)
//加载数据并转换成DataFrame
val DateInfoDF = sc.textFile("/data/Date.txt").map(_.split(",")).map(d => DateInfo(d(0), d(1),d(2),d(3),d(4),d(5),d(6),d(7),d(8),d(9))).toDF()
//加载数据并转换成DataFrame
val StockInfoDF= sc.textFile("/data/Stock.txt").map(_.split(",")).map(s => StockInfo(s(0), s(1),s(2))).toDF()
//加载数据并转换成DataFrame
val StockDetailInfoDF = sc.textFile("/data/StockDetail.txt").map(_.split(",")).map(s => StockDetailInfo(s(0), s(1).trim.toInt,s(2),s(3).trim.toInt,s(4).trim.toDouble,s(5).trim.toDouble)).toDF()
//注册成表
DateInfoDF.registerTempTable("tblDate")
StockInfoDF.registerTempTable("tblStock")
StockDetailInfoDF.registerTempTable("tblStockDetail")
//执行SQL
//所有订单中每年的销售单数、销售总额
//三个表连接后以count(distinct a.ordernumber)计销售单数,sum(b.amount)计销售总额
sqlContext.sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tblDate c on a.dateid=c.dateid group by c.theyear order by c.theyear").collect().foreach(println)
//执行过程
//执行结果
案例实战-求所有订单每年最大金额订单的销售额:
sqlContext.sql("select c.theyear,max(d.sumofamount) from tblDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear").collect().foreach(println)
//执行过程
//执行结果
“`
透过上述代码可以感受到SparkSQL的强大,其它更为复杂的查询可以参考mmicky_wyy的博客:http://blog.csdn.net/book_mmicky/article/details/39177041
添加公众微信号,可以了解更多最新Spark、Scala相关技术资讯