1. 概念
在云数仓SQL分布式计算中,将数据 水平分片 到多机进行并行计算是解决大数据扩展性的关键。 SQL 执行计划中的多个 分片要求(Distribution Requirement) 指定了数据应该怎样分片到多个 Worker上执行计算。 Worker 是一个运行算子的实例,可以分布在不同机器上,从而能使用集群中多个机器的资源。
在SQL执行计划中,连接算子(Join),聚合算子(Aggregate),窗口算子(Window) 等算子对数据计算过程中的分片策略有特定的分片要求
- 多表等值 Join 需要计算多表数据之间的相关性,要求将多表按照 连接键(Join Key) 分片,使得各表相同分片的数据能在同一 Worker 中计算。
- Aggregate 需要计算单表中多行数据聚合值,要求将数据按照 聚合键(Aggregate Key) 分片。
- Window 需要计算单表中多行数据之间相关性,要求将数据按照 窗口分片键(Window Partition Key) 分片。
为了满足算子对数据分片的要求,输入数据需要做 重新分片(scatter-gather) shuffle 到不同 Worker。 由于单个物理机器能处理的数据量有限,重新分片 shuffle 过程的物理实现需要将数据按照指定键 Hash 计算分片 id,并通过网络发送到数据到对应分片 id 不同机器的 Worker 上。 执行重新分片 shuffle 意味着对数据的完整网络拷贝,对磁盘和网络IO有较高代价。 以两个大表 Join 为例,即使是最终两表 Join 匹配的行数很少,也需要在 Join 之前将两表数据完整做一轮分布式 shuffle。 所以,尽可能的避免 shuffle 是优化大数据计算量的关键。
Hash Clustered 表 提供了描述表中数据的分片/排序属性的能力,以 clustered by / sorted by / bucket num 表示。 通过写入时预先将数据分片排序,减少读时代价。
clustered by 已指定的分片 与 SQL查询算子的 分片要求 匹配,从而避免 shuffle。 在多表 Join 同为 cluster 表时可进行分片对齐,从而完全避免多个表的 shuffle。
sorted by 指定了在分片内部的数据排序方式,在列式存储的前提下,
- 按照重复/相近度较高的值进行排序有助于压缩存储量。
- 按照过滤性较高的值进行排序有助于在谓词下推存储时跳过更多的存储块。
- 按照 shuffle key 提前排序有助于提升计算效率。 在后续的 Join/Aggregate/Window 计算中避免排序,能使用更高效的 MergeJoin/SortedAgg 算子替代 HashJoin/HashAgg。
- 以 Aggregate 为例,使用 Sort + SortedAgg 和直接进行 HashAgg 性能相比,性能不一定谁好。 但对于已排序好的数据,一定是 SortedAgg 更好。 Cluster 表使得直接使用一阶段 SortedAgg 成为可能。
bucket num 指定了分片的数量,较多的分片数量意味着更多的文件。 较少的分片数量意味着单个分片需要处理的数据量更多。
Hash Cluster 表的 Shuffle Removal 优化 是指基于已有分桶的分片,排序属性,选择性的避免后续的排序和 Shuffle 代价。详见 MaxCompute 聚簇优化推荐简介。
由于使用 Cluster 表需要用户权衡多个 query 代价,理解执行计划,需要较多的时间精力以及对查询的理解。 所以从智能数仓替用户解决 多查询整体优化 问题的目标出发,Cluster 表推荐(聚簇优化推荐)功能 的目标为基于历史查询智能为用户推荐一系列改造为 Cluster 表后有明确收益的表。 推荐策略会综合权衡各种指标,给用户推荐改造后会对多个查询整体 shuffle 量有优化。
2. 推荐
推荐引擎
因为 MaxCompute 的 强大的 Shuffle Removal 优化 支持消除单侧 Shuffle,不存在需要所有输入表都是相同分片才能做 collocate join 的约束(见 VLDB Fast and Effective Distribution-Key Recommendation for Amazon Redshift)。 算法实现上,在 Shuffle 图 中按照单表最大 Shuffle 量的边,计算不会 skew 的 shuffle key 组合,综合考虑写表代价后做选择即可达到理论最优。
主要是三轮筛选
- 单日推荐: 找到当天有优化空间的作业
- 多日综合推荐: 根据多天作业分析,寻找稳定pattern,并容忍一定程度的波动
- 重编译验证优化: 校验如果修改重编译后是否优化器会真正选择应用 cluster 优化。
在 MaxCompute 控制台上,支持查看不同表的不同推荐指数,一键应用优化,T+1 查看 CU 收益。
Shuffle 图
Shuffle 图是求解最优 Shuffle 优化的核心结构。
TPC-DS 1TB 完整 shuffle 图 (只包含 shuffle byte > 500MB的边)
- 绿边表示 agg 等其他算子, 红色是 merge join. 蓝色表示 table 与 column 组合的从属关系.
- 可以看出 aggregate 较少, 主要是 merge join
- 超过 10 GB 输入 shuffle 的有 Q93, Q64, Q50
在按照策略选择后可以得到:
TPC-DS 1TB 修改 Cluster 表后可优化 Shuffle 边 (只包含 shuffle byte > 500MB的边)
TPC-DS 100TB 完整 Shuffle 图
超过 1TB shuffle 的有 Q93,Q64,Q50 等等
TPC-DS 100TB 修改 Cluster 表后可优化 Shuffle 边
可以看到,主要预期 Shuffle 节省在 Q5, Q29, Q47, Q50, Q57, Q64, Q78, Q93 等查询。对比来优化前看,这些较粗的红线覆盖了大部分完整 shuffle 图中的有显著代价的 shuffle。相对于1TB来看,针对大数据量的场景,Cluster 优化能更显著的降低主要 Shuffle 的代价。
实际项目中的 Shuffle 图包含了租户下跨多个项目全量作业的读写关系,会是非常巨大复杂的一个 Shuffle 关系图,包含有多个不向交的 Shuffle 子图。常见的 Shuffle 子图模式是数据从产生到逐级加工都使用的相同的主键,所以可优化 Shuffle 边的求解结果也常常推荐是链式修改一系列同源数据表以相同列聚簇,以 全链路消除 Shuffle 。
针对不同 Shuffle 场景的不同策略
场景一: 所有 shuffle 都使用相同的 shuffle key 集合,即推荐该 shuffle key 组合作为 clustered key
场景二:所有 shuffle 有出现在历史记录中无 skew 的 公共 shuffle key 集合。 则推荐该公共 shuffle key 集合
- 比如 (a,b),(a,b,c,d),(a,b,e) 三个 shuffle key 集合公共 shuffle key 集合为 (a,b)。
- 比如 (a,b) 和 (b,c) 有公共 shuffle key 集合 (b),但因为 (b) 没有在历史记录中出现,无法判断是否会 skew ,不会推荐 (b)
场景三: 有多个出现在历史记录中无 skew 的 公共 shuffle key 集合,这些集合之间无直接包含关系,则选择具有主导 shuffle 量的 公共 shuffle key 集合做推荐,即对于小数据量要求大于其他 shuffle 量 3 ~ 5 倍,对于大数据量要求是所有shuffle边中最大量。
- 比如 shuffle 键组合有 (a,b),(a,b,c,d),(a,b,e), (b,c),(e) 对应了三个公共 shuffle key
- (a,b) : shuffle 量包含 (a,b),(a,b,c,d),(a,b,e)
- (b,c) : shuffle 量包含 (b,c),(a,b,c,d)
- (e) : shuffle 量包含 (a,b,e),(e)
- 以上三者中选占主导 shuffle 量的 shuffle key 集合推荐。
排除数据倾斜
当历史作业中使用某个 shuffle key 集合做 shuffle 时,会保留数据在shuffle 后下游是否倾斜,以及数据总量的信息。 比如对于有 (a,b,e),(e) 两类 shuffle 的场景
- (e) 相对 (a,b,e) 是一个覆盖更广的公共 shuffle key 集合,选择 (e) 可覆盖 (e) 与 (a,b,e) 的 Shuffle。
- 如果 (e) 历史作业已出现倾斜,不会选择 (e)。
- 如果 (e) 数据量只有几百GB,而 (a,b,e) 的 shuffle 量是几百TB,则小比例 shuffle 不能证明大量shuffle下不会倾斜,不会选择(e)。
- 如果 (a,b,e) 在多个作业上出现数据倾斜,但相对于所有基于 (a,b,e) shuffle 的作业的总量比例不高,则认为 (a,b,e) 不是一个会导致数据倾斜的 shuffle key 集合。
- 当作业 shuffle 前的 filter 和 hash 有相关时,比如 filter 将数据值域限制在了较小的范围导致了后续 shuffle 的倾斜,并不能推断出数据整体是倾斜的,所以应当忽略这类”不真实的倾斜信息”。
重编译
重编译验证 Shuffle Remove 生效
需要排除模型计算改为Cluster表后有优化空间,但优化器因为各类原因不做优化的场景。 这种功能被称为 What If Engine,即判断假设元数据变化(改为 Cluster 表)后是否优化器会有更好的plan。 如果重编译放在更早的阶段,会导致作业和 cluster key 组合爆炸,所以放在三轮筛选的最后。
写入 Shuffle 量估计与完美转发
修改 Cluster 表后,写入数据时需要做 shuffle 以分桶排序数据, 增加了写入代价。推荐策略需要能基于写入表的压缩数据量估计其Shuffle压缩数据量,从而判断是否改后全局Shuffle总量会更少。
修改 Cluster 后无写入 shuffle 量增加场景,我们称之为 完美转发(Perfect Forward)。完美转发的常见原因是增全量合并作业的源表和目标表有相同的数据分布。重编译识别该场景后减去了不会实际增加写入Shuffle量的情况, 大量该类作业被纳入推荐的范围。
3. 性能指标统计
性能指标统计基于一年以内修改的表和相关作业,在按照 Signature 追溯修改前一段时间平均性能和当日运行平均性能,得到单日CU节省量 (CU↓) 和延迟节省量(Latency↓)。
使用 Signature 追溯作业
Signature 是基于 SQL 结构去除修改日期等干扰项计算出的作业签名。使用 Signature 统计不会受到执行计划变化,以及数据变化的影响。
不可避免的是,基于 Signature 追溯会遗漏每日 Signature 不同的作业。比如创建临时表的作业,SQL有修改的作业,由于表名变化和SQL 修改都会影响 Signature,所以统计到的相比实际节省统计会更少。随着时间越长,SQL 修改导致 Signature 变化的可能性就越大。
Signature 变化不影响推荐时对于可优化作业的识别。
统计的稳定性
在统计中发现如果将所有读写相关都进行统计,则每日波动很大,作业的改进会被噪音淹没. 所以为保持统计结果的客观,统计范畴包含写入表的作业, 以及应用了 cluster 表优化的读作业。
不包含其他的读表作业, 假设这类作业正向和负向的优化持平, 因为相比完全不按照 cluster 属性分片, 有聚集性的数据在过滤性和存储压缩上一般是有正向优化,所以可以保证统计到的收益一定在均摊意义上大于等于实际收益。
为排除 Shuffle 不占主导的作业性能波动影响收益累计,会统计并展示 Shuffle 量减少信息(Shuffle↓)。
- 典型场景:Cluster 表的优化量有限,但相关作业的其他部分有较重的 UDF,消耗大量 CU,其波动超过了优化量本身。
为处理单日CU不是稳定减少,多日综合有减少的场景,展示会包含多日累计CU减少信息(CU↓∑)。
- 典型场景:修改表后没有重写历史分区,预计优化的目标作业读了 Cluster 表一个月的天分区。需要一个月后,每个天分区都被重写为 Cluster 分区后读表作业才能优化。
延迟节省量会受到资源紧张,作业排队等问题的影响,更应该关注的 CU 量的节省。
Shuffle 量节省和 CU 节省的关系
经过统计,Shuffle 量节省和 CU 数节省在对数坐标系上成线性关系,Shuffle 量越大其与 CU 消耗的比例会以略高于线性的比例增加。
由于 Shuffle 量与 CU 数的关系和集群机器情况,代码版本,以及 Shuffle 后续算子实际情况等因素相关,无法精确计算两者关系。大致可以估计 1TB 数据在 2–4 CU 之间,随着机器配置越好与环境压力越小,所消耗的 CU 越少。
4.QA
Q: 为什么系统不能自动修改为 Cluster 表,而是要用户手动修改?
A:Hash Cluster 表当前(2025年)增量数据写入开发还在进行中,当前还不支持,需要用户来确认未来不会有增量写入的需求。同时对于非常大的表(10TB+), 修改 Cluster 表会使得重写历史分区有较大的 Shuffle 代价,新写入数据在非完美转发场景需要 Shuffle 也会导致写入作业变慢。越大的表重写越慢,相应的后续读作业优化效果越好。对于较小的表未来有机会做自动优化,但资源使用往往主要在一些大表上,我们在实际业务中发现部分增全量合并相关表的单个分区有超过 600 TB 的数据。所以手动调优大表对于整体性能是不可或缺的。
Q: 为什么发现修改 Cluster 表后,部分作业没有做 Shuffle Removal?
A:Cluster 推荐基于历史作业编译验证了修改后部分历史作业会做 Shuffle Removal 优化,但并非所有作业都会优化。有的作业会在优化器的权衡下使用走 Shuffle 的计划。
Q: 最常见的无法 Shuffle Remove 原因?
A: 一个常见导致无法优化的原因是 Join 的键虽然是 Cluster key,但在等值比较前发生了 cast,从而无法消除 Shuffle,比如 bigint 类型和 string 做等值条件时需要 cast 转换。通过查看 Logview 中 DAG 信息可以观察到这个现象。
Q: 修改后小部分表存储膨胀,数据过滤效率降低?
A: 这是由于修改前已有数据排序被 hash cluster 打散,在实际大表改造中出现概率较小
- 比如对于以下示例数据。类似结账时的购物小票数据,消费者在一个时间地点一次购买多种商品。所以商品信息比较离散。而消费者 id,消费时间地址,付款信息存在大量重复。
- 原始数据按照日期类数据 (ws_sold_date_sk) 排序数据更有规律易于存储压缩,以及针对日期/地点/用户的谓词跳过存储块。按照商品 id 类数据(ws_item_sk) 做 cluster by 和 sort by 更利对商品进行关联分析的 Join 性能。所以需要权衡两者的影响, 在按照日期排序有大量存储跳过能力时,clustered by(id) sorted by (date) 或者不做 cluster 保持原有自然数据顺序可能是更合适的选择。最终应基于实际实验效果,决定排序键,以及是否忽略推荐。当数据膨胀的存储代价小于节省的计算代价时,整体也会更省。