客快物流大数据项目(六十六):车辆主题(下)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 客快物流大数据项目(六十六):车辆主题(下)

2、SQL语句



2.1、拉宽网点车辆表


SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
TTL ."given_load" ,
TTL ."load_cn_unit" ,
TTL ."load_en_unit" ,
TTL ."buy_dt" ,
TTL ."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
dot."id" AS dot_id,
dot."dot_name" ,
dot."dot_number" ,
dot."dot_addr" ,
dot."dot_gis_addr" ,
dot."dot_tel" ,
dot."manage_area_id" ,
dot."manage_area_gis" ,
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_transport_tool"  ttl
LEFT JOIN "tbl_dot_transport_tool" tdtl ON ttl."id" = tdtl."transport_tool_id" 
LEFT JOIN "tbl_dot" dot ON DOT ."id" = TDTL ."dot_id"
LEFT JOIN "tbl_company_dot_map" companydot ON companydot."dot_id" = TDTL ."dot_id"
LEFT JOIN "tbl_company" company ON company."id" = companydot."company_id"


2.2、拉宽仓库车辆表


SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
ttl."given_load" ,
ttl."load_cn_unit" ,
ttl."load_en_unit" ,
ttl."buy_dt" ,
ttl."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
warehouse."id" ,
warehouse."name",
warehouse."addr",
warehouse."addr_gis",
warehouse."employee_id",
warehouse."type",
warehouse."area",
warehouse."is_lease",
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_warehouse_transport_tool"  twt
LEFT JOIN "tbl_transport_tool" ttl ON twt."transport_tool_id" = ttl."id" 
LEFT JOIN "tbl_warehouse" warehouse ON WAREHOUSE ."id" = twt."warehouse_id"
LEFT JOIN "tbl_company_warehouse_map" warehouse_map ON warehouse_map."warehouse_id" = warehouse."id" 
LEFT JOIN "tbl_company" company ON company."id" = warehouse_map."company_id"


3、Spark实现



实现步骤:


  • 在dwd目录下创建 TransportToolDWD 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 获取运输工具表(tbl_transport_tool)数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 获取网点运输工具关联表(tbl_dot_transport_tool)数据,并缓存数据
  • 获取网点表(tbl_dot)数据,并缓存数据
  • 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
  • 获取仓库运输工具关联表(tbl_warehouse_transport_tool)数据,并缓存数据
  • 获取公司仓库关联表(tbl_company_warehouse_map)数据,并缓存数据
  • 获取仓库表(tbl_warehouse)数据,并缓存数据
  • 获取公司表(tbl_company)数据,并缓存数据
  • 根据以下方式拉宽仓库车辆明细数据
  • 根据交通工具id,在交通工具表中获取交通工具数据
  • 根据网点id,在网点表中获取网点数据
  • 根据公司id,在公司表中获取公司数据
  • 根据仓库id,在仓库表中获取仓库数据
  • 创建网点车辆明细宽表(若存在则不创建)
  • 创建仓库车辆明细宽表(若存在则不创建)
  • 将仓库车辆明细宽表数据写入到kudu数据表中


删除缓存数据


3.1、初始化环境变量


初始化运单明细拉宽作业的环境变量


package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
 * 车辆主题开发
 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
 * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
 */
object TransportToolDWD extends OfflineApp{
  //定义应用的名称
  val appName = this.getClass.getSimpleName
  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */
    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //数据处理
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}


3.2、加载运输工具表及车辆相关的表并缓存


加载运输工具表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。

判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)


//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)
//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))
//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))


3.3、定义网点车辆宽表的关联关系


为了在DWS层任务中方便的获取每日增量网点车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:


//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
  .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
  .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
  .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
  .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
  .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
  .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(ttDF.col("cdt").asc)
  .select(
    ttDF("id"), //车辆表id
    ttDF("brand"), //车辆表brand
    ttDF("model"), //车辆表model
    ttDF("type").cast(IntegerType), //车辆表type
    transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
    ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
    ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
    ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
    ttDF("licensePlate").as("license_plate"), //车辆表license_plate
    ttDF("state").cast(IntegerType), //车辆表state
    transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
    ttDF("cdt"), //车辆表cdt
    ttDF("udt"), //车辆表udt
    ttDF("remark"), //车辆表remark
    dotDF("id").as("dot_id"), //网点表dot_id
    dotDF("dotNumber").as("dot_number"), //网点表dot_number
    dotDF("dotName").as("dot_name"), //网点表dot_name
    dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
    dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
    dotDF("dotTel").as("dot_tel"), //网点表dot_tel
    dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
    dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
    companyDF("id").alias("company_id"), //公司表id
    companyDF("companyName").as("company_name"), //公司表company_name
    companyDF("cityId").as("city_id"), //公司表city_id
    companyDF("companyNumber").as("company_number"), //公司表company_number
    companyDF("companyAddr").as("company_addr"), //公司表company_addr
    companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
    companyDF("companyTel").as("company_tel"), //公司表company_tel
    companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
    $"day"
  )


3.4、创建网点车辆明细宽表并将网点车辆明细数据写入到kudu数据表中


网点车辆明细宽表数据需要保存到kudu中,因此在第一次执行网点车辆明细拉宽操作时,网点车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建


实现步骤:


在TransportToolDWD 单例对象中调用save方法


实现过程:


在TransportToolDWD 单例对象Main方法中调用save方法


//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)


3.5、定义仓库车辆宽表的关联关系


为了在DWS层任务中方便的获取每日增量仓库车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:


// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
  .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
  .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
  .join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
  .join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
  .join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
  .withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(ttDF.col("cdt").asc)
  .select(
    ttDF("id"), //车辆表id
    ttDF("brand"), //车辆表brand
    ttDF("model"), //车辆表model
    ttDF("type").cast(IntegerType), //车辆表type
    transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
    ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
    ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
    ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
    ttDF("licensePlate").as("license_plate"), //车辆表license_plate
    ttDF("state").cast(IntegerType), //车辆表state
    transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("cdt"), //车辆表cdt
    ttDF("udt"), //车辆表udt
    ttDF("remark"), //车辆表remark
    wsDF("id").as("ws_id"), //仓库表id
    wsDF("name"), //仓库表name
    wsDF("addr"), //仓库表addr
    wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
    wsDF("employeeId").as("employee_id"), //仓库表employee_id
    wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
    wsDF("area"), //仓库表area
    wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
    companyDF("id").alias("company_id"), //公司表id
    companyDF("companyName").as("company_name"), //公司表company_name
    companyDF("cityId").as("city_id"), //公司表city_id
    companyDF("companyNumber").as("company_number"), //公司表company_number
    companyDF("companyAddr").as("company_addr"), //公司表company_addr
    companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
    companyDF("companyTel").as("company_tel"), //公司表company_tel
    companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
    $"day"
  )


3.6、创建仓库车辆明细宽表并将仓库车辆明细数据写入到kudu数据表中


仓库车辆明细宽表数据需要保存到kudu中,因此在第一次执行仓库车辆明细拉宽操作时,仓库车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建


实现步骤:


在TransportToolDWD 单例对象中调用save方法


实现过程:


在TransportToolDWD 单例对象Main方法中调用save方法


save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)


3.7、删除缓存数据


为了释放资源,车辆明细宽表数据计算完成以后,需要将缓存的源表数据删除。


//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()


3.8、完整代码


package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
 * 车辆主题开发
 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
 * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
 */
object TransportToolDWD extends OfflineApp{
  //定义应用的名称
  val appName = this.getClass.getSimpleName
  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */
    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //数据处理
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
    //加载车辆表数据(事实表)
    val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
    //加载仓库车辆表数据
    val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)
    //加载网点表的数据
    val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
    //加载公司网点关联表的数据
    val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)
    //加载公司表的数据
    val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
    //加载仓库车辆关联表数据(事实表)
    val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
    //加载仓库公司关联表
    val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
    //加载仓库表数据
    val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
    //加载物流码表数据
    val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
    import sparkSession.implicits._
    //获取运输工具类型
    val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))
    //获取运输工具状态
    val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))
    //TODO 4)定义维度表与事实表的关联
    val left_outer: String = "left_outer"
    // 4.1:拉宽网点车辆表
    val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
      .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
      .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
      .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
      .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
      .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
      .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
      .sort(ttDF.col("cdt").asc)
      .select(
        ttDF("id"), //车辆表id
        ttDF("brand"), //车辆表brand
        ttDF("model"), //车辆表model
        ttDF("type").cast(IntegerType), //车辆表type
        transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
        ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
        ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
        ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
        ttDF("licensePlate").as("license_plate"), //车辆表license_plate
        ttDF("state").cast(IntegerType), //车辆表state
        transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
        ttDF("cdt"), //车辆表cdt
        ttDF("udt"), //车辆表udt
        ttDF("remark"), //车辆表remark
        dotDF("id").as("dot_id"), //网点表dot_id
        dotDF("dotNumber").as("dot_number"), //网点表dot_number
        dotDF("dotName").as("dot_name"), //网点表dot_name
        dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
        dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
        dotDF("dotTel").as("dot_tel"), //网点表dot_tel
        dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
        dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
        companyDF("id").alias("company_id"), //公司表id
        companyDF("companyName").as("company_name"), //公司表company_name
        companyDF("cityId").as("city_id"), //公司表city_id
        companyDF("companyNumber").as("company_number"), //公司表company_number
        companyDF("companyAddr").as("company_addr"), //公司表company_addr
        companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
        companyDF("companyTel").as("company_tel"), //公司表company_tel
        companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
        $"day"
      )
    // 4.2:拉宽仓库车辆表
    // 拉宽仓库车辆表
    val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
      .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
      .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
      .join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
      .join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
      .join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
      .withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
      .sort(ttDF.col("cdt").asc)
      .select(
        ttDF("id"), //车辆表id
        ttDF("brand"), //车辆表brand
        ttDF("model"), //车辆表model
        ttDF("type").cast(IntegerType), //车辆表type
        transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
        ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
        ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
        ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
        ttDF("licensePlate").as("license_plate"), //车辆表license_plate
        ttDF("state").cast(IntegerType), //车辆表state
        transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("cdt"), //车辆表cdt
        ttDF("udt"), //车辆表udt
        ttDF("remark"), //车辆表remark
        wsDF("id").as("ws_id"), //仓库表id
        wsDF("name"), //仓库表name
        wsDF("addr"), //仓库表addr
        wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
        wsDF("employeeId").as("employee_id"), //仓库表employee_id
        wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
        wsDF("area"), //仓库表area
        wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
        companyDF("id").alias("company_id"), //公司表id
        companyDF("companyName").as("company_name"), //公司表company_name
        companyDF("cityId").as("city_id"), //公司表city_id
        companyDF("companyNumber").as("company_number"), //公司表company_number
        companyDF("companyAddr").as("company_addr"), //公司表company_addr
        companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
        companyDF("companyTel").as("company_tel"), //公司表company_tel
        companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
        $"day"
      )
    //TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
    save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)
    save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)
    //TODO 6)将缓存的数据删除掉
    ttDF.unpersist()
    ttDotDF.unpersist()
    dotDF.unpersist()
    companyDotDF.unpersist()
    companyDF.unpersist()
    ttWsDF.unpersist()
    companyWareHouseMapDF.unpersist()
    wsDF.unpersist()
    codesDF.unpersist()
    sparkSession.stop()
  }
}



五、车辆数据指标开发



1、计算的字段


image.png


2、Spark实现


实现步骤:


  • 在dws目录下创建 TransportToolDWS 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 根据指定的日期获取拉宽后的车辆主题宽表(tbl_dot_transport_tool_detail、tbl_dot_transport_tool_detail)增量数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 指标计算
  • 各网点发车次数
  • 各网点最大发车次数
  • 各网点最小发车次数
  • 各网点平均发车次数
  • 各区域发车次数
  • 各区域最大发车次数
  • 各区域最小发车次数
  • 各区域平均发车次数
  • 各公司发车次数
  • 各公司最大发车次数
  • 各公司最小发车次数
  • 各公司平均发车次数
  • 获取当前时间yyyyMMddHH
  • 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值)
  • 通过StructType构建指定Schema
  • 创建车辆主题指标数据表(若存在则不创建)
  • 持久化指标数据到kudu表


2.1、初始化环境变量


package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
 * 车辆主题指标开发
 */
object TransportToolDWS extends OfflineApp{
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName
  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取车辆明细宽表的数据
     * 4)对车辆明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */
    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //处理数据
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}


2.2、加载车辆宽表增量数据并缓存


加载车辆宽表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。


//TODO 3)读取车辆明细宽表的数据
val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)


2.3、指标计算


//根据网点车辆的日期进行分组
val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache()
//导入隐式转换
import sparkSession.implicits._
//定义计算好的指标结果集合对象
val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历网点车辆每个日期的车辆明细宽表数据
ttDotDetailGroupByDayDF.collect().foreach(row=> {
  //获取到要处理的数据所在的日期
  val day: String = row.getAs[String](0)
  //返回指定日期的仓库明细数据
  val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
  //各网点的发车次数(西三旗:10,西二旗:20)
  val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  //计算各网点的总发车次数
  val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first()
  //各网点的最大发车次数、最小发车次数、平均发车次数
  val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
  val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0)
  val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1)
  val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2)
  // 各区域发车次数
  val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first()
  // 各区域最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0)
  val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1)
  val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2)
  // 各公司发车次数
  val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first()
  // 各公司最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0)
  val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1)
  val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2)
  //将计算好的指标数据封装到row对象中
  val rowInfo: Row = Row(
    day,
    if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(),
    if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(),
    if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(),
    if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(),
    if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(),
    if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(),
    if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(),
    if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(),
    if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue()
  )
  dotRows.append(rowInfo)
  println(rowInfo)
  ttDotDetailByDayDF.unpersist()
  ttDotTotalCountDF.unpersist()
  areaDotTotalCountDF.unpersist()
  companyDotTotalCountDF.unpersist()
})
//根据仓库车辆的日期进行分组
val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache()
//定义计算好的指标结果集合对象
val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历仓库车辆每个日期的车辆明细宽表数据
ttWsDetailGroupByDayDF.collect().foreach(row=> {
  //获取到要处理的数据所在的日期
  val day: String = row.getAs[String](0)
  //返回指定日期的仓库明细数据
  val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
  val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  //计算各仓库的总发车次数
  val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first()
  //各仓库的最大发车次数、最小发车次数、平均发车次数
  val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
  val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0)
  val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1)
  val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2)
  // 各区域发车次数
  val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first()
  // 各区域最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0)
  val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1)
  val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2)
  // 各公司发车次数
  val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
  val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first()
  // 各公司最大发车次数、最小发车次数和平均发车次数
  val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
  val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0)
  val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1)
  val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2)
  //将计算好的指标数据封装到row对象中
  val rowInfo: Row = Row(
    day,
    if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(),
    if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(),
    if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(),
    if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
    if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue()
  )
  wsRows.append(rowInfo)
  println(rowInfo)
  ttWsDetailByDayDF.unpersist()
  areaTransportToolTotalCountDF.unpersist()
  companyTransportToolTotalCountDF.unpersist()
  whTransportToolTotalCountDF.unpersist()
})


2.4、通过StructType构建指定Schema


//定义指标结果表的shema信息
//网点车辆相关的表结构数据
val schemaDot: StructType = StructType(Array(
  StructField("id", StringType, false, Metadata.empty),
  StructField("ttDotTotalCount", LongType, true, Metadata.empty),           //各网点发车次数
  StructField("maxTtDotTotalCount", LongType, true, Metadata.empty),        //各网点最大发车次数
  StructField("minTtDotTotalCount", LongType, true, Metadata.empty),        //各网点最小发车次数
  StructField("avgTtDotTotalCount", LongType, true, Metadata.empty),        //各网点平均发车次数
  StructField("areaDotTotalCount", LongType, true, Metadata.empty),         //各区域发车次数
  StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域最大发车次数
  StructField("minAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域最小发车次数
  StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域平均发车次数
  StructField("companyDotTotalCount", LongType, true, Metadata.empty),      //各公司发车次数
  StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty),   //各公司最大发车次数
  StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty),   //各公司最小发车次数
  StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty)    //各公司平均发车次数
))
//仓库车辆相关的表结构数据
val schemaWs: StructType = StructType(Array(
  StructField("id", StringType, false, Metadata.empty),
  StructField("whTransportToolTotalCount", LongType, true, Metadata.empty),           //各仓库发车次数
  StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库最大发车次数
  StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库最小发车次数
  StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库平均发车次数
  StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty),         //各区域发车次数
  StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域最大发车次数
  StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域最小发车次数
  StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域平均发车次数
  StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty),      //各公司发车次数
  StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty),   //各公司最大发车次数
  StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty),   //各公司最小发车次数
  StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty)    //各公司平均发车次数
))


2.5、持久化指标数据到kudu表


//TODO 5)将计算好的指标数据写入到kudu数据库中
val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows)
val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows)
val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot)
val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs)
save(dotDataFrame, OfflineTableDefine.ttDotSummary)
save(wsDataFrame, OfflineTableDefine.ttWsSummary)


2.6、完整代码


package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, DateHelper, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
/**
 * 车辆主题指标开发
 */
object TransportToolDWS extends OfflineApp{
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName
  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取车辆明细宽表的数据
     * 4)对车辆明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */
    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )
    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
    //处理数据
    execute(sparkSession)
  }
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)读取车辆明细宽表的数据
    val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
    val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
    //根据网点车辆的日期进行分组
    val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache()
    //导入隐式转换
    import sparkSession.implicits._
    //定义计算好的指标结果集合对象
    val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
    //循环遍历网点车辆每个日期的车辆明细宽表数据
    ttDotDetailGroupByDayDF.collect().foreach(row=> {
      //获取到要处理的数据所在的日期
      val day: String = row.getAs[String](0)
      //返回指定日期的仓库明细数据
      val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
      //各网点的发车次数(西三旗:10,西二旗:20)
      val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      //计算各网点的总发车次数
      val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first()
      //各网点的最大发车次数、最小发车次数、平均发车次数
      val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
      val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0)
      val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1)
      val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2)
      // 各区域发车次数
      val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first()
      // 各区域最大发车次数、最小发车次数和平均发车次数
      val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
      val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0)
      val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1)
      val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2)
      // 各公司发车次数
      val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first()
      // 各公司最大发车次数、最小发车次数和平均发车次数
      val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
      val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0)
      val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1)
      val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2)
      //将计算好的指标数据封装到row对象中
      val rowInfo: Row = Row(
        day,
        if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(),
        if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(),
        if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(),
        if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(),
        if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(),
        if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(),
        if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(),
        if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(),
        if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue()
      )
      dotRows.append(rowInfo)
      println(rowInfo)
      ttDotDetailByDayDF.unpersist()
      ttDotTotalCountDF.unpersist()
      areaDotTotalCountDF.unpersist()
      companyDotTotalCountDF.unpersist()
    })
    //根据仓库车辆的日期进行分组
    val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache()
    //定义计算好的指标结果集合对象
    val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
    //循环遍历仓库车辆每个日期的车辆明细宽表数据
    ttWsDetailGroupByDayDF.collect().foreach(row=> {
      //获取到要处理的数据所在的日期
      val day: String = row.getAs[String](0)
      //返回指定日期的仓库明细数据
      val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
      val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      //计算各仓库的总发车次数
      val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first()
      //各仓库的最大发车次数、最小发车次数、平均发车次数
      val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
      val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0)
      val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1)
      val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2)
      // 各区域发车次数
      val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first()
      // 各区域最大发车次数、最小发车次数和平均发车次数
      val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
      val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0)
      val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1)
      val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2)
      // 各公司发车次数
      val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
      val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first()
      // 各公司最大发车次数、最小发车次数和平均发车次数
      val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
      val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0)
      val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1)
      val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2)
      //将计算好的指标数据封装到row对象中
      val rowInfo: Row = Row(
        day,
        if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(),
        if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(),
        if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(),
        if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
        if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
        if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
        if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
        if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
        if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue()
      )
      wsRows.append(rowInfo)
      println(rowInfo)
      ttWsDetailByDayDF.unpersist()
      areaTransportToolTotalCountDF.unpersist()
      companyTransportToolTotalCountDF.unpersist()
      whTransportToolTotalCountDF.unpersist()
    })
    //定义指标结果表的shema信息
    //网点车辆相关的表结构数据
    val schemaDot: StructType = StructType(Array(
      StructField("id", StringType, false, Metadata.empty),
      StructField("ttDotTotalCount", LongType, true, Metadata.empty),           //各网点发车次数
      StructField("maxTtDotTotalCount", LongType, true, Metadata.empty),        //各网点最大发车次数
      StructField("minTtDotTotalCount", LongType, true, Metadata.empty),        //各网点最小发车次数
      StructField("avgTtDotTotalCount", LongType, true, Metadata.empty),        //各网点平均发车次数
      StructField("areaDotTotalCount", LongType, true, Metadata.empty),         //各区域发车次数
      StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域最大发车次数
      StructField("minAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域最小发车次数
      StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty),      //各区域平均发车次数
      StructField("companyDotTotalCount", LongType, true, Metadata.empty),      //各公司发车次数
      StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty),   //各公司最大发车次数
      StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty),   //各公司最小发车次数
      StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty)    //各公司平均发车次数
    ))
    //仓库车辆相关的表结构数据
    val schemaWs: StructType = StructType(Array(
      StructField("id", StringType, false, Metadata.empty),
      StructField("whTransportToolTotalCount", LongType, true, Metadata.empty),           //各仓库发车次数
      StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库最大发车次数
      StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库最小发车次数
      StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty),        //各仓库平均发车次数
      StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty),         //各区域发车次数
      StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域最大发车次数
      StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域最小发车次数
      StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty),      //各区域平均发车次数
      StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty),      //各公司发车次数
      StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty),   //各公司最大发车次数
      StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty),   //各公司最小发车次数
      StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty)    //各公司平均发车次数
    ))
    //TODO 5)将计算好的指标数据写入到kudu数据库中
    val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows)
    val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows)
    val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot)
    val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs)
    save(dotDataFrame, OfflineTableDefine.ttDotSummary)
    save(wsDataFrame, OfflineTableDefine.ttWsSummary)
//    6)删除缓存数据
    ttDotDetailDF.unpersist()
    ttWarehouseDetailDF.unpersist()
    ttDotDetailGroupByDayDF.unpersist()
    ttWsDetailGroupByDayDF.unpersist()
    //    7)停止任务,退出sparksession
    sparkSession.stop()
  }
}


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
2月前
|
数据采集 监控 算法
大数据与物流行业:智能配送的实现
【10月更文挑战第31天】在数字化时代,大数据成为物流行业转型升级的关键驱动力。本文探讨大数据如何在物流行业中实现智能配送,包括数据采集与整合、数据分析与挖掘、智能配送规划及实时监控与评估,通过案例分析展示了大数据在优化配送路线和提升物流效率方面的巨大潜力,展望了未来智能配送的高度自动化、实时性和协同化趋势。
|
5月前
|
机器学习/深度学习 设计模式 人工智能
面向对象方法在AIGC和大数据集成项目中的应用
【8月更文第12天】随着人工智能生成内容(AIGC)和大数据技术的快速发展,企业面临着前所未有的挑战和机遇。AIGC技术能够自动产生高质量的内容,而大数据技术则能提供海量数据的支持,两者的结合为企业提供了强大的竞争优势。然而,要充分利用这些技术,就需要构建一个既能处理大规模数据又能高效集成机器学习模型的集成框架。面向对象编程(OOP)以其封装性、继承性和多态性等特点,在构建这样的复杂系统中扮演着至关重要的角色。
88 3
|
5月前
|
监控 Java 开发者
揭秘Struts 2性能监控:选对工具与方法,让你的应用跑得更快,赢在起跑线上!
【8月更文挑战第31天】在企业级应用开发中,性能监控对系统的稳定运行至关重要。针对流行的Java EE框架Struts 2,本文探讨了性能监控的工具与方法,包括商用的JProfiler、免费的VisualVM以及Struts 2自带的性能监控插件。通过示例代码展示了如何在实际项目中实施这些监控手段,帮助开发者发现和解决性能瓶颈,确保应用在高并发、高负载环境下稳定运行。选择合适的监控工具需综合考虑项目需求、成本、易用性和可扩展性等因素。
53 0
|
5月前
|
SQL 大数据 分布式数据库
SQL与大数据的神秘力量:如何用高效SQL处理海量数据,让你的项目一鸣惊人?
【8月更文挑战第31天】在现代软件开发中,处理海量数据是关键挑战之一。本文探讨了SQL与大数据结合的方法,包括数据类型优化、索引优化、分区优化及分布式数据库应用,并通过示例代码展示了如何实施这些策略。通过遵循最佳实践,如了解查询模式、使用性能工具及定期维护索引,开发者可以更高效地利用SQL处理大规模数据集。随着SQL技术的发展,其在软件开发中的作用将愈发重要。
157 0
|
6月前
|
弹性计算 分布式计算 大数据
MaxCompute产品使用合集之如何将用户A从项目空间A申请的表权限需要改为用户B
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6月前
|
分布式计算 运维 DataWorks
MaxCompute操作报错合集之用户已在DataWorks项目中,并有项目的开发和运维权限,下载数据时遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何查询MaxCompute项目中的所有表及其字段信息
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
519 7
|
2月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
65 2
|
10天前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试