客快物流大数据项目(六十七):客户主题(一)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 客快物流大数据项目(六十七):客户主题(一)

客户主题


一、背景介绍



客户主题主要是通过分析用户的下单情况构建用户画像


二、指标明细



image.png


三、表关联关系


1、事实表


image.png


2、维度表


image.png


3、关联关系


用户表与维度表的关联关系如下:


image.png


四、客户数据拉宽开发



1、拉宽后的字段


image.png

image.png


2、SQL语句


SELECT 
TC."id" ,
TC."name" ,
TC."tel",
TC."mobile",
TC."email",
TC."type",
TC."is_own_reg",
TC."reg_dt",
TC."reg_channel_id",
TC."state",
TC."cdt",
TC."udt",
TC."last_login_dt",
TC."remark",
customercodes."code_desc",
sender_info.first_cdt AS first_sender_cdt ,
sender_info.last_cdt AS last_sender_cdt, 
sender_info.billCount AS billCount, 
sender_info.totalAmount AS totalAmount
FROM "tbl_customer" tc 
LEFT JOIN (
SELECT 
  "ciid", min(sender_info."id") first_id, max(sender_info."id") last_id, min(sender_info."cdt") first_cdt, max(sender_info."cdt") last_cdt,COUNT(sender_info."id" ) billCount,sum(express_package."actual_amount") totalAmount
  FROM "tbl_consumer_sender_info" sender_info
  LEFT JOIN "tbl_express_package" express_package
  ON SENDER_INFO."pkg_id" =express_package."id"
  GROUP BY sender_info."ciid"
) sender_info
  ON  tc."id" = sender_info."ciid"
LEFT JOIN "tbl_codes" customercodes ON customercodes."type" =16 AND tc."type" =customercodes."code"


3、Spark实现



实现步骤:


  • 在dwd目录下创建 CustomerDWD 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 获取客户表(tbl_customer)数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 获取客户寄件信息表(tbl_consumer_sender_info)数据,并缓存数据
  • 获取客户包裹表(tbl_express_package)数据,并缓存数据
  • 获取物流字典码表(tbl_codes)数据,并缓存数据
  • 根据以下方式拉宽仓库车辆明细数据
  • 根据客户id,在客户表中获取客户数据
  • 根据包裹id,在包裹表中获取包裹数据
  • 根据客户类型id,在物流字典码表中获取客户类型名称数据
  • 创建客户明细宽表(若存在则不创建)
  • 将客户明细宽表数据写入到kudu数据表中
  • 删除缓存数据


3.1、初始化环境变量


初始化客户明细拉宽作业的环境变量


package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
/**
 * 客户主题数据的拉宽操作
 */
object CustomerDWD extends OfflineApp {
  //定义应用的名称
  val appName = this.getClass.getSimpleName
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */
    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //数据处理
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}


3.2、加载客户相关的表并缓存


加载客户表的时候,需要指定日期条件,因为客户主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。

判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)


//导入隐士转换
import sparkSession.implicits._
val customerSenderInfoDF: DataFrame = getKuduSource(sparkSession, TableMapping.consumerSenderInfo, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val customerDF = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)
val expressPageageDF = getKuduSource(sparkSession, TableMapping.expressPackage, true).persist(StorageLevel.DISK_ONLY_2)
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
val customerTypeDF = codesDF.where($"type" === CodeTypeMapping.CustomType)


3.3、定义表的关联关系


为了在DWS层任务中方便的获取每日增量客户表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:


//TODO 4)定义维度表与事实表的关联关系
val left_outer = "left_outer"
/**
 * 获取每个用户的首尾单发货信息及发货件数和总金额
 */
val customerSenderDetailInfoDF: DataFrame = customerSenderInfoDF.join(expressPageageDF, expressPageageDF("id") === customerSenderInfoDF("pkgId"), left_outer)
  .groupBy(customerSenderInfoDF("ciid"))
  .agg(min(customerSenderInfoDF("id")).alias("first_id"),
    max(customerSenderInfoDF("id")).alias("last_id"),
    min(expressPageageDF("cdt")).alias("first_cdt"),
    max(expressPageageDF("cdt")).alias("last_cdt"),
    count(customerSenderInfoDF("id")).alias("totalCount"),
    sum(expressPageageDF("actualAmount")).alias("totalAmount")
  )
val customerDetailDF: DataFrame = customerDF
  .join(customerSenderDetailInfoDF, customerDF("id") === customerSenderInfoDF("ciid"), left_outer)
  .join(customerTypeDF, customerDF("type") === customerTypeDF("code").cast(IntegerType), left_outer)
  .sort(customerDF("cdt").asc)
  .select(
    customerDF("id"),
    customerDF("name"),
    customerDF("tel"),
    customerDF("mobile"),
    customerDF("type").cast(IntegerType),
    customerTypeDF("codeDesc").as("type_name"),
    customerDF("isownreg").as("is_own_reg"),
    customerDF("regdt").as("regdt"),
    customerDF("regchannelid").as("reg_channel_id"),
    customerDF("state"),
    customerDF("cdt"),
    customerDF("udt"),
    customerDF("lastlogindt").as("last_login_dt"),
    customerDF("remark"),
    customerSenderDetailInfoDF("first_id").as("first_sender_id"), //首次寄件id
    customerSenderDetailInfoDF("last_id").as("last_sender_id"), //尾次寄件id
    customerSenderDetailInfoDF("first_cdt").as("first_sender_cdt"), //首次寄件时间
    customerSenderDetailInfoDF("last_cdt").as("last_sender_cdt"), //尾次寄件时间
    customerSenderDetailInfoDF("totalCount"), //寄件总次数
    customerSenderDetailInfoDF("totalAmount") //总金额
  )


3.4、创建客户明细宽表并将客户明细数据写入到kudu数据表中


客户明细宽表数据需要保存到kudu中,因此在第一次执行客户明细拉宽操作时,客户明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建


实现步骤:


在CustomerDWD 单例对象中调用save方法


实实现过程:


在CustomerDWD 单例对象Main方法中调用save方法


save(customerDetailDF, OfflineTableDefine.customerDetail)


3.5、删除缓存数据


为了释放资源,客户明细宽表数据计算完成以后,需要将缓存的源表数据删除。


//移除缓存
customerDetailDF.unpersist
codesDF.unpersist
expressPackageDF.unpersist
customerSenderDF.unpersist
customerDF.unpersist


3.6、完整代码


package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
/**
 * 客户主题数据的拉宽操作
 */
object CustomerDWD extends OfflineApp {
  //定义应用的名称
  val appName = this.getClass.getSimpleName
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */
    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //数据处理
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //导入隐士转换
    import sparkSession.implicits._
    val customerSenderInfoDF: DataFrame = getKuduSource(sparkSession, TableMapping.consumerSenderInfo, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
    val customerDF = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)
    val expressPageageDF = getKuduSource(sparkSession, TableMapping.expressPackage, true).persist(StorageLevel.DISK_ONLY_2)
    val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
    val customerTypeDF = codesDF.where($"type" === CodeTypeMapping.CustomType)
    //TODO 4)定义维度表与事实表的关联关系
    val left_outer = "left_outer"
    /**
     * 获取每个用户的首尾单发货信息及发货件数和总金额
     */
    val customerSenderDetailInfoDF: DataFrame = customerSenderInfoDF.join(expressPageageDF, expressPageageDF("id") === customerSenderInfoDF("pkgId"), left_outer)
      .groupBy(customerSenderInfoDF("ciid"))
      .agg(min(customerSenderInfoDF("id")).alias("first_id"),
        max(customerSenderInfoDF("id")).alias("last_id"),
        min(expressPageageDF("cdt")).alias("first_cdt"),
        max(expressPageageDF("cdt")).alias("last_cdt"),
        count(customerSenderInfoDF("id")).alias("totalCount"),
        sum(expressPageageDF("actualAmount")).alias("totalAmount")
      )
    val customerDetailDF: DataFrame = customerDF
      .join(customerSenderDetailInfoDF, customerDF("id") === customerSenderInfoDF("ciid"), left_outer)
      .join(customerTypeDF, customerDF("type") === customerTypeDF("code").cast(IntegerType), left_outer)
      .sort(customerDF("cdt").asc)
      .select(
        customerDF("id"),
        customerDF("name"),
        customerDF("tel"),
        customerDF("mobile"),
        customerDF("type").cast(IntegerType),
        customerTypeDF("codeDesc").as("type_name"),
        customerDF("isownreg").as("is_own_reg"),
        customerDF("regdt").as("regdt"),
        customerDF("regchannelid").as("reg_channel_id"),
        customerDF("state"),
        customerDF("cdt"),
        customerDF("udt"),
        customerDF("lastlogindt").as("last_login_dt"),
        customerDF("remark"),
        customerSenderDetailInfoDF("first_id").as("first_sender_id"), //首次寄件id
        customerSenderDetailInfoDF("last_id").as("last_sender_id"), //尾次寄件id
        customerSenderDetailInfoDF("first_cdt").as("first_sender_cdt"), //首次寄件时间
        customerSenderDetailInfoDF("last_cdt").as("last_sender_cdt"), //尾次寄件时间
        customerSenderDetailInfoDF("totalCount"), //寄件总次数
        customerSenderDetailInfoDF("totalAmount") //总金额
      )
    save(customerDetailDF, OfflineTableDefine.customerDetail)
    // 5.4:将缓存的数据删除掉
    customerDF.unpersist()
    customerSenderInfoDF.unpersist()
    expressPageageDF.unpersist()
    customerTypeDF.unpersist()
    sparkSession.stop()
  }
}


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
16天前
|
数据采集 监控 算法
大数据与物流行业:智能配送的实现
【10月更文挑战第31天】在数字化时代,大数据成为物流行业转型升级的关键驱动力。本文探讨大数据如何在物流行业中实现智能配送,包括数据采集与整合、数据分析与挖掘、智能配送规划及实时监控与评估,通过案例分析展示了大数据在优化配送路线和提升物流效率方面的巨大潜力,展望了未来智能配送的高度自动化、实时性和协同化趋势。
|
3月前
|
机器学习/深度学习 设计模式 人工智能
面向对象方法在AIGC和大数据集成项目中的应用
【8月更文第12天】随着人工智能生成内容(AIGC)和大数据技术的快速发展,企业面临着前所未有的挑战和机遇。AIGC技术能够自动产生高质量的内容,而大数据技术则能提供海量数据的支持,两者的结合为企业提供了强大的竞争优势。然而,要充分利用这些技术,就需要构建一个既能处理大规模数据又能高效集成机器学习模型的集成框架。面向对象编程(OOP)以其封装性、继承性和多态性等特点,在构建这样的复杂系统中扮演着至关重要的角色。
66 3
|
3月前
|
监控 Java 开发者
揭秘Struts 2性能监控:选对工具与方法,让你的应用跑得更快,赢在起跑线上!
【8月更文挑战第31天】在企业级应用开发中,性能监控对系统的稳定运行至关重要。针对流行的Java EE框架Struts 2,本文探讨了性能监控的工具与方法,包括商用的JProfiler、免费的VisualVM以及Struts 2自带的性能监控插件。通过示例代码展示了如何在实际项目中实施这些监控手段,帮助开发者发现和解决性能瓶颈,确保应用在高并发、高负载环境下稳定运行。选择合适的监控工具需综合考虑项目需求、成本、易用性和可扩展性等因素。
43 0
|
3月前
|
SQL 大数据 分布式数据库
SQL与大数据的神秘力量:如何用高效SQL处理海量数据,让你的项目一鸣惊人?
【8月更文挑战第31天】在现代软件开发中,处理海量数据是关键挑战之一。本文探讨了SQL与大数据结合的方法,包括数据类型优化、索引优化、分区优化及分布式数据库应用,并通过示例代码展示了如何实施这些策略。通过遵循最佳实践,如了解查询模式、使用性能工具及定期维护索引,开发者可以更高效地利用SQL处理大规模数据集。随着SQL技术的发展,其在软件开发中的作用将愈发重要。
103 0
|
4月前
|
弹性计算 分布式计算 大数据
MaxCompute产品使用合集之如何将用户A从项目空间A申请的表权限需要改为用户B
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4月前
|
分布式计算 运维 DataWorks
MaxCompute操作报错合集之用户已在DataWorks项目中,并有项目的开发和运维权限,下载数据时遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何查询MaxCompute项目中的所有表及其字段信息
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
1月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
4天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
50 7
|
4天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
15 2