客快物流大数据项目(六十七):客户主题(二)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 客快物流大数据项目(六十七):客户主题(二)

五、客户数据指标开发



1、计算的字段


image.png


2、Spark实现


实现步骤:


  • 在dws目录下创建 ConsumerDWS 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 根据指定的日期获取拉宽后的用户宽表(tbl_customer_detail)增量数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 指标计算
  • 总客户数
  • 今日新增客户数(注册时间为今天)
  • 留存数(超过180天未下单表示已流失,否则表示留存)
  • 留存率
  • 活跃用户数(近10天内有发件的客户表示活跃用户)
  • 月度新老用户数(应该是月度新用户!)
  • 沉睡用户数(3个月~6个月之间的用户表示已沉睡)
  • 流失用户数(9个月未下单表示已流失)
  • 客单数
  • 客单价
  • 平均客单数
  • 普通用户数
  • 获取当前时间yyyyMMddHH
  • 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值)
  • 通过StructType构建指定Schema
  • 创建客户指标数据表(若存在则不创建)
  • 持久化指标数据到kudu表


2.1、初始化环境变量


package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, DateHelper, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ArrayBuffer
/**
 * 客户主题指标计算
 */
object CustomerDWS  extends  OfflineApp {
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取客户明细宽表的数据
     * 4)对客户明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */
    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //处理数据
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
  }
}


2.2、加载客户宽表增量数据并缓存


加载客户宽表的时候,需要指定日期条件,因为客户主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。


//TODO 3)读取客户明细宽表的数据(用户主题的数据不需要按照天进行增量更新,而是每天全量运行)
val customerDetailDF = getKuduSource(sparkSession, OfflineTableDefine.customerDetail, Configuration.isFirstRunnable)


2.3、指标计算


//定义数据集合
val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//TODO 4)对客户明细宽表的数据进行指标的计算
val customerTotalCount: Row = customerDetailDF.agg(count($"id").alias("total_count")).first()
//今日新增客户数
val addTotalCount: Long = customerDetailDF.where(date_format($"regDt", "yyyy-MM-dd").equalTo(DateHelper.getyesterday("yyyy-MM-dd"))).agg(count($"id")).first().getLong(0)
//留存率(超过180天未下单表示已经流失,否则表示留存)
//留存用户数
//val lostCustomerTotalCount: Long = customerDetailDF.join(customerSenderInfoDF.where("cdt >= date_sub(now(), 180)"), customerDetailDF("id") === customerSenderInfoDF("ciid")).count()
val lostCustomerTotalCount: Long = customerDetailDF.where("last_sender_cdt >= date_sub(now(), 180)").count()
println(lostCustomerTotalCount)
//留存率,超过180天未下单的用户数/所有的用户数
val lostRate: Double = (lostCustomerTotalCount / (if (customerTotalCount.isNullAt(0)) 1D else customerTotalCount.getLong(0))).asInstanceOf[Number].doubleValue()
println(lostRate)
// 活跃用户数(近10天内有发件的客户表示活跃用户)
val activeCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 10)").count
// 月度新老用户数(应该是月度新用户!)
val monthOfNewCustomerCount = customerDetailDF.where($"regDt".between(trunc($"regDt", "MM"), date_format(current_date(), "yyyy-MM-dd"))).count
// 沉睡用户数(3个月~6个月之间的用户表示已沉睡)
val sleepCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 180) and last_sender_cdt<=date_sub(now(), 90)").count
println(sleepCustomerCount)
// 流失用户数(9个月未下单表示已流失)
val loseCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 270)").count
println(loseCustomerCount)
// 客单数
val customerSendInfoDF = customerDetailDF.where("first_sender_id is not null")
val customerBillCountAndAmount: Row = customerSendInfoDF.agg(sum("totalCount").alias("totalCount"), sum("totalAmount").alias("totalAmount")).first()
// 客单价
val customerAvgAmount = customerBillCountAndAmount.get(1).toString.toDouble / customerBillCountAndAmount.get(0).toString.toDouble //总金额/总件数
println(customerAvgAmount)
// 平均客单数
val avgCustomerBillCount = customerSendInfoDF.count / customerDetailDF.count
// 获取昨天时间yyyyMMdd
val cdt = DateHelper.getyesterday("yyyyMMdd")
// 构建要持久化的指标数据
val rowInfo = Row(
  cdt,
  if (customerTotalCount.isNullAt(0)) 0L else customerTotalCount.get(0).asInstanceOf[Number].longValue(),
  addTotalCount,
  lostCustomerTotalCount,
  lostRate,
  activeCount,
  monthOfNewCustomerCount,
  sleepCustomerCount,
  loseCustomerCount,
  if (customerBillCountAndAmount.isNullAt(0)) 0L else customerBillCountAndAmount.get(0).asInstanceOf[Number].longValue(),
  customerAvgAmount,
  avgCustomerBillCount
)
rows.append(rowInfo)


2.4、通过StructType构建指定Schema


import sparkSession.implicits._
val schema = StructType(Array(
  StructField("id", StringType, true, Metadata.empty),
  StructField("customerTotalCount", LongType, true, Metadata.empty),
  StructField("addtionTotalCount", LongType, true, Metadata.empty),
  StructField("lostCustomerTotalCount", LongType, true, Metadata.empty),
  StructField("lostRate", DoubleType, true, Metadata.empty),
  StructField("activeCount", LongType, true, Metadata.empty),
  StructField("monthOfNewCustomerCount", LongType, true, Metadata.empty),
  StructField("sleepCustomerCount", LongType, true, Metadata.empty),
  StructField("loseCustomerCount", LongType, true, Metadata.empty),
  StructField("customerBillCount", LongType, true, Metadata.empty),
  StructField("customerAvgAmount", DoubleType, true, Metadata.empty),
  StructField("avgCustomerBillCount", LongType, true, Metadata.empty)
))


2.5、持久化指标数据到kudu表


// 5.2:组织要写入到kudu表的数据
val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)
val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)
save(quotaDF, OfflineTableDefine.customerSummery)


2.6、完整代码


package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configure, DateHelper, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import cn.it.logistics.offline.dws.ExpressBillDWS.{appName, execute}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType}
import scala.collection.mutable.ArrayBuffer
/**
 * 客户主题开发
 * 读取客户明细宽表的数据,然后进行指标开发,将结果存储到kudu表中(DWS层)
 */
object ConsumerDWS extends  OfflineApp{
  //定义应用的名称
  val appName: String = this.getClass.getSimpleName
  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建sparkConf对象
     * 2)创建SparkSession对象
     * 3)读取客户宽表数据(判断是全量装载还是增量装载),将加载的数据进行缓存
     * 4)对客户明细表的数据进行指标计算
     * 5)将计算好的数写入到kudu表中
     *   5.1)定义写入kudu表的schema结构信息
     *   5.2)将组织好的指标结果集合转换成RDD对象
     *   5.3)创建表,写入数据
     * 6)删除缓存,释放资源
     * 7)停止作业,退出sparkSession
     */
    //TODO 1)创建sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName),
      SparkUtils.parameterParser(args)
    )
    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configure.LOG_OFF)
    //执行数据处理的逻辑
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)读取客户明细宽表的数据(用户主题的数据不需要按照天进行增量更新,而是每天全量运行)
    val customerDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.customerDetail, true)
    import sparkSession.implicits._
    val schema = StructType(Array(
      StructField("id", StringType, true, Metadata.empty),
      StructField("customerTotalCount", LongType, true, Metadata.empty),
      StructField("addtionTotalCount", LongType, true, Metadata.empty),
      StructField("lostCustomerTotalCount", LongType, true, Metadata.empty),
      StructField("lostRate", DoubleType, true, Metadata.empty),
      StructField("activeCount", LongType, true, Metadata.empty),
      StructField("monthOfNewCustomerCount", LongType, true, Metadata.empty),
      StructField("sleepCustomerCount", LongType, true, Metadata.empty),
      StructField("loseCustomerCount", LongType, true, Metadata.empty),
      StructField("customerBillCount", LongType, true, Metadata.empty),
      StructField("customerAvgAmount", DoubleType, true, Metadata.empty),
      StructField("avgCustomerBillCount", LongType, true, Metadata.empty),
      StructField("normalCustomerCount", LongType, true, Metadata.empty)
    ))
    //定义数据集合
    val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()
    //TODO 4)对客户明细宽表的数据进行指标的计算
    val customerTotalCount: Row = customerDetailDF.agg(count($"id").alias("total_count")).first()
    //今日新增客户数
    val addTotalCount: Long = customerDetailDF.where(date_format($"regDt", "yyyy-MM-dd").equalTo(DateHelper.getyestday("yyyy-MM-dd"))).agg(count($"id")).first().getLong(0)
    //留存率(超过180天未下单表示已经流失,否则表示留存)
    //留存用户数
    //val lostCustomerTotalCount: Long = customerDetailDF.join(customerSenderInfoDF.where("cdt >= date_sub(now(), 180)"), customerDetailDF("id") === customerSenderInfoDF("ciid")).count()
    val lostCustomerTotalCount: Long = customerDetailDF.where("last_sender_cdt >= date_sub(now(), 180)").count()
    println(lostCustomerTotalCount)
    //留存率,超过180天未下单的用户数/所有的用户数
    val lostRate: Double = (lostCustomerTotalCount / (if (customerTotalCount.isNullAt(0)) 1D else customerTotalCount.getLong(0))).asInstanceOf[Number].doubleValue()
    println(lostRate)
    // 活跃用户数(近10天内有发件的客户表示活跃用户)
    val activeCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 10)").count
    // 月度新老用户数(应该是月度新用户!)
    val monthOfNewCustomerCount = customerDetailDF.where($"regDt".between(trunc($"regDt", "MM"), date_format(current_date(), "yyyy-MM-dd"))).count
    // 沉睡用户数(3个月~6个月之间的用户表示已沉睡)
    val sleepCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 180) and last_sender_cdt<=date_sub(now(), 90)").count
    println(sleepCustomerCount)
    // 流失用户数(9个月未下单表示已流失)
    val loseCustomerCount = customerDetailDF.where("last_sender_cdt>=date_sub(now(), 270)").count
    println(loseCustomerCount)
    // 客单数
    val customerSendInfoDF = customerDetailDF.where("first_sender_id is not null")
    val customerBillCountAndAmount: Row = customerSendInfoDF.agg(sum("totalCount").alias("totalCount"), sum("totalAmount").alias("totalAmount")).first()
    // 客单价
    val customerAvgAmount = customerBillCountAndAmount.get(1).toString.toDouble / customerBillCountAndAmount.get(0).toString.toDouble //总金额/总件数
    println(customerAvgAmount)
    // 平均客单数
    val avgCustomerBillCount = customerSendInfoDF.count / customerDetailDF.count
    // 普通用户数
    val normalCustomerRow: Row = customerDetailDF.where("type=1").agg(count($"id").alias("total_count")).first()
    println(normalCustomerRow)
    val normalCustomerCount: Long = if (normalCustomerRow.isNullAt(0)) 0L else normalCustomerRow.get(0).asInstanceOf[Number].longValue()
    // 获取昨天时间yyyyMMdd
    val cdt = DateHelper.getyestday("yyyyMMdd")
    // 构建要持久化的指标数据
    val rowInfo = Row(
      cdt,
      if (customerTotalCount.isNullAt(0)) 0L else customerTotalCount.get(0).asInstanceOf[Number].longValue(),
      addTotalCount,
      lostCustomerTotalCount,
      lostRate,
      activeCount,
      monthOfNewCustomerCount,
      sleepCustomerCount,
      loseCustomerCount,
      if (customerBillCountAndAmount.isNullAt(0)) 0L else customerBillCountAndAmount.get(0).asInstanceOf[Number].longValue(),
      customerAvgAmount,
      avgCustomerBillCount,
      normalCustomerCount
    )
    rows.append(rowInfo)
    // 5.2:组织要写入到kudu表的数据
    val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)
    val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)
    save(quotaDF, OfflineTableDefine.customerSummery)
    //删除缓存,释放资源
    customerDetailDF.unpersist()
    sparkSession.stop()
  }
}


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
16天前
|
数据采集 监控 算法
大数据与物流行业:智能配送的实现
【10月更文挑战第31天】在数字化时代,大数据成为物流行业转型升级的关键驱动力。本文探讨大数据如何在物流行业中实现智能配送,包括数据采集与整合、数据分析与挖掘、智能配送规划及实时监控与评估,通过案例分析展示了大数据在优化配送路线和提升物流效率方面的巨大潜力,展望了未来智能配送的高度自动化、实时性和协同化趋势。
|
3月前
|
机器学习/深度学习 设计模式 人工智能
面向对象方法在AIGC和大数据集成项目中的应用
【8月更文第12天】随着人工智能生成内容(AIGC)和大数据技术的快速发展,企业面临着前所未有的挑战和机遇。AIGC技术能够自动产生高质量的内容,而大数据技术则能提供海量数据的支持,两者的结合为企业提供了强大的竞争优势。然而,要充分利用这些技术,就需要构建一个既能处理大规模数据又能高效集成机器学习模型的集成框架。面向对象编程(OOP)以其封装性、继承性和多态性等特点,在构建这样的复杂系统中扮演着至关重要的角色。
66 3
|
3月前
|
监控 Java 开发者
揭秘Struts 2性能监控:选对工具与方法,让你的应用跑得更快,赢在起跑线上!
【8月更文挑战第31天】在企业级应用开发中,性能监控对系统的稳定运行至关重要。针对流行的Java EE框架Struts 2,本文探讨了性能监控的工具与方法,包括商用的JProfiler、免费的VisualVM以及Struts 2自带的性能监控插件。通过示例代码展示了如何在实际项目中实施这些监控手段,帮助开发者发现和解决性能瓶颈,确保应用在高并发、高负载环境下稳定运行。选择合适的监控工具需综合考虑项目需求、成本、易用性和可扩展性等因素。
43 0
|
3月前
|
SQL 大数据 分布式数据库
SQL与大数据的神秘力量:如何用高效SQL处理海量数据,让你的项目一鸣惊人?
【8月更文挑战第31天】在现代软件开发中,处理海量数据是关键挑战之一。本文探讨了SQL与大数据结合的方法,包括数据类型优化、索引优化、分区优化及分布式数据库应用,并通过示例代码展示了如何实施这些策略。通过遵循最佳实践,如了解查询模式、使用性能工具及定期维护索引,开发者可以更高效地利用SQL处理大规模数据集。随着SQL技术的发展,其在软件开发中的作用将愈发重要。
103 0
|
4月前
|
弹性计算 分布式计算 大数据
MaxCompute产品使用合集之如何将用户A从项目空间A申请的表权限需要改为用户B
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4月前
|
分布式计算 运维 DataWorks
MaxCompute操作报错合集之用户已在DataWorks项目中,并有项目的开发和运维权限,下载数据时遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何查询MaxCompute项目中的所有表及其字段信息
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
1月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
4天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
50 7
|
4天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
15 2