数仓学习---数仓开发之DWD层

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时数仓Hologres,5000CU*H 100GB 3个月
简介: 数仓学习---数仓开发之DWD

数仓开发之DWD

DWD层设计要点:

(1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。

(2)DWD层的数据存储格式为orc列式存储+snappy压缩。

(3)DWD层表名的命名规范为dwd_数据域(分类)_原子行为(login_success)_单分区增量全量标识(inc/full


DWD层
---Data Warehourse Detail
---detail:详细,明细
---对ODS层的数据进行加工,为后续的统计分析做准备
DIM层主要功能其实是分析数据:面向状态
DWD层主要功能其实是统计数据:面向行为
--业务过程就是业务行为
DWD层的表中主要保存的就是业务行为数据,表的设计需要遵循建模理论-维度建模-事实(行为表)
--事实表
  --包含维度
    --维度越多,行为越详细,维度越少,行为越简单
  --包含度量值
    --所有的行为必须可以用于用于统计,这里用于统计的值就是度量值
--事实表分类
  --事务事实表
    --绝大多数对的事实表都是事务型事实表
    --事务:原子性
    --粒度:行为描述的详细程序,称为细粒度
    --维度越多,粒度越细
  --周期快照表
  --累计快照事实表
  --累计开照事实表

事务型事实表

设计流程

设计事务事实表时一般可遵循以下四个步骤。

选择业务过程→声明粒度(确定行)→确认维度(确定列)→确认事实(确定度量值)

交易域加购事务事实表

DROP TABLE IF EXISTS dwd_trade_cart_add_inc;
CREATE EXTERNAL TABLE dwd_trade_cart_add_inc
(
    `id`                  STRING COMMENT '编号',
    `user_id`            STRING COMMENT '用户ID',
    `sku_id`             STRING COMMENT 'SKU_ID',
    `date_id`            STRING COMMENT '日期ID',
    `create_time`        STRING COMMENT '加购时间',
    `sku_num`            BIGINT COMMENT '加购物车件数'
) COMMENT '交易域加购事务事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/gmall/dwd/dwd_trade_cart_add_inc/'
    TBLPROPERTIES ('orc.compress' = 'snappy');

从ODS层的增量表获取数据,能想到什么?

   --数据格式:JSON

   --首日:全量

   --每日:增量

   全量:首日,包含历史数据(4,5,6,7,8),无法判断行为,折中的认为当前数据全部都是新增购物(加购)

   增量:每日,每天新增及变化

   沉淀:计算

2)数据装载

(1)思路分析

① 分区规划

为了避免全表扫描,事务事实表也要按天分区,数据应进入业务过程发生日期对应的分区。如2022-06-08发生的加购操作进入2022-06-08分区。

② 数据流向

事务事实表的数据源表都做了增量采集,数仓上线首日执行一次全表扫描获取历史全量数据,从第二日开始只采集每日变更数据。事实表上游ODS层表的首日分区可能包含多天的业务操作,这些数据应进入事实表的不同分区,第二日及之后的分区只包含当日的业务操作,应进入事实表的当日分区。

③ 数据装载

首日和每日装载数据集和数据流向的不同,导致了二者处理逻辑的差异,需要分开处理。加购操作发生时,只会导致业务库cart_info表的数据发生变化,因而本节事实表的数据来源于cart_info的变更记录,应从ods_cart_info_inc表中读取数据。

a)首日装载

获取ods_cart_add_inc表首日分区的数据,写入加购操作发生日期对应的分区即可。

b)每日装载

获取ods_cart_add_inc当日分区的数据,写入事实表当日分区。

(2)知识储备

from_utc_timestamp({any primitive type} ts, string timezone):将UTC时间戳ts转化为给定时区的timestamp类型数据,展示为YYYY-MM-DD HH:MM:SS.fffffffff格式的日期字符串。ts可以是任意原生类型,包括timestamp/date,tinyint/smallint/init/bigint,float/double和decimal。如果ts是小数会被视为秒级时间戳,如果是整数会被视为毫秒时间戳。timezone是用于指明时区的字符串,可以用IANA时区数据中的时区名称或时区偏移量表示,如东八区可以用GMT+8或Asia/Shanghai表示。

(3)执行步骤

① 首日装载

从ods_cart_info_inc中筛选2022-06-08分区的数据。首日执行maxwell-bootstrap命令做全表扫描,将扫描到的数据交给Maxwell进程封装为JSON字符串,这些数据的操作类型分为bootstrap-start、bootstrap-insert、bootstrap-complete三类,第一类和第三类分别标记了全表扫描任务的开始和结束,只有操作类型为第二类的JSON包含了统计所须的数据, 

接下来,如何从ods_cart_info_inc中提取加购操作?加购操作发生时,若加购表没有该用户对该sku的加购记录,就会新增一条数据,create_time为加购时间;否则修改数据,将sku_num更改为加购之后的值,此时update_time为加购时间。业务数据库没有记录cart_info的变更操作,首日执行全表扫描只能获取第一种加购记录,我们认为每条记录都对应一次加购操作,create_time为加购时间,sku_num为加购商品数。这样做存在误差,不可避免。

最后,筛选所须字段,基于create_time计算加购日期,作为分区字段,通过动态分区将数据写入加购日期对应分区即可。

首日装载
分区策略:哪一天的行为数据就存放到哪一天的分区中
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_trade_cart_add_inc partition (dt)
select
    data.id,
    data.user_id,
    data.sku_id,
    date_format(data.create_time,'yyyy-MM-dd') date_id,
    data.create_time,
    data.sku_num,
    date_format(data.create_time, 'yyyy-MM-dd')
from ods_cart_info_inc
    where dt = '2022-06-08'
    and type = 'bootstrap-insert';

② 每日装载

每日装载相对简单。只需要获取ods_cart_info_inc当日分区的数据处理后写入事实表当日分区即可。需要注意的是,从第二日开始我们可以采集到两类加购操作,如下。

Ø 操作类型为insert,加购时间为create_time。

Ø 操作类型为update且sku_num大于变更前的sku_num。当前的sku_num存储在data字段下,变更前的sku_num存储在old字段下。加购时间为update_time。

--每日装载:不包含历史数据,主要当天新增及变化的数据
--首日:2022-06-08
--每日:2022-06-09,10,11,11

insert overwrite table dwd_trade_cart_add_inc partition (dt = '2022-06-09')
select data.id,
       data.user_id,
       data.sku_id,
      --from_unixtime:0时区的时间
       date_format(from_utc_timestamp(ts * 1000, 'GMT+8'), 'yyyy-MM-dd')                          date_id,
       date_format(from_utc_timestamp(ts * 1000, 'GMT+8'), 'yyyy-MM-dd HH:mm:ss')                 create_time,
       if(type = 'insert', data.sku_num, cast(data.sku_num as int) - cast(old['sku_num'] as int)) sku_num
from ods_cart_info_inc
where dt = '2022-06-09'
  and (type = 'insert'
    or (type = 'update' and old['sku_num'] is not null and cast(data.sku_num as int) > cast(old['sku_num'] as int)));

DWD层-事实表练习

为统计分析做准备

image.png

image.png

交易域下单事务事实表

1)建表语句
DROP TABLE IF EXISTS dwd_trade_order_detail_inc;
CREATE EXTERNAL TABLE dwd_trade_order_detail_inc
(
    `id`                     STRING COMMENT '编号',
    `order_id`              STRING COMMENT '订单ID',
    `user_id`               STRING COMMENT '用户ID',
    `sku_id`                STRING COMMENT '商品ID',
    `province_id`          STRING COMMENT '省份ID',
    `activity_id`          STRING COMMENT '参与活动ID',
    `activity_rule_id`    STRING COMMENT '参与活动规则ID',
    `coupon_id`             STRING COMMENT '使用优惠券ID',
    `date_id`               STRING COMMENT '下单日期ID',
    `create_time`           STRING COMMENT '下单时间',
    `sku_num`                BIGINT COMMENT '商品数量',
    `split_original_amount` DECIMAL(16, 2) COMMENT '原始价格',
    `split_activity_amount` DECIMAL(16, 2) COMMENT '活动优惠分摊',
    `split_coupon_amount`   DECIMAL(16, 2) COMMENT '优惠券优惠分摊',
    `split_total_amount`    DECIMAL(16, 2) COMMENT '最终价格分摊'
) COMMENT '交易域下单事务事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/gmall/dwd/dwd_trade_order_detail_inc/'
    TBLPROPERTIES ('orc.compress' = 'snappy');

数据装载

首日装载

set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_trade_order_detail_inc partition (dt)
select
    od.id,
    order_id,
    user_id,
    sku_id,
    province_id,
    activity_id,
    activity_rule_id,
    coupon_id,
    date_format(create_time, 'yyyy-MM-dd') date_id,
    create_time,
    sku_num,
    split_original_amount,
    nvl(split_activity_amount,0.0),
    nvl(split_coupon_amount,0.0),
    split_total_amount,
    date_format(create_time,'yyyy-MM-dd')
from
(
    select
        data.id,
        data.order_id,
        data.sku_id,
        data.create_time,
        data.sku_num,
        data.sku_num * data.order_price split_original_amount,
        data.split_total_amount,
        data.split_activity_amount,
        data.split_coupon_amount
    from ods_order_detail_inc
    where dt = '2022-06-08'
    and type = 'bootstrap-insert'
) od
left join
(
    select
        data.id,
        data.user_id,
        data.province_id
    from ods_order_info_inc
    where dt = '2022-06-08'
    and type = 'bootstrap-insert'
) oi
on od.order_id = oi.id
left join
(
    select
        data.order_detail_id,
        data.activity_id,
        data.activity_rule_id
    from ods_order_detail_activity_inc
    where dt = '2022-06-08'
    and type = 'bootstrap-insert'
) act
on od.id = act.order_detail_id
left join
(
    select
        data.order_detail_id,
        data.coupon_id
    from ods_order_detail_coupon_inc
    where dt = '2022-06-08'
    and type = 'bootstrap-insert'
) cou
on od.id = cou.order_detail_id;

每日装载

insert overwrite table dwd_trade_order_detail_inc partition (dt='2022-06-09')
select
    od.id,
    order_id,
    user_id,
    sku_id,
    province_id,
    activity_id,
    activity_rule_id,
    coupon_id,
    date_id,
    create_time,
    sku_num,
    split_original_amount,
    nvl(split_activity_amount,0.0),
    nvl(split_coupon_amount,0.0),
    split_total_amount
from
(
    select
        data.id,
        data.order_id,
        data.sku_id,
        date_format(data.create_time, 'yyyy-MM-dd') date_id,
        data.create_time,
        data.sku_num,
        data.sku_num * data.order_price split_original_amount,
        data.split_total_amount,
        data.split_activity_amount,
        data.split_coupon_amount
    from ods_order_detail_inc
    where dt = '2022-06-09'
    and type = 'insert'
) od
left join
(
    select
        data.id,
        data.user_id,
        data.province_id
    from ods_order_info_inc
    where dt = '2022-06-09'
    and type = 'insert'
) oi
on od.order_id = oi.id
left join
(
    select
        data.order_detail_id,
        data.activity_id,
        data.activity_rule_id
    from ods_order_detail_activity_inc
    where dt = '2022-06-09'
    and type = 'insert'
) act
on od.id = act.order_detail_id
left join
(
    select
        data.order_detail_id,
        data.coupon_id
    from ods_order_detail_coupon_inc
    where dt = '2022-06-09'
    and type = 'insert'
) cou
on od.id = cou.order_detail_id;

交易域支付成功事务事实表

DROP TABLE IF EXISTS dwd_trade_pay_detail_suc_inc;
CREATE EXTERNAL TABLE dwd_trade_pay_detail_suc_inc
(
    `id`                      STRING COMMENT '编号',
    `order_id`               STRING COMMENT '订单ID',
    `user_id`                STRING COMMENT '用户ID',
    `sku_id`                 STRING COMMENT 'SKU_ID',
    `province_id`           STRING COMMENT '省份ID',
    `activity_id`           STRING COMMENT '参与活动ID',
    `activity_rule_id`     STRING COMMENT '参与活动规则ID',
    `coupon_id`              STRING COMMENT '使用优惠券ID',
    `payment_type_code`     STRING COMMENT '支付类型编码',
    `payment_type_name`     STRING COMMENT '支付类型名称',
    `date_id`                STRING COMMENT '支付日期ID',
    `callback_time`         STRING COMMENT '支付成功时间',
    `sku_num`                 BIGINT COMMENT '商品数量',
    `split_original_amount` DECIMAL(16, 2) COMMENT '应支付原始金额',
    `split_activity_amount` DECIMAL(16, 2) COMMENT '支付活动优惠分摊',
    `split_coupon_amount`   DECIMAL(16, 2) COMMENT '支付优惠券优惠分摊',
    `split_payment_amount`  DECIMAL(16, 2) COMMENT '支付金额'
) COMMENT '交易域支付成功事务事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/gmall/dwd/dwd_trade_pay_detail_suc_inc/'
    TBLPROPERTIES ('orc.compress' = 'snappy');

首日装载

set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_trade_order_detail_inc partition (dt)
select
    od.id,
    order_id,
    user_id,
    sku_id,
    province_id,
    activity_id,
    activity_rule_id,
    coupon_id,
    date_format(create_time, 'yyyy-MM-dd') date_id,
    create_time,
    sku_num,
    split_original_amount,
    nvl(split_activity_amount,0.0),
    nvl(split_coupon_amount,0.0),
    split_total_amount,
    date_format(create_time,'yyyy-MM-dd')
from
(
    select
        data.id,
        data.order_id,
        data.sku_id,
        data.create_time,
        data.sku_num,
        data.sku_num * data.order_price split_original_amount,
        data.split_total_amount,
        data.split_activity_amount,
        data.split_coupon_amount
    from ods_order_detail_inc
    where dt = '2022-06-08'
    and type = 'bootstrap-insert'
) od
left join
(
    select
        data.id,
        data.user_id,
        data.province_id
    from ods_order_info_inc
    where dt = '2022-06-08'
    and type = 'bootstrap-insert'
) oi
on od.order_id = oi.id
left join
(
    select
        data.order_detail_id,
        data.activity_id,
        data.activity_rule_id
    from ods_order_detail_activity_inc
    where dt = '2022-06-08'
    and type = 'bootstrap-insert'
) act
on od.id = act.order_detail_id
left join
(
    select
        data.order_detail_id,
        data.coupon_id
    from ods_order_detail_coupon_inc
    where dt = '2022-06-08'
    and type = 'bootstrap-insert'
) cou
on od.id = cou.order_detail_id;

② 每日装载

insert overwrite table dwd_trade_order_detail_inc partition (dt='2022-06-09')
select
    od.id,
    order_id,
    user_id,
    sku_id,
    province_id,
    activity_id,
    activity_rule_id,
    coupon_id,
    date_id,
    create_time,
    sku_num,
    split_original_amount,
    nvl(split_activity_amount,0.0),
    nvl(split_coupon_amount,0.0),
    split_total_amount
from
(
    select
        data.id,
        data.order_id,
        data.sku_id,
        date_format(data.create_time, 'yyyy-MM-dd') date_id,
        data.create_time,
        data.sku_num,
        data.sku_num * data.order_price split_original_amount,
        data.split_total_amount,
        data.split_activity_amount,
        data.split_coupon_amount
    from ods_order_detail_inc
    where dt = '2022-06-09'
    and type = 'insert'
) od
left join
(
    select
        data.id,
        data.user_id,
        data.province_id
    from ods_order_info_inc
    where dt = '2022-06-09'
    and type = 'insert'
) oi
on od.order_id = oi.id
left join
(
    select
        data.order_detail_id,
        data.activity_id,
        data.activity_rule_id
    from ods_order_detail_activity_inc
    where dt = '2022-06-09'
    and type = 'insert'
) act
on od.id = act.order_detail_id
left join
(
    select
        data.order_detail_id,
        data.coupon_id
    from ods_order_detail_coupon_inc
    where dt = '2022-06-09'
    and type = 'insert'
) cou
on od.id = cou.order_detail_id;

交易域购物车周期快照事实表

DROP TABLE IF EXISTS dwd_trade_cart_full;
CREATE EXTERNAL TABLE dwd_trade_cart_full
(
    `id`         STRING COMMENT '编号',
    `user_id`   STRING COMMENT '用户ID',
    `sku_id`    STRING COMMENT 'SKU_ID',
    `sku_name`  STRING COMMENT '商品名称',
    `sku_num`   BIGINT COMMENT '现存商品件数'
) COMMENT '交易域购物车周期快照事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/gmall/dwd/dwd_trade_cart_full/'
    TBLPROPERTIES ('orc.compress' = 'snappy');
insert overwrite table dwd_trade_cart_full partition(dt='2022-06-08')
select
    id,
    user_id,
    sku_id,
    sku_name,
    sku_num
from ods_cart_info_full
where dt='2022-06-08'
and is_ordered='0';

交易域交易流程累积快照事实表

特殊需求:下单到支付时间间隔的平均值

将一个流程中的多个行为状态数据累计在一张表中

DROP TABLE IF EXISTS dwd_trade_trade_flow_acc;
CREATE EXTERNAL TABLE dwd_trade_trade_flow_acc
(
    `order_id`               STRING COMMENT '订单ID',
    `user_id`                STRING COMMENT '用户ID',
    `province_id`           STRING COMMENT '省份ID',
    `order_date_id`         STRING COMMENT '下单日期ID',
    `order_time`             STRING COMMENT '下单时间',
    `payment_date_id`        STRING COMMENT '支付日期ID',
    `payment_time`           STRING COMMENT '支付时间',
    `finish_date_id`         STRING COMMENT '确认收货日期ID',
    `finish_time`             STRING COMMENT '确认收货时间',
    `order_original_amount` DECIMAL(16, 2) COMMENT '下单原始价格',
    `order_activity_amount` DECIMAL(16, 2) COMMENT '下单活动优惠分摊',
    `order_coupon_amount`   DECIMAL(16, 2) COMMENT '下单优惠券优惠分摊',
    `order_total_amount`    DECIMAL(16, 2) COMMENT '下单最终价格分摊',
    `payment_amount`         DECIMAL(16, 2) COMMENT '支付金额'
) COMMENT '交易域交易流程累积快照事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/gmall/dwd/dwd_trade_trade_flow_acc/'
TBLPROPERTIES ('orc.compress' = 'snappy');

(1)思路分析

累积快照事实表与拉链表的处理逻辑非常相似,可以互为参考。

① 分区规划

累积快照事实表是为了处理涉及同一业务流程多个业务过程的指标而设计的。它的分区规划与拉链表完全相同,其中,9999-12-31分区存储的是业务流程尚未结束的所有数据,普通分区存储的是业务流程在当日结束的数据。

② 数据流向

a)首日装载

与拉链表不同,累积快照事实表的首日装载也需要动态分区。因为历史数据中一定会有业务流程已结束的数据,这部分数据应进入流程结束当日分区,也会有未完成的数据,进入9999-12-31分区。

b)每日装载

业务流程在当日结束的数据进入当日分区,未完成的数据进入9999-12-31分区。

③ 数据装载

本节仅筛选交易流程的三个关键结点:下单、支付、确认收货。下单时间通过order_info的create_time字段获取,支付操作通过payment_info的callback_time字段获取,参考上文,不再赘述。order_status_log记录了订单表状态的变更明细,确认收货时该表会插入一条状态为1006的数据,create_time即确认收货时间。

下单、支付、确认收货是按照时间顺序依次发生的,因此,在任意时刻下单数据中order_id的集合包含已支付order_id的集合,后者又包含已确认收货order_id的集合,显然,应将取自order_info表的子查询作为主表,通过order_id字段left join支付成功数据,通过order_id字段left join确认收货数据。

最后,选择所须字段写入对应分区即可。

a)首日装载

筛选ODS层相关表首日分区数据,关联后写入所属分区。确认收货时间即业务流程结束时间,若该字段不为null则说明业务流程已完成,进入收货日期对应分区,否则进入9999-12-31分区。

b)每日装载

筛选ODS层相关表每日分区数据,与9999-12-31分区数据union、关联后(下文详述)写入所属分区。

(2)执行步骤

① 首日装载

a)子查询

筛选上述ods_order_info_inc、ods_payment_info_inc、ods_order_status_log_inc表首日分区操作类型为bootstrap-insert类型的数据。其中,ods_payment_info_inc还要限定payment_status为1602(支付成功状态),ods_order_status_log_inc要限定order_status为1006(已完成,即确认收货)。按照前文所述关联。

b)关联

筛选所须字段,处理时间字段。其中,下单时间为取自订单表的create_time,支付成功时间为取自支付表的callback_time,确认收货时间取自订单流水表的create_time。如果确认收货时间不为null,说明业务流程结束,将其格式化为yyyy-MM-dd格式,作为分区字段dt的值,否则业务流程未结束,dt取值为9999-12-31。按照dt动态分区写入本节事实表。

② 每日装载

a)子查询

与首日装载相比筛选条件有所不同,如下。

Ø ods_order_info_inc:筛选首日分区操作类型为insert的数据。

Ø ods_payment_info_inc:筛选首日分区操作类型为update、payment_status为1602且变更字段包含了payment_status(该条件可省略,参见9.3节说明)的数据。所得子查询记为pi。

Ø ods_order_status_log_inc:筛选首日分区操作类型为insert,且order_status为1006的数据。所得子查询记为log。

与首日装载不同,每日装载要考虑前日的9999-12-31分区,这部分数据并没有完成业务流程,仍可能发生变更。我们需要将该分区数据与上述三个业务过程当日新增的记录合并,有两种实现思路,如下。

i)第一步,根据上文所述筛选条件将下单、支付、确认收货业务过程当日数据分别作为子查询,关联起来,关联方式及条件、字段处理逻辑与首日相同。

第二步,筛选9999-12-31分区数据,该分区数据分为两种:已下单未支付,已支付未收货(已收货则业务流程结束,进入收货当日分区)。第一类记录和当日新增的下单记录处理逻辑完全相同,与pi、log子查询left join即可。第二类记录已支付,因而在pi子查询中一定不会有对应记录,left join关联log即可。根据上述分析,似乎我们需要将9999-12-31分区的数据分成两部分处理,比较麻烦。实际上,这两类数据的处理完全可以统一,对于第二类数据,pi表中一定不存在满足关联条件的记录,left join pi得到的子查询与关联前的主表完全一致,因此对于这部分数据,分别与pi、log关联的结果和仅关联log的结果是一样的。综上,我们只需要把第一步子查询的主表替换为9999-12-31分区的数据即可。

第一步和第二步获得的子查询数据结构完全相同,union在一起,通过dt字段动态分区写入本节事实表即可。

ii)观察方法i)的实现步骤,可以发现当日新增下单记录和昨日未完成数据(9999-12-31分区数据)处理逻辑完全一致,可以用union运算符将二者合并起来,作为oi子查询,而后与pi及log通过left join关联即可。需要注意的是,待union的两部分子查询数据结构不同,新增的下单记录缺少五个字段,需要用null值或0.0填充,处理如下。

Ø payment_date_id:在当日新增下单记录子查询中补null。对于9999-12-31分区未完成的历史数据(以下简称历史数据),若前日已支付,则oi子查询中的支付日期不为null,直接获取,否则可能在当日支付,也可能不支付,无论哪种情况此时获取pi子查询中的支付日期即可。对于当日新增的下单记录,oi子查询中的支付日期为null,最终的支付日期与pi子查询中的支付日期保持一致。综上,关联后取数逻辑为nvl(oi.payment_date_id,pi.payment_date_id)

Ø payment_time:在当日新增下单记录子查询中补null,关联后取数逻辑为nvl(oi.payment_time,pi.payment_time)。思路同上。

Ø finish_date_id:在当日新增下单记录子查询中补null,关联后取数逻辑为nvl(oi.finish_date_id,log.finish_date_id)。与支付日期同理。

Ø finish_time:在当日新增下单记录子查询中补null,关联后取数逻辑为nvl(oi.finish_time,log.finish_time)。与支付时间同理。

Ø payment_amount:在当日新增下单记录子查询中补0.0。对于9999-12-31分区未完成的历史数据(以下简称历史数据),若前日已支付,则oi子查询中的支付金额就是我们要的值,直接获取。否则可能在当日支付,也可能不支付,对于前者,获取pi子查询中的支付金额即可,对于后者,pi子查询中的支付金额为null,应置为0.0,因次要在获取pi.payment_amount后做空值处理,若为null则置为0.0。对于当日新增的下单记录,oi子查询中的支付金额为0.0,支付金额应取自pi子查询的payment_amount字段,若该字段不为null则直接获取,否则做空值处理,同上。综上,关联后取数逻辑为nvl(if(oi.payment_amount = 0.0, pi.payment_amount, oi.payment_amount), 0.0)

最后,筛选所须字段,生成分区字段,写入事实表。分区字段取数逻辑如下。

oi子查询的两类数据的完成日期一定为null:第一类数据位于9999-12-31分区,业务流程未完成,确认收货日期一定为null,第二类数据的完成日期是我们补充的null值。因而只需要判断log子查询中的完成日期即可。若log. finish_date_id不为null,说明流程在当日结束,数据写入当日分区,否则流程未完成,写入9999-12-31分区。综上,分区字段计算逻辑为:nvl(log.finish_date_id,'9999-12-31')

本节选用方法ii)完成每日装载。

③ 特殊情况说明

实际上,上述交易流程并不是线性的,从下单到确认收货的任何一步都可能取消订单,确认收货和取消订单都可以终结交易流程。对于这类非线性过程的处理,通常有两种处理方式,如下。

a)将确认收货和取消订单都作为业务流程结束的标志,将数据写入业务流程结束日期对应的分区。

b)如果取消订单,将数据永久地保留在9999-12-31分区,未达到的业务过程对应里程碑字段置空。

两种方式均可,本节选择的是Plan B。生产环境如何处理需要综合考虑下游需求、企业硬件资源等因素,选取合适的方案。

(3)图解

① 分区规划

image.png

数据流向

image.png

数据装载

image.png

分区策略
--查询的数据如何存储到表的分区中
   --查询数据的效率,数据有效
   --ODS层
     --一天采集的数据就存储到表的一天的分区
   --DIM层
     --全量:每天一份全量数据存放到一天的分区中
     --拉链:会采用结束时间作为分区字段
   --DWD层
     --事务型事实表:一天的业务行为数据存储在一天的分区中
     --周期型快照事实表:将每天的全部状态数据保存到这一天的分区中
  

首日装载

set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_trade_trade_flow_acc partition(dt)
select
    oi.id,
    user_id,
    province_id,
    date_format(create_time,'yyyy-MM-dd'),
    create_time,
    date_format(callback_time,'yyyy-MM-dd'),
    callback_time,
    date_format(finish_time,'yyyy-MM-dd'),
    finish_time,
    original_total_amount,
    activity_reduce_amount,
    coupon_reduce_amount,
    total_amount,
    nvl(payment_amount,0.0),
    nvl(date_format(finish_time,'yyyy-MM-dd'),'9999-12-31')
from
(
    select
        data.id,
        data.user_id,
        data.province_id,
        data.create_time,
        data.original_total_amount,
        data.activity_reduce_amount,
        data.coupon_reduce_amount,
        data.total_amount
    from ods_order_info_inc
    where dt='2022-06-08'
    and type='bootstrap-insert'
)oi
left join
(
    select
        data.order_id,
        data.callback_time,
        data.total_amount payment_amount
    from ods_payment_info_inc
    where dt='2022-06-08'
    and type='bootstrap-insert'
    and data.payment_status='1602'
)pi
on oi.id=pi.order_id
left join
(
    select
        data.order_id,
        data.create_time finish_time
    from ods_order_status_log_inc
    where dt='2022-06-08'
    and type='bootstrap-insert'
    and data.order_status='1004'
)log
on oi.id=log.order_id;

每日装载

set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_trade_trade_flow_acc partition(dt)
select
    oi.order_id,
    user_id,
    province_id,
    order_date_id,
    order_time,
    nvl(oi.payment_date_id,pi.payment_date_id),
    nvl(oi.payment_time,pi.payment_time),
    nvl(oi.finish_date_id,log.finish_date_id),
    nvl(oi.finish_time,log.finish_time),
    order_original_amount,
    order_activity_amount,
    order_coupon_amount,
    order_total_amount,
    nvl(oi.payment_amount,pi.payment_amount),
    nvl(nvl(oi.finish_time,log.finish_time),'9999-12-31')
from
(
    select
        order_id,
        user_id,
        province_id,
        order_date_id,
        order_time,
        payment_date_id,
        payment_time,
        finish_date_id,
        finish_time,
        order_original_amount,
        order_activity_amount,
        order_coupon_amount,
        order_total_amount,
        payment_amount
    from dwd_trade_trade_flow_acc
    where dt='9999-12-31'
    union all
    select
        data.id,
        data.user_id,
        data.province_id,
        date_format(data.create_time,'yyyy-MM-dd') order_date_id,
        data.create_time,
        null payment_date_id,
        null payment_time,
        null finish_date_id,
        null finish_time,
        data.original_total_amount,
        data.activity_reduce_amount,
        data.coupon_reduce_amount,
        data.total_amount,
        null payment_amount
    from ods_order_info_inc
    where dt='2022-06-09'
    and type='insert'
)oi
left join
(
    select
        data.order_id,
        date_format(data.callback_time,'yyyy-MM-dd') payment_date_id,
        data.callback_time payment_time,
        data.total_amount payment_amount
    from ods_payment_info_inc
    where dt='2022-06-09'
    and type='update'
    and array_contains(map_keys(old),'payment_status')
    and data.payment_status='1602'
)pi
on oi.order_id=pi.order_id
left join
(
    select
        data.order_id,
        date_format(data.create_time,'yyyy-MM-dd') finish_date_id,
        data.create_time finish_time
    from ods_order_status_log_inc
    where dt='2022-06-09'
    and type='insert'
    and data.order_status='1004'
)log
on oi.order_id=log.order_id;

工具域用户优惠券粒度优惠券使用(支付)最近1日汇总表

--工具域
--优惠券使用
--事务事实表(度量值:隐藏的支付次数)
DROP TABLE IF EXISTS dws_tool_user_coupon_coupon_used_1d;
CREATE EXTERNAL TABLE dws_tool_user_coupon_coupon_used_1d
(
    `user_id`          STRING COMMENT '用户ID',
    `coupon_id`        STRING COMMENT '优惠券ID',
    `coupon_name`      STRING COMMENT '优惠券名称',
    `coupon_type_code` STRING COMMENT '优惠券类型编码',
    `coupon_type_name` STRING COMMENT '优惠券类型名称',
    `benefit_rule`     STRING COMMENT '优惠规则',
    `used_count_1d`    STRING COMMENT '使用(支付)次数'
) COMMENT '工具域用户优惠券粒度优惠券使用(支付)最近1日汇总表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/gmall/dws/dws_tool_user_coupon_coupon_used_1d'
    TBLPROPERTIES ('orc.compress' = 'snappy');

数据装载

首日装载


set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dws_tool_user_coupon_coupon_used_1d partition(dt)
select
    user_id,
    coupon_id,
    coupon_name,
    coupon_type_code,
    coupon_type_name,
    benefit_rule,
    used_count,
    dt
from
(
    select
        dt,
        user_id,
        coupon_id,
        count(*) used_count
    from dwd_tool_coupon_used_inc
    group by dt,user_id,coupon_id
)t1
left join
(
    select
        id,
        coupon_name,
        coupon_type_code,
        coupon_type_name,
        benefit_rule
    from dim_coupon_full
    where dt='2022-06-08'
)t2
on t1.coupon_id=t2.id;

每日装载

insert overwrite table dws_tool_user_coupon_coupon_used_1d partition(dt='2022-06-09')
select
    user_id,
    coupon_id,
    coupon_name,
    coupon_type_code,
    coupon_type_name,
    benefit_rule,
    used_count
from
(
    select
        user_id,
        coupon_id,
        count(*) used_count
    from dwd_tool_coupon_used_inc
    where dt='2022-06-09'
    group by user_id,coupon_id
)t1
left join
(
    select
        id,
        coupon_name,
        coupon_type_code,
        coupon_type_name,
        benefit_rule
    from dim_coupon_full
    where dt='2022-06-09'
)t2
on t1.coupon_id=t2.id;

互动域商品粒度收藏商品最近1日汇总表

建表语句
DROP TABLE IF EXISTS dws_interaction_sku_favor_add_1d;
CREATE EXTERNAL TABLE dws_interaction_sku_favor_add_1d
(
    `sku_id`             STRING COMMENT 'SKU_ID',
    `sku_name`           STRING COMMENT 'SKU名称',
    `category1_id`       STRING COMMENT '一级品类ID',
    `category1_name`     STRING COMMENT '一级品类名称',
    `category2_id`       STRING COMMENT '二级品类ID',
    `category2_name`     STRING COMMENT '二级品类名称',
    `category3_id`       STRING COMMENT '三级品类ID',
    `category3_name`     STRING COMMENT '三级品类名称',
    `tm_id`              STRING COMMENT '品牌ID',
    `tm_name`            STRING COMMENT '品牌名称',
    `favor_add_count_1d` BIGINT COMMENT '商品被收藏次数'
) COMMENT '互动域商品粒度收藏商品最近1日汇总表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/gmall/dws/dws_interaction_sku_favor_add_1d'
    TBLPROPERTIES ('orc.compress' = 'snappy');
首日装载
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dws_interaction_sku_favor_add_1d partition(dt)
select
    sku_id,
    sku_name,
    category1_id,
    category1_name,
    category2_id,
    category2_name,
    category3_id,
    category3_name,
    tm_id,
    tm_name,
    favor_add_count,
    dt
from
(
    select
        dt,
        sku_id,
        count(*) favor_add_count
    from dwd_interaction_favor_add_inc
    group by dt,sku_id
)favor
left join
(
    select
        id,
        sku_name,
        category1_id,
        category1_name,
        category2_id,
        category2_name,
        category3_id,
        category3_name,
        tm_id,
        tm_name
    from dim_sku_full
    where dt='2022-06-08'
)sku
on favor.sku_id=sku.id;
每日装载
insert overwrite table dws_interaction_sku_favor_add_1d partition(dt='2022-06-09')
select
    sku_id,
    sku_name,
    category1_id,
    category1_name,
    category2_id,
    category2_name,
    category3_id,
    category3_name,
    tm_id,
    tm_name,
    favor_add_count
from
(
    select
        sku_id,
        count(*) favor_add_count
    from dwd_interaction_favor_add_inc
    where dt='2022-06-09'
    group by sku_id
)favor
left join
(
    select
        id,
        sku_name,
        category1_id,
        category1_name,
        category2_id,
        category2_name,
        category3_id,
        category3_name,
        tm_id,
        tm_name
    from dim_sku_full
    where dt='2022-06-09'
)sku
on favor.sku_id=sku.id;
相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
5月前
|
存储 数据采集 JavaScript
深入理解数仓开发(一)数据技术篇之日志采集
深入理解数仓开发(一)数据技术篇之日志采集
|
5月前
|
消息中间件 关系型数据库 Kafka
深入理解数仓开发(二)数据技术篇之数据同步
深入理解数仓开发(二)数据技术篇之数据同步
|
4月前
|
存储 DataWorks Java
DataWorks产品使用合集之开发离线数仓时,需要多个工作空间的情况有哪些
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
消息中间件 存储 Kafka
Flink 实时数仓(二)【ODS 层开发】
Flink 实时数仓(二)【ODS 层开发】
|
5月前
|
SQL
离线数仓(十)【ADS 层开发】(5)
离线数仓(十)【ADS 层开发】
|
1月前
|
人工智能 自然语言处理 关系型数据库
阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成
近日,阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成。
|
1月前
|
人工智能 分布式计算 数据管理
阿里云位居 IDC MarketScape 中国实时湖仓评估领导者类别
国际数据公司( IDC )首次发布了《IDC MarketScape: 中国实时湖仓市场 2024 年厂商评估》,阿里云在首次报告发布即位居领导者类别。
|
1月前
|
SQL 分布式计算 数据挖掘
加速数据分析:阿里云Hologres在实时数仓中的应用实践
【10月更文挑战第9天】随着大数据技术的发展,企业对于数据处理和分析的需求日益增长。特别是在面对海量数据时,如何快速、准确地进行数据查询和分析成为了关键问题。阿里云Hologres作为一个高性能的实时交互式分析服务,为解决这些问题提供了强大的支持。本文将深入探讨Hologres的特点及其在实时数仓中的应用,并通过具体的代码示例来展示其实际应用。
163 0
|
2月前
|
存储 机器学习/深度学习 监控
阿里云 Hologres OLAP 解决方案评测
随着大数据时代的到来,企业面临着海量数据的挑战,如何高效地进行数据分析和决策变得尤为重要。阿里云推出的 Hologres OLAP(在线分析处理)解决方案,旨在为用户提供快速、高效的数据分析能力。本文将深入探讨 Hologres OLAP 的特点、优势以及应用场景,并针对方案的技术细节、部署指导、代码示例和数据分析需求进行评测。
127 7
|
2月前
|
运维 数据挖掘 OLAP
阿里云Hologres:一站式轻量级OLAP分析平台的全面评测
在数据驱动决策的今天,企业对高效、灵活的数据分析平台的需求日益增长。阿里云的Hologres,作为一站式实时数仓引擎,提供了强大的OLAP(在线分析处理)分析能力。本文将对Hologres进行深入评测,探讨其在多源集成、性能、易用性以及成本效益方面的表现。
128 7