客户主题
一、背景介绍
客户主题主要是通过分析用户的下单情况构建用户画像
二、指标明细
三、表关联关系
1、事实表
2、维度表
3、关联关系
用户表与维度表的关联关系如下:
四、客户数据拉宽开发
1、拉宽后的字段
2、SQL语句
SELECT TC."id" , TC."name" , TC."tel", TC."mobile", TC."email", TC."type", TC."is_own_reg", TC."reg_dt", TC."reg_channel_id", TC."state", TC."cdt", TC."udt", TC."last_login_dt", TC."remark", customercodes."code_desc", sender_info.first_cdt AS first_sender_cdt , sender_info.last_cdt AS last_sender_cdt, sender_info.billCount AS billCount, sender_info.totalAmount AS totalAmount FROM "tbl_customer" tc LEFT JOIN ( SELECT "ciid", min(sender_info."id") first_id, max(sender_info."id") last_id, min(sender_info."cdt") first_cdt, max(sender_info."cdt") last_cdt,COUNT(sender_info."id" ) billCount,sum(express_package."actual_amount") totalAmount FROM "tbl_consumer_sender_info" sender_info LEFT JOIN "tbl_express_package" express_package ON SENDER_INFO."pkg_id" =express_package."id" GROUP BY sender_info."ciid" ) sender_info ON tc."id" = sender_info."ciid" LEFT JOIN "tbl_codes" customercodes ON customercodes."type" =16 AND tc."type" =customercodes."code"
3、Spark实现
实现步骤:
- 在dwd目录下创建 CustomerDWD 单例对象,继承自OfflineApp特质
- 初始化环境的参数,创建SparkSession对象
- 获取客户表(tbl_customer)数据,并缓存数据
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
- 获取客户寄件信息表(tbl_consumer_sender_info)数据,并缓存数据
- 获取客户包裹表(tbl_express_package)数据,并缓存数据
- 获取物流字典码表(tbl_codes)数据,并缓存数据
- 根据以下方式拉宽仓库车辆明细数据
- 根据客户id,在客户表中获取客户数据
- 根据包裹id,在包裹表中获取包裹数据
- 根据客户类型id,在物流字典码表中获取客户类型名称数据
- 创建客户明细宽表(若存在则不创建)
- 将客户明细宽表数据写入到kudu数据表中
- 删除缓存数据
3.1、初始化环境变量
初始化客户明细拉宽作业的环境变量
package cn.it.logistics.offline.dwd import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils} import cn.it.logistics.offline.OfflineApp import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.IntegerType /** * 客户主题数据的拉宽操作 */ object CustomerDWD extends OfflineApp { //定义应用的名称 val appName = this.getClass.getSimpleName def main(args: Array[String]): Unit = { /** * 实现步骤: * 1)初始化sparkConf对象 * 2)创建sparkSession对象 * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存) * 4)定义维度表与事实表的关联 * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层) * 5.1:创建车辆明细宽表的schema表结构 * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建) * 5.3:将数据写入到kudu中 * 6)将缓存的数据删除掉 * 7)停止任务 */ //1)初始化sparkConf对象 val sparkConf: SparkConf = SparkUtils.autoSettingEnv( SparkUtils.sparkConf(appName) ) //2)创建sparkSession对象 val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf) sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF) //数据处理 execute(sparkSession) } /** * 数据处理 * * @param sparkSession */ override def execute(sparkSession: SparkSession): Unit = { sparkSession.stop() } }
3.2、加载客户相关的表并缓存
加载客户表的时候,需要指定日期条件,因为客户主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
//导入隐士转换 import sparkSession.implicits._ val customerSenderInfoDF: DataFrame = getKuduSource(sparkSession, TableMapping.consumerSenderInfo, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2) val customerDF = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2) val expressPageageDF = getKuduSource(sparkSession, TableMapping.expressPackage, true).persist(StorageLevel.DISK_ONLY_2) val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2) val customerTypeDF = codesDF.where($"type" === CodeTypeMapping.CustomType)
3.3、定义表的关联关系
为了在DWS层任务中方便的获取每日增量客户表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd
代码如下:
//TODO 4)定义维度表与事实表的关联关系 val left_outer = "left_outer" /** * 获取每个用户的首尾单发货信息及发货件数和总金额 */ val customerSenderDetailInfoDF: DataFrame = customerSenderInfoDF.join(expressPageageDF, expressPageageDF("id") === customerSenderInfoDF("pkgId"), left_outer) .groupBy(customerSenderInfoDF("ciid")) .agg(min(customerSenderInfoDF("id")).alias("first_id"), max(customerSenderInfoDF("id")).alias("last_id"), min(expressPageageDF("cdt")).alias("first_cdt"), max(expressPageageDF("cdt")).alias("last_cdt"), count(customerSenderInfoDF("id")).alias("totalCount"), sum(expressPageageDF("actualAmount")).alias("totalAmount") ) val customerDetailDF: DataFrame = customerDF .join(customerSenderDetailInfoDF, customerDF("id") === customerSenderInfoDF("ciid"), left_outer) .join(customerTypeDF, customerDF("type") === customerTypeDF("code").cast(IntegerType), left_outer) .sort(customerDF("cdt").asc) .select( customerDF("id"), customerDF("name"), customerDF("tel"), customerDF("mobile"), customerDF("type").cast(IntegerType), customerTypeDF("codeDesc").as("type_name"), customerDF("isownreg").as("is_own_reg"), customerDF("regdt").as("regdt"), customerDF("regchannelid").as("reg_channel_id"), customerDF("state"), customerDF("cdt"), customerDF("udt"), customerDF("lastlogindt").as("last_login_dt"), customerDF("remark"), customerSenderDetailInfoDF("first_id").as("first_sender_id"), //首次寄件id customerSenderDetailInfoDF("last_id").as("last_sender_id"), //尾次寄件id customerSenderDetailInfoDF("first_cdt").as("first_sender_cdt"), //首次寄件时间 customerSenderDetailInfoDF("last_cdt").as("last_sender_cdt"), //尾次寄件时间 customerSenderDetailInfoDF("totalCount"), //寄件总次数 customerSenderDetailInfoDF("totalAmount") //总金额 )
3.4、创建客户明细宽表并将客户明细数据写入到kudu数据表中
客户明细宽表数据需要保存到kudu中,因此在第一次执行客户明细拉宽操作时,客户明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建
实现步骤:
在CustomerDWD 单例对象中调用save方法
实实现过程:
在CustomerDWD 单例对象Main方法中调用save方法
save(customerDetailDF, OfflineTableDefine.customerDetail)
3.5、删除缓存数据
为了释放资源,客户明细宽表数据计算完成以后,需要将缓存的源表数据删除。
//移除缓存 customerDetailDF.unpersist codesDF.unpersist expressPackageDF.unpersist customerSenderDF.unpersist customerDF.unpersist
3.6、完整代码
package cn.it.logistics.offline.dwd import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping} import cn.it.logistics.offline.OfflineApp import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.IntegerType /** * 客户主题数据的拉宽操作 */ object CustomerDWD extends OfflineApp { //定义应用的名称 val appName = this.getClass.getSimpleName def main(args: Array[String]): Unit = { /** * 实现步骤: * 1)初始化sparkConf对象 * 2)创建sparkSession对象 * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存) * 4)定义维度表与事实表的关联 * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层) * 5.1:创建车辆明细宽表的schema表结构 * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建) * 5.3:将数据写入到kudu中 * 6)将缓存的数据删除掉 * 7)停止任务 */ //1)初始化sparkConf对象 val sparkConf: SparkConf = SparkUtils.autoSettingEnv( SparkUtils.sparkConf(appName) ) //2)创建sparkSession对象 val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf) sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF) //数据处理 execute(sparkSession) } /** * 数据处理 * * @param sparkSession */ override def execute(sparkSession: SparkSession): Unit = { //导入隐士转换 import sparkSession.implicits._ val customerSenderInfoDF: DataFrame = getKuduSource(sparkSession, TableMapping.consumerSenderInfo, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2) val customerDF = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2) val expressPageageDF = getKuduSource(sparkSession, TableMapping.expressPackage, true).persist(StorageLevel.DISK_ONLY_2) val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2) val customerTypeDF = codesDF.where($"type" === CodeTypeMapping.CustomType) //TODO 4)定义维度表与事实表的关联关系 val left_outer = "left_outer" /** * 获取每个用户的首尾单发货信息及发货件数和总金额 */ val customerSenderDetailInfoDF: DataFrame = customerSenderInfoDF.join(expressPageageDF, expressPageageDF("id") === customerSenderInfoDF("pkgId"), left_outer) .groupBy(customerSenderInfoDF("ciid")) .agg(min(customerSenderInfoDF("id")).alias("first_id"), max(customerSenderInfoDF("id")).alias("last_id"), min(expressPageageDF("cdt")).alias("first_cdt"), max(expressPageageDF("cdt")).alias("last_cdt"), count(customerSenderInfoDF("id")).alias("totalCount"), sum(expressPageageDF("actualAmount")).alias("totalAmount") ) val customerDetailDF: DataFrame = customerDF .join(customerSenderDetailInfoDF, customerDF("id") === customerSenderInfoDF("ciid"), left_outer) .join(customerTypeDF, customerDF("type") === customerTypeDF("code").cast(IntegerType), left_outer) .sort(customerDF("cdt").asc) .select( customerDF("id"), customerDF("name"), customerDF("tel"), customerDF("mobile"), customerDF("type").cast(IntegerType), customerTypeDF("codeDesc").as("type_name"), customerDF("isownreg").as("is_own_reg"), customerDF("regdt").as("regdt"), customerDF("regchannelid").as("reg_channel_id"), customerDF("state"), customerDF("cdt"), customerDF("udt"), customerDF("lastlogindt").as("last_login_dt"), customerDF("remark"), customerSenderDetailInfoDF("first_id").as("first_sender_id"), //首次寄件id customerSenderDetailInfoDF("last_id").as("last_sender_id"), //尾次寄件id customerSenderDetailInfoDF("first_cdt").as("first_sender_cdt"), //首次寄件时间 customerSenderDetailInfoDF("last_cdt").as("last_sender_cdt"), //尾次寄件时间 customerSenderDetailInfoDF("totalCount"), //寄件总次数 customerSenderDetailInfoDF("totalAmount") //总金额 ) save(customerDetailDF, OfflineTableDefine.customerDetail) // 5.4:将缓存的数据删除掉 customerDF.unpersist() customerSenderInfoDF.unpersist() expressPageageDF.unpersist() customerTypeDF.unpersist() sparkSession.stop() } }