SparkSQL实战:统计用户及商品数据指标,包含以下三张表
orders表:
product表:
prior表:
实现以下业务需求:
1.统计product被购买的数量:
val productCnt = priors.groupBy("product_id").count()
2..统计product 被reordered的数量(再次购买)
product_id做group by(聚合),统计一下sum(reorder)的值 1)priors .selectExpr("product_id","cast(reordered as int)") .filter(col("reordered")===1) .groupBy("product_id") .count() 2)priors.selectExpr("product_id","cast(reordered as int)") .groupBy("product_id").sum("reordered") 3)priors.selectExpr("product_id","cast(reordered as int)") .groupBy("product_id").agg(sum("reordered"),avg("reordered"))
(agg一般是搭配groupby这种聚合函数使用和单独使用sum的差别,是在一次聚合中可以统计出来多个值)
3.结合上面数量统计product购买的reordered 的比率avg("reordered")
sql解决:jproduct.selectExpr("*","sum_re/count as mean_re")
udf解决:
join操作:
productCnt:为上面计算好的count
val jproduct = productCnt.join(prodcutSumReorder,"product_id")
使用udf:1.导包
2定义udf:.val avg_udf = udf((sm:Long,cnt:Long)=>sm.toDouble/cnt.toDouble)
3.使用udf:
user 统计/特征
1.每个用户平均购买订单的间隔周期
1)解决用户的第一个订单没有间隔天数的,需要赋值为0
val ordersNew =orders.selectExpr("*","if(days_since_prior_order='',0,days_since_prior_order) as dspo") .drop("days_since_prior_order").show() (drop:丢掉其中的一列)
2)最终代码:
ordersNew.selectExpr("user_id","cast(dspo as int)") .groupBy("user_id").avg("dspo").show()
2.每个用户的总订单数量
orders.groupBy("user_id").count().show()
3.每个用户购买的product商品去重后的集合数据
1.)先求每个用户和product:
val po = orders.join(priors,"order_id").select("user_id","product_id").show(5)
2)对每个用户对product求和:
DataFrame转RDD处理:
val rddRecords=op.rdd.map(x=>(x(0).toString,x(1).toString))
.groupByKey()
.mapValues(_.toSet.mkString(","))
取出一条数据的结果:
3)隐式转换:
3.1导包:import spark.implicits._
3.2代码:rddRecords.toDF("user_id","product_records")
结果:
4.每个用户总商品数量以及去重后的商品数量
总商品数量:
orders.join(priors,"order_id").groupBy("user_id").count().show(5)
解法1:统计去重商品的数量:
结果:
解法2:
5.每个用户购买的平均每个订单的商品数量
1)先求每个订单的商品数量【对订单做聚合count()】
val ordProCnt = priors.groupBy("order_id").count()
2)求每个用户订单中商品个数的平均值【对user做聚合,avg(商品个数)】
orders.join(ordProCnt,"order_id").groupBy("user_id").avg("count")
(count出来的列名就叫count)