如何利用MaxCompute Hash Clustering让数据增量更新节省资源,耗时减半

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 目前hash clustering table已经在阿里巴巴内部集群生产环境正式发布,并且已经有蚂蚁、安全部、菜鸟等多个BU参与了试用。从蚂蚁的反馈来看,改造之后的任务收效非常明显,运行时间缩短40%到80%,节省计算资源23%到67%。

对于增量更新的场景,可以利用 MaxCompute2.0的新特性,对语句做简单改造,从而大幅提升性能,节约集群资源。

image

背景

在数据开发的过程中,往往会进行分层的设计,在ODS层中,一种非常常见的场景是使用一个增量表delta对一个存量表snapshot进行更新。例如snapshot表存储所有的会员信息,而增量表中包括新增会员信息和原有会员信息属性的一些修改;或者snapshot表存储最近一个月的订单信息,delta表存储了新增订单以及物流的更新等等。

对于这种任务,往往有以下几个特征

  1. snapshot表存储量巨大,delta表相对较小
  2. snapshot表和delta表拥有一致的schema
  3. snapshot和delta表中存在主键key,且key可能有重合(否则可以通过简单的union all来完成)
  4. 上一个周期的snapshot + 当前周期的delta => 当前周期的snapshot

为了完成上述的功能,对应的sql逻辑一般使用full outer join,简单起见,我们的snapshot和delta表只有两列
(key string, value string)

其中key为主键

INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170102')
SELECT

CASE WHEN d.key IS NULL THEN s.key ELSE d.key END,
CASE WHEN d.key IS NULL THEN s.value ELSE d.value END

FROM
(SELECT * FROM snapshot WHERE ds='20170101') s
FULL OUTER JOIN
(SELECT * FROM delta WHERE ds = '20170102')d
ON s.key = d.key;

这个语句表示,对于delta表存在的数据,使用delta表的值,对于delta表不存在的数据,使用snapshot的值。

问题

但是在实际执行的过程中,虽然只是进行简单的join操作,但是由于存量表可能非常大(几T到几十T的规模),这种任务往往耗时非常长,有些任务甚至需要耗费一天的时间才能产出。这种任务是否存在优化的空间呢?我们可以分析一个线上实际的大表full outer join的执行计划。

image

可以看到M1是snapshot表,需要将近20000的并发,M2是delta表,只有9个并发,而为了进行join的操作,两边会各自进行shuffle,在J3阶段进行sort-merge-join的计算。实际执行过程中,M2只需要几分钟,M1需要十几分钟,而在J3阶段则往往需要一两个小时,因为J3只有3000个并发,但是却读取了上游将近20000个并发读取的数据,相当于并发减小到原来的15%,处理的数据量却是一样的,当然耗时会长。另外,从M1到J3这个路径的shuffle中间存在大表的一次读写+两次排序,而且在数据量较大的情况下,还有可能会发生数据spill,使得运行性能更差。

在这种情况下,为了缩短执行时间,通常可以调大join阶段的instance数目,增加join阶段的内存减少spill等,但是instance的数目不能无限增长,否则会由于shuffle规模太大造成集群压力过大,另外内存的资源也是有限的,所以调整参数也只是牺牲资源换取时间,治标不治本。

为了对这个场景进行彻底的优化,我们希望能完全消除掉大表的shuffle阶段,将M1和J3合二为一,这样大表数据只需要读写一次,而且免去了中间排序的过程,执行时间可以缩短一半甚至更多。有调优经验的同学可能已经想到了mapjoin,但是这里的delta表往往数据较多无法当做mapjoin的小表,另外mapjoin无法支持full outer join,这两个限制都无法绕过,所以这个方案只能被pass了。那么这个shuffle的阶段应该如何省去呢?这里就要引入我们今天介绍的功能,hash clustering table了。

方案

Hash clustering,简而言之,就是将数据提前进行shuffle和排序,在使用数据的过程中,读取数据后直接参与计算。这种模式非常适合产出后后续节点多次按照相同key进行join或者聚合的场景。当然生成hash clustering table本身也是有代价的,在生成阶段会进行一次额外的shuffle。因此,这个功能并不是对于所有的场景都有效,例如数据生成之后只使用了一次,那么这个shuffle在生成表的阶段进行还是在读表之后进行其实并没有什么区别。但是对于特定的场景,这个特性可以起到显著的效果。

根据这个方案,我们重建一下snapshot表
ALTER TABLE snapshot CLUSTERED BY (key) SORTED BY (key) INTO 100 BUCKETS;

注意这个100 bucket需要根据实际数据规模进行设置,这里只是示例,不要照抄^_^

然后重建一下ds='20170101'的数据
INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170101')
SELECT key, value
FROM snapshot
WHERE ds='20170101'

注意,这个过程由于会有一个额外的shuffle阶段,所以耗时会比普通的insert overwrite长。

  • 第一次尝试: full outer join

数据准备完成后,重新执行刚才的full outer join语句
INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170102')
SELECT

CASE WHEN d.key IS NULL THEN s.key ELSE d.key END,
CASE WHEN d.key IS NULL THEN s.value ELSE d.value END

FROM
(SELECT * FROM snapshot WHERE ds='20170101') s
FULL OUTER JOIN
(SELECT * FROM delta WHERE ds = '20170102')d
ON s.key = d.key;

让我们看下执行计划
image
结果好像不尽如人意,M1读取了delta表,M2读取了snapshot表并且进行了sort-merge-join操作,但是读取完成以后数据重新进行了一次shuffle才写入了ds='20170102'分区,为什么会这样呢?

原因是ds='20170102'这个分区也是一个hash clustering table的分区,在写入的过程中,也需要数据按照特定key进行shuffle,虽然ds='20170101'的数据是shuffle过了的,但是在后续的full outer join的过程中,可能会存在补null的行为,并不能保证输出数据依然符合shuffle的特征,所以需要进行一次reshuffle。

其实,这个sql通过CASE WHEN d.id IS NULL THEN s.id ELSE d.id END在语义上实际是保证了不会出现额外补null的行为的,但是这个行为目前我们的优化器还不能识别,所以这种情况下大表数据依然会有一次shuffle,这并不能让我们满意。

  • 第二次尝试: not in + union all

下一个问题是如何才能让优化器识别出来我们其实并没有改变shuffle的属性呢,我们观察到这个full outer join其实这个sql就是一个求并集的过程
image
那么整个这个sql可以被拆分为两部分

SELECT a.key, a.value
FROM (SELECT * FROM snapshot WHERE ds='20170101' AND KEY NOT IN

             (SELECT  key FROM delta WHERE ds='20170102')) a  -- snapshot_not_in_delta

UNION ALL
SELECT key, value FROM delta WHERE ds='20170102' -- delta_all

在上述两部分中,前一部分对应图中的蓝色部分,后一部分对应图中的绿色部分。我们仅仅是对snapshot的key列进行了过滤操作,并没有改变key的分布,所以这个语句可以省去一次额外的shuffle。但是MaxCompute对于not in有一个限制是结果集合不能超过2000条,这个又限制了这种写法的应用场景。

  • 最终方案: anti semi join + union all

好在MaxCompute2.0中新支持的anti semi join同样实现了not in的语义,而且对结果集大小并没有限制,使用anti semi join 这个语句可以进一步修改为

INSERT OVERWRITE TABLE snapshot PARTITION (ds='20170102')
SELECT s.key, s.value
FROM (SELECT * FROM snapshot WHERE ds='20170101') s
LEFT ANTI JOIN
(SELECT * FROM delta WHERE ds='20170102') d ON s.key = d.key
UNION ALL
SELECT key, value
FROM delta
WHERE ds='20170102';

经过这一步的改造后,让我们运行一下,看看发生了什么。

image
只有三个阶段,M1读取delta表,M2读取snapshot表并进行sort-merge-join,随后写出数据,最后一个R3阶段仅仅是一个收集信息的任务,耗时在秒级别,所以实际的处理阶段只有两个stage,其中M1合并了之前M1和J3的功能,由于省去了一次数据读写、排序以及可能的spill等操作,实际运行时间往往可以减半。

上面也说过,如果数据只是进行一次读写,其实hash clustering table的作用有限,但是在增量更新这个特定的场景下,我们的输入和输出都为hash clustering的数据,而且中间过程并没有对cluster key进行修改,只是进行了过滤,所以我们可以只在一个阶段中完成read->join->union all->write这四个操作,极大地缩短了运行时间。

收效

目前hash clustering table已经在阿里巴巴内部集群生产环境正式发布,并且已经有蚂蚁、安全部、菜鸟等多个BU参与了试用。

从蚂蚁的反馈来看,改造之后的任务收效非常明显,运行时间缩短40%到80%,节省计算资源23%到67%。

菜鸟在使用hash clustering之后,任务的执行计划有所变化,节省了之前join操作需要的shuffle等操作,任务执行时间从40分钟左右降低到20分钟以内,有效的提升了任务执行效率,缩短执行时间,节约了资源。

飞猪应用Hash Clustering后,对于计算,整个计算过程由优化前的3小时,缩短到40分钟内完成,对于明细事实表视图一次读取计算可在1分钟内完成;对于存储,节省的存储和数据膨胀程度是线性关系,采用视图形式,我们用非常小的计算消耗代价节省了80%的存储,这一点看来,是很值得的。

所以我们付出的代价,仅仅是将表的属性进行修改,并且提前进行一次数据生成操作,这个操作也只需要执行一次,一劳永逸。

最后,欢迎大家在自己的增量更新的任务使用hash clustering功能,从现有的经验来看,大表的数据越多,收益越明显。

一些需要注意的地方

  1. bucket的数目设置需要一些经验,bucket越多,并发越多,运行越快,但是如果文件本身不大,小文件也越多,目前推荐500MB~1GB设置一个bucket,超大规模数据情况下一个bucket的数据可以更多。在任何情况下,不建议设置bucket number超过4096。
  2. hash clustering table会对数据进行重排,在一些极端场景下,可能会导致原来压缩率较高的文件压缩率降低,影响后续的性能,这个可以通过观察生成表的summary的input/output bytes来确认
  3. 目前我们正在对decimal类型进行重构,重构之后可能会影响decimal类型的分布方式,所以clustered key不要选用decimal类型
  4. snapshot表和delta表的schema不需要完全一致,但是如果key的类型不同,比如一边是bigint,一边是string,在join的时候需要将delta表的类型转换为snapshot的key类型,否则依然会需要一次reshuffle。

MaxCompute Hash Clustering介绍

如需了解更多有关MaxCompute Hash Clustering,欢迎加入“MaxCompute开发者社区”钉钉群
image

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
9天前
|
存储 人工智能 OLAP
深度|大模型时代下,基于湖仓一体的数据智能新范式
本次文根据峰会演讲内容整理:分享在大模型时代基于湖仓一体的数据产品演进,以及我们观察到的一些智能开发相关的新范式。
|
14天前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用问题之如何查看数据离线同步每天从MySQL抽取的数据量
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
14天前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用问题之如何实现MySQL的实时增量同步
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
14天前
|
分布式计算 大数据 Java
MaxCompute产品使用问题之是否可以恢复最近两天生命周期清理的数据
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
13天前
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用问题之删除了某个分区的数据,如何找回
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
13天前
|
分布式计算 DataWorks 大数据
MaxCompute产品使用问题之表数据大于1w行,如何下载数据
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
13天前
|
数据采集 分布式计算 DataWorks
MaxCompute产品使用问题之如何确保数据完整性验证有效
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
13天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用问题之dts是否支持传输数据到mc主键表2.0
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
14天前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之odps数据库T1有几百行的数据,为什么出来只有5行的数据
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
7天前
|
分布式计算 DataWorks 安全
DataWorks产品使用合集之两个odps数据源绑定了同一个项目, 如何看另外一个数据源的同步数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
10 0

相关产品

  • 云原生大数据计算服务 MaxCompute