【转载】MaxCompute full outer join改写left anti join实践

简介: ods层数据同步时经常会遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表。可以通过full outer join脚本来完成合并,但是数据量很大时非常消耗资源。本文将为您介绍在做增量数据的增加、更新时如何通过full outer join改写left anti join来实现的最佳实践。

背景

ods层数据同步时经常会遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表。可以通过full outer join脚本来完成合并,但是数据量很大时非常消耗资源。

insert overwrite table tb_test partition(ds='${bizdate}')
select case when a.id is not null then a.id esle b.id end as id   
      ,if(a.name is not null, a.name, b.name) as name
      ,coalesce(a.age, b.age) as age 
      --这3种写法一样,都是优先取delta表的字段

from
(
   select * from tb_test_delta where ds='${bizdate}'
) a
full outer join
(
   select * from tb_test where ds='${bizdate-1}'
) b
on a.id =b.id;

这种写法可实现新增和更新操作:

  • 新增是指增量表中新出现的数据,而全量表中没有;
  • 更新是指增量表和全量表中都有的数据,但优先取增量表的数据,覆盖历史表的数据。
    如下图所示,R2_1是增量表当天去重后增量数据,M3是全量表前一天的数据,而J4_2_3则是full outer join的执行图。

image.png

将J4_2_3展开会发现里面将增量和全量进行了merge join,当数据量很大(1288亿条)时会产生很大的shuffle开销。此时优化方案就是将full outer join改成 union all,从而避免join shuffle

优化模型

结论:full outer join改成hash cluster + left join +union all可以有效地降低计算成本,且有两种应用场景。先将模型进行抽象,假设有a和b两个表,a是增量表,b是全量表:

with 
 a as ( select * from values  (1,'111')
                             ,(2,'two')
                             ,(7,'777') as (id,name) ) --增量

,b as ( select * from values  (1,'')
                             ,(2,'222')
                             ,(3,'333')
                             ,(4,'444') as (id,name) )  --全量

场景1:只合并新增数据到全量表

left anti join相当于not in,增量not in全量,过滤后只剩下完全新增的id,对全量中已有的id不修改:

--查询完全新增的id
select * from a left anti join b on a.id=b.id ;
--结果如下
+------------+------+
| id         | name |
+------------+------+
| 7          | 777  |
+------------+------+
--完全新增的合并全量表
select * from  a --增量表
left anti join b on a.id=b.id  
union all 
select * from b  --全量表
--结果如下
+------------+------+
| id         | name |
+------------+------+
| 1          |      |
| 2          | 222  |
| 3          | 333  |
| 4          | 444  |
| 7          | 777  |
+------------+------+

场景2:合并新增数据到全量表,且更新历史数据

全量not in增量,过滤后只剩下历史的id,然后union all增量,既新增也修改

--查询历史全量数据
select * from b left anti join a on a.id=b.id;
--结果如下
+------------+------+
| id         | name |
+------------+------+
| 3          | 333  |
| 4          | 444  |
+------------+------+
--合并新增数据到全量表,且更新历史数据
select * from  b --全量表
left anti join a on a.id=b.id
union all 
select * from a ; --增量表
--结果如下
+------------+------+
| id         | name |
+------------+------+
| 1          | 111  |
| 2          | two  |
| 7          | 777  |
| 3          | 333  |
| 4          | 444  |
+------------+------+

优化实践

步骤1:表属性修改

表、作业属性修改,对原来的表、作业进行属性优化,可以提升优化效果。

set odps.sql.reducer.instances=3072;  --可选。默认最大1111个reducer,1111哈希桶。
alter table table_name clustered by(contact_id) sorted by(contact_id) into 3072 buckets;--必选

步骤2:按照上述模型的场景1 或者 场景2进行代码改造。

这里先给出代码改造后的资源消耗对比:

原来的full outer jion left anti join初始化 原来的full outer jion left anti join第二天以后
时间消耗 8h30min38s 1h4min48s 7h32min30s 32min30s
cpu消耗 29666.02 Core * Min 65705.30 Core * Min 31126.86 Core * Min 30589.29 Core * Min
mem消耗 109640.80 GB * Min 133922.25 GB * Min 114764.80 GB * Min 65509.28 GB * Min

可以发现hash cluster分桶操作在初始化有额外的开销,主要是按主键进行散列和排序,但是这是值得的,可一劳永逸,后续的读取速度非常快。以前每天跑需要8小时,现在除了分桶初始化需要1小时,以后每天实际只需要30分钟。

初始化执行图

图1:
image.png

  • M2是读全量表。
  • M4是读取增量表,在场景2的模型中增量表被读取了两次,其中:

    • R5_4是对主键去重(row_number)后用于后面的union all,里面包含了所有的增量数据;
    • R1_4是对主键去重(row_number)后用于left anti join,里面只包含了主键。
  • J3_1_2是left anti join,将它展开后看到这里还是有mergJoin,但是这只是初始化的操作,后面每天就不会有了。展开后如图2。
  • R6_3_5是将增量和全量进行union all,展开后如图3。
  • R7_6则是将索引信息写入元数据,如图3的MetaCollector1会在R7_6中sink。
    因此:图1中除了R5_4和R1_4是去重必须的,有shuffle。还有J3_1_2和R6_3_5这两个地方有shuffle。

图2:
image.png

图3:
image.png

第二天以后的执行图

图1:
image.png

同上,图1中的R3_2和R1_2是对增量去重必要对操作,有shuffle,这里忽略。

初始化执行图的J3_1_2和R6_3_5已经被合并到了M4_1_3,将其展开后如图2。即left anti join 和 union all这两步操作在一个阶段完成了,且这个阶段是Map 任务(M4_1_3),而不是Join任务或Reduce任务。而且全量表不在单独占用一个Map任务,也被合并到了M4_1_3,因此整个过程下来没有shuffle操作,速度提升非常明显。也就是说只需要一个M4_1_3就能完成所有到操作,直接sink到表。

R5_4则是将索引信息写入元数据,如图2的MetaCollector1会在R5_4中sink。

图2:
image.png

原创:阿里菜鸟-数据 鹤方

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1天前
|
存储 大数据 API
大数据隐私保护策略:加密、脱敏与访问控制实践
【4月更文挑战第9天】本文探讨了大数据隐私保护的三大策略:数据加密、数据脱敏和访问控制。数据加密通过加密技术保护静态和传输中的数据,密钥管理确保密钥安全;数据脱敏通过替换、遮蔽和泛化方法降低敏感信息的敏感度;访问控制则通过用户身份验证和权限设置限制数据访问。示例代码展示了数据库、文件系统和API访问控制的实施方式,强调了在实际应用中需结合业务场景和平台特性定制部署。
70 0
|
1天前
|
数据采集 监控 算法
利用大数据和API优化电商决策:商品性能分析实践
在数据驱动的电子商务时代,大数据分析已成为企业提升运营效率、增强市场竞争力的关键工具。通过精确收集和分析商品性能数据,企业能够洞察市场趋势,实现库存优化,提升顾客满意度,并显著增加销售额。本文将探讨如何通过API收集商品数据,并将这些数据转化为对电商平台有价值的洞察。
|
1天前
|
存储 数据可视化 数据管理
基于阿里云服务的数据平台架构实践
本文主要介绍基于阿里云大数据组件服务,对企业进行大数据平台建设的架构实践。
756 2
|
1天前
|
存储 SQL 分布式计算
开源大数据比对平台设计与实践—dataCompare
开源大数据比对平台设计与实践—dataCompare
81 0
|
1天前
|
SQL 存储 大数据
某互联网大厂亿级大数据服务平台的建设和实践
某互联网大厂亿级大数据服务平台的建设和实践
79 0
|
7月前
|
机器学习/深度学习 分布式计算 数据挖掘
MaxCompute 应用场景实践
MaxCompute 应用场景实践
105 0
|
1天前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
23 0
|
1天前
|
分布式计算 Java Hadoop
大数据实战——WordCount案例实践
大数据实战——WordCount案例实践
6 0
|
1天前
|
数据采集 供应链 安全
利用大数据优化业务流程:策略与实践
【5月更文挑战第11天】本文探讨了利用大数据优化业务流程的策略与实践,包括明确业务目标、构建大数据平台、数据采集整合、分析挖掘及流程优化。通过实例展示了电商和制造企业如何利用大数据改进库存管理和生产流程,提高效率与客户满意度。随着大数据技术进步,其在业务流程优化中的应用将更加广泛和深入,企业需积极采纳以适应市场和客户需求。
|
1天前
|
缓存 运维 NoSQL
面试分享:Redis在大数据环境下的缓存策略与实践
【4月更文挑战第10天】探索Redis在大数据缓存的关键作用,本文分享面试经验及必备知识点。聚焦Redis数据结构(String、List、Set、Hash、Sorted Set)及其适用场景,缓存策略(LRU、LFU、TTL)与过期机制,集群和数据分片,以及性能优化和运维技巧。通过代码示例深入理解,助你面试成功,构建高效缓存服务。
51 4

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute