SparkSQL实践

简介: SparkSQL实战:统计用户及商品数据指标,包含以下三张表

SparkSQL实战:统计用户及商品数据指标,包含以下三张表


orders表:

image.png

product表:

image.png

prior表:

image.png

实现以下业务需求:

image.png


1.统计product被购买的数量:

val productCnt = priors.groupBy("product_id").count()

2..统计product 被reordered的数量(再次购买)

product_id做group by(聚合),统计一下sum(reorder)的值
1)priors
.selectExpr("product_id","cast(reordered as int)")
.filter(col("reordered")===1)
.groupBy("product_id")
.count()
2)priors.selectExpr("product_id","cast(reordered as int)")
.groupBy("product_id").sum("reordered")
3)priors.selectExpr("product_id","cast(reordered as int)")
.groupBy("product_id").agg(sum("reordered"),avg("reordered"))


(agg一般是搭配groupby这种聚合函数使用和单独使用sum的差别,是在一次聚合中可以统计出来多个值)


3.结合上面数量统计product购买的reordered 的比率avg("reordered")

sql解决:jproduct.selectExpr("*","sum_re/count as mean_re")


udf解决:


join操作:


productCnt:为上面计算好的count


val jproduct = productCnt.join(prodcutSumReorder,"product_id")

image.png

使用udf:1.导包

image.png


2定义udf:.val avg_udf = udf((sm:Long,cnt:Long)=>sm.toDouble/cnt.toDouble)


3.使用udf:

image.png

user 统计/特征

1.每个用户平均购买订单的间隔周期

1)解决用户的第一个订单没有间隔天数的,需要赋值为0


val ordersNew =orders.selectExpr("*","if(days_since_prior_order='',0,days_since_prior_order) as dspo")
.drop("days_since_prior_order").show()
(drop:丢掉其中的一列)


2)最终代码:


ordersNew.selectExpr("user_id","cast(dspo as int)")
.groupBy("user_id").avg("dspo").show()

2.每个用户的总订单数量

orders.groupBy("user_id").count().show()

3.每个用户购买的product商品去重后的集合数据

1.)先求每个用户和product:

image.png

val po = orders.join(priors,"order_id").select("user_id","product_id").show(5)


2)对每个用户对product求和:


DataFrame转RDD处理:

val rddRecords=op.rdd.map(x=>(x(0).toString,x(1).toString))

.groupByKey()

.mapValues(_.toSet.mkString(","))


取出一条数据的结果:

image.png

3)隐式转换:


3.1导包:import spark.implicits._


3.2代码:rddRecords.toDF("user_id","product_records")


结果:

image.png

4.每个用户总商品数量以及去重后的商品数量

总商品数量:

orders.join(priors,"order_id").groupBy("user_id").count().show(5)


解法1:统计去重商品的数量:

image.png

结果:

image.png

解法2:

image.png


5.每个用户购买的平均每个订单的商品数量

1)先求每个订单的商品数量【对订单做聚合count()】

val ordProCnt = priors.groupBy("order_id").count()


2)求每个用户订单中商品个数的平均值【对user做聚合,avg(商品个数)】

orders.join(ordProCnt,"order_id").groupBy("user_id").avg("count")

(count出来的列名就叫count)


目录
相关文章
|
存储 SQL JSON
Spark - 一文搞懂 parquet
parquet 文件常见于 Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲parquet 文件在 spark 场景下的存储,读取与使用中可能遇到的坑。......
1346 0
Spark - 一文搞懂 parquet
|
SQL JSON 分布式计算
SparkSQL 是什么_适用场景 | 学习笔记
快速学习 SparkSQL 是什么_适用场景
258 0
|
机器学习/深度学习 消息中间件 分布式计算
Sparkstreaming 介绍-特点 | 学习笔记
快速学习 Sparkstreaming 介绍-特点
155 0
|
分布式计算 监控 算法
Sparkstreaming 介绍 场景 | 学习笔记
快速学习 Sparkstreaming 介绍 场景
89 0
Sparkstreaming 介绍 场景 | 学习笔记
|
SQL 存储 分布式计算
SparkSQL概念介绍
Spark SQL:将sql转换成spark任务
126 0
SparkSQL概念介绍
|
JSON Java 数据库连接
九十一、Spark-SparkSQL(多数据源处理)
九十一、Spark-SparkSQL(多数据源处理)
九十一、Spark-SparkSQL(多数据源处理)
|
分布式计算 Apache Spark
|
SQL 缓存 分布式计算
SparkSQL 初识_2
快速学习 SparkSQL 初识_2
SparkSQL 初识_2
|
SQL JSON 分布式计算
SparkSQL 初识_3
快速学习 SparkSQL 初识_3
162 0
|
缓存 分布式计算 算法
SparkSQL 初识_1
快速学习 SparkSQL 初识_1
116 0