2、SQL语句
2.1、拉宽网点车辆表
SELECT ttl."id" , ttl."brand" , ttl."model" , ttl."type" , TTL ."given_load" , TTL ."load_cn_unit" , TTL ."load_en_unit" , TTL ."buy_dt" , TTL ."license_plate" , ttl."state" , ttl."cdt" , ttl."udt" , ttl."remark" , dot."id" AS dot_id, dot."dot_name" , dot."dot_number" , dot."dot_addr" , dot."dot_gis_addr" , dot."dot_tel" , dot."manage_area_id" , dot."manage_area_gis" , COMPANY ."id" AS company_id, COMPANY ."company_name", COMPANY ."company_number", COMPANY ."city_id", COMPANY ."company_addr", COMPANY ."company_addr_gis", COMPANY ."company_tel", COMPANY ."is_sub_company" FROM "tbl_transport_tool" ttl LEFT JOIN "tbl_dot_transport_tool" tdtl ON ttl."id" = tdtl."transport_tool_id" LEFT JOIN "tbl_dot" dot ON DOT ."id" = TDTL ."dot_id" LEFT JOIN "tbl_company_dot_map" companydot ON companydot."dot_id" = TDTL ."dot_id" LEFT JOIN "tbl_company" company ON company."id" = companydot."company_id"
2.2、拉宽仓库车辆表
SELECT ttl."id" , ttl."brand" , ttl."model" , ttl."type" , ttl."given_load" , ttl."load_cn_unit" , ttl."load_en_unit" , ttl."buy_dt" , ttl."license_plate" , ttl."state" , ttl."cdt" , ttl."udt" , ttl."remark" , warehouse."id" , warehouse."name", warehouse."addr", warehouse."addr_gis", warehouse."employee_id", warehouse."type", warehouse."area", warehouse."is_lease", COMPANY ."id" AS company_id, COMPANY ."company_name", COMPANY ."company_number", COMPANY ."city_id", COMPANY ."company_addr", COMPANY ."company_addr_gis", COMPANY ."company_tel", COMPANY ."is_sub_company" FROM "tbl_warehouse_transport_tool" twt LEFT JOIN "tbl_transport_tool" ttl ON twt."transport_tool_id" = ttl."id" LEFT JOIN "tbl_warehouse" warehouse ON WAREHOUSE ."id" = twt."warehouse_id" LEFT JOIN "tbl_company_warehouse_map" warehouse_map ON warehouse_map."warehouse_id" = warehouse."id" LEFT JOIN "tbl_company" company ON company."id" = warehouse_map."company_id"
3、Spark实现
实现步骤:
- 在dwd目录下创建 TransportToolDWD 单例对象,继承自OfflineApp特质
- 初始化环境的参数,创建SparkSession对象
- 获取运输工具表(tbl_transport_tool)数据,并缓存数据
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
- 获取网点运输工具关联表(tbl_dot_transport_tool)数据,并缓存数据
- 获取网点表(tbl_dot)数据,并缓存数据
- 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
- 获取仓库运输工具关联表(tbl_warehouse_transport_tool)数据,并缓存数据
- 获取公司仓库关联表(tbl_company_warehouse_map)数据,并缓存数据
- 获取仓库表(tbl_warehouse)数据,并缓存数据
- 获取公司表(tbl_company)数据,并缓存数据
- 根据以下方式拉宽仓库车辆明细数据
- 根据交通工具id,在交通工具表中获取交通工具数据
- 根据网点id,在网点表中获取网点数据
- 根据公司id,在公司表中获取公司数据
- 根据仓库id,在仓库表中获取仓库数据
- 创建网点车辆明细宽表(若存在则不创建)
- 创建仓库车辆明细宽表(若存在则不创建)
- 将仓库车辆明细宽表数据写入到kudu数据表中
删除缓存数据
3.1、初始化环境变量
初始化运单明细拉宽作业的环境变量
package cn.it.logistics.offline.dwd import cn.it.logistics.common.{Configuration, SparkUtils} import cn.it.logistics.offline.OfflineApp import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession /** * 车辆主题开发 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车) * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机) */ object TransportToolDWD extends OfflineApp{ //定义应用的名称 val appName = this.getClass.getSimpleName /** * 入口函数 * @param args */ def main(args: Array[String]): Unit = { /** * 实现步骤: * 1)初始化sparkConf对象 * 2)创建sparkSession对象 * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存) * 4)定义维度表与事实表的关联 * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层) * 5.1:创建车辆明细宽表的schema表结构 * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建) * 5.3:将数据写入到kudu中 * 6)将缓存的数据删除掉 * 7)停止任务 */ //1)初始化sparkConf对象 val sparkConf: SparkConf = SparkUtils.autoSettingEnv( SparkUtils.sparkConf(appName) ) //2)创建sparkSession对象 val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf) sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF) //数据处理 execute(sparkSession) } /** * 数据处理 * * @param sparkSession */ override def execute(sparkSession: SparkSession): Unit = { sparkSession.stop() } }
3.2、加载运输工具表及车辆相关的表并缓存
加载运输工具表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存) //加载车辆表数据(事实表) val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2) //加载仓库车辆表数据 val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2) //加载网点表的数据 val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2) //加载公司网点关联表的数据 val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2) //加载公司表的数据 val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2) //加载仓库车辆关联表数据(事实表) val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2) //加载仓库公司关联表 val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2) //加载仓库表数据 val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2) //加载物流码表数据 val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2) import sparkSession.implicits._ //获取运输工具类型 val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName")) //获取运输工具状态 val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))
3.3、定义网点车辆宽表的关联关系
为了在DWS层任务中方便的获取每日增量网点车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd
代码如下:
//TODO 4)定义维度表与事实表的关联 val left_outer: String = "left_outer" // 4.1:拉宽网点车辆表 val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表 .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型 .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态 .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点 .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表 .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表 .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中 .sort(ttDF.col("cdt").asc) .select( ttDF("id"), //车辆表id ttDF("brand"), //车辆表brand ttDF("model"), //车辆表model ttDF("type").cast(IntegerType), //车辆表type transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述 ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit ttDF("buyDt").as("buy_dt"), //车辆表buy_dt ttDF("licensePlate").as("license_plate"), //车辆表license_plate ttDF("state").cast(IntegerType), //车辆表state transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述 ttDF("cdt"), //车辆表cdt ttDF("udt"), //车辆表udt ttDF("remark"), //车辆表remark dotDF("id").as("dot_id"), //网点表dot_id dotDF("dotNumber").as("dot_number"), //网点表dot_number dotDF("dotName").as("dot_name"), //网点表dot_name dotDF("dotAddr").as("dot_addr"), //网点表dot_addr dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr dotDF("dotTel").as("dot_tel"), //网点表dot_tel dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis companyDF("id").alias("company_id"), //公司表id companyDF("companyName").as("company_name"), //公司表company_name companyDF("cityId").as("city_id"), //公司表city_id companyDF("companyNumber").as("company_number"), //公司表company_number companyDF("companyAddr").as("company_addr"), //公司表company_addr companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis companyDF("companyTel").as("company_tel"), //公司表company_tel companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company $"day" )
3.4、创建网点车辆明细宽表并将网点车辆明细数据写入到kudu数据表中
网点车辆明细宽表数据需要保存到kudu中,因此在第一次执行网点车辆明细拉宽操作时,网点车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建
实现步骤:
在TransportToolDWD 单例对象中调用save方法
实现过程:
在TransportToolDWD 单例对象Main方法中调用save方法
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层) save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)
3.5、定义仓库车辆宽表的关联关系
为了在DWS层任务中方便的获取每日增量仓库车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd
代码如下:
// 4.2:拉宽仓库车辆表 // 拉宽仓库车辆表 val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表 .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型 .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态 .join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库 .join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表 .join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer) .withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中 .sort(ttDF.col("cdt").asc) .select( ttDF("id"), //车辆表id ttDF("brand"), //车辆表brand ttDF("model"), //车辆表model ttDF("type").cast(IntegerType), //车辆表type transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述 ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit ttDF("buyDt").as("buy_dt"), //车辆表buy_dt ttDF("licensePlate").as("license_plate"), //车辆表license_plate ttDF("state").cast(IntegerType), //车辆表state transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述 ttDF("cdt"), //车辆表cdt ttDF("udt"), //车辆表udt ttDF("remark"), //车辆表remark wsDF("id").as("ws_id"), //仓库表id wsDF("name"), //仓库表name wsDF("addr"), //仓库表addr wsDF("addrGis").as("addr_gis"), //仓库表addr_gis wsDF("employeeId").as("employee_id"), //仓库表employee_id wsDF("type").as("ws_type").cast(IntegerType), //仓库表type wsDF("area"), //仓库表area wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease companyDF("id").alias("company_id"), //公司表id companyDF("companyName").as("company_name"), //公司表company_name companyDF("cityId").as("city_id"), //公司表city_id companyDF("companyNumber").as("company_number"), //公司表company_number companyDF("companyAddr").as("company_addr"), //公司表company_addr companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis companyDF("companyTel").as("company_tel"), //公司表company_tel companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company $"day" )
3.6、创建仓库车辆明细宽表并将仓库车辆明细数据写入到kudu数据表中
仓库车辆明细宽表数据需要保存到kudu中,因此在第一次执行仓库车辆明细拉宽操作时,仓库车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建
实现步骤:
在TransportToolDWD 单例对象中调用save方法
实现过程:
在TransportToolDWD 单例对象Main方法中调用save方法
save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)
3.7、删除缓存数据
为了释放资源,车辆明细宽表数据计算完成以后,需要将缓存的源表数据删除。
//TODO 6)将缓存的数据删除掉 ttDF.unpersist() ttDotDF.unpersist() dotDF.unpersist() companyDotDF.unpersist() companyDF.unpersist() ttWsDF.unpersist() companyWareHouseMapDF.unpersist() wsDF.unpersist() codesDF.unpersist()
3.8、完整代码
package cn.it.logistics.offline.dwd import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping} import cn.it.logistics.offline.OfflineApp import org.apache.spark.SparkConf import org.apache.spark.sql.functions.date_format import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel /** * 车辆主题开发 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车) * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机) */ object TransportToolDWD extends OfflineApp{ //定义应用的名称 val appName = this.getClass.getSimpleName /** * 入口函数 * @param args */ def main(args: Array[String]): Unit = { /** * 实现步骤: * 1)初始化sparkConf对象 * 2)创建sparkSession对象 * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存) * 4)定义维度表与事实表的关联 * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层) * 5.1:创建车辆明细宽表的schema表结构 * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建) * 5.3:将数据写入到kudu中 * 6)将缓存的数据删除掉 * 7)停止任务 */ //1)初始化sparkConf对象 val sparkConf: SparkConf = SparkUtils.autoSettingEnv( SparkUtils.sparkConf(appName) ) //2)创建sparkSession对象 val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf) sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF) //数据处理 execute(sparkSession) } /** * 数据处理 * * @param sparkSession */ override def execute(sparkSession: SparkSession): Unit = { //TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存) //加载车辆表数据(事实表) val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2) //加载仓库车辆表数据 val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2) //加载网点表的数据 val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2) //加载公司网点关联表的数据 val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2) //加载公司表的数据 val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2) //加载仓库车辆关联表数据(事实表) val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2) //加载仓库公司关联表 val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2) //加载仓库表数据 val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2) //加载物流码表数据 val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2) import sparkSession.implicits._ //获取运输工具类型 val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName")) //获取运输工具状态 val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName")) //TODO 4)定义维度表与事实表的关联 val left_outer: String = "left_outer" // 4.1:拉宽网点车辆表 val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表 .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型 .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态 .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点 .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表 .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表 .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中 .sort(ttDF.col("cdt").asc) .select( ttDF("id"), //车辆表id ttDF("brand"), //车辆表brand ttDF("model"), //车辆表model ttDF("type").cast(IntegerType), //车辆表type transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述 ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit ttDF("buyDt").as("buy_dt"), //车辆表buy_dt ttDF("licensePlate").as("license_plate"), //车辆表license_plate ttDF("state").cast(IntegerType), //车辆表state transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述 ttDF("cdt"), //车辆表cdt ttDF("udt"), //车辆表udt ttDF("remark"), //车辆表remark dotDF("id").as("dot_id"), //网点表dot_id dotDF("dotNumber").as("dot_number"), //网点表dot_number dotDF("dotName").as("dot_name"), //网点表dot_name dotDF("dotAddr").as("dot_addr"), //网点表dot_addr dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr dotDF("dotTel").as("dot_tel"), //网点表dot_tel dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis companyDF("id").alias("company_id"), //公司表id companyDF("companyName").as("company_name"), //公司表company_name companyDF("cityId").as("city_id"), //公司表city_id companyDF("companyNumber").as("company_number"), //公司表company_number companyDF("companyAddr").as("company_addr"), //公司表company_addr companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis companyDF("companyTel").as("company_tel"), //公司表company_tel companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company $"day" ) // 4.2:拉宽仓库车辆表 // 拉宽仓库车辆表 val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表 .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型 .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态 .join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库 .join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表 .join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer) .withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中 .sort(ttDF.col("cdt").asc) .select( ttDF("id"), //车辆表id ttDF("brand"), //车辆表brand ttDF("model"), //车辆表model ttDF("type").cast(IntegerType), //车辆表type transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述 ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit ttDF("buyDt").as("buy_dt"), //车辆表buy_dt ttDF("licensePlate").as("license_plate"), //车辆表license_plate ttDF("state").cast(IntegerType), //车辆表state transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述 ttDF("cdt"), //车辆表cdt ttDF("udt"), //车辆表udt ttDF("remark"), //车辆表remark wsDF("id").as("ws_id"), //仓库表id wsDF("name"), //仓库表name wsDF("addr"), //仓库表addr wsDF("addrGis").as("addr_gis"), //仓库表addr_gis wsDF("employeeId").as("employee_id"), //仓库表employee_id wsDF("type").as("ws_type").cast(IntegerType), //仓库表type wsDF("area"), //仓库表area wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease companyDF("id").alias("company_id"), //公司表id companyDF("companyName").as("company_name"), //公司表company_name companyDF("cityId").as("city_id"), //公司表city_id companyDF("companyNumber").as("company_number"), //公司表company_number companyDF("companyAddr").as("company_addr"), //公司表company_addr companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis companyDF("companyTel").as("company_tel"), //公司表company_tel companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company $"day" ) //TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层) save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail) save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail) //TODO 6)将缓存的数据删除掉 ttDF.unpersist() ttDotDF.unpersist() dotDF.unpersist() companyDotDF.unpersist() companyDF.unpersist() ttWsDF.unpersist() companyWareHouseMapDF.unpersist() wsDF.unpersist() codesDF.unpersist() sparkSession.stop() } }
五、车辆数据指标开发
1、计算的字段
2、Spark实现
实现步骤:
- 在dws目录下创建 TransportToolDWS 单例对象,继承自OfflineApp特质
- 初始化环境的参数,创建SparkSession对象
- 根据指定的日期获取拉宽后的车辆主题宽表(tbl_dot_transport_tool_detail、tbl_dot_transport_tool_detail)增量数据,并缓存数据
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
- 指标计算
- 各网点发车次数
- 各网点最大发车次数
- 各网点最小发车次数
- 各网点平均发车次数
- 各区域发车次数
- 各区域最大发车次数
- 各区域最小发车次数
- 各区域平均发车次数
- 各公司发车次数
- 各公司最大发车次数
- 各公司最小发车次数
- 各公司平均发车次数
- 获取当前时间yyyyMMddHH
- 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值)
- 通过StructType构建指定Schema
- 创建车辆主题指标数据表(若存在则不创建)
- 持久化指标数据到kudu表
2.1、初始化环境变量
package cn.it.logistics.offline.dws import cn.it.logistics.common.{Configuration, SparkUtils} import cn.it.logistics.offline.OfflineApp import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession /** * 车辆主题指标开发 */ object TransportToolDWS extends OfflineApp{ //定义应用程序的名称 val appName = this.getClass.getSimpleName /** * 入口函数 * @param args */ def main(args: Array[String]): Unit = { /** * 实现步骤: * 1)创建SparkConf对象 * 2)创建SparkSession对象 * 3)读取车辆明细宽表的数据 * 4)对车辆明细宽表的数据进行指标的计算 * 5)将计算好的指标数据写入到kudu数据库中 * 5.1:定义指标结果表的schema信息 * 5.2:组织需要写入到kudu表的数据 * 5.3:判断指标结果表是否存在,如果不存在则创建 * 5.4:将数据写入到kudu表中 * 6)删除缓存数据 * 7)停止任务,退出sparksession */ //TODO 1)创建SparkConf对象 val sparkConf: SparkConf = SparkUtils.autoSettingEnv( SparkUtils.sparkConf(appName) ) //TODO 2)创建SparkSession对象 val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf) sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF) //处理数据 execute(sparkSession) } /** * 数据处理 * * @param sparkSession */ override def execute(sparkSession: SparkSession): Unit = { sparkSession.stop() } }
2.2、加载车辆宽表增量数据并缓存
加载车辆宽表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
//TODO 3)读取车辆明细宽表的数据 val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2) val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
2.3、指标计算
//根据网点车辆的日期进行分组 val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache() //导入隐式转换 import sparkSession.implicits._ //定义计算好的指标结果集合对象 val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]() //循环遍历网点车辆每个日期的车辆明细宽表数据 ttDotDetailGroupByDayDF.collect().foreach(row=> { //获取到要处理的数据所在的日期 val day: String = row.getAs[String](0) //返回指定日期的仓库明细数据 val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2) //各网点的发车次数(西三旗:10,西二旗:20) val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) //计算各网点的总发车次数 val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first() //各网点的最大发车次数、最小发车次数、平均发车次数 val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first() val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0) val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1) val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2) // 各区域发车次数 val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first() // 各区域最大发车次数、最小发车次数和平均发车次数 val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first() val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0) val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1) val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2) // 各公司发车次数 val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first() // 各公司最大发车次数、最小发车次数和平均发车次数 val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first() val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0) val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1) val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2) //将计算好的指标数据封装到row对象中 val rowInfo: Row = Row( day, if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(), if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(), if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(), if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(), if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(), if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(), if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(), if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(), if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue() ) dotRows.append(rowInfo) println(rowInfo) ttDotDetailByDayDF.unpersist() ttDotTotalCountDF.unpersist() areaDotTotalCountDF.unpersist() companyDotTotalCountDF.unpersist() }) //根据仓库车辆的日期进行分组 val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache() //定义计算好的指标结果集合对象 val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]() //循环遍历仓库车辆每个日期的车辆明细宽表数据 ttWsDetailGroupByDayDF.collect().foreach(row=> { //获取到要处理的数据所在的日期 val day: String = row.getAs[String](0) //返回指定日期的仓库明细数据 val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2) val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) //计算各仓库的总发车次数 val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first() //各仓库的最大发车次数、最小发车次数、平均发车次数 val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first() val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0) val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1) val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2) // 各区域发车次数 val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first() // 各区域最大发车次数、最小发车次数和平均发车次数 val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first() val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0) val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1) val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2) // 各公司发车次数 val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first() // 各公司最大发车次数、最小发车次数和平均发车次数 val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first() val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0) val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1) val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2) //将计算好的指标数据封装到row对象中 val rowInfo: Row = Row( day, if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(), if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(), if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(), if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(), if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(), if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(), if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(), if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(), if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue() ) wsRows.append(rowInfo) println(rowInfo) ttWsDetailByDayDF.unpersist() areaTransportToolTotalCountDF.unpersist() companyTransportToolTotalCountDF.unpersist() whTransportToolTotalCountDF.unpersist() })
2.4、通过StructType构建指定Schema
//定义指标结果表的shema信息 //网点车辆相关的表结构数据 val schemaDot: StructType = StructType(Array( StructField("id", StringType, false, Metadata.empty), StructField("ttDotTotalCount", LongType, true, Metadata.empty), //各网点发车次数 StructField("maxTtDotTotalCount", LongType, true, Metadata.empty), //各网点最大发车次数 StructField("minTtDotTotalCount", LongType, true, Metadata.empty), //各网点最小发车次数 StructField("avgTtDotTotalCount", LongType, true, Metadata.empty), //各网点平均发车次数 StructField("areaDotTotalCount", LongType, true, Metadata.empty), //各区域发车次数 StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数 StructField("minAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数 StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数 StructField("companyDotTotalCount", LongType, true, Metadata.empty), //各公司发车次数 StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数 StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数 StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数 )) //仓库车辆相关的表结构数据 val schemaWs: StructType = StructType(Array( StructField("id", StringType, false, Metadata.empty), StructField("whTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库发车次数 StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最大发车次数 StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最小发车次数 StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库平均发车次数 StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域发车次数 StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数 StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数 StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数 StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司发车次数 StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数 StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数 StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数 ))
2.5、持久化指标数据到kudu表
//TODO 5)将计算好的指标数据写入到kudu数据库中 val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows) val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows) val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot) val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs) save(dotDataFrame, OfflineTableDefine.ttDotSummary) save(wsDataFrame, OfflineTableDefine.ttWsSummary)
2.6、完整代码
package cn.it.logistics.offline.dws import cn.it.logistics.common.{Configuration, DateHelper, OfflineTableDefine, SparkUtils} import cn.it.logistics.offline.OfflineApp import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType} import org.apache.spark.sql.functions._ import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer /** * 车辆主题指标开发 */ object TransportToolDWS extends OfflineApp{ //定义应用程序的名称 val appName = this.getClass.getSimpleName /** * 入口函数 * @param args */ def main(args: Array[String]): Unit = { /** * 实现步骤: * 1)创建SparkConf对象 * 2)创建SparkSession对象 * 3)读取车辆明细宽表的数据 * 4)对车辆明细宽表的数据进行指标的计算 * 5)将计算好的指标数据写入到kudu数据库中 * 5.1:定义指标结果表的schema信息 * 5.2:组织需要写入到kudu表的数据 * 5.3:判断指标结果表是否存在,如果不存在则创建 * 5.4:将数据写入到kudu表中 * 6)删除缓存数据 * 7)停止任务,退出sparksession */ //TODO 1)创建SparkConf对象 val sparkConf: SparkConf = SparkUtils.autoSettingEnv( SparkUtils.sparkConf(appName) ) //TODO 2)创建SparkSession对象 val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf) sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF) //处理数据 execute(sparkSession) } /** * 数据处理 * * @param sparkSession */ override def execute(sparkSession: SparkSession): Unit = { //TODO 3)读取车辆明细宽表的数据 val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2) val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2) //根据网点车辆的日期进行分组 val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache() //导入隐式转换 import sparkSession.implicits._ //定义计算好的指标结果集合对象 val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]() //循环遍历网点车辆每个日期的车辆明细宽表数据 ttDotDetailGroupByDayDF.collect().foreach(row=> { //获取到要处理的数据所在的日期 val day: String = row.getAs[String](0) //返回指定日期的仓库明细数据 val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2) //各网点的发车次数(西三旗:10,西二旗:20) val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) //计算各网点的总发车次数 val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first() //各网点的最大发车次数、最小发车次数、平均发车次数 val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first() val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0) val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1) val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2) // 各区域发车次数 val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first() // 各区域最大发车次数、最小发车次数和平均发车次数 val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first() val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0) val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1) val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2) // 各公司发车次数 val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first() // 各公司最大发车次数、最小发车次数和平均发车次数 val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first() val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0) val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1) val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2) //将计算好的指标数据封装到row对象中 val rowInfo: Row = Row( day, if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(), if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(), if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(), if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(), if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(), if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(), if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(), if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(), if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue() ) dotRows.append(rowInfo) println(rowInfo) ttDotDetailByDayDF.unpersist() ttDotTotalCountDF.unpersist() areaDotTotalCountDF.unpersist() companyDotTotalCountDF.unpersist() }) //根据仓库车辆的日期进行分组 val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache() //定义计算好的指标结果集合对象 val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]() //循环遍历仓库车辆每个日期的车辆明细宽表数据 ttWsDetailGroupByDayDF.collect().foreach(row=> { //获取到要处理的数据所在的日期 val day: String = row.getAs[String](0) //返回指定日期的仓库明细数据 val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2) val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) //计算各仓库的总发车次数 val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first() //各仓库的最大发车次数、最小发车次数、平均发车次数 val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first() val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0) val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1) val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2) // 各区域发车次数 val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first() // 各区域最大发车次数、最小发车次数和平均发车次数 val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first() val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0) val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1) val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2) // 各公司发车次数 val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2) val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first() // 各公司最大发车次数、最小发车次数和平均发车次数 val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first() val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0) val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1) val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2) //将计算好的指标数据封装到row对象中 val rowInfo: Row = Row( day, if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(), if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(), if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(), if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(), if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(), if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(), if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(), if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(), if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(), if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue() ) wsRows.append(rowInfo) println(rowInfo) ttWsDetailByDayDF.unpersist() areaTransportToolTotalCountDF.unpersist() companyTransportToolTotalCountDF.unpersist() whTransportToolTotalCountDF.unpersist() }) //定义指标结果表的shema信息 //网点车辆相关的表结构数据 val schemaDot: StructType = StructType(Array( StructField("id", StringType, false, Metadata.empty), StructField("ttDotTotalCount", LongType, true, Metadata.empty), //各网点发车次数 StructField("maxTtDotTotalCount", LongType, true, Metadata.empty), //各网点最大发车次数 StructField("minTtDotTotalCount", LongType, true, Metadata.empty), //各网点最小发车次数 StructField("avgTtDotTotalCount", LongType, true, Metadata.empty), //各网点平均发车次数 StructField("areaDotTotalCount", LongType, true, Metadata.empty), //各区域发车次数 StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数 StructField("minAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数 StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数 StructField("companyDotTotalCount", LongType, true, Metadata.empty), //各公司发车次数 StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数 StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数 StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数 )) //仓库车辆相关的表结构数据 val schemaWs: StructType = StructType(Array( StructField("id", StringType, false, Metadata.empty), StructField("whTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库发车次数 StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最大发车次数 StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最小发车次数 StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库平均发车次数 StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域发车次数 StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数 StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数 StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数 StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司发车次数 StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数 StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数 StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数 )) //TODO 5)将计算好的指标数据写入到kudu数据库中 val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows) val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows) val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot) val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs) save(dotDataFrame, OfflineTableDefine.ttDotSummary) save(wsDataFrame, OfflineTableDefine.ttWsSummary) // 6)删除缓存数据 ttDotDetailDF.unpersist() ttWarehouseDetailDF.unpersist() ttDotDetailGroupByDayDF.unpersist() ttWsDetailGroupByDayDF.unpersist() // 7)停止任务,退出sparksession sparkSession.stop() } }