数仓开发之DWS层(需要思考,考验一个人的能力)
设计要点:
(1)DWS层的设计参考指标体系。
(2)DWS层的数据存储格式为orc列式存储+snappy压缩。
(3)DWS层表名的命名规范为dws_数据域_统计粒度_业务过程_统计周期(1d/nd/td)。
注:1d表示最近1日,nd表示最近n日,td表示历史至今(本质是时间范围来统计)
表的分类:根据数据范围进行分类 --1d:1天的数据的统计 --数据来源为DIM,DWD --nd:N天的数据的统计 --数据来源必须为1d的表 --td:所有数据的统计 --数据来源可以为1d表,也可以为DIM,DWD --表的设计 --参考ADS层表的设计 --指标体系: --原子指标 --行为,统计字段,统计逻辑 --派生指标(增加条件) --统计周期(范围)+业务限定(筛选条件)+统计粒度(分组维度) --衍生指标(比率,比例) --表名 --为dwd_数据域_统计粒度+业务过程+统计周期(1d/nd/td) --指标:客户想要的一个统计结果(数值) 业务过程相同:数据来源相同 统计周期相同:数据范围相同 统计粒度相同:数据含义相同
DWS层-表的设计问题
--1d表 --nd表 --td表 DROP TABLE IF EXISTS dws_order_stats_by_cate; CREATE EXTERNAL TABLE dws_order_stats_by_tm_1d_a ( `tm_id`, STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', `order_id` BIGINT COMMENT '用户ID', `order_user_count` BIGINT COMMENT '下单人数' ) COMMENT '各品类商品下单统计' partition by(`dt` String) stored as orc location '/warehourse/gmall/dws/dws_order_stats_by_tm_1d/' tblproperties('orc.compress'='snappy');
nd(7d): 2022-06-(02~08) sum(order_id)7 sum(user_id)7d dws层表的字段在预统计时,如果字段可以跨越天,那么就不能再每天中统计 因为最终统计需要指定的字段,但是提前聚合不能对这个字段做统计,所以为了避免数据丢失 需要再表中增加这个字段,而不是统计这个字段 insert overwrite table dws_order_stats_by_tm_1d_a partition (dt='2022-06-08') select tm_id,tm_name, user_id, count(distinct order_id) from( select order_id,sku_id,user_id from dwd_trade_order_detail_inc where dt='2022-06-08' )od left join( select id,tm_id,tm_name from dim_sku_full where dt='2022-06-08' )sku on od.sku_id=sku.id group by tm_id,tm_name 2022-06-08: 华为 zhangsan 12 华为 李四 5 select '2022-06-08',1,tm_id,tm_name,sum(order_count_1d), count(user_id) from dws_order_stats_by_tm_1d_a where dt='2022-06-08' group by tm_id,tm_name --dws层设计目的其实就是简化计算,提前进行预聚合的操作 --底层实现,依然是数据写入文件,在读取文件,性能一定会受到影响 --如果表的设计可以在多个地方使用,那么就可以提高效率
我们为了提高性能,比如实现各品类商品下单人数,我们可以把表设计一个共用的
DROP TABLE IF EXISTS dws_order_stats_by_sku_a; CREATE EXTERNAL TABLE dws_order_stats_by_sku_a ( `sku_id` STRING COMMENT '品牌ID', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `tm_id`, STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', `order_id` BIGINT COMMENT '用户ID', `order_count_1d` BIGINT COMMENT '下单数', ) COMMENT '各品类商品下单统计' partition by(`dt` String) stored as orc location '/warehourse/gmall/dws/dws_order_stats_by_sku_a/' tblproperties('orc.compress'='snappy');
交易域用户商品粒度订单最近n日汇总表
--交易域 --用户商品粒度:user+sku --订单 --下单 --最近1日汇总表:数据范围 DROP TABLE IF EXISTS dws_trade_user_sku_order_1d; CREATE EXTERNAL TABLE dws_trade_user_sku_order_1d ( `user_id` STRING COMMENT '用户ID', `sku_id` STRING COMMENT 'SKU_ID', `sku_name` STRING COMMENT 'SKU名称', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `category2_id` STRING COMMENT '二级品类ID', `category2_name` STRING COMMENT '二级品类名称', `category3_id` STRING COMMENT '三级品类ID', `category3_name` STRING COMMENT '三级品类名称', `tm_id` STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', `order_count_1d` BIGINT COMMENT '最近1日下单次数', `order_num_1d` BIGINT COMMENT '最近1日下单件数', `order_original_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单原始金额', `activity_reduce_amount_1d` DECIMAL(16, 2) COMMENT '最近1日活动优惠金额', `coupon_reduce_amount_1d` DECIMAL(16, 2) COMMENT '最近1日优惠券优惠金额', `order_total_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单最终金额' ) COMMENT '交易域用户商品粒度订单最近1日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_trade_user_sku_order_1d' TBLPROPERTIES ('orc.compress' = 'snappy');
数据装载
--1d表存储的数据为1天的行为数据的统计结果,存放在一天的分区中
--dwd的数据其实是包含历史行为数据(7,6,5,4),历史数据也应该进行统计
--1d表数据装载分为首日装载和每日装载
--首日装载:包含历史数据
--每日装载:包含当日数据
insert overwrite table dws_trade_user_sku_order_1d partition (dt) select `user_id` STRING COMMENT '用户ID', `sku_id` STRING COMMENT 'SKU_ID', `sku_name` STRING COMMENT 'SKU名称', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `category2_id` STRING COMMENT '二级品类ID', `category2_name` STRING COMMENT '二级品类名称', `category3_id` STRING COMMENT '三级品类ID', `category3_name` STRING COMMENT '三级品类名称', `tm_id` STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', count(distinct order_id) `order_count_1d` BIGINT COMMENT '最近1日下单次数', sum(sku_num) `order_num_1d` BIGINT COMMENT '最近1日下单件数', sum( split_original_amount) `order_original_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单原始金额', sum(split_coupon_amount)`activity_reduce_amount_1d` DECIMAL(16, 2) COMMENT '最近1日活动优惠金额', sum(split_coupon_amount) `coupon_reduce_amount_1d` DECIMAL(16, 2) COMMENT '最近1日优惠券优惠金额', sum( split_total_amount) `order_total_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单最终金额', dt from( select `user_id` STRING COMMENT '用户ID', `sku_id` STRING COMMENT 'SKU_ID', sku_num, split_original_amount, split_activity_amount, split_coupon_amount, split_total_amount, dt #那天下的订单 from dwd_trade_order_detail_inc )od left join( select id, `sku_name` STRING COMMENT 'SKU名称', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `category2_id` STRING COMMENT '二级品类ID', `category2_name` STRING COMMENT '二级品类名称', `category3_id` STRING COMMENT '三级品类ID', `category3_name` STRING COMMENT '三级品类名称', `tm_id` STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', from dim_sku_full where dt='2022-06-08' )sku on od.sku_id=sku.id group by dt, `user_id` , `sku_id` , `sku_name` , `category1_id` , `category1_name` , `category2_id` , `category2_name` , `category3_id` , `category3_name` , `tm_id` , `tm_name`
每日装载
insert overwrite table dws_trade_user_sku_order_1d partition (dt='2022-06-09') select `user_id` STRING COMMENT '用户ID', `sku_id` STRING COMMENT 'SKU_ID', `sku_name` STRING COMMENT 'SKU名称', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `category2_id` STRING COMMENT '二级品类ID', `category2_name` STRING COMMENT '二级品类名称', `category3_id` STRING COMMENT '三级品类ID', `category3_name` STRING COMMENT '三级品类名称', `tm_id` STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', count(distinct order_id) `order_count_1d` BIGINT COMMENT '最近1日下单次数', sum(sku_num) `order_num_1d` BIGINT COMMENT '最近1日下单件数', sum( split_original_amount) `order_original_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单原始金额', sum(split_coupon_amount)`activity_reduce_amount_1d` DECIMAL(16, 2) COMMENT '最近1日活动优惠金额', sum(split_coupon_amount) `coupon_reduce_amount_1d` DECIMAL(16, 2) COMMENT '最近1日优惠券优惠金额', sum( split_total_amount) `order_total_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单最终金额', from( select `user_id` STRING COMMENT '用户ID', `sku_id` STRING COMMENT 'SKU_ID', sku_num, split_original_amount, split_activity_amount, split_coupon_amount, split_total_amount, from dwd_trade_order_detail_inc where dt='2022-06-09' )od left join( select id, `sku_name` STRING COMMENT 'SKU名称', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `category2_id` STRING COMMENT '二级品类ID', `category2_name` STRING COMMENT '二级品类名称', `category3_id` STRING COMMENT '三级品类ID', `category3_name` STRING COMMENT '三级品类名称', `tm_id` STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', from dim_sku_full where dt='2022-06-09' )sku on od.sku_id=sku.id group by `user_id` , `sku_id` , `sku_name` , `category1_id` , `category1_name` , `category2_id` , `category2_name` , `category3_id` , `category3_name` , `tm_id` , `tm_name`
set hive.exec.dynamic.partition.mode=nonstrict; -- Hive的bug:对某些类型数据的处理可能会导致报错,关闭矢量化查询优化解决 set hive.vectorized.execution.enabled = false; insert overwrite table dws_trade_user_sku_order_1d partition(dt) select user_id, id, sku_name, category1_id, category1_name, category2_id, category2_name, category3_id, category3_name, tm_id, tm_name, order_count_1d, order_num_1d, order_original_amount_1d, activity_reduce_amount_1d, coupon_reduce_amount_1d, order_total_amount_1d, dt from ( select dt, user_id, sku_id, count(*) order_count_1d, sum(sku_num) order_num_1d, sum(split_original_amount) order_original_amount_1d, sum(nvl(split_activity_amount,0.0)) activity_reduce_amount_1d, sum(nvl(split_coupon_amount,0.0)) coupon_reduce_amount_1d, sum(split_total_amount) order_total_amount_1d from dwd_trade_order_detail_inc group by dt,user_id,sku_id )od left join ( select id, sku_name, category1_id, category1_name, category2_id, category2_name, category3_id, category3_name, tm_id, tm_name from dim_sku_full where dt='2022-06-08' )sku on od.sku_id=sku.id; -- 矢量化查询优化可以一定程度上提升执行效率,不会触发前述Bug时,应打开 set hive.vectorized.execution.enabled = true;
问题
me:补全数据后=>统计(效率低)=》缺什么补什么逻辑简单
course:统计=》补全数据(效率高)=>优化(连接之前减少数据)=》统计的结果和补全的数据没有关系
统计结果如果和补全的数据有关系,
--学习方式:由浅入深
交易域用户粒度加购最近n日汇总表
--最近n日汇总 --nd --表设计:参考1d表 --数据来源:1d表 DROP TABLE IF EXISTS dws_trade_user_sku_order_nd; CREATE EXTERNAL TABLE dws_trade_user_sku_order_nd ( `user_id` STRING COMMENT '用户ID', `sku_id` STRING COMMENT 'SKU_ID', `sku_name` STRING COMMENT 'SKU名称', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `category2_id` STRING COMMENT '二级品类ID', `category2_name` STRING COMMENT '二级品类名称', `category3_id` STRING COMMENT '三级品类ID', `category3_name` STRING COMMENT '三级品类名称', `tm_id` STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', `order_count_7d` STRING COMMENT '最近7日下单次数', `order_num_7d` BIGINT COMMENT '最近7日下单件数', `order_original_amount_7d` DECIMAL(16, 2) COMMENT '最近7日下单原始金额', `activity_reduce_amount_7d` DECIMAL(16, 2) COMMENT '最近7日活动优惠金额', `coupon_reduce_amount_7d` DECIMAL(16, 2) COMMENT '最近7日优惠券优惠金额', `order_total_amount_7d` DECIMAL(16, 2) COMMENT '最近7日下单最终金额', `order_count_30d` BIGINT COMMENT '最近30日下单次数', `order_num_30d` BIGINT COMMENT '最近30日下单件数', `order_original_amount_30d` DECIMAL(16, 2) COMMENT '最近30日下单原始金额', `activity_reduce_amount_30d` DECIMAL(16, 2) COMMENT '最近30日活动优惠金额', `coupon_reduce_amount_30d` DECIMAL(16, 2) COMMENT '最近30日优惠券优惠金额', `order_total_amount_30d` DECIMAL(16, 2) COMMENT '最近30日下单最终金额' ) COMMENT '交易域用户商品粒度订单最近n日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_trade_user_sku_order_nd' TBLPROPERTIES ('orc.compress' = 'snappy');
--nd表的数据装载基本思虑
--1.读取最大范围的数据:30d
2.同时计算不同时间范围的数据
--sum(if):有条件的求和
insert overwrite table dws_trade_user_sku_order_nd partition(dt='2022-06-08') select `user_id` STRING COMMENT '用户ID', `sku_id` STRING COMMENT 'SKU_ID', `sku_name` STRING COMMENT 'SKU名称', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `category2_id` STRING COMMENT '二级品类ID', `category2_name` STRING COMMENT '二级品类名称', `category3_id` STRING COMMENT '三级品类ID', `category3_name` STRING COMMENT '三级品类名称', `tm_id` STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', sum(if(date_sub('2022-06-08',6),order_count_1d,0 )), sum(if(date_sub('2022-06-08',6),order_num_1d,0)), sum(if(date_sub('2022-06-08',6),order_original_amount_1d,0)), sum(if(date_sub('2022-06-08',6),activity_reduce_amount_1d,0)), sum(if(date_sub('2022-06-08',6),coupon_reduce_amount_1d,0)), sum(if(date_sub('2022-06-08',6),order_total_amount_1d,0)), sum(order_count_1d), sum(order_num_1d), sum(order_original_amount_1d), sum(activity_reduce_amount_1d), sum(coupon_reduce_amount_1d), sum(order_total_amount_1d), from dws_trade_user_sku_order_1d where dt >=date_sub('2022-06-08',29) and dt<='2022-06-08' group by `user_id` STRING COMMENT '用户ID', `sku_id` STRING COMMENT 'SKU_ID', `sku_name` STRING COMMENT 'SKU名称', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `category2_id` STRING COMMENT '二级品类ID', `category2_name` STRING COMMENT '二级品类名称', `category3_id` STRING COMMENT '三级品类ID', `category3_name` STRING COMMENT '三级品类名称', `tm_id` STRING COMMENT '品牌ID', `tm_name`
交易域用户商品粒度订单最近1日汇总表
1)建表语句 DROP TABLE IF EXISTS dws_trade_user_sku_order_1d; CREATE EXTERNAL TABLE dws_trade_user_sku_order_1d ( `user_id` STRING COMMENT '用户ID', `order_count_1d` BIGINT COMMENT '最近1日下单次数', `order_num_1d` BIGINT COMMENT '最近1日下单件数', `order_original_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单原始金额', `activity_reduce_amount_1d` DECIMAL(16, 2) COMMENT '最近1日活动优惠金额', `coupon_reduce_amount_1d` DECIMAL(16, 2) COMMENT '最近1日优惠券优惠金额', `order_total_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单最终金额' ) COMMENT '交易域用户商品粒度订单最近1日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_trade_user_sku_order_1d' TBLPROPERTIES ('orc.compress' = 'snappy');
首日装载:含有历史数据
insert overwrite table dws_trade_user_order_1d partition (dt) select user_id,count(distinct order_id), sum(sku_num), sum(split_original_amount), sum( split_activity_amount), sum( split_coupon_amount), sum( split_total_amount), dt from dwd_trade_order_detail_inc group by user_id,dt
每日装载:
insert overwrite table dws_trade_user_order_1d partition (dt='2022-06-09') select user_id,count(distinct order_id), sum(sku_num), sum(split_original_amount), sum( split_activity_amount), sum( split_coupon_amount), sum( split_total_amount) from dwd_trade_order_detail_inc group by user_id where dt='2022-06-09'
统计粒度变化
--DWS层的表不是最终的表,还需要进一步计算
--dws:user+sku(tm)
--ads:tm
粒度没变
--dws:user+sku
--ads:user+sku
粒度没有变化的情况下,可以直接将中间表的数据获取后使用
交易域用户粒度订单历史至今汇总表(td)
DROP TABLE IF EXISTS dws_trade_user_order_td; CREATE EXTERNAL TABLE dws_trade_user_order_td ( `user_id` STRING COMMENT '用户ID', `order_date_first` STRING COMMENT '历史至今首次下单日期', `order_date_last` STRING COMMENT '历史至今末次下单日期', `order_count_td` BIGINT COMMENT '历史至今下单次数', `order_num_td` BIGINT COMMENT '历史至今购买商品件数', `original_amount_td` DECIMAL(16, 2) COMMENT '历史至今下单原始金额', `activity_reduce_amount_td` DECIMAL(16, 2) COMMENT '历史至今下单活动优惠金额', `coupon_reduce_amount_td` DECIMAL(16, 2) COMMENT '历史至今下单优惠券优惠金额', `total_amount_td` DECIMAL(16, 2) COMMENT '历史至今下单最终金额' ) COMMENT '交易域用户粒度订单历史至今汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_trade_user_order_td' TBLPROPERTIES ('orc.compress' = 'snappy');
首日装载
insert overwrite table dws_trade_user_order_td partition(dt='2022-06-08') select user_id, min(dt) order_date_first, max(dt) order_date_last, sum(order_count_1d) order_count, sum(order_num_1d) order_num, sum(order_original_amount_1d) original_amount, sum(activity_reduce_amount_1d) activity_reduce_amount, sum(coupon_reduce_amount_1d) coupon_reduce_amount, sum(order_total_amount_1d) total_amount from dws_trade_user_order_1d group by user_id;
每日装载
--td表的数据装载一般为了考虑效率,会分为首日装载和每日装载
--首日装载:直接获取所有的数据做聚合
--每日装载:装载的数据只比前一天多了一天的数据,而前一天的数据已经统计过了。所以就存在重复计算
--改善装载的思路:获取昨天的统计结果+今天新的数据=>做进一步的聚合
insert overwrite table dws_trade_user_order_td partition(dt='2022-06-09') select user_id,min(order_date_first), max( order_date_last), sum(order_count_td) order_count, sum(order_num_td) order_num, sum(order_original_amount_td) original_amount, sum(activity_reduce_amount_td) activity_reduce_amount, sum(coupon_reduce_amount_td) coupon_reduce_amount, sum(order_total_amount_td) total_amount from(select `user_id` , `order_date_first` , `order_date_last` , `order_count_td` , `order_num_td` , `original_amount_td` , `activity_reduce_amount_td`, `coupon_reduce_amount_td` , `total_amount_td` from dws_trade_user_order_td where dt=date_sub('2022-06-08',1) union all select `user_id` , '2022-06-09' '2022-06-09' , `order_num_1d` , `original_amount_1d` , `activity_reduce_amount_1d`, `coupon_reduce_amount_1d` , order_total_amount_1d from dws_trade_user_order_1d where dt='2022-06-09')t group by user_id
交易域用户粒度加购最近1日汇总表
DROP TABLE IF EXISTS dws_trade_user_cart_add_1d; CREATE EXTERNAL TABLE dws_trade_user_cart_add_1d ( `user_id` STRING COMMENT '用户ID', `cart_add_count_1d` BIGINT COMMENT '最近1日加购次数', `cart_add_num_1d` BIGINT COMMENT '最近1日加购商品件数' ) COMMENT '交易域用户粒度加购最近1日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_trade_user_cart_add_1d' TBLPROPERTIES ('orc.compress' = 'snappy');
--1d表的首日包含历史数据,绝对不能写死
--user+sku set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table dws_trade_user_cart_add_1d partition(dt) select user_id, count(*), sum(sku_num), dt from dwd_trade_cart_add_inc group by user_id,dt;
每日装载
insert overwrite table dws_trade_user_cart_add_1d partition(dt='2022-06-09') select user_id, count(*), sum(sku_num) from dwd_trade_cart_add_inc where dt='2022-06-09' group by user_id;
交易域用户粒度支付最近1日汇总表
DROP TABLE IF EXISTS dws_trade_user_payment_1d; CREATE EXTERNAL TABLE dws_trade_user_payment_1d ( `user_id` STRING COMMENT '用户ID', `payment_count_1d` BIGINT COMMENT '最近1日支付次数', `payment_num_1d` BIGINT COMMENT '最近1日支付商品件数', `payment_amount_1d` DECIMAL(16, 2) COMMENT '最近1日支付金额' ) COMMENT '交易域用户粒度支付最近1日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_trade_user_payment_1d' TBLPROPERTIES ('orc.compress' = 'snappy');
首日装载
set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table dws_trade_user_payment_1d partition(dt) select user_id, count(distinct(order_id)), sum(sku_num), sum(split_payment_amount), dt from dwd_trade_pay_detail_suc_inc group by user_id,dt;
每日装载
insert overwrite table dws_trade_user_payment_1d partition(dt='2022-06-09') select user_id, count(distinct(order_id)), sum(sku_num), sum(split_payment_amount) from dwd_trade_pay_detail_suc_inc where dt='2022-06-09' group by user_id;
交易域省份粒度订单最近1日汇总表
DROP TABLE IF EXISTS dws_trade_province_order_1d; CREATE EXTERNAL TABLE dws_trade_province_order_1d ( `province_id` STRING COMMENT '省份ID', `province_name` STRING COMMENT '省份名称', `area_code` STRING COMMENT '地区编码', `iso_code` STRING COMMENT '旧版国际标准地区编码', `iso_3166_2` STRING COMMENT '新版国际标准地区编码', `order_count_1d` BIGINT COMMENT '最近1日下单次数', `order_original_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单原始金额', `activity_reduce_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单活动优惠金额', `coupon_reduce_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单优惠券优惠金额', `order_total_amount_1d` DECIMAL(16, 2) COMMENT '最近1日下单最终金额' ) COMMENT '交易域省份粒度订单最近1日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_trade_province_order_1d' TBLPROPERTIES ('orc.compress' = 'snappy');
首日装载
set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table dws_trade_province_order_1d partition(dt) select province_id, province_name, area_code, iso_code, iso_3166_2, order_count_1d, order_original_amount_1d, activity_reduce_amount_1d, coupon_reduce_amount_1d, order_total_amount_1d, dt from ( select province_id, count(distinct(order_id)) order_count_1d, sum(split_original_amount) order_original_amount_1d, sum(nvl(split_activity_amount,0)) activity_reduce_amount_1d, sum(nvl(split_coupon_amount,0)) coupon_reduce_amount_1d, sum(split_total_amount) order_total_amount_1d, dt from dwd_trade_order_detail_inc group by province_id,dt )o left join ( select id, province_name, area_code, iso_code, iso_3166_2 from dim_province_full where dt='2022-06-08' )p on o.province_id=p.id;
每日装载
insert overwrite table dws_trade_province_order_1d partition(dt='2022-06-09') select province_id, province_name, area_code, iso_code, iso_3166_2, order_count_1d, order_original_amount_1d, activity_reduce_amount_1d, coupon_reduce_amount_1d, order_total_amount_1d from ( select province_id, count(distinct(order_id)) order_count_1d, sum(split_original_amount) order_original_amount_1d, sum(nvl(split_activity_amount,0)) activity_reduce_amount_1d, sum(nvl(split_coupon_amount,0)) coupon_reduce_amount_1d, sum(split_total_amount) order_total_amount_1d from dwd_trade_order_detail_inc where dt='2022-06-09' group by province_id )o left join ( select id, province_name, area_code, iso_code, iso_3166_2 from dim_province_full where dt='2022-06-09' )p on o.province_id=p.id;
交易域省份粒度订单最近n日汇总表
DROP TABLE IF EXISTS dws_trade_province_order_nd; CREATE EXTERNAL TABLE dws_trade_province_order_nd ( `province_id` STRING COMMENT '省份ID', `province_name` STRING COMMENT '省份名称', `area_code` STRING COMMENT '地区编码', `iso_code` STRING COMMENT '旧版国际标准地区编码', `iso_3166_2` STRING COMMENT '新版国际标准地区编码', `order_count_7d` BIGINT COMMENT '最近7日下单次数', `order_original_amount_7d` DECIMAL(16, 2) COMMENT '最近7日下单原始金额', `activity_reduce_amount_7d` DECIMAL(16, 2) COMMENT '最近7日下单活动优惠金额', `coupon_reduce_amount_7d` DECIMAL(16, 2) COMMENT '最近7日下单优惠券优惠金额', `order_total_amount_7d` DECIMAL(16, 2) COMMENT '最近7日下单最终金额', `order_count_30d` BIGINT COMMENT '最近30日下单次数', `order_original_amount_30d` DECIMAL(16, 2) COMMENT '最近30日下单原始金额', `activity_reduce_amount_30d` DECIMAL(16, 2) COMMENT '最近30日下单活动优惠金额', `coupon_reduce_amount_30d` DECIMAL(16, 2) COMMENT '最近30日下单优惠券优惠金额', `order_total_amount_30d` DECIMAL(16, 2) COMMENT '最近30日下单最终金额' ) COMMENT '交易域省份粒度订单最近n日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_trade_province_order_nd' TBLPROPERTIES ('orc.compress' = 'snappy');
insert overwrite table dws_trade_province_order_nd partition(dt='2022-06-08') select province_id, province_name, area_code, iso_code, iso_3166_2, sum(if(dt>=date_add('2022-06-08',-6),order_count_1d,0)), sum(if(dt>=date_add('2022-06-08',-6),order_original_amount_1d,0)), sum(if(dt>=date_add('2022-06-08',-6),activity_reduce_amount_1d,0)), sum(if(dt>=date_add('2022-06-08',-6),coupon_reduce_amount_1d,0)), sum(if(dt>=date_add('2022-06-08',-6),order_total_amount_1d,0)), sum(order_count_1d), sum(order_original_amount_1d), sum(activity_reduce_amount_1d), sum(coupon_reduce_amount_1d), sum(order_total_amount_1d) from dws_trade_province_order_1d where dt>=date_add('2022-06-08',29) and dt<='2022-06-08' group by province_id,province_name,area_code,iso_code,iso_3166_2;
工具域用户优惠券粒度优惠券使用(支付)最近1日汇总表
DROP TABLE IF EXISTS dws_tool_user_coupon_coupon_used_1d; CREATE EXTERNAL TABLE dws_tool_user_coupon_coupon_used_1d ( `user_id` STRING COMMENT '用户ID', `coupon_id` STRING COMMENT '优惠券ID', `coupon_name` STRING COMMENT '优惠券名称', `coupon_type_code` STRING COMMENT '优惠券类型编码', `coupon_type_name` STRING COMMENT '优惠券类型名称', `benefit_rule` STRING COMMENT '优惠规则', `used_count_1d` STRING COMMENT '使用(支付)次数' ) COMMENT '工具域用户优惠券粒度优惠券使用(支付)最近1日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_tool_user_coupon_coupon_used_1d' TBLPROPERTIES ('orc.compress' = 'snappy');
首日装载
set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table dws_tool_user_coupon_coupon_used_1d partition(dt) select user_id, coupon_id, coupon_name, coupon_type_code, coupon_type_name, benefit_rule, used_count, dt from ( select dt, user_id, coupon_id, count(*) used_count from dwd_tool_coupon_used_inc group by dt,user_id,coupon_id )t1 left join ( select id, coupon_name, coupon_type_code, coupon_type_name, benefit_rule from dim_coupon_full where dt='2022-06-08' )t2 on t1.coupon_id=t2.id;
每日装载
insert overwrite table dws_tool_user_coupon_coupon_used_1d partition(dt='2022-06-09') select user_id, coupon_id, coupon_name, coupon_type_code, coupon_type_name, benefit_rule, used_count from ( select user_id, coupon_id, count(*) used_count from dwd_tool_coupon_used_inc where dt='2022-06-09' group by user_id,coupon_id )t1 left join ( select id, coupon_name, coupon_type_code, coupon_type_name, benefit_rule from dim_coupon_full where dt='2022-06-09' )t2 on t1.coupon_id=t2.id;
互动域商品粒度收藏商品最近1日汇总表
DROP TABLE IF EXISTS dws_interaction_sku_favor_add_1d; CREATE EXTERNAL TABLE dws_interaction_sku_favor_add_1d ( `sku_id` STRING COMMENT 'SKU_ID', `sku_name` STRING COMMENT 'SKU名称', `category1_id` STRING COMMENT '一级品类ID', `category1_name` STRING COMMENT '一级品类名称', `category2_id` STRING COMMENT '二级品类ID', `category2_name` STRING COMMENT '二级品类名称', `category3_id` STRING COMMENT '三级品类ID', `category3_name` STRING COMMENT '三级品类名称', `tm_id` STRING COMMENT '品牌ID', `tm_name` STRING COMMENT '品牌名称', `favor_add_count_1d` BIGINT COMMENT '商品被收藏次数' ) COMMENT '互动域商品粒度收藏商品最近1日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_interaction_sku_favor_add_1d' TBLPROPERTIES ('orc.compress' = 'snappy');
首日装载
set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table dws_interaction_sku_favor_add_1d partition(dt) select sku_id, sku_name, category1_id, category1_name, category2_id, category2_name, category3_id, category3_name, tm_id, tm_name, favor_add_count, dt from ( select dt, sku_id, count(*) favor_add_count from dwd_interaction_favor_add_inc group by dt,sku_id )favor left join ( select id, sku_name, category1_id, category1_name, category2_id, category2_name, category3_id, category3_name, tm_id, tm_name from dim_sku_full where dt='2022-06-08' )sku on favor.sku_id=sku.id;
每日装载
insert overwrite table dws_interaction_sku_favor_add_1d partition(dt='2022-06-09') select sku_id, sku_name, category1_id, category1_name, category2_id, category2_name, category3_id, category3_name, tm_id, tm_name, favor_add_count from ( select sku_id, count(*) favor_add_count from dwd_interaction_favor_add_inc where dt='2022-06-09' group by sku_id )favor left join ( select id, sku_name, category1_id, category1_name, category2_id, category2_name, category3_id, category3_name, tm_id, tm_name from dim_sku_full where dt='2022-06-09' )sku on favor.sku_id=sku.id;
流量域会话粒度页面浏览最近1日汇总表
DROP TABLE IF EXISTS dws_traffic_session_page_view_1d; CREATE EXTERNAL TABLE dws_traffic_session_page_view_1d ( `session_id` STRING COMMENT '会话ID', `mid_id` string comment '设备ID', `brand` string comment '手机品牌', `model` string comment '手机型号', `operate_system` string comment '操作系统', `version_code` string comment 'APP版本号', `channel` string comment '渠道', `during_time_1d` BIGINT COMMENT '最近1日浏览时长', `page_count_1d` BIGINT COMMENT '最近1日浏览页面数' ) COMMENT '流量域会话粒度页面浏览最近1日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_traffic_session_page_view_1d' TBLPROPERTIES ('orc.compress' = 'snappy');
insert overwrite table dws_traffic_session_page_view_1d partition(dt='2022-06-08') select session_id, mid_id, brand, model, operate_system, version_code, channel, sum(during_time), count(*) from dwd_traffic_page_view_inc where dt='2022-06-08' group by session_id,mid_id,brand,model,operate_system,version_code,channel;
流量域访客页面粒度页面浏览最近1日汇总表
DROP TABLE IF EXISTS dws_traffic_page_visitor_page_view_1d; CREATE EXTERNAL TABLE dws_traffic_page_visitor_page_view_1d ( `mid_id` STRING COMMENT '访客ID', `brand` string comment '手机品牌', `model` string comment '手机型号', `operate_system` string comment '操作系统', `page_id` STRING COMMENT '页面ID', `during_time_1d` BIGINT COMMENT '最近1日浏览时长', `view_count_1d` BIGINT COMMENT '最近1日访问次数' ) COMMENT '流量域访客页面粒度页面浏览最近1日汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_traffic_page_visitor_page_view_1d' TBLPROPERTIES ('orc.compress' = 'snappy');
insert overwrite table dws_traffic_page_visitor_page_view_1d partition(dt='2022-06-08') select mid_id, brand, model, operate_system, page_id, sum(during_time), count(*) from dwd_traffic_page_view_inc where dt='2022-06-08' group by mid_id,brand,model,operate_system,page_id;
用户域用户粒度登录历史至今汇总表
--活跃 DROP TABLE IF EXISTS dws_user_user_login_td; CREATE EXTERNAL TABLE dws_user_user_login_td ( `user_id` STRING COMMENT '用户ID', `login_date_last` STRING COMMENT '历史至今末次登录日期', `login_date_first` STRING COMMENT '历史至今首次登录日期', `login_count_td` BIGINT COMMENT '历史至今累计登录次数' ) COMMENT '用户域用户粒度登录历史至今汇总表' PARTITIONED BY (`dt` STRING) STORED AS ORC LOCATION '/warehouse/gmall/dws/dws_user_user_login_td' TBLPROPERTIES ('orc.compress' = 'snappy');
首日装载
特别注意:
--登录信息来自于日志表,但是日志表的数据只有8号及以后得数据,没有历史数据
--mysql数据库中不会保存行为数据,也就是说不会保存登录信息
--折中的认为,用户注册的时间就是用户首次登录时间,也就是末次登录日期
insert overwrite table dws_user_login_td partition (dt='2022-06-08') select user_id,max(dt) login_date_last, min(dt) login_date_first,count(*) login_count_td from dwd_user_login_inc group by user_id
首日(先统计昨天的在统计今天的)
insert overwrite table dws_user_login_td partition (dt='2022-06-09') select user_id,max(lofin_date_last),min(login_date_first),sum(login_count_td) from( select user_id, login_date_last, login_date_first, login_count_td from dwd_user_login_td where dt=date_sub('2022-06-09',1) union all select user_id,'2022-06-09','2022-06-09',count(*) from dwd_user_login_inc where dt='2022-06-09' group by user_id )t group by user_id
所以要改进首日的数据
--ODS层的数据是整个数据仓库的数据源,不能作为直接统计分析的数据源 --统计分析的数据源一般是DIM,DWD层 select user_id,max(login_date_last),min(login_date_first),sum(login_count_td) from (select id user_id, date_format(create_time,'yyyy-MM-dd') login_date_first, date_format(create_time,'yyyy-MM-dd') login_date_last 1 login_count_id, from dim_user_zip where dt = '9999-12-31' and date_format(create_time,'yyyy-MM-dd'!='2022-06-08') #6月8号之前的数据 union all select user_id,'2022-06-08','2022-06-08' ,count(*) login_count_td from dwd_user_login_inc where dt='2022-06-08' group by user_id )t group by user_id
数据装载脚本
在node1的/home/bin目录下创建dws_1d_to_dws_td_init.sh
vim dws_1d_to_dws_td_init.sh
(2)编写如下内容 #!/bin/bash APP=gmall if [ -n "$2" ] ;then do_date=$2 else echo "请传入日期参数" exit fi dws_trade_user_order_td=" insert overwrite table ${APP}.dws_trade_user_order_td partition(dt='$do_date') select user_id, min(dt) login_date_first, max(dt) login_date_last, sum(order_count_1d) order_count, sum(order_num_1d) order_num, sum(order_original_amount_1d) original_amount, sum(activity_reduce_amount_1d) activity_reduce_amount, sum(coupon_reduce_amount_1d) coupon_reduce_amount, sum(order_total_amount_1d) total_amount from ${APP}.dws_trade_user_order_1d group by user_id; " dws_user_user_login_td=" insert overwrite table ${APP}.dws_user_user_login_td partition (dt = '$do_date') select u.id user_id, nvl(login_date_last, date_format(create_time, 'yyyy-MM-dd')) login_date_last, date_format(create_time, 'yyyy-MM-dd') login_date_first, nvl(login_count_td, 1) login_count_td from ( select id, create_time from ${APP}.dim_user_zip where dt = '9999-12-31' ) u left join ( select user_id, max(dt) login_date_last, count(*) login_count_td from ${APP}.dwd_user_login_inc group by user_id ) l on u.id = l.user_id; " case $1 in "dws_trade_user_order_td" ) hive -e "$dws_trade_user_order_td" ;; "dws_user_user_login_td" ) hive -e "$dws_user_user_login_td" ;; "all" ) hive -e "$dws_trade_user_order_td$dws_user_user_login_td" ;; esac (3)增加脚本执行权限 [atguigu@hadoop102 bin]$ chmod +x dws_1d_to_dws_td_init.sh (4)脚本用法 [atguigu@hadoop102 bin]$ dws_1d_to_dws_td_init.sh all 2022-06-08 2)每日数据装载脚本 (1)在hadoop102的/home/atguigu/bin目录下创建dws_1d_to_dws_td.sh [atguigu@hadoop102 bin]$ vim dws_1d_to_dws_td.sh (2)编写如下内容 #!/bin/bash APP=gmall # 如果输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$2" ] ;then do_date=$2 else do_date=`date -d "-1 day" +%F` fi dws_trade_user_order_td=" insert overwrite table ${APP}.dws_trade_user_order_td partition (dt = '$do_date') select nvl(old.user_id, new.user_id), if(old.user_id is not null, old.order_date_first, '$do_date'), if(new.user_id is not null, '$do_date', old.order_date_last), nvl(old.order_count_td, 0) + nvl(new.order_count_1d, 0), nvl(old.order_num_td, 0) + nvl(new.order_num_1d, 0), nvl(old.original_amount_td, 0) + nvl(new.order_original_amount_1d, 0), nvl(old.activity_reduce_amount_td, 0) + nvl(new.activity_reduce_amount_1d, 0), nvl(old.coupon_reduce_amount_td, 0) + nvl(new.coupon_reduce_amount_1d, 0), nvl(old.total_amount_td, 0) + nvl(new.order_total_amount_1d, 0) from ( select user_id, order_date_first, order_date_last, order_count_td, order_num_td, original_amount_td, activity_reduce_amount_td, coupon_reduce_amount_td, total_amount_td from ${APP}.dws_trade_user_order_td where dt = date_add('$do_date', -1) ) old full outer join ( select user_id, order_count_1d, order_num_1d, order_original_amount_1d, activity_reduce_amount_1d, coupon_reduce_amount_1d, order_total_amount_1d from ${APP}.dws_trade_user_order_1d where dt = '$do_date' ) new on old.user_id = new.user_id; " dws_user_user_login_td=" insert overwrite table ${APP}.dws_user_user_login_td partition (dt = '$do_date') select nvl(old.user_id, new.user_id) user_id, if(new.user_id is null, old.login_date_last, '$do_date') login_date_last, if(old.login_date_first is null, '$do_date', old.login_date_first) login_date_first, nvl(old.login_count_td, 0) + nvl(new.login_count_1d, 0) login_count_td from ( select user_id, login_date_last, login_date_first, login_count_td from ${APP}.dws_user_user_login_td where dt = date_add('$do_date', -1) ) old full outer join ( select user_id, count(*) login_count_1d from ${APP}.dwd_user_login_inc where dt = '$do_date' group by user_id ) new on old.user_id = new.user_id; " case $1 in "dws_trade_user_order_td" ) hive -e "$dws_trade_user_order_td" ;; "dws_user_user_login_td" ) hive -e "$dws_user_user_login_td" ;; "all" ) hive -e "$dws_trade_user_order_td$dws_user_user_login_td" ;; esac (3)增加脚本执行权限 [atguigu@hadoop102 bin]$ chmod +x dws_1d_to_dws_td.sh (4)脚本用法 [atguigu@hadoop102 bin]$ dws_1d_to_dws_td.sh all 2022-06-08