五、客户数据指标开发
1、计算的字段
2、Spark实现
实现步骤:
- 在dws目录下创建 ConsumerDWS 单例对象,继承自OfflineApp特质
- 初始化环境的参数,创建SparkSession对象
- 根据指定的日期获取拉宽后的用户宽表(tbl_customer_detail)增量数据,并缓存数据
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
- 指标计算
- 总客户数
- 今日新增客户数(注册时间为今天)
- 留存数(超过180天未下单表示已流失,否则表示留存)
- 留存率
- 活跃用户数(近10天内有发件的客户表示活跃用户)
- 月度新老用户数(应该是月度新用户!)
- 沉睡用户数(3个月~6个月之间的用户表示已沉睡)
- 流失用户数(9个月未下单表示已流失)
- 客单数
- 客单价
- 平均客单数
- 普通用户数
- 获取当前时间yyyyMMddHH
- 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值)
- 通过StructType构建指定Schema
- 创建客户指标数据表(若存在则不创建)
- 持久化指标数据到kudu表
2.1、初始化环境变量
package cn.it.logistics.offline.dws import cn.it.logistics.common.{Configuration, DateHelper, OfflineTableDefine, SparkUtils} import cn.it.logistics.offline.OfflineApp import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType} import org.apache.spark.sql.functions._ import scala.collection.mutable.ArrayBuffer /** * 客户主题指标计算 */ object CustomerDWS extends OfflineApp { //定义应用程序的名称 val appName = this.getClass.getSimpleName def main(args: Array[String]): Unit = { /** * 实现步骤: * 1)创建SparkConf对象 * 2)创建SparkSession对象 * 3)读取客户明细宽表的数据 * 4)对客户明细宽表的数据进行指标的计算 * 5)将计算好的指标数据写入到kudu数据库中 * 5.1:定义指标结果表的schema信息 * 5.2:组织需要写入到kudu表的数据 * 5.3:判断指标结果表是否存在,如果不存在则创建 * 5.4:将数据写入到kudu表中 * 6)删除缓存数据 * 7)停止任务,退出sparksession */ //TODO 1)创建SparkConf对象 val sparkConf: SparkConf = SparkUtils.autoSettingEnv( SparkUtils.sparkConf(appName) ) //TODO 2)创建SparkSession对象 val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf) sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF) //处理数据 execute(sparkSession) } /** * 数据处理 * * @param sparkSession */ override def execute(sparkSession: SparkSession): Unit = { sparkSession.stop() } }
2.2、加载客户宽表增量数据并缓存
加载客户宽表的时候,需要指定日期条件,因为客户主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
//TODO 3)读取客户明细宽表的数据(用户主题的数据不需要按照天进行增量更新,而是每天全量运行) val customerDetailDF = getKuduSource(sparkSession, OfflineTableDefine.customerDetail, Configuration.isFirstRunnable)
2.3、指标计算
//定义数据集合 val rows: ArrayBuffer[Row] = ArrayBuffer[Row]() //TODO 4)对客户明细宽表的数据进行指标的计算 val customerTotalCount: Row = customerDetailDF.agg(count($"id").alias("total_count")).first() //今日新增客户数 val addTotalCount: Long = customerDetailDF.where(date_format($"regDt", "yyyy-MM-dd").equalTo(DateHelper.getyesterday("yyyy-MM-dd"))).agg(count($"id")).first().getLong(0) //留存率(超过180天未下单表示已经流失,否则表示留存) //留存用户数 //val lostCustomerTotalCount: Long = customerDetailDF.join(customerSenderInfoDF.where("cdt >= date_sub(now(), 180)"), customerDetailDF("id") === customerSenderInfoDF("ciid")).count() val lostCustomerTotalCount: Long = customerDetailDF.where("last_sender_cdt >= date_sub(now(), 180)").count() println(lostCustomerTotalCount) //留存率,超过180天未下单的用户数/所有的用户数 val lostRate: Double = (lostCustomerTotalCount / (if (customerTotalCount.isNullAt(0)) 1D else customerTotalCount.getLong(0))).asInstanceOf[Number].doubleValue() println(lostRate) // 活跃用户数(近10天内有发件的客户表示活跃用户) val activeCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 10)").count // 月度新老用户数(应该是月度新用户!) val monthOfNewCustomerCount = customerDetailDF.where($"regDt".between(trunc($"regDt", "MM"), date_format(current_date(), "yyyy-MM-dd"))).count // 沉睡用户数(3个月~6个月之间的用户表示已沉睡) val sleepCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 180) and last_sender_cdt<=date_sub(now(), 90)").count println(sleepCustomerCount) // 流失用户数(9个月未下单表示已流失) val loseCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 270)").count println(loseCustomerCount) // 客单数 val customerSendInfoDF = customerDetailDF.where("first_sender_id is not null") val customerBillCountAndAmount: Row = customerSendInfoDF.agg(sum("totalCount").alias("totalCount"), sum("totalAmount").alias("totalAmount")).first() // 客单价 val customerAvgAmount = customerBillCountAndAmount.get(1).toString.toDouble / customerBillCountAndAmount.get(0).toString.toDouble //总金额/总件数 println(customerAvgAmount) // 平均客单数 val avgCustomerBillCount = customerSendInfoDF.count / customerDetailDF.count // 获取昨天时间yyyyMMdd val cdt = DateHelper.getyesterday("yyyyMMdd") // 构建要持久化的指标数据 val rowInfo = Row( cdt, if (customerTotalCount.isNullAt(0)) 0L else customerTotalCount.get(0).asInstanceOf[Number].longValue(), addTotalCount, lostCustomerTotalCount, lostRate, activeCount, monthOfNewCustomerCount, sleepCustomerCount, loseCustomerCount, if (customerBillCountAndAmount.isNullAt(0)) 0L else customerBillCountAndAmount.get(0).asInstanceOf[Number].longValue(), customerAvgAmount, avgCustomerBillCount ) rows.append(rowInfo)
2.4、通过StructType构建指定Schema
import sparkSession.implicits._ val schema = StructType(Array( StructField("id", StringType, true, Metadata.empty), StructField("customerTotalCount", LongType, true, Metadata.empty), StructField("addtionTotalCount", LongType, true, Metadata.empty), StructField("lostCustomerTotalCount", LongType, true, Metadata.empty), StructField("lostRate", DoubleType, true, Metadata.empty), StructField("activeCount", LongType, true, Metadata.empty), StructField("monthOfNewCustomerCount", LongType, true, Metadata.empty), StructField("sleepCustomerCount", LongType, true, Metadata.empty), StructField("loseCustomerCount", LongType, true, Metadata.empty), StructField("customerBillCount", LongType, true, Metadata.empty), StructField("customerAvgAmount", DoubleType, true, Metadata.empty), StructField("avgCustomerBillCount", LongType, true, Metadata.empty) ))
2.5、持久化指标数据到kudu表
// 5.2:组织要写入到kudu表的数据 val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows) val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema) save(quotaDF, OfflineTableDefine.customerSummery)
2.6、完整代码
package cn.it.logistics.offline.dws import cn.it.logistics.common.{Configure, DateHelper, OfflineTableDefine, SparkUtils} import cn.it.logistics.offline.OfflineApp import cn.it.logistics.offline.dws.ExpressBillDWS.{appName, execute} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType} import scala.collection.mutable.ArrayBuffer /** * 客户主题开发 * 读取客户明细宽表的数据,然后进行指标开发,将结果存储到kudu表中(DWS层) */ object ConsumerDWS extends OfflineApp{ //定义应用的名称 val appName: String = this.getClass.getSimpleName /** * 入口函数 * @param args */ def main(args: Array[String]): Unit = { /** * 实现步骤: * 1)创建sparkConf对象 * 2)创建SparkSession对象 * 3)读取客户宽表数据(判断是全量装载还是增量装载),将加载的数据进行缓存 * 4)对客户明细表的数据进行指标计算 * 5)将计算好的数写入到kudu表中 * 5.1)定义写入kudu表的schema结构信息 * 5.2)将组织好的指标结果集合转换成RDD对象 * 5.3)创建表,写入数据 * 6)删除缓存,释放资源 * 7)停止作业,退出sparkSession */ //TODO 1)创建sparkConf对象 val sparkConf: SparkConf = SparkUtils.autoSettingEnv( SparkUtils.sparkConf(appName), SparkUtils.parameterParser(args) ) //TODO 2)创建SparkSession对象 val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf) sparkSession.sparkContext.setLogLevel(Configure.LOG_OFF) //执行数据处理的逻辑 execute(sparkSession) } /** * 数据处理 * * @param sparkSession */ override def execute(sparkSession: SparkSession): Unit = { //TODO 3)读取客户明细宽表的数据(用户主题的数据不需要按照天进行增量更新,而是每天全量运行) val customerDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.customerDetail, true) import sparkSession.implicits._ val schema = StructType(Array( StructField("id", StringType, true, Metadata.empty), StructField("customerTotalCount", LongType, true, Metadata.empty), StructField("addtionTotalCount", LongType, true, Metadata.empty), StructField("lostCustomerTotalCount", LongType, true, Metadata.empty), StructField("lostRate", DoubleType, true, Metadata.empty), StructField("activeCount", LongType, true, Metadata.empty), StructField("monthOfNewCustomerCount", LongType, true, Metadata.empty), StructField("sleepCustomerCount", LongType, true, Metadata.empty), StructField("loseCustomerCount", LongType, true, Metadata.empty), StructField("customerBillCount", LongType, true, Metadata.empty), StructField("customerAvgAmount", DoubleType, true, Metadata.empty), StructField("avgCustomerBillCount", LongType, true, Metadata.empty), StructField("normalCustomerCount", LongType, true, Metadata.empty) )) //定义数据集合 val rows: ArrayBuffer[Row] = ArrayBuffer[Row]() //TODO 4)对客户明细宽表的数据进行指标的计算 val customerTotalCount: Row = customerDetailDF.agg(count($"id").alias("total_count")).first() //今日新增客户数 val addTotalCount: Long = customerDetailDF.where(date_format($"regDt", "yyyy-MM-dd").equalTo(DateHelper.getyestday("yyyy-MM-dd"))).agg(count($"id")).first().getLong(0) //留存率(超过180天未下单表示已经流失,否则表示留存) //留存用户数 //val lostCustomerTotalCount: Long = customerDetailDF.join(customerSenderInfoDF.where("cdt >= date_sub(now(), 180)"), customerDetailDF("id") === customerSenderInfoDF("ciid")).count() val lostCustomerTotalCount: Long = customerDetailDF.where("last_sender_cdt >= date_sub(now(), 180)").count() println(lostCustomerTotalCount) //留存率,超过180天未下单的用户数/所有的用户数 val lostRate: Double = (lostCustomerTotalCount / (if (customerTotalCount.isNullAt(0)) 1D else customerTotalCount.getLong(0))).asInstanceOf[Number].doubleValue() println(lostRate) // 活跃用户数(近10天内有发件的客户表示活跃用户) val activeCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 10)").count // 月度新老用户数(应该是月度新用户!) val monthOfNewCustomerCount = customerDetailDF.where($"regDt".between(trunc($"regDt", "MM"), date_format(current_date(), "yyyy-MM-dd"))).count // 沉睡用户数(3个月~6个月之间的用户表示已沉睡) val sleepCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 180) and last_sender_cdt<=date_sub(now(), 90)").count println(sleepCustomerCount) // 流失用户数(9个月未下单表示已流失) val loseCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 270)").count println(loseCustomerCount) // 客单数 val customerSendInfoDF = customerDetailDF.where("first_sender_id is not null") val customerBillCountAndAmount: Row = customerSendInfoDF.agg(sum("totalCount").alias("totalCount"), sum("totalAmount").alias("totalAmount")).first() // 客单价 val customerAvgAmount = customerBillCountAndAmount.get(1).toString.toDouble / customerBillCountAndAmount.get(0).toString.toDouble //总金额/总件数 println(customerAvgAmount) // 平均客单数 val avgCustomerBillCount = customerSendInfoDF.count / customerDetailDF.count // 普通用户数 val normalCustomerRow: Row = customerDetailDF.where("type=1").agg(count($"id").alias("total_count")).first() println(normalCustomerRow) val normalCustomerCount: Long = if (normalCustomerRow.isNullAt(0)) 0L else normalCustomerRow.get(0).asInstanceOf[Number].longValue() // 获取昨天时间yyyyMMdd val cdt = DateHelper.getyestday("yyyyMMdd") // 构建要持久化的指标数据 val rowInfo = Row( cdt, if (customerTotalCount.isNullAt(0)) 0L else customerTotalCount.get(0).asInstanceOf[Number].longValue(), addTotalCount, lostCustomerTotalCount, lostRate, activeCount, monthOfNewCustomerCount, sleepCustomerCount, loseCustomerCount, if (customerBillCountAndAmount.isNullAt(0)) 0L else customerBillCountAndAmount.get(0).asInstanceOf[Number].longValue(), customerAvgAmount, avgCustomerBillCount, normalCustomerCount ) rows.append(rowInfo) // 5.2:组织要写入到kudu表的数据 val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows) val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema) save(quotaDF, OfflineTableDefine.customerSummery) //删除缓存,释放资源 customerDetailDF.unpersist() sparkSession.stop() } }