Spark SQL实战

简介: 一、程序 1 package sparklearning 2 3 import org.apache.log4j.Logger 4 import org.apache.spark.SparkConf 5 import org.

一、程序

 1 package sparklearning
 2 
 3 import org.apache.log4j.Logger
 4 import org.apache.spark.SparkConf
 5 import org.apache.spark.SparkContext
 6 import org.apache.spark.sql.SQLContext
 7 import org.apache.spark.storage.StorageLevel
 8 import org.apache.log4j.Level
 9 
10 object OnLineTradeStatistics {
11 
12   case class User(userID:String,gender:String,age:Int,registerDate:String,provice:String,career:String)
13   case class TradeDetail(tradeID:String, tradeDate:String,productID:Int,amount:Int,userID:String)
14   def main(args: Array[String]){
15       
16     //关闭不必要的日志显示
17       Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
18       Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
19       Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
20       
21       //设置应用程序
22       val conf=new SparkConf().setAppName("On Line Trade Data").setMaster("local")
23       val ctx=new SparkContext(conf)
24       val sqlCtx=new SQLContext(ctx)
25       import sqlCtx.implicits._
26       
27       //读文件  RDD-->DataFrame  
28       val userDF= ctx.textFile("/home/hadoop/data/on_line_trade_user.txt").map(_.split(" ")).map(u=>User(u(0),u(1),u(2).toInt,u(3),u(4),u(5))).toDF()
29       userDF.registerTempTable("user")
30       userDF.persist(StorageLevel.MEMORY_ONLY_SER)
31       
32       val tradeDF= ctx.textFile("/home/hadoop/data/on_line_trade_detail.txt").map(_.split(" ")).map(u=>TradeDetail(u(0),u(1),u(2).toInt,u(3).toInt,u(4))).toDF()
33       tradeDF.registerTempTable("trade")//生成临时表
34       tradeDF.persist(StorageLevel.MEMORY_ONLY_SER)
35       
36       val countOfTrade2016 = sqlCtx.sql("SELECT * FROM trade where tradeDate like '2016%'").count()
37       println("2016 total money: "+countOfTrade2016)
38   }
39 }

二、结果

当神已无能为力,那便是魔渡众生
目录
相关文章
|
20天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
55 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
11天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
44 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
78 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
36 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
54 0
|
分布式计算 算法 大数据
大数据实战之spark安装部署
楔子 我是在2013年底第一次听说Spark,当时我对Scala很感兴趣,而Spark就是使用Scala编写的。一段时间之后,我做了一个有趣的数据科学项目,它试着去预测在泰坦尼克号上幸存。
3088 0
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
87 0