客快物流大数据项目(六十七):客户主题(二)

简介: 客快物流大数据项目(六十七):客户主题(二)

五、客户数据指标开发



1、计算的字段


image.png


2、Spark实现


实现步骤:


  • 在dws目录下创建 ConsumerDWS 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 根据指定的日期获取拉宽后的用户宽表(tbl_customer_detail)增量数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 指标计算
  • 总客户数
  • 今日新增客户数(注册时间为今天)
  • 留存数(超过180天未下单表示已流失,否则表示留存)
  • 留存率
  • 活跃用户数(近10天内有发件的客户表示活跃用户)
  • 月度新老用户数(应该是月度新用户!)
  • 沉睡用户数(3个月~6个月之间的用户表示已沉睡)
  • 流失用户数(9个月未下单表示已流失)
  • 客单数
  • 客单价
  • 平均客单数
  • 普通用户数
  • 获取当前时间yyyyMMddHH
  • 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值)
  • 通过StructType构建指定Schema
  • 创建客户指标数据表(若存在则不创建)
  • 持久化指标数据到kudu表


2.1、初始化环境变量


package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, DateHelper, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ArrayBuffer
/**
 * 客户主题指标计算
 */
object CustomerDWS  extends  OfflineApp {
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取客户明细宽表的数据
     * 4)对客户明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */
    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //处理数据
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
  }
}


2.2、加载客户宽表增量数据并缓存


加载客户宽表的时候,需要指定日期条件,因为客户主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。


//TODO 3)读取客户明细宽表的数据(用户主题的数据不需要按照天进行增量更新,而是每天全量运行)
val customerDetailDF = getKuduSource(sparkSession, OfflineTableDefine.customerDetail, Configuration.isFirstRunnable)


2.3、指标计算


//定义数据集合
val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//TODO 4)对客户明细宽表的数据进行指标的计算
val customerTotalCount: Row = customerDetailDF.agg(count($"id").alias("total_count")).first()
//今日新增客户数
val addTotalCount: Long = customerDetailDF.where(date_format($"regDt", "yyyy-MM-dd").equalTo(DateHelper.getyesterday("yyyy-MM-dd"))).agg(count($"id")).first().getLong(0)
//留存率(超过180天未下单表示已经流失,否则表示留存)
//留存用户数
//val lostCustomerTotalCount: Long = customerDetailDF.join(customerSenderInfoDF.where("cdt >= date_sub(now(), 180)"), customerDetailDF("id") === customerSenderInfoDF("ciid")).count()
val lostCustomerTotalCount: Long = customerDetailDF.where("last_sender_cdt >= date_sub(now(), 180)").count()
println(lostCustomerTotalCount)
//留存率,超过180天未下单的用户数/所有的用户数
val lostRate: Double = (lostCustomerTotalCount / (if (customerTotalCount.isNullAt(0)) 1D else customerTotalCount.getLong(0))).asInstanceOf[Number].doubleValue()
println(lostRate)
// 活跃用户数(近10天内有发件的客户表示活跃用户)
val activeCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 10)").count
// 月度新老用户数(应该是月度新用户!)
val monthOfNewCustomerCount = customerDetailDF.where($"regDt".between(trunc($"regDt", "MM"), date_format(current_date(), "yyyy-MM-dd"))).count
// 沉睡用户数(3个月~6个月之间的用户表示已沉睡)
val sleepCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 180) and last_sender_cdt<=date_sub(now(), 90)").count
println(sleepCustomerCount)
// 流失用户数(9个月未下单表示已流失)
val loseCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 270)").count
println(loseCustomerCount)
// 客单数
val customerSendInfoDF = customerDetailDF.where("first_sender_id is not null")
val customerBillCountAndAmount: Row = customerSendInfoDF.agg(sum("totalCount").alias("totalCount"), sum("totalAmount").alias("totalAmount")).first()
// 客单价
val customerAvgAmount = customerBillCountAndAmount.get(1).toString.toDouble / customerBillCountAndAmount.get(0).toString.toDouble //总金额/总件数
println(customerAvgAmount)
// 平均客单数
val avgCustomerBillCount = customerSendInfoDF.count / customerDetailDF.count
// 获取昨天时间yyyyMMdd
val cdt = DateHelper.getyesterday("yyyyMMdd")
// 构建要持久化的指标数据
val rowInfo = Row(
  cdt,
  if (customerTotalCount.isNullAt(0)) 0L else customerTotalCount.get(0).asInstanceOf[Number].longValue(),
  addTotalCount,
  lostCustomerTotalCount,
  lostRate,
  activeCount,
  monthOfNewCustomerCount,
  sleepCustomerCount,
  loseCustomerCount,
  if (customerBillCountAndAmount.isNullAt(0)) 0L else customerBillCountAndAmount.get(0).asInstanceOf[Number].longValue(),
  customerAvgAmount,
  avgCustomerBillCount
)
rows.append(rowInfo)


2.4、通过StructType构建指定Schema


import sparkSession.implicits._
val schema = StructType(Array(
  StructField("id", StringType, true, Metadata.empty),
  StructField("customerTotalCount", LongType, true, Metadata.empty),
  StructField("addtionTotalCount", LongType, true, Metadata.empty),
  StructField("lostCustomerTotalCount", LongType, true, Metadata.empty),
  StructField("lostRate", DoubleType, true, Metadata.empty),
  StructField("activeCount", LongType, true, Metadata.empty),
  StructField("monthOfNewCustomerCount", LongType, true, Metadata.empty),
  StructField("sleepCustomerCount", LongType, true, Metadata.empty),
  StructField("loseCustomerCount", LongType, true, Metadata.empty),
  StructField("customerBillCount", LongType, true, Metadata.empty),
  StructField("customerAvgAmount", DoubleType, true, Metadata.empty),
  StructField("avgCustomerBillCount", LongType, true, Metadata.empty)
))


2.5、持久化指标数据到kudu表


// 5.2:组织要写入到kudu表的数据
val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)
val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)
save(quotaDF, OfflineTableDefine.customerSummery)


2.6、完整代码


package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configure, DateHelper, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import cn.it.logistics.offline.dws.ExpressBillDWS.{appName, execute}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType}
import scala.collection.mutable.ArrayBuffer
/**
 * 客户主题开发
 * 读取客户明细宽表的数据,然后进行指标开发,将结果存储到kudu表中(DWS层)
 */
object ConsumerDWS extends  OfflineApp{
  //定义应用的名称
  val appName: String = this.getClass.getSimpleName
  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建sparkConf对象
     * 2)创建SparkSession对象
     * 3)读取客户宽表数据(判断是全量装载还是增量装载),将加载的数据进行缓存
     * 4)对客户明细表的数据进行指标计算
     * 5)将计算好的数写入到kudu表中
     *   5.1)定义写入kudu表的schema结构信息
     *   5.2)将组织好的指标结果集合转换成RDD对象
     *   5.3)创建表,写入数据
     * 6)删除缓存,释放资源
     * 7)停止作业,退出sparkSession
     */
    //TODO 1)创建sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName),
      SparkUtils.parameterParser(args)
    )
    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configure.LOG_OFF)
    //执行数据处理的逻辑
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)读取客户明细宽表的数据(用户主题的数据不需要按照天进行增量更新,而是每天全量运行)
    val customerDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.customerDetail, true)
    import sparkSession.implicits._
    val schema = StructType(Array(
      StructField("id", StringType, true, Metadata.empty),
      StructField("customerTotalCount", LongType, true, Metadata.empty),
      StructField("addtionTotalCount", LongType, true, Metadata.empty),
      StructField("lostCustomerTotalCount", LongType, true, Metadata.empty),
      StructField("lostRate", DoubleType, true, Metadata.empty),
      StructField("activeCount", LongType, true, Metadata.empty),
      StructField("monthOfNewCustomerCount", LongType, true, Metadata.empty),
      StructField("sleepCustomerCount", LongType, true, Metadata.empty),
      StructField("loseCustomerCount", LongType, true, Metadata.empty),
      StructField("customerBillCount", LongType, true, Metadata.empty),
      StructField("customerAvgAmount", DoubleType, true, Metadata.empty),
      StructField("avgCustomerBillCount", LongType, true, Metadata.empty),
      StructField("normalCustomerCount", LongType, true, Metadata.empty)
    ))
    //定义数据集合
    val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()
    //TODO 4)对客户明细宽表的数据进行指标的计算
    val customerTotalCount: Row = customerDetailDF.agg(count($"id").alias("total_count")).first()
    //今日新增客户数
    val addTotalCount: Long = customerDetailDF.where(date_format($"regDt", "yyyy-MM-dd").equalTo(DateHelper.getyestday("yyyy-MM-dd"))).agg(count($"id")).first().getLong(0)
    //留存率(超过180天未下单表示已经流失,否则表示留存)
    //留存用户数
    //val lostCustomerTotalCount: Long = customerDetailDF.join(customerSenderInfoDF.where("cdt >= date_sub(now(), 180)"), customerDetailDF("id") === customerSenderInfoDF("ciid")).count()
    val lostCustomerTotalCount: Long = customerDetailDF.where("last_sender_cdt >= date_sub(now(), 180)").count()
    println(lostCustomerTotalCount)
    //留存率,超过180天未下单的用户数/所有的用户数
    val lostRate: Double = (lostCustomerTotalCount / (if (customerTotalCount.isNullAt(0)) 1D else customerTotalCount.getLong(0))).asInstanceOf[Number].doubleValue()
    println(lostRate)
    // 活跃用户数(近10天内有发件的客户表示活跃用户)
    val activeCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 10)").count
    // 月度新老用户数(应该是月度新用户!)
    val monthOfNewCustomerCount = customerDetailDF.where($"regDt".between(trunc($"regDt", "MM"), date_format(current_date(), "yyyy-MM-dd"))).count
    // 沉睡用户数(3个月~6个月之间的用户表示已沉睡)
    val sleepCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 180) and last_sender_cdt<=date_sub(now(), 90)").count
    println(sleepCustomerCount)
    // 流失用户数(9个月未下单表示已流失)
    val loseCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 270)").count
    println(loseCustomerCount)
    // 客单数
    val customerSendInfoDF = customerDetailDF.where("first_sender_id is not null")
    val customerBillCountAndAmount: Row = customerSendInfoDF.agg(sum("totalCount").alias("totalCount"), sum("totalAmount").alias("totalAmount")).first()
    // 客单价
    val customerAvgAmount = customerBillCountAndAmount.get(1).toString.toDouble / customerBillCountAndAmount.get(0).toString.toDouble //总金额/总件数
    println(customerAvgAmount)
    // 平均客单数
    val avgCustomerBillCount = customerSendInfoDF.count / customerDetailDF.count
    // 普通用户数
    val normalCustomerRow: Row = customerDetailDF.where("type=1").agg(count($"id").alias("total_count")).first()
    println(normalCustomerRow)
    val normalCustomerCount: Long = if (normalCustomerRow.isNullAt(0)) 0L else normalCustomerRow.get(0).asInstanceOf[Number].longValue()
    // 获取昨天时间yyyyMMdd
    val cdt = DateHelper.getyestday("yyyyMMdd")
    // 构建要持久化的指标数据
    val rowInfo = Row(
      cdt,
      if (customerTotalCount.isNullAt(0)) 0L else customerTotalCount.get(0).asInstanceOf[Number].longValue(),
      addTotalCount,
      lostCustomerTotalCount,
      lostRate,
      activeCount,
      monthOfNewCustomerCount,
      sleepCustomerCount,
      loseCustomerCount,
      if (customerBillCountAndAmount.isNullAt(0)) 0L else customerBillCountAndAmount.get(0).asInstanceOf[Number].longValue(),
      customerAvgAmount,
      avgCustomerBillCount,
      normalCustomerCount
    )
    rows.append(rowInfo)
    // 5.2:组织要写入到kudu表的数据
    val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)
    val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)
    save(quotaDF, OfflineTableDefine.customerSummery)
    //删除缓存,释放资源
    customerDetailDF.unpersist()
    sparkSession.stop()
  }
}


相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
6月前
|
数据采集 分布式计算 DataWorks
ODPS在某公共数据项目上的实践
本项目基于公共数据定义及ODPS与DataWorks技术,构建一体化智能化数据平台,涵盖数据目录、归集、治理、共享与开放六大目标。通过十大子系统实现全流程管理,强化数据安全与流通,提升业务效率与决策能力,助力数字化改革。
224 4
|
6月前
|
SQL 分布式计算 大数据
别再迷信“上大数据就能飞”了!大数据项目成败的5个真相
别再迷信“上大数据就能飞”了!大数据项目成败的5个真相
153 6
|
6月前
|
JSON 分布式计算 大数据
springboot项目集成大数据第三方dolphinscheduler调度器
springboot项目集成大数据第三方dolphinscheduler调度器
360 3
|
6月前
|
数据采集 机器学习/深度学习 分布式计算
客户老是流失?可能是你没用好大数据!
客户老是流失?可能是你没用好大数据!
173 2
|
分布式计算 大数据 Java
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
129 0
|
9月前
|
数据采集 分布式计算 数据可视化
大数据项目成功的秘诀——不只是技术,更是方法论!
大数据项目成功的秘诀——不只是技术,更是方法论!
243 8
大数据项目成功的秘诀——不只是技术,更是方法论!
|
数据采集 监控 算法
大数据与物流行业:智能配送的实现
【10月更文挑战第31天】在数字化时代,大数据成为物流行业转型升级的关键驱动力。本文探讨大数据如何在物流行业中实现智能配送,包括数据采集与整合、数据分析与挖掘、智能配送规划及实时监控与评估,通过案例分析展示了大数据在优化配送路线和提升物流效率方面的巨大潜力,展望了未来智能配送的高度自动化、实时性和协同化趋势。
1258 1
|
监控 Java 开发者
揭秘Struts 2性能监控:选对工具与方法,让你的应用跑得更快,赢在起跑线上!
【8月更文挑战第31天】在企业级应用开发中,性能监控对系统的稳定运行至关重要。针对流行的Java EE框架Struts 2,本文探讨了性能监控的工具与方法,包括商用的JProfiler、免费的VisualVM以及Struts 2自带的性能监控插件。通过示例代码展示了如何在实际项目中实施这些监控手段,帮助开发者发现和解决性能瓶颈,确保应用在高并发、高负载环境下稳定运行。选择合适的监控工具需综合考虑项目需求、成本、易用性和可扩展性等因素。
180 0
|
4月前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
336 14
|
5月前
|
机器学习/深度学习 运维 监控
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
197 0