离线数仓(七)【DIM 层开发】(2)https://developer.aliyun.com/article/1532405
1.9、日期维度表
前面的四张维度表都是每日全量快照表而且来自于业务系统,但时间维度表的数据并不是来自于业务系统,而是手动写入,并且由于时间维度表数据的可预见性,无须每日导入,一般可一次性导入一年的数据(主要是因为我们国家每年的节假日由国务院一年公布一次)。所以设计日期维度表也就没有选择主维表和相关维表这一步了,而确认维度属性取决于我们数仓的计算周期,并尽可能使得维度属性更加丰富。
日期维度表中每一行代表什么取决于我们数仓的计算周期,通常以天为单位。
1.9.1、建表语句
日期维度表不需要分区,因为没有必要
DROP TABLE IF EXISTS dim_date; CREATE EXTERNAL TABLE dim_date ( `date_id` STRING COMMENT '日期ID', `week_id` STRING COMMENT '周ID,一年中的第几周', `week_day` STRING COMMENT '周几', `day` STRING COMMENT '每月的第几天', `month` STRING COMMENT '一年中的第几月', `quarter` STRING COMMENT '一年中的第几季度', `year` STRING COMMENT '年份', `is_workday` STRING COMMENT '是否是工作日', `holiday_id` STRING COMMENT '节假日' ) COMMENT '时间维度表' STORED AS ORC LOCATION '/warehouse/gmall/dim/dim_date/' TBLPROPERTIES ('orc.compress' = 'snappy');
1.9.2、加载语句
日期维度表的数据不是来自于业务系统,而是来自于我们自己上传。为了批量导入一年的数据,我们总不能一天一天的 insert ,而是先生成一个一年的日期文件,每行代表一天,然后把它 load 到我们的日期维度表当中,但是不能直接 load ,因为我们的日期维度表是一个 ORC 文件(STORE AS ORC 而不是 STORE AS TEXTFILE),所以我们还需要借助一个中间表(表结构应该和日期维度表一致,除了 STORE AS 字段)进行 insert + select 来导入到日期维度表当中。
(1)创建临时表
DROP TABLE IF EXISTS tmp_dim_date_info; CREATE EXTERNAL TABLE tmp_dim_date_info ( `date_id` STRING COMMENT '日', `week_id` STRING COMMENT '周ID', `week_day` STRING COMMENT '周几', `day` STRING COMMENT '每月的第几天', `month` STRING COMMENT '第几月', `quarter` STRING COMMENT '第几季度', `year` STRING COMMENT '年', `is_workday` STRING COMMENT '是否是工作日', `holiday_id` STRING COMMENT '节假日' ) COMMENT '时间维度表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/warehouse/gmall/tmp/tmp_dim_date_info/';
注意:对于分区表的数据 ,我们不能直接把数据所在文件放到 location 字段所指定的路径来实现数据插入,因为分区表的元数据中存储着分区目录文件的信息,我们自己偷摸放到里面是没有用的。但是对于非分区表是 OK 的。所以我们这里直接上传到临时表的 location 即可。
(2)将数据文件上传到HFDS上临时表路径
/warehouse/gmall/tmp/tmp_dim_date_info
(3)执行以下语句将其导入时间维度表
insert overwrite table dim_date select * from tmp_dim_date_info;
1.10、用户维度表
1.10.1、回顾拉链表
之前我们在学习数仓建模的维度表的设计时讲到,对于维度属性变化的表,我们保存维度的策略有两种:全量快照表和拉链表,对于这里的用户信息,它不适合每天全量快照因为它内容很多,但是变化很小,所以我们一般把它做成拉链表,因为拉链表它不会保存那些不变化的数据,减少数据冗余。
那拉链表的每行代表什么?是一个维度对象的一个状态,可能是历史状态,也可能是最新的 状态(结束日期是极大值 9999-12-31);列是我们用户维度的一些常规属性(比如姓名、手机号等),特殊的是拉链表需要多创建两个字段:开始日期和结束日期,它们形容的是我们这个维度状态的有效期范围。
如何使用拉链表?
拉链表同样是维度表,所以我们依然是用事实表和它去做关联,关联的时候的规则同样是哪一天发生的事实去 join 哪天的维度状态。现在的问题是我们如何通过拉链表获得那一天的全量状态。
对于最新数据,我们可以直接查询满足状态结束日期为 9999-12-31 的数据。对于某天的数据,比如 2023-12-1 我们只要要求 状态开始日期 <= 2023-12-1 <= 状态结束日期 即可。
1.10.2、设计用户维度表
拉链表和前面的全量快照表一样都是维度表,那么我们同样在设计拉链表的时候需要参考业务系统来确定选择哪些字段。
主维表和相关维表
业务系统当中和我们用户表相关的表只有 user_info ,所以我们将来用户维度表的维度基本上就都是从该表中的字段选取过来的。此外,我们需要遵守拉链表的规定,额外创建两个字段(start_date 和 end_date )带表示该行维度状态的有效时间。
分区
拉链表怎么分区?我们分区的意义当然是希望能够快速的查到我们需要的信息,而对于前面的全量快照表,我们可以直接通过分区查询到具体到哪一天的数据;而对于这里的增量表,我们的查询场景往往就是:1. 获取最新的用户状态信息 2.查询用户历史状态信息 。而实际开发中对于查询最新状态信息肯定是最频繁的,所以我们拉链表的分区设计应该尽可能为查询最新维度状态数据做考虑。
这里,我们按照用户状态过期的日期做分区,也就是说,对于用户维度表,每天我们创建的分区都是当日过期的用户状态。同时对于用户最新状态(结束日期字段为极大值)单独放到一个分区。这里的细节有很多:比如我们 HDFS 的数据是只读的,那如果用户的状态发生改变那我存放在最新状态分区(dt=9999-12-31)中的数据不就不是最新的了吗,那怎么去修改?这个问题我们放到拉链表的装载过程去回答。
DROP TABLE IF EXISTS dim_user_zip; CREATE EXTERNAL TABLE dim_user_zip ( `id` STRING COMMENT '用户id', `login_name` STRING COMMENT '用户名称', `nick_name` STRING COMMENT '用户昵称', `name` STRING COMMENT '用户姓名', `phone_num` STRING COMMENT '手机号码', `email` STRING COMMENT '邮箱', `user_level` STRING COMMENT '用户等级', `birthday` STRING COMMENT '生日', `gender` STRING COMMENT '性别', `create_time` STRING COMMENT '创建时间', `operate_time` STRING COMMENT '操作时间', `start_date` STRING COMMENT '开始日期', `end_date` STRING COMMENT '结束日期' ) COMMENT '用户表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dim/dim_user_zip/' TBLPROPERTIES ('orc.compress' = 'snappy');
1.10.2、装载拉链表
可以看到,拉链表首日装载和之后的装载逻辑是不一样的:首日装载直接从我们业务系统中全量同步过来然后增加两个字段:start_date = 首日装载时间,end_date =日期极大值;之后的装载是那当天的新增和变化值和我们昨天的拉链表进行合并,然后需要修改各自的 start_date 和 end_date 字段。
数据流向
首日全量数据:
首日全量的数据是都放到最新状态分区的(也就是 dt=9999-12-31)
对于之后的数据,我们首先需要从用户的新增和变化数据中过滤出用户的最后一个状态,然后合并到最新用户状态分区,那么原本最新用户状态分区中就存在数据过期,就需要我们拿出来放到当天的分区中。
所以对于之后的数据都采用这种方式来分区,那么我们在装载的时候就需要写两条 SQL 语句:首日装载语句和每日装载语句。
1.10.2.1、首日装载
insert overwrite table dim_user_zip partition (dt='9999-12-31') select data.id, data.login_name, data.nick_name, md5(data.name), md5(data.phone_num), md5(data.email), data.user_level, data.birthday, data.gender, data.create_time, data.operate_time, '2020-06-14' start_date, '9999-12-31' end_date from ods_user_info_inc where dt='2020-06-14' and type='bootstrap-insert';
1.10.2.2、每日装载语句
我们首先要清楚数据来源和目的地,对于每日装载,我们需要通过 Maxwell 从业务系统重获得用户变更操作日志(Maxwell 会把这部分日志每日增量同步到 ods_user_info_inc ),然后把这部分数据中的新增及变化数据对我们的 9999-12-31 的分区的数据进行修改合并(把过期的数据移动到当日的过期分区)。
所以我们的数据源和目的地各有两个:数据源:9999-12-31 分区和当日的增量数据 ods_user_info_inc,目的地:9999-12-31 分区和当日的过期数据 dim_user_zip 。
我们现在要做的就是把截止前一天的全量最新数据中的过时数据放到今日过期分区,把新增及变化的数据放到 9999-12-31 分区中。
具体来说,我们可以对截止前一天的全量最新数据(9999-12-31分区)和 ODS 层今日新增及变化数据(ods_user_info_inc 的 dt=今天)的数据通过 id 进行 full join,上图中蓝色的部分就是两表 join 之后匹配不上的部分,它们的单元格全部为 null。这样,我们的数据可以分为三份:1. 没有修改的数据,2. 被修改的数据,3. 新增的数据
这里我们把左边的截止昨天最新数据表叫做 a,今日新增及变化表叫做 b。对于没有修改的数据,我们直接在 SQL 中判断 if (a.id !=null and b.id=null) 这样的数据就是没有修改的数据。对于修改了的数据,我们只需判断(if(b.id !=null),b.xxx,a.xxx)。对于新增数据我们只要判断 if(a.id =null and b.id !=null )即可。
截止昨日全量最新数据(9999-12-31)
-- 截止昨天全量最新数据 select id, login_name, nick_name, name, phone_num, email, user_level, birthday, gender, create_time, operate_time, start_date, end_date, dt from dim_user_zip where dt='9999-12-31';
今日新增及变化(ods_user_info_inc)
这里我们需要得到的是每个用户的最后一个状态,因为用户可能一天修改了多次状态。
-- 今天的新增及变化数据 需要注意这里得到的所有的状态数据,比如一个用户一天修改了10次用户名。我们只要他的最后状态 select id, login_name, nick_name, md5(name), md5(phone_num), md5(email), user_level, birthday, gender, create_time, operate_time, '2020-06-15' start_date, '9999-12-31' end_date from ( select data.id, data.login_name, data.nick_name, data.name, data.phone_num, data.email, data.user_level, data.birthday, data.gender, data.create_time, data.operate_time, row_number() over (partition by data.id order by ts desc) rk from ods_user_info_inc where dt='2020-06-15' )t1 where rk=1;
full join
with tmp as ( select old.id old_id, old.login_name old_login_name, old.nick_name old_nick_name, old.name old_name, old.phone_num old_phone_num, old.email old_email, old.user_level old_user_level, old.birthday old_birthday, old.gender old_gender, old.create_time old_create_time, old.operate_time old_operate_time, old.start_date old_start_date, old.end_date old_end_date, new.id new_id, new.login_name new_login_name, new.nick_name new_nick_name, new.name new_name, new.phone_num new_phone_num, new.email new_email, new.user_level new_user_level, new.birthday new_birthday, new.gender new_gender, new.create_time new_create_time, new.operate_time new_operate_time, new.start_date new_start_date, new.end_date new_end_date from ( select id, login_name, nick_name, name, phone_num, email, user_level, birthday, gender, create_time, operate_time, start_date, end_date from dim_user_zip where dt='9999-12-31' )old full outer join ( select id, login_name, nick_name, md5(name) name, md5(phone_num) phone_num, md5(email) email, user_level, birthday, gender, create_time, operate_time, '2020-06-15' start_date, '9999-12-31' end_date from ( select data.id, data.login_name, data.nick_name, data.name, data.phone_num, data.email, data.user_level, data.birthday, data.gender, data.create_time, data.operate_time, row_number() over (partition by data.id order by ts desc) rn from ods_user_info_inc where dt='2020-06-15' )t1 where rn=1 )new on old.id=new.id ) insert overwrite table dim_user_zip partition(dt) -- 更新后的全量最新数据 select if(new_id is not null,new_id,old_id), if(new_id is not null,new_login_name,old_login_name), if(new_id is not null,new_nick_name,old_nick_name), if(new_id is not null,new_name,old_name), if(new_id is not null,new_phone_num,old_phone_num), if(new_id is not null,new_email,old_email), if(new_id is not null,new_user_level,old_user_level), if(new_id is not null,new_birthday,old_birthday), if(new_id is not null,new_gender,old_gender), if(new_id is not null,new_create_time,old_create_time), if(new_id is not null,new_operate_time,old_operate_time), if(new_id is not null,new_start_date,old_start_date), if(new_id is not null,new_end_date,old_end_date), if(new_id is not null,new_end_date,old_end_date) dt from tmp union all -- 过期数据 select old_id, old_login_name, old_nick_name, old_name, old_phone_num, old_email, old_user_level, old_birthday, old_gender, old_create_time, old_operate_time, old_start_date, cast(date_add('2020-06-15',-1) as string) old_end_date, cast(date_add('2020-06-15',-1) as string) dt from tmp where old_id is not null and new_id is not null;
离线数仓(七)【DIM 层开发】(4)https://developer.aliyun.com/article/1532407