如何在Spark中实现Count Distinct重聚合

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 背景 Count Distinct是SQL查询中经常使用的聚合统计方式,用于计算非重复结果的数目。由于需要去除重复结果,Count Distinct的计算通常非常耗时。为了支持更快速的非重复结果统计Spark还基于Hyperloglog实现了Approximate Count Distinct,用于统计非重复结果的近似值,支持。

背景

Count Distinct是SQL查询中经常使用的聚合统计方式,用于计算非重复结果的数目。由于需要去除重复结果,Count Distinct的计算通常非常耗时。

以如下查询为例,Count Distinct的实现方式主要有两种:

SELECT region, COUNT(DISTINCT userId) FROM orders GROUP BY region
  1. 对订单表的数据按照region进行shuffle分区,在每个分区中使用一个类似HashTable的数据结构,存储所有的非重复userId的值,最后统计所有key的数量。
  2. 对表t的数据按照(region, userId)进行shuffle分区,第一步的结果即为非重复的(region, userId)对,对于第一步的结果再按照region分区,统计每个分区中的Row数量。

第一种方式只需要一次shuffle,但是需要在内存中维护一个数据结构,占用大量内存,甚至导致OOM。第二种方式多了一次shuffle,但是更加稳定可靠。Spark采用第二种方式实现Count Distinct。在多维分析或报表等场景中,用户可能需要秒级的交互响应,在大数据量的情况下,很难通过单纯地扩充资源满足要求。本文主要介绍在Spark中如何基于重聚合实现交互式响应的COUNT DISTINCT支持。

预聚合和重聚合

预计算是数据仓库领域常见的一种提升查询效率的方式,通过将全部或部分计算结果提前计算好并存储下来,对于后续的相关的查询可以直接重用之前的预计算结果,从而加速查询速度。在多维分析或报表等查询模式相对比较固定的场景中,我们可以通过预聚合,将需要处理的数据量下降成百上千倍。此外对于预计算来说,由于用户的查询维度,过滤条件,统计方式非常多,考虑到预计算的计算和存储代价,不太可能把每种可能的查询条件都进行预计算,通常的方式是按照较细粒度进行分组聚合,然后对于后续更粗粒度的分组聚合查询,可以使用预计算的结果进行重聚合,如下所示:

// 原始SQL
SELECT SUM(userId) AS sum_user FROM orders;

// 预计算SQL
CREATE TABLE pre_sum_orders AS SELECT region, sum(userId) as pre_sum_user FROM orders GROUP BY region;

// 重聚合SQL
SELECT sum(pre_sum_user) AS sum_user FROM pre_sum_orders;

由于pre_sum_orders中的数据已经经过了region的分组聚合,数据量相对于原始表可能会从100万条下降到100条,重聚合可以非常快的完成。

Count Distinct的重聚合

SUM/COUNT/MIN/MAX等聚合函数都满足结合律,可以非常容易的支持重聚合,例如Count可以在通过SUM再聚合,最小值可以通过MIN再聚合,最大值也可以通过MAX再聚合。但是对于AVERAGE或COUNT DISTINCT等许多聚合函数却并非如此。以AVG为例,每个region平均订单金额和平均值并不等于全国的订单平均值,对于COUNT DISTINCT也是如此。对于这类函数,我们也需要找到一种支持重聚合的中间结果。对于AVG,这个中间结果可以是(SUM, COUNT),可以通过如下方式重聚合,

// 原始SQL
SELECT avg(col2) FROM t;

// 预计算SQL
CREATE TABLE pre_avg_t AS SELECT col1, sum(col2) as pre_sum_col2, count(col2)as pre_count_col2 FROM t GROUP BY col1;

// 重聚合SQL
SELECT sum(pre_sum_col2)/sum(pre_count_col2) FROM pre_avg_t

对于COUNT DISTINCT来说,由于需要计算非重复结果的数目,Bitset存储与计算效率很高,而且可以用来去重,是一个比较合适存储COUNT DISTINCT预计算结果的数据结构。

Bitmap

Bitmap是一个常见的数据结构,存储元素到bit的映射关系,逻辑上可以理解为一个bit数组,每个bit有0和1两个状态,将元素映射到数组下标,在插入新元素时,修改该元素对应的数组下标的bit位置为1,重复元素出现时也不会有影响,由于每个元素仅需1个bit位表示,可以大大减少存储空间。此外Bitmap还可以通过位运算支持高效的计算,例如使用或运算合并两个Bitmap。实际实践中,为了操作效率,通常会用long/int array作为Bitmap的存储,也有很多高效的开源实现可以直接使用,例如Google的EWAHCompressedBitmap,RoaringBitmap等等,还会对bitmap进行压缩,对较为稀疏的bitmap节省更多的存储空间。COUNT DISTINCT会使用的Bitmap接口主要包括:

  1. add(element: Int): 向Bitmap中插入新的值。
  2. or(other: Bitmap): 将两个Bitmap按bit位进行或操作。
  3. getCardinality(): 获取Bitmap中1出现的次数。

使用Bitmap进行重聚合

我们可以基于Bitmap实现对COUNT DISTINCT的重聚合,这主要包含两部分工作:1. 在预聚合时将COUNT DISTINCT字段值写入Bitmap,并将Bitmap序列化成Binary类型字段保存。2. 在重聚合时,读取并反序列化Bitmap字段,在合并后统计最终count数量。这两步工作都需要新增自定义Spark UDF实现:

  1. bit_mapping: 接受Integer类型字段作为参数,内部维护Bitmap数据结构,将输入数据插入Bitmap中,并把Bitmap序列化二进制数据作为输出结果。
  2. re_count_distinct: 接受二进制数据作为参数,反序列化位Bitmap,merge同一分区的多个Bitmap,把Bitmap的cardinality作为结果输出。

下面的SQL展示了如何使用这两个UDF:

// 原始SQL
SELECT region, COUNT(DISTINCT userId) as dist_count_user FROM orders GROUP BY region;

// 预计算SQL
CREATE TABLE pre_count_orders AS SELECT region, bit_mapping(userId) as userId_bitmap FROM orders GROUP BY region;

// 改写后SQL
SELECT region, re_count_distinct(userId_bitmap) as dist_count_user FROM pre_count_orders GROUP BY region;

原始SQL需要全量扫描orders表的数据,并经过partition key为(region, userId)和region的两轮shuffle。而改写后的SQL,不需要任何shuffle操作,而且pre_count_orders由于经过了region的分组聚合,表的行数相对于orders表大大降低,整体执行速度可能有十倍甚至百倍以上的提升。

对于更粗粒度的COUNT DISTINCT,也可以通过重聚合支持:

// 原始SQL
SELECT COUNT(DISTINCT userId) as dist_count_user FROM orders;

// 改写后SQL
SELECT re_count_distinct(userId_bitmap) as dist_count_user FROM pre_count_orders;

Global Dictionary

在上面bit_mapping的实现中,由于Bitmap的输入要求是从0开始的自然数类型,以对应bit数组的下标,但是我们实际统计的字段,例如上面的userId的具体值很可能不是一个简单的自然数,所以需要一个映射函数,能够将统计字段的取值范围映射成自然数。此外,由于Spark SQL分布式执行的特性,这个映射必须是一致的,即同一个字段值在不同的spark task必须映射成同一个自然数,否则在merge Bitmap的时候,同一个字段对应的值就会在合并后的Bitmap中出现两次,违反COUNT DISTINCT的语义要求。一个维护了统计字段到自然数的全局字典可以帮助我们满足这个要求

Spark在1.5版本后加入了对开窗函数的支持,其中的row_number()函数的作用是根据表中字段分组排序,然后给排序好的记录添加一个从1开始的序号,我们可以使用这个函数实现一个全局字典的功能。例如,对于任意类型的userId,我们可以通过如下的方式映射为从1开始的自然数。

SELECT userId, row_number() over(order by userId) as bitmap_index FROM orders GROUP BY userId;

有了userId到bitmap_index的映射,我们就可以通过Join把全局字典和orders表关联起来,使用bitmap_index字段去构建bitmap了,COUNT DISTINCT预聚合的SQL示例如下:

WITH globalDict AS (SELECT userId, row_number() over(order by userId) as bitmap_index FROM orders GROUP BY userId)
CREATE TABLE pre_count_orders 
AS SELECT region, bit_mapping(bitmap_index) as userId_bitmap
FROM orders, globalDict
WHERE orders.userId = globalDict.userId
GROUP BY region;

最后,构建全局字典是一个较为耗时的工作,当有新的订单加入orders表后,如果我们构建新的预聚合结果时能够重用已有的全局字典,只是增加新的userId到字典中,显然可以提高执行预聚合的效率。

其他

Approximate Count Distinct可以用更小的代价统计count distinct的估计值,在不需要精确值的场景中被大量使用,Approximate Count Distinct主要基于Hyperloglog数据结构实现,HLL sketch作为hll的中间结果,也是可以重聚合的。Swoop 开源了高性能的 HLL native 函数工具包,作为 spark-alchemy 项目的一部分,提供了预聚合和重聚合的函数,用户可以直接使用,实现Approximate Count Distinct的重聚合。详细信息可以参考:Spark-Alchemy:HyperLogLog的使用介绍

在EMR Spark中通过Relational Cache支持了Count Distinct的预聚合和重聚合,提供了pre_count_distinct和re_count_distinct函数的实现,还提供了自定义的优化规则,将pre_count_distinct函数自动转化为基于Global Dictionary和bit_mapping的执行计划,不需要用户手工拼写复杂的预聚合SQL逻辑。

总结

本文主要介绍了在Spark中实现Count Distinct重聚合的基本思路和实现方式,通过基于Bitmap的重聚合,可以实现几个数量级的性能提升,满足交互式分析场景统计Count Distinct的需求。

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
6月前
|
存储 分布式计算 Apache
Spark编程范例:Word Count示例解析
Spark编程范例:Word Count示例解析
|
SQL 分布式计算 Spark
SPARK Expand问题的解决(由count distinct、group sets、cube、rollup引起的)
SPARK Expand问题的解决(由count distinct、group sets、cube、rollup引起的)
728 0
SPARK Expand问题的解决(由count distinct、group sets、cube、rollup引起的)
|
SQL 分布式计算 Spark
如何在Spark中实现Count Distinct重聚合
Count Distinct是SQL查询中经常使用的聚合统计方式,用于计算非重复结果的数目。由于需要去除重复结果,Count Distinct的计算通常非常耗时。本文主要介绍在Spark中如何基于重聚合实现交互式响应的COUNT DISTINCT支持。
|
SQL 分布式计算 自然语言处理
Spark 系列教程(1)Word Count
Spark 系列教程(1)Word Count
462 0
Spark 系列教程(1)Word Count
|
分布式计算 API Spark
Spark Streaming和Flink的Word Count对比
准备: nccat for windows/linux 都可以 通过 TCP 套接字连接,从流数据中创建了一个 Spark DStream/ Flink DataSream, 然后进行处理, 时间窗口大小为10s 因为 示例需要, 所以 需要下载一个netcat, 来构造流的输入。
2423 0
|
分布式计算 Spark
Spark sc.textFile(...).map(...).count() 执行完整流程
本文介绍下Spark 到底是如何运行sc.TextFile(...).map(....).count() 这种代码的,从driver端到executor端。
4195 0
|
19天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
54 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
83 0