背景信息:
在数据仓库的数据模型设计过程中,经常会遇到这样的需求:
-
数据量比较大;
-
表中的部分字段会被update,如用户的地址,产品的描述信息,订单的状态、手机号码等等;
-
需要查看某一个时间点或者时间段的历史快照信息。(比如,查看某一个订单在历史某一个时间点的状态,比如,查看某一个用户在过去某一段时间内,更新过几次等等)
-
变化的比例和频率不是很大,比如,总共有1000万的会员,每天新增和发生变化的有10万左右;如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费;
综上所述:引入'拉链历史表',既能满足反应数据的历史状态,又可以最大程度的节省存储。
(备注:在阿里巴巴内部很大程度上是基于存储换计算来提供开发的效率及易用性,因为在当今,存储的成本远低于CPU和内存。因此在阿里巴巴内部会采用快照的方式将每日的全量数据进行快照,同时也会通过极限存储的方式,压缩率高,在合适的场景下,约能压缩为原始数据的1/30。)
数据量比较大;
表中的部分字段会被update,如用户的地址,产品的描述信息,订单的状态、手机号码等等;
需要查看某一个时间点或者时间段的历史快照信息。(比如,查看某一个订单在历史某一个时间点的状态,比如,查看某一个用户在过去某一段时间内,更新过几次等等)
变化的比例和频率不是很大,比如,总共有1000万的会员,每天新增和发生变化的有10万左右;如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费;
Demo数据
以下只是demo如何在MaxCompute中实现拉链表,所以是基于一些假设:
-
同一天中同一订单只有一个状态发生;
-
基于20150821及之前的数据并没有同一个订单有两个状态的最简单场景模拟;
-
且数据源在阿里云RDS for Mysql中。且表明为orders。
同一天中同一订单只有一个状态发生;
基于20150821及之前的数据并没有同一个订单有两个状态的最简单场景模拟;
且数据源在阿里云RDS for Mysql中。且表明为orders。
20150821以及之前的历史订单数据:
订单ID
创建时间
修改时间
状态
1
20150818
20150818
创建
2
20150818
20150818
创建
3
20150819
20150821
支付
4
20150819
20150821
完成
5
20150819
20150820
支付
6
20150820
20150820
创建
7
20150820
20150821
支付
8
20150821
20150821
创建
订单ID
|
创建时间
|
修改时间
|
状态
|
1
|
20150818
|
20150818
|
创建
|
2
|
20150818
|
20150818
|
创建
|
3
|
20150819
|
20150821
|
支付
|
4
|
20150819
|
20150821
|
完成
|
5
|
20150819
|
20150820
|
支付
|
6
|
20150820
|
20150820
|
创建
|
7
|
20150820
|
20150821
|
支付
|
8
|
20150821
|
20150821
|
创建
|
20150822订单数据:
订单ID
创建时间
修改时间
状态
1
20150818
20150822
支付
2
20150818
20150822
完成
6
20150820
20150822
支付
8
20150821
20150822
支付
9
20150822
20150822
创建
10
20150822
20150822
支付
订单ID
|
创建时间
|
修改时间
|
状态
|
1
|
20150818
|
20150822
|
支付
|
2
|
20150818
|
20150822
|
完成
|
6
|
20150820
|
20150822
|
支付
|
8
|
20150821
|
20150822
|
支付
|
9
|
20150822
|
20150822
|
创建
|
10
|
20150822
|
20150822
|
支付
|
20150823的订单数据:
订单ID
创建时间
修改时间
状态
1
20150818
20150823
完成
3
20150819
20150823
完成
5
20150819
20150823
完成
8
20150821
20150823
完成
11
20150823
20150823
创建
12
20150823
20150823
创建
13
20150823
20150823
支付
创建MaxCompute表
订单ID
|
创建时间
|
修改时间
|
状态
|
1
|
20150818
|
20150823
|
完成
|
3
|
20150819
|
20150823
|
完成
|
5
|
20150819
|
20150823
|
完成
|
8
|
20150821
|
20150823
|
完成
|
11
|
20150823
|
20150823
|
创建
|
12
|
20150823
|
20150823
|
创建
|
13
|
20150823
|
20150823
|
支付
|
--ODS层:订单的增量数据表,按天分区,存放每天的增量数据
CREATE TABLE ods_orders_inc_d
(
orderid BIGINT
,createtime STRING
,modifiedtime STRING
,o_status STRING
)
PARTITIONED BY (dt STRING)
LIFECYCLE 7;
--DW层:历史数据拉链表,存放订单的历史状态数据
CREATE TABLE dw_orders_his_d
(
orderid BIGINT COMMENT '订单ID'
,createtime STRING COMMENT '订单创建时间'
,modifiedtime STRING COMMENT '订单修改时间'
,o_status STRING COMMENT '订单修改时间'
,dw_start_date STRING COMMENT '订单生命周期开始时间'
,dw_end_date STRING COMMENT '订单生命周期结束时间'
);
实现思路
CREATE TABLE ods_orders_inc_d
(
orderid BIGINT
,createtime STRING
,modifiedtime STRING
,o_status STRING
)
PARTITIONED BY (dt STRING)
LIFECYCLE 7;
CREATE TABLE dw_orders_his_d
(
orderid BIGINT COMMENT '订单ID'
,createtime STRING COMMENT '订单创建时间'
,modifiedtime STRING COMMENT '订单修改时间'
,o_status STRING COMMENT '订单修改时间'
,dw_start_date STRING COMMENT '订单生命周期开始时间'
,dw_end_date STRING COMMENT '订单生命周期结束时间'
);
-
全量初始化:将2015-08-21及以前的全量历史数据通过全量方式同步至ODS并刷进DW层。
-
增量更新:将2015-08-22、2015-08-23的全天增量数据以增量方式刷入下游数据。
全量初始化:将2015-08-21及以前的全量历史数据通过全量方式同步至ODS并刷进DW层。
增量更新:将2015-08-22、2015-08-23的全天增量数据以增量方式刷入下游数据。
全量初始化
-
创建节点任务:数据同步
-
选择调度类型:手动调度
-
配置数据同步任务:Mysql:Orders-->ODPS:ods_orders_inc_d
-
where条件配置:modifiedtime <= '20150821'
-
分区值dt=20150821
提交调度系统,待数据同步任务执行成功后,再将ODS数据刷入DW。
创建SQL脚本:
INSERT overwrite TABLE dw_orders_his_d
SELECT orderid,createtime,modifiedtime,o_status,createtime AS dw_start_date,'99991231' AS dw_end_date
FROM ods_orders_inc_d
WHERE dt = '20150821';
数据如下:
订单ID
创建时间
修改时间
状态
start_date
end_date
1
20150818
20150818
创建
20150818
99991231
2
20150818
20150818
创建
20150818
99991231
3
20150819
20150821
支付
20150819
99991231
4
20150819
20150821
完成
20150819
99991231
5
20150819
20150820
支付
20150819
99991231
6
20150820
20150820
创建
20150820
99991231
7
20150820
20150821
支付
20150820
99991231
8
20150821
20150821
创建
20150821
99991231
通过以上步骤可以将2015-08-21及以前的历史全量数据一次性刷入DW和ODS中。
创建节点任务:数据同步
选择调度类型:手动调度
配置数据同步任务:Mysql:Orders-->ODPS:ods_orders_inc_d
where条件配置:modifiedtime <= '20150821'
分区值dt=20150821
INSERT overwrite TABLE dw_orders_his_d
SELECT orderid,createtime,modifiedtime,o_status,createtime AS dw_start_date,'99991231' AS dw_end_date
FROM ods_orders_inc_d
WHERE dt = '20150821';
订单ID
|
创建时间
|
修改时间
|
状态
|
start_date
|
end_date
|
1
|
20150818
|
20150818
|
创建
|
20150818
|
99991231
|
2
|
20150818
|
20150818
|
创建
|
20150818
|
99991231
|
3
|
20150819
|
20150821
|
支付
|
20150819
|
99991231
|
4
|
20150819
|
20150821
|
完成
|
20150819
|
99991231
|
5
|
20150819
|
20150820
|
支付
|
20150819
|
99991231
|
6
|
20150820
|
20150820
|
创建
|
20150820
|
99991231
|
7
|
20150820
|
20150821
|
支付
|
20150820
|
99991231
|
8
|
20150821
|
20150821
|
创建
|
20150821
|
99991231
|
通过以上步骤可以将2015-08-21及以前的历史全量数据一次性刷入DW和ODS中。
增量抽取并生成拉链表
-
创建工作流任务并选择周期性调度。
-
依次拖入数据同步节点任务和SQL任务。
-
在数据同步任务中where条件配置为:modifiedtime=${bdp.system.bizdate}
-
目标表ods_orders_inc_d分区配置为dt=${bdp.system.bizdate}
-
配置SQL节点,且为数据同步节点的下游节点。
--通过DW历史数据和ODS增量数据刷新DW表
insert overwrite table dw_orders_his_d
SELECT a0.orderid, a0.createtime, a0.modifiedtime, a0.o_status, a0.dw_start_date, a0.dw_end_date
FROM (
-- 对orderid进行开窗然后按照生命周期结束时间倒序排,支持重跑
SELECT a1.orderid, a1.createtime, a1.modifiedtime, a1.o_status, a1.dw_start_date, a1.dw_end_date
, ROW_NUMBER() OVER (distribute BY a1.orderid,a1.createtime, a1.modifiedtime,a1.o_status sort BY a1.dw_end_date DESC) AS nums
FROM (
-- 用历史数据与增量22日的数据进行匹配,当发现在22日新增数据中存在且end_date > 当前日期的就表示数据状态发生过变化,然后修改生命周期
-- 修改昨日已经生命截止的数据并union最新增量数据到DW
SELECT a.orderid, a.createtime, a.modifiedtime, a.o_status, a.dw_start_date
, CASE
WHEN b.orderid IS NOT NULL AND a.dw_end_date > ${bdp.system.bizdate} THEN ${yesterday}
ELSE a.dw_end_date
END AS dw_end_date
FROM dw_orders_his_d a
LEFT OUTER JOIN (
SELECT *
FROM ods_orders_inc_d
WHERE dt = ${bdp.system.bizdate}
) b
ON a.orderid = b.orderid
UNION ALL
--2015-08-22的增量数据刷新到DW
SELECT orderid, createtime, modifiedtime, o_status, modifiedtime AS dw_start_date
, '99991231' AS dw_end_date
FROM ods_orders_inc_d
WHERE dt = ${bdp.system.bizdate}
) a1
) a0
-- 开窗口后对某个订单中生命周期为'9999-12-31'的取值并写入,防止重跑数据情况。
WHERE a0.nums = 1
order by a0.orderid,a0.dw_start_date;
备注:测试运行的时候,选择业务日期为20150822。也可以通过补数据方式,直接把20150822和20150823两天的增量数据刷入DW中。上面SQL中${yesterday}为自定义变量,其赋值为${yyyymmdd-1}
通过如上方式将20150822的增量数据刷入DW,如下所示:
订单ID
创建时间
修改时间
状态
start_date
end_date
1
20150818
20150818
创建
20150818
20150821
1
20150818
20150822
支付
20150822
99991231
2
20150818
20150818
创建
20150818
20150821
2
20150818
20150822
完成
20150822
99991231
3
20150819
20150821
支付
20150819
99991231
4
20150819
20150821
完成
20150819
99991231
5
20150819
20150820
支付
20150819
99991231
6
20150820
20150820
创建
20150820
20150821
6
20150820
20150822
支付
20150822
99991231
7
20150820
20150821
支付
20150820
99991231
8
20150821
20150821
创建
20150821
20150821
8
20150821
20150822
支付
20150822
99991231
9
20150822
20150822
创建
20150822
99991231
10
20150822
20150822
支付
20150822
99991231
通过同样的方式将2015-08-23日的数据增量输入DW,其结果为:
订单ID
创建时间
修改时间
状态
start_date
end_date
1
20150818
20150818
创建
20150818
20150821
1
20150818
20150822
支付
20150822
20150822
1
20150818
20150823
完成
20150823
99991231
2
20150818
20150818
创建
20150818
20150821
2
20150818
20150822
完成
20150822
99991231
3
20150819
20150821
支付
20150819
20150822
3
20150819
20150823
完成
20150823
99991231
4
20150819
20150821
完成
20150819
99991231
5
20150819
20150820
支付
20150819
20150822
5
20150819
20150823
完成
20150823
99991231
6
20150820
20150820
创建
20150820
20150821
6
20150820
20150822
支付
20150822
99991231
7
20150820
20150821
支付
20150820
99991231
8
20150821
20150821
创建
20150821
20150821
8
20150821
20150822
支付
20150822
20150822
8
20150821
20150823
完成
20150823
99991231
9
20150822
20150822
创建
20150822
99991231
10
20150822
20150822
支付
20150822
99991231
11
20150823
20150823
创建
20150823
99991231
12
20150823
20150823
创建
20150823
99991231
13
20150823
20150823
支付
20150823
99991231
创建工作流任务并选择周期性调度。
依次拖入数据同步节点任务和SQL任务。
在数据同步任务中where条件配置为:modifiedtime=${bdp.system.bizdate}
目标表ods_orders_inc_d分区配置为dt=${bdp.system.bizdate}
配置SQL节点,且为数据同步节点的下游节点。
--通过DW历史数据和ODS增量数据刷新DW表
insert overwrite table dw_orders_his_d
SELECT a0.orderid, a0.createtime, a0.modifiedtime, a0.o_status, a0.dw_start_date, a0.dw_end_date
FROM (
-- 对orderid进行开窗然后按照生命周期结束时间倒序排,支持重跑
SELECT a1.orderid, a1.createtime, a1.modifiedtime, a1.o_status, a1.dw_start_date, a1.dw_end_date
, ROW_NUMBER() OVER (distribute BY a1.orderid,a1.createtime, a1.modifiedtime,a1.o_status sort BY a1.dw_end_date DESC) AS nums
FROM (
-- 用历史数据与增量22日的数据进行匹配,当发现在22日新增数据中存在且end_date > 当前日期的就表示数据状态发生过变化,然后修改生命周期
-- 修改昨日已经生命截止的数据并union最新增量数据到DW
SELECT a.orderid, a.createtime, a.modifiedtime, a.o_status, a.dw_start_date
, CASE
WHEN b.orderid IS NOT NULL AND a.dw_end_date > ${bdp.system.bizdate} THEN ${yesterday}
ELSE a.dw_end_date
END AS dw_end_date
FROM dw_orders_his_d a
LEFT OUTER JOIN (
SELECT *
FROM ods_orders_inc_d
WHERE dt = ${bdp.system.bizdate}
) b
ON a.orderid = b.orderid
UNION ALL
--2015-08-22的增量数据刷新到DW
SELECT orderid, createtime, modifiedtime, o_status, modifiedtime AS dw_start_date
, '99991231' AS dw_end_date
FROM ods_orders_inc_d
WHERE dt = ${bdp.system.bizdate}
) a1
) a0
-- 开窗口后对某个订单中生命周期为'9999-12-31'的取值并写入,防止重跑数据情况。
WHERE a0.nums = 1
order by a0.orderid,a0.dw_start_date;
备注:测试运行的时候,选择业务日期为20150822。也可以通过补数据方式,直接把20150822和20150823两天的增量数据刷入DW中。上面SQL中${yesterday}为自定义变量,其赋值为${yyyymmdd-1}
订单ID
|
创建时间
|
修改时间
|
状态
|
start_date
|
end_date
|
1
|
20150818
|
20150818
|
创建
|
20150818
|
20150821
|
1
|
20150818
|
20150822
|
支付
|
20150822
|
99991231
|
2
|
20150818
|
20150818
|
创建
|
20150818
|
20150821
|
2
|
20150818
|
20150822
|
完成
|
20150822
|
99991231
|
3
|
20150819
|
20150821
|
支付
|
20150819
|
99991231
|
4
|
20150819
|
20150821
|
完成
|
20150819
|
99991231
|
5
|
20150819
|
20150820
|
支付
|
20150819
|
99991231
|
6
|
20150820
|
20150820
|
创建
|
20150820
|
20150821
|
6
|
20150820
|
20150822
|
支付
|
20150822
|
99991231
|
7
|
20150820
|
20150821
|
支付
|
20150820
|
99991231
|
8
|
20150821
|
20150821
|
创建
|
20150821
|
20150821
|
8
|
20150821
|
20150822
|
支付
|
20150822
|
99991231
|
9
|
20150822
|
20150822
|
创建
|
20150822
|
99991231
|
10
|
20150822
|
20150822
|
支付
|
20150822
|
99991231
|
订单ID
|
创建时间
|
修改时间
|
状态
|
start_date
|
end_date
|
1
|
20150818
|
20150818
|
创建
|
20150818
|
20150821
|
1
|
20150818
|
20150822
|
支付
|
20150822
|
20150822
|
1
|
20150818
|
20150823
|
完成
|
20150823
|
99991231
|
2
|
20150818
|
20150818
|
创建
|
20150818
|
20150821
|
2
|
20150818
|
20150822
|
完成
|
20150822
|
99991231
|
3
|
20150819
|
20150821
|
支付
|
20150819
|
20150822
|
3
|
20150819
|
20150823
|
完成
|
20150823
|
99991231
|
4
|
20150819
|
20150821
|
完成
|
20150819
|
99991231
|
5
|
20150819
|
20150820
|
支付
|
20150819
|
20150822
|
5
|
20150819
|
20150823
|
完成
|
20150823
|
99991231
|
6
|
20150820
|
20150820
|
创建
|
20150820
|
20150821
|
6
|
20150820
|
20150822
|
支付
|
20150822
|
99991231
|
7
|
20150820
|
20150821
|
支付
|
20150820
|
99991231
|
8
|
20150821
|
20150821
|
创建
|
20150821
|
20150821
|
8
|
20150821
|
20150822
|
支付
|
20150822
|
20150822
|
8
|
20150821
|
20150823
|
完成
|
20150823
|
99991231
|
9
|
20150822
|
20150822
|
创建
|
20150822
|
99991231
|
10
|
20150822
|
20150822
|
支付
|
20150822
|
99991231
|
11
|
20150823
|
20150823
|
创建
|
20150823
|
99991231
|
12
|
20150823
|
20150823
|
创建
|
20150823
|
99991231
|
13
|
20150823
|
20150823
|
支付
|
20150823
|
99991231
|
如何使用拉链表
-
查看某一天的全量历史快照数据。
SELECT *
FROM dw_orders_his_d
WHERE dw_start_date <= '20150822'
AND dw_end_date >= '20150822'
ORDER BY orderid
LIMIT 10000;
-
取一段时间的变化记录集合,如在20150822-20150823变化的记录。
SELECT *
FROM dw_orders_his_d
WHERE dw_start_date <= '20150823'
AND dw_end_date >= '20150822'
ORDER BY orderid
LIMIT 10000;
-
查看某一订单历史变化情况。
SELECT *
FROM dw_orders_his_d
WHERE orderid = 8
ORDER BY dw_start_date;
-
取最新的数据。
SELECT *
FROM dw_orders_his_d
WHERE dw_end_date = '99991231'
关于基于历史拉链表回滚某一天或一段时间内的数据,还是一个相对比较复杂的话题,这个可以下载再谈。
查看某一天的全量历史快照数据。
SELECT *
FROM dw_orders_his_d
WHERE dw_start_date <= '20150822'
AND dw_end_date >= '20150822'
ORDER BY orderid
LIMIT 10000;
取一段时间的变化记录集合,如在20150822-20150823变化的记录。
SELECT *
FROM dw_orders_his_d
WHERE dw_start_date <= '20150823'
AND dw_end_date >= '20150822'
ORDER BY orderid
LIMIT 10000;
查看某一订单历史变化情况。
SELECT *
FROM dw_orders_his_d
WHERE orderid = 8
ORDER BY dw_start_date;
取最新的数据。
SELECT *
FROM dw_orders_his_d
WHERE dw_end_date = '99991231'