Kylin如何实现基数统计-阿里云开发者社区

开发者社区> 数据库> 正文
登录阅读全文

Kylin如何实现基数统计

简介: # 概述 ## 什么是基数统计 基数是指集合中不重复元素的个数,在网站运营、流量分析时我们经常需要统计的UV(Unique Visitor),就是一种基数,在使用SQL查询时,我们可以通过COUNT和DISTINCT这两个函数的组合进行基数统计,下列SQL: ```sql select count(distinct(seller_id)) from kylin_sales;

概述

什么是基数统计

基数是指集合中不重复元素的个数,在网站运营、流量分析时我们经常需要统计的UV(Unique Visitor),就是一种基数,在使用SQL查询时,我们可以通过COUNT和DISTINCT这两个函数的组合进行基数统计,下列SQL:

select count(distinct(seller_id)) from kylin_sales;

就是统计销售事实表中卖家这一列的基数。

基数统计算法

基数统计最直观的算法就是遍历集合中的所有元素,使用Set等数据结构记录元素是否已出现,若未出现,则加一,采用这种算法,需要存储集合中所有元素,这在大数据场景下需要占用较大的存储空间,因此,对于如何压缩存储空间,提出了Bitmap(位图)以及更进一步优化的RoaringBitmap数据结构存储元素是否存在。使用Bitmap进行基数统计本质上还是一种精确统计方法,虽压缩了存储空间,但还是存储了元素是否存在的明细数据,存储空间随元素取值范围线性增长,并且在统计多个集合合并的基数时,仍需遍历明细数据,因此在大数据场景下,提出了HyperLogLog等概率统计方法,在不保存明细数据的前提下对基数进行近似估计,大大降低存储空间、提高计算速度的同时,也能保证一定的准确度,达到性能和精确的平衡。

基数统计算法在Kylin中的应用

Kylin是一个开源、分布式的OLAP存储和查询引擎,其核心思想是基于Cube理论,将明细数据中的各列分为维度和量度,在维度上进行预先聚合,通过空间换时间,加速查询。作为OLAP分析中的常见场景,Kylin也支持基数统计,包括基于RoaringBitmap的精确统计和基于HyperLogLog的近似统计。分析者在设计Cube中的量度时,可以引入COUNT_DISTINCT表达式,对某列进行基数统计预计算,保存计算结果,而在查询时,Kylin也提供了SQL化的查询方式,通过COUNT和DISTINCT这两个函数对于已预计算的某列进行基数查询时,会命中预计算结果,在此基础上,进行进一步合并,而无需遍历明细数据,从而大大提高查询速度。

文章目录

本文后续分以下几个部分进行详细介绍:

  1. 基于RoaringBitmap的精确统计算法;
  2. 基于HyperLogLog的近似统计算法;
  3. 如何在Kylin中进行基数统计;
  4. Kylin中基数统计的实现。

基于RoaringBitmap的精确统计算法

Bitmap

Bitmap(位图)即使用一个比特数组表示一个集合中存在的元素,数组中的每一位表示一个元素是否存在,若该位为0,表示相应元素不存在,若该位为1,表示相应元素存在,因此,Bitmap数组长度即集合中元素取值范围大小,数组中1的总数即集合基数。若集合中的元素类型为整数,且取值范围为[0, 7],则可以使用1个字节表示其Bitmap,例如,集合为{1, 4, 4, 7},则其Bitmap为10010010(则右至左依次表示1至8各元素),基数为3,另一个集合为{3, 3, 4},则其Bitmap为00011000,基数为2,两个集合并集的Bitmap可以通过或操作计算:

$$ 10010010 | 00011000 = 10011010 $$

因此并集的基数为4,两个集合交集的Bitmap可以通过与操作计算:

$$ 10010010 \& 00011000 = 00010000 $$

因此交集的技术为1。若集合中的元素类型为整数,且取值范围为[0, 100000000 - 1],则其Bitmap的大小为:

$$ 100000000/(8*1024*1024)=12M $$

因此,只需要约12M就可以存储取值范围为1亿的元素集合,降低存储空间,但其在空间使用上仍存在以下问题:

  1. 对于每个维度均需要用一个Bitmap存储元素集合(例如每天的访问用户);
  2. 稀疏和稠密集合的Bitmap大小相同(例如某天的访问用户很少,则其Bitmap中会填充大量0)。

RoaringBitmap

针对以上问题,2016年由S. Chambi、D. Lemire、O. Kaser等人在论文《Better bitmap performance with Roaring bitmaps》中提出了RoaringBitmap进行优化,RoaringBitmap的思路是:定义集合元素类型是32位无符号整数(即取值范围为[0, $2^{32}$ - 1]),对集合中的每一个元素,根据其高16位进行分桶(分桶在论文中称为container),因此,最多有65536($2^{16}$)个分桶,再将低16位存入相应的分桶中,而分桶的类型分为3种:

  1. Array Container,其数据结构为short类型数组,每个short类型保存一个元素的低16位,数组初始长度为4,最大长度为4096,也就是说最多可保存4096个元素,最大占用8KB(4096 * 2Byte),因此Array Container适合存储稀疏数据,另外Array Container中的数组有序,便于二分查找;
  2. Bitmap Container,即上一节中的普通位图,其数据结构为long类型数组,每个比特表示一个元素,数组固定长度为1024,也就是说可保存当前分桶所有$2^{16}$个元素,固定占用8KB(1024 * 8Byte),因此Bitmap Container适合存储稠密数据;
  3. Run Container,其数据结构为short类型数组,数组初始长度为4,Run Container采用行程长度编码(Run Length Encoding)算法对桶中的元素进行压缩,并将压缩后的数据保存在数组中。例如,桶中元素为2个连续整数序列{2, 3, 4, 5}和{15,16},则经过行程长度编码后压缩成两个二元组{2, 3}和{15, 1},分别表示从2开始连续递增3次、从15开始递增1次,因此,Run Container适合存储连续数据。

image-20210218165507258.png

图1 RoaringBitmap示例

图1是官方论文中的一个RoaringBitmap示例,其中,第一个分桶保存高位均为0x0000,低位是从0开始、按62连续递增的1000个整数,该分桶元素个数不足4096,因此分桶类型采用Array Container,占用2000Byte,第二个分桶保存高位均为0x0001,低位是[0, 99]、[101, 99]、[300, 99]这三个范围的连续整数序列,因此分桶类型采用Run Container,占用6Byte,第三个分桶保存[2 $2^{16}$, 3 $2^{16}$)范围内的所有奇数,即高位均为0x0002,低位是从0开始,按2连续递增的$2^{15}$个整数,该分桶元素个数超过4096,因此分桶类型采用Bitmap Container,占用8KB。

RoaringBitmap各分桶所采用的类型不是一成不变的,会根据分桶内存储数据的变化动态调整,当分桶不存在、需要初始创建时,若只插入一个元素,则默认采用Array Container存储,若插入多个元素,则分析采用Array Container和Run Container分别占用的空间,选择占用空间小的类型进行存储,当Array Container的元素个数超过4096时,则会自动转化成Bitmap Container,这样使得各分桶占用空间始终不会超过8KB。

综上,RoaringBitmap在分桶中的数据较稀疏时,采用变长数组存储数据(最大不超过8KB),在分桶中的数据较稠密时,采用定长数组存储数据(固定大小8KB),在分桶中的数据较连续时,采用行程长度编码压缩数据,通过针对不同的数据,采用不同的存储方式,最大限度地节省存储空间。

RoaringBitmap官方提供了多种语言的实现,可以很方便地在工程中引入、使用,其Java版本Maven坐标如下所示:

<dependencies>
  <dependency>
    <groupId>org.roaringbitmap</groupId>
    <artifactId>RoaringBitmap</artifactId>
    <version>[0.6,)</version>
  </dependency>
</dependencies>

基于HyperLogLog的近似统计算法

抛硬币游戏

首先通过抛硬币游戏阐述HyperLogLog算法的基本思想,但不作严格的数学推导。假设抛硬币得到正面和反面的概率分别为0.5,在一个回合中不停抛硬币,直至得到正面,那么抛一次就得到正面的概率为0.5(一次正面),抛两次得到正面的概率为$0.5^2$(一次正面,一次反面),抛k次得到正面的概率为$0.5^k$(k-1次反面,1次正面),因此,每个回合是一个伯努利过程。

定义X为n个回合中,每个回合抛硬币直至得到正面次数的最大值。首先,一个回合至少抛k+1次才得到正面的概率为$0.5^k$(前k次均为反面),那么,一个回合至多抛k次就能得到正面的概率为1-$0.5^k$,而所有n个回合均至多抛k次就能得到正面的概率为:

$$ P(X\leq k)=(1-0.5^k)^n $$

同理,所有n个回合均至多抛k-1次就能得到正面的概率为:

$$ P(X\leq k-1)=(1-0.5^{k-1})^n $$

那么,所有n个回合至少有一次抛硬币的次数大于或等于k次的概率为:

$$ P(X\geq k)=1-(1-0.5^{k-1})^n $$

从$P(X\leq k)$和$P(X\geq k)$中可以看出,当$n\gg 2^k$时,$P(X\leq k)$趋近为0,当$n\ll 2^k$时,$P(X\geq k)$也趋近为0。若k~max~为实际n个回合中抛硬币次数最大值,假设$n\gg 2^{k_{max}}$,那么k~max~为当前值的概率趋近为0(从$P(X\leq k)$分析k~max~应该比当前值更大),假设假设$n\ll 2^{k_{max}}$,那么$k_{max}$为当前值的概率也趋近为0(从$P(X\geq k)$分析k~max~应该比当前值更小),因此,可以用$2^{k_{max}}$作为n的粗略估计。

基于上述思想,对于集合中的元素,我们可以使用哈希函数将其映射为整数(即二进制序列),然后计算二进制序列中从左至右第一个1的位置序号,每次计算过程可以类比于一次抛硬币,第一个1的位置序号即抛硬币直至得到正面的次数,若集合基数为n,则有n个回合,记$k_{max}$为第一个1的位置序号最大值,那么可以用$2^{k_{max}}$作为n的粗略估计。

HyperLogLog

在上节算法思想的基础上,P. Flajolet在论文《LogLog counting of large cardinalities》和《HyperLogLog: The analysis of a near-optimal cardinality estimation algorithm》中先后提出了LogLog和HyperLogLog算法进行优化,算法的优化包括引入分桶,

1用一个理想的哈希函数,对数据流中的每一个元素求出一个哈希值

如果使用long来存储哈希值,则该哈希函数需将2e64个不同值均匀映射到0~2e64-1上

2对每个哈希值,取最后p位来决定桶序号

3 在剩下的(64-p)位中找到第一个1出现的位置,如果大于桶中现有值,则更新

4 所有元素处理完毕后,求所有桶中的值的调和平均数

调和平均数是将所有数值取倒数并求其算法平均数后,再将此算数平均数取倒数而得

5乘以m得到最后结果E

比如一个示例如下所示:

如何在Kylin中进行基数统计

数据模型

作为OLAP分析中的常见场景,Kylin也支持基数统计,包括基于RoaringBitmap的精确统计和基于HyperLogLog的近似统计。这里,先介绍一下Kylin的官方数据模型,并以此模型作为示例介绍如何在Kylin中进行基数统计。

image-20210220103531100.png

图2 数据模型,包含1张事实表和4张维度表

官方数据模型为雪花模型,如图2所示,包含1张事实表和4张维度表:

  1. KYLIN_SALES,销售事实表;
  2. KYLIN_ACCOUNT,用户维度表;
  3. KYLIN_CAL_DT,日期维度表;
  4. KYLIN_CATEGORY_GROUPING,类别维度表
  5. KYLIN_COUNTR,国家维度表。

如果从销售事实表中查询共有多少卖家(即卖家这一列的基数),则执行下列SQL:

select count(distinct(seller_id)) from kylin_sales;

Cube设计

Kylin的核心思想是在数据模型的基础上设计Cube,在维度上进行预先聚合,通过空间换时间,这里略过维度的设计,在量度上的设计上,可以选择“COUNT_DISTINCT”表达式,并选定参数类型为销售事实表中的seller_id这一列,而在具体的返回类型上,目前Kylin共支持5种基数统计方式,其中1种是基于RoaringBitmap的精确统计,另外4种均是基于HyperLogLog的近似统计,但分桶数不同,随分桶数的增加,精度不断提高,而占用空间也不断增加,表1列出了这5种方式。

image-20210219172158462.png

图3 COUNT_DISTINCT类型量度设置

表1 5种基数统计方式的占用空间和误差概率

返回类型每行占用空间误差概率
Bitmap随基数变化,一般是HLLC的若干倍精确
HLLC 101KB<9.75%
HLLC 124KB<4.88%
HLLC 1416KB<2.44%
HLLC 1532KB<1.72%
HLLC 1664KB<1.22%

另外,不管采用哪种方式,基数统计与其他类型量度相比,均需要占用较多的存储空间,而在实际存储时,量度是作为HBase KV存储模型中的Value来存储的,HBase的Value可以划分为多个列族,各列族数据保存在独立的HFile文件中,因此,Kylin官方建议将基数统计量度设置在单独的列族中,其数据与其他类型量度隔离,这样可以减少查询其他量度时扫描数据的大小,提高查询速度。

image-20210220103810480.png

图4 列族设置

另外,采用Bitmap方式时,若选择的字段类型非整型,则需要另外构建字典,将原始字段值转化为整型值,基于整型值构建Bitmap。目前Kylin提供两种字典格式,即Global Dictionary和Segment Dictinonary,顾名思义,Global Dictionary是全局字典,被所有Segment共享,因此支持跨Segment做“COUNT_DISTINCT”操作,但单个字典占用空间大,而Segment Dictionary基于某个Segment的值构建,因此不支持跨Segment操作,但单个字典占用空间小,如果Cube没有划分Segment或是查询时能严格保证按照Segment分区字段进行group by,则最好使用Segment Dictionary,避免单个字典过大的问题。

image-20210220104817706.png

图5 字典设置

查询

在设计Cube后,可以基于原始数据进行Cube构建用于后续查询。Kylin提供了SQL化的查询方式,并支持多种查询入口,包括WEB控制台、REST API、JDBC驱动、ODBC驱动等,表是对于示例SQL,分别直接从Hive查询和查询命中Cube的效果,直接从Hive查询,耗时较高,但结果精准,查询命中采用Bitmap方式进行基数统计的Cube,耗时较少,且结果精准,但Cube占用空间较大,查询命中采用HLLC 10方式进行基数统计的Cube,耗时最少,Cube占用空间较小,但结果近似。

表2 不同查询方式的耗时和结果

查询方式Cube占用空间查询耗时查询结果
直接从Hive查询15.296秒1000
查询命中采用Bitmap方式进行基数统计的的Cube115.48MB0.075秒1000
查询命中采用HLLC 10方式进行基数统计的Cube93.39M0.056秒955

Kylin中基数统计的实现

整体构架

Kylin的系统架构如图6所示,它依赖大数据基础设施HBase、Spark、Hadoop等实现分布式的存储和计算,并基于这些基础设施,设计了构建引擎和查询引擎分别实现数据的构建和查询。在构建引擎部分,Kylin支持使用MapReduce、Spark或Flink对来自Hive或Kafka的原始明细数据构建Cube,图7是一个Cube示例,从下到上由多个Cuboid构成,最下层的Cuboid按所有维度对原始明细数据进行聚合,往上各Cuboid在下层Cuboid的基础上,选取一个维度进行上卷,直至最上层的Cuboid不包含任何维度对数据进行聚合,因此,理论上n个维度的Cube包含2^n^个Cuboid,实际上Kylin会根据一定的规则进行剪枝,在减少数据规模的同时保证查询性能。另外,在物理存储Cube时,由于Cube可以按照时间等维度增量构建,因此Cube可以按分区字段划分为多个Segment,每个Segment对应于一张HBase表,Cube中每一条记录对应到HBase表的一行,维度和量度分别对应行的Key和Value。在查询引擎部分,Kylin使用Calcite实现SQL的解析、优化和执行,将SQL查询转化为对存储在HBase中的Cube数据的扫描,如果SQL能直接命中已预聚合数据,则直接返回,否则在已聚合数据基础上进行实时聚合,但相对于对原始明细数据进行临时聚合,数据规模已大大减少,因此能实现亚秒级的在线OLAP分析。

f58568a06aab1f19c2b7f05e444ad47e.png

图6 Kylin整体架构

b5660e9169bdde621cd08efdc0198490.png

图7 Cube示例

元数据

在设计Cube后,Kylin会将Cube元数据保存至HBase kylin_metadata表中,每个Cube元数据包括维度、量度、rowkey、字典、聚合组等信息,以下是采用Bitmap方式进行基数统计的Cube的部分元数据信息:

{
    "name": "kylin_sales_cube_bitmap",
    "measures": [{
        "name": "SELLER_CNT_BITMAP",
        "function": {
            "expression": "COUNT_DISTINCT",
            "parameter": {
                "type": "column",
                "value": "KYLIN_SALES.SELLER_ID"
            },
            "returntype": "bitmap"
        }
    }],
    "dictionaries": [{
        "column": "KYLIN_SALES.SELLER_ID",
        "builder": "org.apache.kylin.dict.GlobalDictionaryBuilder",
        "cube": null,
        "model": null
    }],
    "hbase_mapping": {
        "column_family": [{
            "name": "F2",
            "columns": [{
                "qualifier": "M",
                "measure_refs": ["SELLER_CNT_BITMAP", "TOP_SELLER"]
            }]
        }]
    }
}

其中,“measures”部分是量度信息,“dictionaries”部分是字典信息,“hbase_mapping”部分是列族信息,后续Kylin在构建和查询Cube时,都会依赖这份元数据。元数据在加载进内存时,会被反序列化为CubeDesc对象,该类及其部分关联、依赖类的类图如图8所示,其中,,“dictionaries”部分反序列化为DictionaryDesc对象列表,“hbase_mapping”部分反序列化为HBaseMappingDesc对象,“measures”部分反序列化为MeasureDesc对象列表,“measures”中的“function”部分反序列化为FunctionDesc对象。

CubeDesc.png

图8 元数据类图

FunctionDesc对象关联了核心接口MeasureType,这个接口针对不同类型的量度有相应的实现,FunctionDesc通过工厂模式根据元数据中的expression和returntype获取相应的MeasureType实现,代码如下:

measureType = MeasureTypeFactory.create(getExpression(), getReturnDataType());

对于expression为“COUNT_DISTINCT”、returntype为“bitmap”,相应的MeasureType实现为BitmapMeasureType,对于expression为“COUNT_DISTINCT”、returntype为“hllc(x)”,相应的MeasureType实现为HLLCMeasureType。MeasureType定义了两个核心方法,代码如下所示:

abstract public MeasureIngester<T> newIngester();

abstract public MeasureAggregator<T> newAggregator();

其中,第一个方法用于创建MeasureIngester对象,该对象作用是从原始明细数据生成聚合对象,比如BitmapMeasureType中的MeasureIngester实现会从原始明细数据生成Bitmap,第二个方法用于创建MeasureAggregator对象,该对象作用是在聚合对象的基础上进行聚合。进一步分析源码,BitmapMeasureType和HLLCMeasureType的聚合对象分别是BitmapCounter和HLLCCounter,而BitmapCounter是委托给RoaringBitmap的官方Java实现生成Bitmap并进行多个Bitmap的合并,HLLCCounter则是由Kylin自己实现了HyperLogLog算法。后续在构建和查询阶段,Kylin均是通过量度类型获取相应的MeasureType实现,再获取相应的MeasureIngester和MeasureAggregator实现完成聚合对象的创建和聚合等操作,体现高内聚、低耦合的设计原则。

构建实现

Kylin的Cube构建流程主要包括以下5个步骤:

  1. 根据数据模型将Hive中的事实表和维度表关联,创建中间宽表;
  2. 对中间宽表进行统计,并对部分字段创建字典,便于后续加速查询;
  3. 根据Cube定义,基于中间宽表和字典,构建Cube;
  4. 创建HTable,并将构建好的Cube数据转化为HFile,加载到HBase中;
  5. 更新Cube状态,使构建后的数据生效。

在另一篇文章中已详细分析其中的实现细节。Kylin支持使用MapReduce、Spark或Flink进行数据构建,以Spark构建方式为例,如图所示,使用分层构建算法,首先执行mapToPair操作,将原始明细数据编码为KV类型,key是维度值经编码后组成的rowkey,value是量度经过转化、编码后的二进制数据,这步操作完成后,将获得一个编码后的中间RDD,随后对中间RDD进行reduceByKey操作,聚合得到RDD-1,也就是Base Cuboid。因为Base Cuboid向上有n个子Cuboid,因此在RDD-1的基础上执行flatMap操作映射到各子Cuboid,再执行reduceByKey操作在各维度上进行上卷得到子Cuboid,flatMap和reduceByKey这两个操作会逐层循环执行直至把所有层的Cuboid计算完成。每步生成的RDD会保存至HDFS中,后续被转化为HFile并加载到HBase。定义Spark构建任务的代码在SparkCubingByLayer中,这里只再说明和“COUNT_DISTINCT”操作相关的两步处理,一是在读取原始明细数据生成编码后的中间RDD时,其量度编码实际是调用了各量度类型对应的MeasureIngester的valueOf方法,例如BitmapMeasureType中的MeasureIngester实现在valueOf方法中会返回BitmapCounter对象(即Bitmap),代码如下:

public Object[] buildValueObjects(String[] flatRow) {
    Object[] measures = new Object[cubeDesc.getMeasures().size()];

    for (int i = 0; i < measures.length; i++) {
        String[] colValues = kvBuilder.buildValueOf(i, flatRow);
        MeasureDesc measure = measureDescList.get(i);
        measures[i] = aggrIngesters[i].valueOf(colValues, measure, dictionaryMap);
    }
    return measures;
}

二是在通过reduceByKey操作进行聚合时,实际也是调用了各量度类型对应的MeasureAggregator的aggregate方法,例如BitmapAggregator的aggregate方法会通过BitmapCounter对象的或操作进行聚合,代码如下:

@Override
public Object[] call(Object[] input1, Object[] input2) throws Exception {
    if (initialized == false) {
        synchronized (SparkCubingByLayer.class) {
             if (initialized == false) {
                init();
                initialized = true;
            }
        }
    }
    Object[] result = new Object[measureNum];
    aggregators.aggregate(input1, input2, result);
    return result;
}

查询实现

Kylin的SQL解析和执行是基于Calcite实现的,其流程包括以下5个步骤:

  1. 将SQL解析成抽象语法树;
  2. 对抽象语法树进行校验;
  3. 将抽象语法树解析成关系代数表达式;
  4. 对关系代数表达式进行优化,在保持语义不变的前提下,转化为较优的表达式;
  5. 将优化后的关系代数表达式转化为物理执行计划并执行,返回最终的结果。

在另一篇文章中已详细分析其中的实现细节,这里只列出示例SQL经过Calcite解析后得到的执行计划:

  OLAPAggregateRel(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], ctx=[]): rowcount = 1.0, cumulative cost = {10.05625 rows, 5.050000000000001 cpu, 0.0 io}, id = 38
    OLAPProjectRel(SELLER_ID=[$6], ctx=[]): rowcount = 100.0, cumulative cost = {10.0 rows, 5.050000000000001 cpu, 0.0 io}, id = 36
      OLAPTableScan(table=[[DEFAULT, KYLIN_SALES]], ctx=[], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]]): rowcount = 100.0, cumulative cost = {5.0 rows, 5.050000000000001 cpu, 0.0 io}, id = 0

执行时,Kylin会根据解析出的表名、维度、量度等信息,通过一定的规则,路由到相应的Cube,并进一步定位从哪些Cuboid中查询数据(示例SQL中没有维度,因此可以从尽量上层的Cuboid中查询),同时会判断是否需要作存储端的实时聚合,随后,Kylin会向Cube各Segment所对应的HBase表发起数据扫描请求,并基于协处理器在存储端作实时聚合,以减少数据在网络中的传输。这里,只再说明和“COUNT_DISTINCT”操作相关的两处处理,一是在存储端,GTAggregateScanner会通过装饰者模式对扫描出的原始数据作实时聚合,而聚合实际仍是调用了和各量度类型对应的MeasureAggregator实现所提供的aggregate方法,代码如下所示:

@Override
public Iterator<GTRecord> iterator() {
    long count = 0;

    for (GTRecord r : inputScanner) {

        //check limit
        boolean ret = aggrCache.aggregate(r);

        if (!ret) {
            logger.info("abort reading inputScanner because storage push down limit is hit");
            break;//limit is hit
        }

        count++;
    }
    logger.info("GTAggregateScanner input rows: " + count);
    return aggrCache.iterator();
}

二是在查询引擎侧,Kylin在OLAPAggregateRel中重建了AggregateCall算子,代码如下所示:

// 从各量度类型中获取其AggrFunction类,例如BitmapMeasureType的AggrFunction类包括BitmapDistinctCountAggFunc、BitmapIntersectDistinctCountAggFunc和BitmapIntersectValueAggFunc,分别用于COUNT_DISTINCT、INTERSECT_COUNT和INTERSECT_VALUE场景
Map<String, Class<?>> udafMap = func.getMeasureType().getRewriteCalciteAggrFunctions();
if (func.isCount()) {
    newAgg = SqlStdOperatorTable.SUM0;
} else if (udafMap != null && udafMap.containsKey(callName)) {
    // 对于非COUNT场景,创建自定义SqlAggFunction类实例,其中主要是通过反射获取AggrFunction类中init、add和result方法,用于Calcite运行时,调用相应方法进行再次聚合和获取结果
    newAgg = createCustomAggFunction(callName, fieldType, udafMap.get(callName));
}

// rebuild parameters
List<Integer> newArgList = Lists.newArrayList(aggCall.getArgList());
if (udafMap != null && udafMap.containsKey(callName)) {
    newArgList = truncArgList(newArgList, udafMap.get(callName));
}
if (func.needRewriteField()) {
    RelDataTypeField field = getInput().getRowType().getField(func.getRewriteFieldName(), true, false);
    if (newArgList.isEmpty()) {
        newArgList.add(field.getIndex());
    } else {
        // TODO: only the first column got overwritten
        newArgList.set(0, field.getIndex());
    }
}

// rebuild aggregate call
AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, callName);

从而在Calcite运行时进行再次聚合并得到最终的结果(例如从多个Segment获取的值需进一步聚合),而该操作实际是通过反射调用了各量度类型中所指定的AggrFunction类的相应方法,例如BitmapMeasureType在COUNT_DISTINCT场景下的AggrFunction类是BitmapDistinctCountAggFunc,其代码如下所示:

public class BitmapDistinctCountAggFunc {

    public static BitmapAggregator init() {
        return new BitmapAggregator();
    }

    public static BitmapAggregator add(BitmapAggregator agg, Object value) {
        agg.aggregate((BitmapCounter) value);
        return agg;
    }

    public static BitmapAggregator merge(BitmapAggregator agg, Object value) {
        BitmapAggregator agg2 = (BitmapAggregator) value;
        if (agg2.getState() == null) {
            return agg;
        }
        return add(agg, agg2.getState());
    }

    public static long result(BitmapAggregator agg) {
        BitmapCounter finalState = agg.getState();
        return finalState == null ? 0 : finalState.getCount();
    }
}

实际也是调用了BitmapAggregator的相关方法。

结语

本文主要介绍了基数统计的相关知识以及Kylin如何支持基数统计。Kylin同时支持精确统计和近似统计,可以针对实际应用场景,权衡查询性能、存储空间和计算精度,选择合适的方式。

参考文献

  1. 《Apache Kylin权威指南(第2版)》;
  2. 《大数据去重算法分析及其在Kylin中的应用》。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

其他文章