客快物流大数据项目(六十六):车辆主题(下)

简介: 客快物流大数据项目(六十六):车辆主题(下)

2、SQL语句



2.1、拉宽网点车辆表


SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
TTL ."given_load" ,
TTL ."load_cn_unit" ,
TTL ."load_en_unit" ,
TTL ."buy_dt" ,
TTL ."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
dot."id" AS dot_id,
dot."dot_name" ,
dot."dot_number" ,
dot."dot_addr" ,
dot."dot_gis_addr" ,
dot."dot_tel" ,
dot."manage_area_id" ,
dot."manage_area_gis" ,
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_transport_tool"  ttl
LEFT JOIN "tbl_dot_transport_tool" tdtl ON ttl."id" = tdtl."transport_tool_id" 
LEFT JOIN "tbl_dot" dot ON DOT ."id" = TDTL ."dot_id"
LEFT JOIN "tbl_company_dot_map" companydot ON companydot."dot_id" = TDTL ."dot_id"
LEFT JOIN "tbl_company" company ON company."id" = companydot."company_id"


2.2、拉宽仓库车辆表


SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
ttl."given_load" ,
ttl."load_cn_unit" ,
ttl."load_en_unit" ,
ttl."buy_dt" ,
ttl."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
warehouse."id" ,
warehouse."name",
warehouse."addr",
warehouse."addr_gis",
warehouse."employee_id",
warehouse."type",
warehouse."area",
warehouse."is_lease",
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_warehouse_transport_tool"  twt
LEFT JOIN "tbl_transport_tool" ttl ON twt."transport_tool_id" = ttl."id" 
LEFT JOIN "tbl_warehouse" warehouse ON WAREHOUSE ."id" = twt."warehouse_id"
LEFT JOIN "tbl_company_warehouse_map" warehouse_map ON warehouse_map."warehouse_id" = warehouse."id" 
LEFT JOIN "tbl_company" company ON company."id" = warehouse_map."company_id"


3、Spark实现



实现步骤:


  • 在dwd目录下创建 TransportToolDWD 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 获取运输工具表(tbl_transport_tool)数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 获取网点运输工具关联表(tbl_dot_transport_tool)数据,并缓存数据
  • 获取网点表(tbl_dot)数据,并缓存数据
  • 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
  • 获取仓库运输工具关联表(tbl_warehouse_transport_tool)数据,并缓存数据
  • 获取公司仓库关联表(tbl_company_warehouse_map)数据,并缓存数据
  • 获取仓库表(tbl_warehouse)数据,并缓存数据
  • 获取公司表(tbl_company)数据,并缓存数据
  • 根据以下方式拉宽仓库车辆明细数据
  • 根据交通工具id,在交通工具表中获取交通工具数据
  • 根据网点id,在网点表中获取网点数据
  • 根据公司id,在公司表中获取公司数据
  • 根据仓库id,在仓库表中获取仓库数据
  • 创建网点车辆明细宽表(若存在则不创建)
  • 创建仓库车辆明细宽表(若存在则不创建)
  • 将仓库车辆明细宽表数据写入到kudu数据表中


删除缓存数据


3.1、初始化环境变量


初始化运单明细拉宽作业的环境变量


package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
 * 车辆主题开发
 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
 * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
 */
object TransportToolDWD extends OfflineApp{
  //定义应用的名称
  val appName = this.getClass.getSimpleName
  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */
    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //数据处理
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}


3.2、加载运输工具表及车辆相关的表并缓存


加载运输工具表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。

判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)


//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)
//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))
//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))


3.3、定义网点车辆宽表的关联关系


为了在DWS层任务中方便的获取每日增量网点车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:


//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
  .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
  .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
  .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
  .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
  .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
  .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(ttDF.col("cdt").asc)
  .select(
    ttDF("id"), //车辆表id
    ttDF("brand"), //车辆表brand
    ttDF("model"), //车辆表model
    ttDF("type").cast(IntegerType), //车辆表type
    transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
    ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
    ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
    ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
    ttDF("licensePlate").as("license_plate"), //车辆表license_plate
    ttDF("state").cast(IntegerType), //车辆表state
    transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
    ttDF("cdt"), //车辆表cdt
    ttDF("udt"), //车辆表udt
    ttDF("remark"), //车辆表remark
    dotDF("id").as("dot_id"), //网点表dot_id
    dotDF("dotNumber").as("dot_number"), //网点表dot_number
    dotDF("dotName").as("dot_name"), //网点表dot_name
    dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
    dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
    dotDF("dotTel").as("dot_tel"), //网点表dot_tel
    dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
    dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
    companyDF("id").alias("company_id"), //公司表id
    companyDF("companyName").as("company_name"), //公司表company_name
    companyDF("cityId").as("city_id"), //公司表city_id
    companyDF("companyNumber").as("company_number"), //公司表company_number
    companyDF("companyAddr").as("company_addr"), //公司表company_addr
    companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
    companyDF("companyTel").as("company_tel"), //公司表company_tel
    companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
    $"day"
  )


3.4、创建网点车辆明细宽表并将网点车辆明细数据写入到kudu数据表中


网点车辆明细宽表数据需要保存到kudu中,因此在第一次执行网点车辆明细拉宽操作时,网点车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建


实现步骤:


在TransportToolDWD 单例对象中调用save方法


实现过程:


在TransportToolDWD 单例对象Main方法中调用save方法


//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)


3.5、定义仓库车辆宽表的关联关系


为了在DWS层任务中方便的获取每日增量仓库车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:


// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
  .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
  .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
  .join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
  .join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
  .join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
  .withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(ttDF.col("cdt").asc)
  .select(
    ttDF("id"), //车辆表id
    ttDF("brand"), //车辆表brand
    ttDF("model"), //车辆表model
    ttDF("type").cast(IntegerType), //车辆表type
    transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
    ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
    ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
    ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
    ttDF("licensePlate").as("license_plate"), //车辆表license_plate
    ttDF("state").cast(IntegerType), //车辆表state
    transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("cdt"), //车辆表cdt
    ttDF("udt"), //车辆表udt
    ttDF("remark"), //车辆表remark
    wsDF("id").as("ws_id"), //仓库表id
    wsDF("name"), //仓库表name
    wsDF("addr"), //仓库表addr
    wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
    wsDF("employeeId").as("employee_id"), //仓库表employee_id
    wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
    wsDF("area"), //仓库表area
    wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
    companyDF("id").alias("company_id"), //公司表id
    companyDF("companyName").as("company_name"), //公司表company_name
    companyDF("cityId").as("city_id"), //公司表city_id
    companyDF("companyNumber").as("company_number"), //公司表company_number
    companyDF("companyAddr").as("company_addr"), //公司表company_addr
    companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
    companyDF("companyTel").as("company_tel"), //公司表company_tel
    companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
    $"day"
  )


3.6、创建仓库车辆明细宽表并将仓库车辆明细数据写入到kudu数据表中


仓库车辆明细宽表数据需要保存到kudu中,因此在第一次执行仓库车辆明细拉宽操作时,仓库车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建


实现步骤:


在TransportToolDWD 单例对象中调用save方法


实现过程:


在TransportToolDWD 单例对象Main方法中调用save方法


save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)


3.7、删除缓存数据


为了释放资源,车辆明细宽表数据计算完成以后,需要将缓存的源表数据删除。


//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()


3.8、完整代码


package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
 * 车辆主题开发
 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
 * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
 */
object TransportToolDWD extends OfflineApp{
  //定义应用的名称
  val appName = this.getClass.getSimpleName
  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */
    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //数据处理
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
    //加载车辆表数据(事实表)
    val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
    //加载仓库车辆表数据
    val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)
    //加载网点表的数据
    val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
    //加载公司网点关联表的数据
    val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)
    //加载公司表的数据
    val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
    //加载仓库车辆关联表数据(事实表)
    val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
    //加载仓库公司关联表
    val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
    //加载仓库表数据
    val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
    //加载物流码表数据
    val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
    import sparkSession.implicits._
    //获取运输工具类型
    val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))
    //获取运输工具状态
    val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))
    //TODO 4)定义维度表与事实表的关联
    val left_outer: String = "left_outer"
    // 4.1:拉宽网点车辆表
    val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
      .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
      .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
      .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
      .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
      .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
      .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
      .sort(ttDF.col("cdt").asc)
      .select(
        ttDF("id"), //车辆表id
        ttDF("brand"), //车辆表brand
        ttDF("model"), //车辆表model
        ttDF("type").cast(IntegerType), //车辆表type
        transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
        ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
        ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
        ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
        ttDF("licensePlate").as("license_plate"), //车辆表license_plate
        ttDF("state").cast(IntegerType), //车辆表state
        transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
        ttDF("cdt"), //车辆表cdt
        ttDF("udt"), //车辆表udt
        ttDF("remark"), //车辆表remark
        dotDF("id").as("dot_id"), //网点表dot_id
        dotDF("dotNumber").as("dot_number"), //网点表dot_number
        dotDF("dotName").as("dot_name"), //网点表dot_name
        dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
        dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
        dotDF("dotTel").as("dot_tel"), //网点表dot_tel
        dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
        dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
        companyDF("id").alias("company_id"), //公司表id
        companyDF("companyName").as("company_name"), //公司表company_name
        companyDF("cityId").as("city_id"), //公司表city_id
        companyDF("companyNumber").as("company_number"), //公司表company_number
        companyDF("companyAddr").as("company_addr"), //公司表company_addr
        companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
        companyDF("companyTel").as("company_tel"), //公司表company_tel
        companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
        $"day"
      )
    // 4.2:拉宽仓库车辆表
    // 拉宽仓库车辆表
    val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
      .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
      .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
      .join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
      .join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
      .join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
      .withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
      .sort(ttDF.col("cdt").asc)
      .select(
        ttDF("id"), //车辆表id
        ttDF("brand"), //车辆表brand
        ttDF("model"), //车辆表model
        ttDF("type").cast(IntegerType), //车辆表type
        transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
        ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
        ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
        ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
        ttDF("licensePlate").as("license_plate"), //车辆表license_plate
        ttDF("state").cast(IntegerType), //车辆表state
        transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("cdt"), //车辆表cdt
        ttDF("udt"), //车辆表udt
        ttDF("remark"), //车辆表remark
        wsDF("id").as("ws_id"), //仓库表id
        wsDF("name"), //仓库表name
        wsDF("addr"), //仓库表addr
        wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
        wsDF("employeeId").as("employee_id"), //仓库表employee_id
        wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
        wsDF("area"), //仓库表area
        wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
        companyDF("id").alias("company_id"), //公司表id
        companyDF("companyName").as("company_name"), //公司表company_name
        companyDF("cityId").as("city_id"), //公司表city_id
        companyDF("companyNumber").as("company_number"), //公司表company_number
        companyDF("companyAddr").as("company_addr"), //公司表company_addr
        companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
        companyDF("companyTel").as("company_tel"), //公司表company_tel
        companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
        $"day"
      )
    //TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
    save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)
    save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)
    //TODO 6)将缓存的数据删除掉
    ttDF.unpersist()
    ttDotDF.unpersist()
    dotDF.unpersist()
    companyDotDF.unpersist()
    companyDF.unpersist()
    ttWsDF.unpersist()
    companyWareHouseMapDF.unpersist()
    wsDF.unpersist()
    codesDF.unpersist()
    sparkSession.stop()
  }
}



五、车辆数据指标开发



1、计算的字段


image.png


2、Spark实现


实现步骤:


  • 在dws目录下创建 TransportToolDWS 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 根据指定的日期获取拉宽后的车辆主题宽表(tbl_dot_transport_tool_detail、tbl_dot_transport_tool_detail)增量数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 指标计算
  • 各网点发车次数
  • 各网点最大发车次数
  • 各网点最小发车次数
  • 各网点平均发车次数
  • 各区域发车次数
  • 各区域最大发车次数
  • 各区域最小发车次数
  • 各区域平均发车次数
  • 各公司发车次数
  • 各公司最大发车次数
  • 各公司最小发车次数
  • 各公司平均发车次数
  • 获取当前时间yyyyMMddHH
  • 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值)
  • 通过StructType构建指定Schema
  • 创建车辆主题指标数据表(若存在则不创建)
  • 持久化指标数据到kudu表


2.1、初始化环境变量


package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
 * 车辆主题指标开发
 */
object TransportToolDWS extends OfflineApp{
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName
  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取车辆明细宽表的数据
     * 4)对车辆明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */
    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //处理数据
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}


2.2、加载车辆宽表增量数据并缓存


加载车辆宽表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。


//TODO 3)读取车辆明细宽表的数据
val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)


2.3、指标计算


//根据网点车辆的日期进行分组
val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache()
//导入隐式转换
import sparkSession.implicits._
//定义计算好的指标结果集合对象
val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历网点车辆每个日期的车辆明细宽表数据
ttDotDetailGroupByDayDF.collect().foreach(row=> {
  //获取到要处理的数据所在的日期
  val day: String = row.getAs[String](0)
  //返回指定日期的仓库明细数据
  val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
  //各网点的发车次数(西三旗:10,西二旗:20)
  val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  //计算各网点的总发车次数
  val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first()
  //各网点的最大发车次数、最小发车次数、平均发车次数
  val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
  val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0)
  val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1)
  val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2)
  // 各区域发车次数
  val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first()
  // 各区域最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0)
  val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1)
  val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2)
  // 各公司发车次数
  val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first()
  // 各公司最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0)
  val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1)
  val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2)
  //将计算好的指标数据封装到row对象中
  val rowInfo: Row = Row(
    day,
    if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(),
    if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(),
    if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(),
    if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(),
    if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(),
    if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(),
    if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(),
    if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(),
    if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue()
  )
  dotRows.append(rowInfo)
  println(rowInfo)
  ttDotDetailByDayDF.unpersist()
  ttDotTotalCountDF.unpersist()
  areaDotTotalCountDF.unpersist()
  companyDotTotalCountDF.unpersist()
})
//根据仓库车辆的日期进行分组
val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache()
//定义计算好的指标结果集合对象
val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历仓库车辆每个日期的车辆明细宽表数据
ttWsDetailGroupByDayDF.collect().foreach(row=> {
  //获取到要处理的数据所在的日期
  val day: String = row.getAs[String](0)
  //返回指定日期的仓库明细数据
  val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
  val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  //计算各仓库的总发车次数
  val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first()
  //各仓库的最大发车次数、最小发车次数、平均发车次数
  val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
  val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0)
  val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1)
  val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2)
  // 各区域发车次数
  val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first()
  // 各区域最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0)
  val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1)
  val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2)
  // 各公司发车次数
  val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first()
  // 各公司最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0)
  val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1)
  val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2)
  //将计算好的指标数据封装到row对象中
  val rowInfo: Row = Row(
    day,
    if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(),
    if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(),
    if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(),
    if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue()
  )
  wsRows.append(rowInfo)
  println(rowInfo)
  ttWsDetailByDayDF.unpersist()
  areaTransportToolTotalCountDF.unpersist()
  companyTransportToolTotalCountDF.unpersist()
  whTransportToolTotalCountDF.unpersist()
})


2.4、通过StructType构建指定Schema


//定义指标结果表的shema信息
//网点车辆相关的表结构数据
val schemaDot: StructType = StructType(Array(
  StructField("id", StringType, false, Metadata.empty),
  StructField("ttDotTotalCount", LongType, true, Metadata.empty),           //各网点发车次数
  StructField("maxTtDotTotalCount", LongType, true, Metadata.empty),        //各网点最大发车次数
  StructField("minTtDotTotalCount", LongType, true, Metadata.empty),        //各网点最小发车次数
  StructField("avgTtDotTotalCount", LongType, true, Metadata.empty),        //各网点平均发车次数
  StructField("areaDotTotalCount", LongType, true, Metadata.empty),         //各区域发车次数
  StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域最大发车次数
  StructField("minAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域最小发车次数
  StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域平均发车次数
  StructField("companyDotTotalCount", LongType, true, Metadata.empty),      //各公司发车次数
  StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty),   //各公司最大发车次数
  StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty),   //各公司最小发车次数
  StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty)    //各公司平均发车次数
))
//仓库车辆相关的表结构数据
val schemaWs: StructType = StructType(Array(
  StructField("id", StringType, false, Metadata.empty),
  StructField("whTransportToolTotalCount", LongType, true, Metadata.empty),           //各仓库发车次数
  StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库最大发车次数
  StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库最小发车次数
  StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库平均发车次数
  StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty),         //各区域发车次数
  StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域最大发车次数
  StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域最小发车次数
  StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域平均发车次数
  StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty),      //各公司发车次数
  StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty),   //各公司最大发车次数
  StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty),   //各公司最小发车次数
  StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty)    //各公司平均发车次数
))


2.5、持久化指标数据到kudu表


//TODO 5)将计算好的指标数据写入到kudu数据库中
val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows)
val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows)
val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot)
val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs)
save(dotDataFrame, OfflineTableDefine.ttDotSummary)
save(wsDataFrame, OfflineTableDefine.ttWsSummary)


2.6、完整代码


package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, DateHelper, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
/**
 * 车辆主题指标开发
 */
object TransportToolDWS extends OfflineApp{
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName
  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取车辆明细宽表的数据
     * 4)对车辆明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */
    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //处理数据
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)读取车辆明细宽表的数据
    val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
    val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
    //根据网点车辆的日期进行分组
    val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache()
    //导入隐式转换
    import sparkSession.implicits._
    //定义计算好的指标结果集合对象
    val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
    //循环遍历网点车辆每个日期的车辆明细宽表数据
    ttDotDetailGroupByDayDF.collect().foreach(row=> {
      //获取到要处理的数据所在的日期
      val day: String = row.getAs[String](0)
      //返回指定日期的仓库明细数据
      val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
      //各网点的发车次数(西三旗:10,西二旗:20)
      val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      //计算各网点的总发车次数
      val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first()
      //各网点的最大发车次数、最小发车次数、平均发车次数
      val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
      val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0)
      val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1)
      val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2)
      // 各区域发车次数
      val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first()
      // 各区域最大发车次数、最小发车次数和平均发车次数
      val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
      val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0)
      val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1)
      val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2)
      // 各公司发车次数
      val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first()
      // 各公司最大发车次数、最小发车次数和平均发车次数
      val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
      val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0)
      val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1)
      val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2)
      //将计算好的指标数据封装到row对象中
      val rowInfo: Row = Row(
        day,
        if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(),
        if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(),
        if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(),
        if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(),
        if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(),
        if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(),
        if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(),
        if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(),
        if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue()
      )
      dotRows.append(rowInfo)
      println(rowInfo)
      ttDotDetailByDayDF.unpersist()
      ttDotTotalCountDF.unpersist()
      areaDotTotalCountDF.unpersist()
      companyDotTotalCountDF.unpersist()
    })
    //根据仓库车辆的日期进行分组
    val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache()
    //定义计算好的指标结果集合对象
    val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
    //循环遍历仓库车辆每个日期的车辆明细宽表数据
    ttWsDetailGroupByDayDF.collect().foreach(row=> {
      //获取到要处理的数据所在的日期
      val day: String = row.getAs[String](0)
      //返回指定日期的仓库明细数据
      val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
      val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      //计算各仓库的总发车次数
      val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first()
      //各仓库的最大发车次数、最小发车次数、平均发车次数
      val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
      val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0)
      val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1)
      val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2)
      // 各区域发车次数
      val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first()
      // 各区域最大发车次数、最小发车次数和平均发车次数
      val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
      val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0)
      val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1)
      val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2)
      // 各公司发车次数
      val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first()
      // 各公司最大发车次数、最小发车次数和平均发车次数
      val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
      val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0)
      val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1)
      val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2)
      //将计算好的指标数据封装到row对象中
      val rowInfo: Row = Row(
        day,
        if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(),
        if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(),
        if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(),
        if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
        if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
        if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
        if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
        if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
        if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue()
      )
      wsRows.append(rowInfo)
      println(rowInfo)
      ttWsDetailByDayDF.unpersist()
      areaTransportToolTotalCountDF.unpersist()
      companyTransportToolTotalCountDF.unpersist()
      whTransportToolTotalCountDF.unpersist()
    })
    //定义指标结果表的shema信息
    //网点车辆相关的表结构数据
    val schemaDot: StructType = StructType(Array(
      StructField("id", StringType, false, Metadata.empty),
      StructField("ttDotTotalCount", LongType, true, Metadata.empty),           //各网点发车次数
      StructField("maxTtDotTotalCount", LongType, true, Metadata.empty),        //各网点最大发车次数
      StructField("minTtDotTotalCount", LongType, true, Metadata.empty),        //各网点最小发车次数
      StructField("avgTtDotTotalCount", LongType, true, Metadata.empty),        //各网点平均发车次数
      StructField("areaDotTotalCount", LongType, true, Metadata.empty),         //各区域发车次数
      StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域最大发车次数
      StructField("minAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域最小发车次数
      StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域平均发车次数
      StructField("companyDotTotalCount", LongType, true, Metadata.empty),      //各公司发车次数
      StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty),   //各公司最大发车次数
      StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty),   //各公司最小发车次数
      StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty)    //各公司平均发车次数
    ))
    //仓库车辆相关的表结构数据
    val schemaWs: StructType = StructType(Array(
      StructField("id", StringType, false, Metadata.empty),
      StructField("whTransportToolTotalCount", LongType, true, Metadata.empty),           //各仓库发车次数
      StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库最大发车次数
      StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库最小发车次数
      StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库平均发车次数
      StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty),         //各区域发车次数
      StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域最大发车次数
      StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域最小发车次数
      StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域平均发车次数
      StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty),      //各公司发车次数
      StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty),   //各公司最大发车次数
      StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty),   //各公司最小发车次数
      StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty)    //各公司平均发车次数
    ))
    //TODO 5)将计算好的指标数据写入到kudu数据库中
    val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows)
    val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows)
    val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot)
    val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs)
    save(dotDataFrame, OfflineTableDefine.ttDotSummary)
    save(wsDataFrame, OfflineTableDefine.ttWsSummary)
//    6)删除缓存数据
    ttDotDetailDF.unpersist()
    ttWarehouseDetailDF.unpersist()
    ttDotDetailGroupByDayDF.unpersist()
    ttWsDetailGroupByDayDF.unpersist()
    //    7)停止任务,退出sparksession
    sparkSession.stop()
  }
}


相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
3月前
|
供应链 大数据 调度
大数据在物流领域的应用有哪些?请举例说明。
大数据在物流领域的应用有哪些?请举例说明。
51 0
|
4月前
|
存储 SQL Oracle
助力工业物联网,工业大数据之服务域:项目总结【三十九】
助力工业物联网,工业大数据之服务域:项目总结【三十九】
43 1
|
4月前
|
Prometheus 监控 Cloud Native
助力工业物联网,工业大数据之服务域:服务器性能监控Prometheus及项目总结【三十五】
助力工业物联网,工业大数据之服务域:服务器性能监控Prometheus及项目总结【三十五】
36 1
|
4月前
|
物联网 大数据
助力工业物联网,工业大数据之服务域:安装主题分析实现【三十】
助力工业物联网,工业大数据之服务域:安装主题分析实现【三十】
22 0
|
4月前
|
物联网 大数据
助力工业物联网,工业大数据之服务域:派单主题分析实现【二十九】
助力工业物联网,工业大数据之服务域:派单主题分析实现【二十九】
27 0
|
4月前
|
物联网 大数据
助力工业物联网,工业大数据之服务域:回访主题分析【二十八】
助力工业物联网,工业大数据之服务域:回访主题分析【二十八】
19 0
|
4月前
|
物联网 大数据
助力工业物联网,工业大数据之服务域:安装主题分析实现【二十七】
助力工业物联网,工业大数据之服务域:安装主题分析实现【二十七】
25 0
|
4月前
|
物联网 大数据
助力工业物联网,工业大数据之服务域:油站主题分析【二十六】
助力工业物联网,工业大数据之服务域:油站主题分析【二十六】
28 0
|
4月前
|
物联网 大数据 调度
助力工业物联网,工业大数据之事实主题指标划分【十八】
助力工业物联网,工业大数据之事实主题指标划分【十八】
26 0
|
4月前
|
运维 供应链 物联网
助力工业物联网,工业大数据之一站制造业务主题划分【十三】
助力工业物联网,工业大数据之一站制造业务主题划分【十三】
41 0