依据我们当前的业务。选择表数据量大于10W以上采用增量数据导入,10W以下采用全量数据导入。增量数据依据时间字段进行判断(create_date,modify_date)。
如果针对时间字段进行增量数据导入,需要先确定当前业务数据库中是否针对时间字段添加了索引(提高查询速度);其次,在写增量导入sql时不要对索引字段进行函数操作。比如:
date_format(create_date,'%Y%m%d')>=${bizdate} ×,这种写法会导致无法使用索引
create_date>=date_format(${bizdate},'%Y%m%d') √,推荐使用这种写法
首先创建两张表,分别为增量表和全量表
-- 增量表_delta
-- 存储了当日新增或变化数据
create table if not exists ods_s_mscmp_msc_b_spu_delta
(
spu_code string comment"商品代码"
,product_category_code string comment"产品目录代码"
,spu_name string comment"商品名称"
,spu_type string comment"商品类型"
,external_code string comment"第三方商品原始代码"
,external_id string comment"第三方系统商品ID"
,state string comment"状态"
,is_delete string comment"逻辑删除标志位"
,create_date string comment"创建时间"
,create_by string comment"创建者"
,modify_date string comment"修改时间"
,modify_by string comment"修改者"
)PARTITIONED BY (dt STRING) -- 按照日期分区
-- 全量表
-- 存放历史全量数据
create table if not exists ods_s_mscmp_msc_b_spu
(
spu_code string comment"商品代码"
,product_category_code string comment"产品目录代码"
,spu_name string comment"商品名称"
,spu_type string comment"商品类型"
,external_code string comment"第三方商品原始代码"
,external_id string comment"第三方系统商品ID"
,state string comment"状态"
,is_delete string comment"逻辑删除标志位"
,create_date string comment"创建时间"
,create_by string comment"创建者"
,modify_date string comment"修改时间"
,modify_by string comment"修改者"
)PARTITIONED BY (dt STRING) -- 按照日期分区
增量表和全量表字段完全一致,首次需要先对全量表ods_s_mscmp_msc_b_spu进行全量数据导入。后续每天的新增或者变化数据均保存到增量表ods_s_mscmp_msc_b_spu_delta中,然后将增量表和全量表进行合并,合并代码为:
-- 增量表数据主要分为三部分,分别为新增数据,变化数据,历史无变化数据。
-- 使用增量表 full join 全量表(采用主键id关联)将全量表和增量表进行合并。
-- 当full join结果中增量表di主键id不为空则说明新增或历史数据发生了变化,采用新增或更新后的字段值。
-- 当full join结果中增量表di主键为空则说明历史数据无变化,使用df原字段值。
-- 将拼接的最新全量数据写入到全量表T-1分区中。
insert into table ods_s_mscmp_msc_b_spu partition(ds = ${bizdate})
select
case when di.spu_code is not null then di.spu_code else df.spu_code end as spu_code
case when di.spu_code is not null then di.product_category_code else df.product_category_code end as product_category_code
case when di.spu_code is not null then di.spu_name else df.spu_name end as spu_name
case when di.spu_code is not null then di.spu_type else df.spu_type end as spu_type
case when di.spu_code is not null then di.external_code else df.external_code end as external_code
case when di.spu_code is not null then di.external_id else df.external_id end as external_id
case when di.spu_code is not null then di.state else df.state end as state
case when di.spu_code is not null then di.is_delete else df.is_delete end as is_delete
case when di.spu_code is not null then di.create_date else df.create_date end as create_date
case when di.spu_code is not null then di.create_by else df.create_by end as create_by
case when di.spu_code is not null then di.modify_date else df.modify_date end as modify_date
case when di.spu_code is not null then di.modify_by else df.modify_by end as modify_by
from
(select
spu_code
,product_category_code
,spu_name
,spu_type
,external_code
,external_id
,state
,is_delete
,create_date
,create_by
,modify_date
,modify_by
from ods_s_mscmp_msc_b_spu
where ds = ${bizdate} -- 昨天分区的日期T-1
)di full join
(select
spu_code
,product_category_code
,spu_name
,spu_type
,external_code
,external_id
,state
,is_delete
,create_date
,create_by
,modify_date
,modify_by
from ods_s_mscmp_msc_b_spu
where ds = date_sub(${bizdate}, 1) -- 前天分区的日期T-2
)df on di.spu_code = df.spu_code