Apache Calcite 框架 50 倍性能优化实践

简介: 某天临时被当成壮丁拉去参加一个非常牛逼的应用监控平台(后续会开源),然后大佬就给我派了一个任务,要将项目中的查询性能优化 50 倍以上,大佬对我如此地寄予厚望,我怎么能让大佬失望呢(虽然我内心瑟瑟发抖)?于是我就开始了这段性能优化之旅。

某天临时被当成壮丁拉去参加一个非常牛逼的应用监控平台(后续会开源),然后大佬就给我派了一个任务,要将项目中的查询性能优化 50 倍以上,大佬对我如此地寄予厚望,我怎么能让大佬失望呢(虽然我内心瑟瑟发抖)?于是我就开始了这段性能优化之旅。


初步认识 Calcite



项目使用 Calcite 框架作为查询引擎,之前一直没停过这玩意是啥,而且网上资料特别少,又是体现我学习能力的时候了,在着手排查性能问题前,我花了非常多时间在了解 Calcite 实现原理上面。


1、Calcite 简介

Apache Calcite是一款开源的动态数据管理框架,它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力,但不包括数据存储、处理数据的算法和存储元数据的存储库。

Calcite 之前的名称叫做optiq,optiq 起初在 Hive 项目中,为 Hive 提供基于成本模型的优化,即CBO(Cost Based Optimizatio)。2014 年 5 月 optiq 独立出来,成为 Apache 社区的孵化项目,2014 年 9 月正式更名为 Calcite。

Calcite 的目标是“one size fits all(一种方案适应所有需求场景)”,希望能为不同计算平台和数据源提供统一的查询引擎。


2、Calcite 执行流程

640.png


1)解析 SQL,目的是为了将 SQL 转换成 AST 抽象语法数,Calcite 有一个专门的对象 SqlNode 表示;

2)语法检查,用数据库的元数据信息进行语法验证;

3)逻辑优化,根据前面生成的逻辑计划按照相应的规则(Rule)进行优化;

4)SQL 执行,按照执行计划执行。


3、Calcite 相关对象


  • RelNode:

关系表达式, 主要有 TableScan, Project, Sort, Join 等。如果 SQL 为查询的话,所有关系达式都可以在 SqlSelect中找到, 如 where 和 having 对应的 Filter, selectList 对应 Project, orderBy、offset、fetch 对应着 Sort, From 对应着 TableScan/Join 等等, 示便 Sql 最后会生成如下 RelNode 树。


Debug 源码得到的 RelNode 对象长这样:

640.png


  • RexNode:

行表达式, 如 RexLiteral(常量), RexCall(函数), RexInputRef (输入引用) 等,举个例子:

SELECT LOCATION as LOCATION,MERGE2(VALUE2) as VALUE2 
FROM transaction 
WHERE REPORTTIME >=1594887720000 AND REPORTTIME <=1594891320000 AND APPID = 'test-api'  AND GROUP2 IN ('DubboService','URL') AND METRICKEY IN ('$$TOTAL') GROUP BY LOCATION


RexCall

<=($1, 1595496539000)


RexInputRef

$1


RexLiteral

1595496539000:BIGINT


下面根据官方资料的描述,总结 Calcite 的三种查询模式:


1)ScannableTable

这种方式基本不会用,原因是查询数据库的时候没有任何条件限制,默认会先把全部数据拉到内存,然后再根据filter条件在内存中过滤。

使用方式:实现Enumerable scan(DataContext root);,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据。

2)FilterableTable

初级用法,我们能拿到filter条件,即能再查询底层DB时进行一部分的数据过滤,一般开始介入calcite可以用这种方式(translatable方式学习成本较高)。

使用方式:实现Enumerable scan(DataContext root, List filters )

如果当前类型的“表”能够支持我们自己写代码优化这个过滤器,那么执行完自定义优化器,可以把该过滤条件从集合中移除,否则,就让calcite来过滤,简言之就是,如果我们不处理List filters ,Calcite也会根据自己的规则在内存中过滤,无非就是对于查询引擎来说查的数据多了,但如果我们可以写查询引擎支持的过滤器(比如写一些hbase、es的filter),这样在查的时候引擎本身就能先过滤掉多余数据,更加优化。提示,即使走了我们的查询过滤条件,可以再让calcite帮我们过滤一次,比较灵活。

3)TranslatableTable

高阶用法,有些查询用上面的方式都支持不了或支持的不好,比如join、聚合、或对于select的字段筛选等,需要用这种方式来支持,好处是可以支持更全的功能,代价是所有的解析都要自己写,“承上启下”,上面解析sql的各个部件,下面要根据不同的DB(esmysqldrudi..)来写不同的语法查询。

当使用ScannableTable的时候,我们只需要实现函数Enumerable scan(DataContext root);,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据(也就意味着每次的查询都是扫描这个表的数据,我们干涉不了任何执行过程);当使用FilterableTable的时候,我们需要实现函数Enumerable scan(DataContext root, List filters );参数中多了filters数组,这个数据包含了针对这个表的过滤条件,这样我们根据过滤条件只返回过滤之后的行,减少上层进行其它运算的数据集;当使用TranslatableTable的时候,我们需要实现RelNode toRel( RelOptTable.ToRelContext context, RelOptTable relOptTable);,该函数可以让我们根据上下文自己定义表扫描的物理执行计划,至于为什么不在返回一个Enumerable对象了,因为上面两种其实使用的是默认的执行计划,转换成EnumerableTableAccessRel算子,通过TranslatableTable我们可以实现自定义的算子,以及执行一些其他的rule,Kylin就是使用这个类型的Table实现查询。


由于我对 Calcite 还没有一个更加深入的了解(但是并不阻碍我排查问题,而且 Calcite 这玩意对我来说太复杂了),因此 Calcite 更加复杂的概念我在这里就不继续啰嗦了,比如关系代数的基本知识、查询优化器等等,排查问题并不需要了解那么深入,而且项目中只是使用了 Calcite 一小部分功能。


使用 Calcite 实现一个简单的数据库



需要做如下几步:


  1. 编写 model.json
  2. 自定义 SchemaFactory
  3. 自定义 Schema(像一个“没有存储层的databse”一样,Calcite不会去了解任何文件格式)
  4. 自定义Table
  5. 自定义 Enumerator

demo url: https://github.com/objcoding/calcite-demo


耗时排查



我在项目中使用了 FilterableTable 模式,Cacite 在解析 Sql 时耗时非常大,然后通过调试,我发现每个请求都占据了两个位置:

org.apache.calcite.adapter.enumerable.EnumerableInterpretable#getBindable

640.png


Cacite 在这个地方通过设置缓存大小来优化缓存设置。

org.apache.calcite.interpreter.JaninoRexCompiler#baz

640.png


但是不会缓存该位置,并且每次都会使用新的表达式字符串通过反射创建它。

我使用 JProfile 工具对线程耗时的地方进行定位:

640.png


Calcite 会在这个地方会调用反射根据不同的 Sql 动态生成不同的表达式,Debug 获取的表达式如下:

640.png

Calcite 为什么会有这种机制呢?我们先从 Bindable 对象讲起:


在 EnumerableRel(RelNode,我们可以通过 TranslatableTable 自定义 FilterRel、JoinRel、AggregateRel)的每个算子的 implement 方法中会将一些算子(Group、join、sort、function)要实现的算法写成 Linq4j 的表达式,然后通过这些 Linq4j 表达式生成 Java Class。通过 JavaRowFormat 格式化)


calcite 会将 sql 生成的 linq4j 表达式生成可执行的 Java 代码( Bindable 类):org.apache.calcite.adapter.enumerable.EnumerableInterpretable#getBindable


Calcite 会调用 Janino 编译器动态编译这个 java 类,并且实例化这个类的一个对象,然后将其封装到 CalciteSignature 对象中。


调用 executorQuery 查询方法并创建 CalciteResultSet 的时候会调用 Bindable 对象的 bind 方法,这个方法返回一个Eumerable对象:


org.apache.calcite.avatica.AvaticaResultSet#execute

640.png


org.apache.calcite.jdbc.CalcitePrepare.CalciteSignature#enumerable

640.png

将 Enumerable 赋值给 CalciteResultSet 的 cursor 成员变量。


在执行真正的数据库查询时,获得实际的 CalciteResultSet,最终会调用:

org.apache.calcite.avatica.AvaticaResultSet#next

640.png


以下是根据 SQL 动态生成的 linq4j 表达式:

public static class Record2_0 implements java.io.Serializable {
  public Object f0;
  public boolean f1;
  public Record2_0() {}
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (!(o instanceof Record2_0)) {
      return false;
    }
    return java.util.Objects.equals(this.f0, ((Record2_0) o).f0) && this.f1 == ((Record2_0) o).f1;
  }
  public int hashCode() {
    int h = 0;
    h = org.apache.calcite.runtime.Utilities.hash(h, this.f0);
    h = org.apache.calcite.runtime.Utilities.hash(h, this.f1);
    return h;
  }
  public int compareTo(Record2_0 that) {
    int c;
    c = org.apache.calcite.runtime.Utilities.compare(this.f1, that.f1);
    if (c != 0) {
      return c;
    }
    return 0;
  }
  public String toString() {
    return "{f0=" + this.f0 + ", f1=" + this.f1 + "}";
  }
}
public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {
  final org.apache.calcite.rel.RelNode v1stashed = (org.apache.calcite.rel.RelNode) root.get("v1stashed");
  final org.apache.calcite.interpreter.Interpreter interpreter = new org.apache.calcite.interpreter.Interpreter(
    root,
    v1stashed);
  java.util.List accumulatorAdders = new java.util.LinkedList();
  accumulatorAdders.add(new org.apache.calcite.linq4j.function.Function2() {
    public Record2_0 apply(Record2_0 acc, Object[] in) {
      final Object inp9_ = in[9];
      if (inp9_ != null) {
        acc.f1 = true;
        acc.f0 = com.zto.zcat.store.api.query.Merge2Fun.add(acc.f0, inp9_);
      }
      return acc;
    }
    public Record2_0 apply(Object acc, Object in) {
      return apply(
        (Record2_0) acc,
        (Object[]) in);
    }
  }
  );
  org.apache.calcite.adapter.enumerable.AggregateLambdaFactory lambdaFactory = new org.apache.calcite.adapter.enumerable.BasicAggregateLambdaFactory(
    new org.apache.calcite.linq4j.function.Function0() {
      public Object apply() {
        Object a0s0;
        boolean a0s1;
        a0s1 = false;
        a0s0 = com.zto.zcat.store.api.query.Merge2Fun.init();
        Record2_0 record0;
        record0 = new Record2_0();
        record0.f0 = a0s0;
        record0.f1 = a0s1;
        return record0;
      }
    }
,
    accumulatorAdders);
  return org.apache.calcite.linq4j.Linq4j.singletonEnumerable(interpreter.aggregate(lambdaFactory.accumulatorInitializer().apply(), lambdaFactory.accumulatorAdder(), lambdaFactory.singleGroupResultSelector(new org.apache.calcite.linq4j.function.Function1() {
      public Object apply(Record2_0 acc) {
        return acc.f1 ? com.zto.zcat.store.api.query.Merge2Fun.result(acc.f0) : (Object) null;
      }
      public Object apply(Object acc) {
        return apply(
          (Record2_0) acc);
      }
    }
    )));
}
public Class getElementType() {
  return java.lang.Object.class;
}


Enumerator是Linq风格的迭代器,它有4个方法:

  1. current
  2. moveNext
  3. reset
  4. close


current 返回游标所指的当前记录,需要注意的是current并不会改变游标的位置,这一点和iterator是不同的,在iterator相对应的是next方法,每一次调用都会将游标移动到下一条记录,current则不会,Enumerator是在调用moveNext方法时才会移动游标。moveNext方法将游标指向下一条记录,并获取当前记录供current方法调用,如果没有下一条记录则返回false。


CsvEnumerator是读取 csv 文件的迭代器,它还得需要一个RowConverter,因为csv中都是String类型,使用RowConverter转化成相应的类型。在moreNext方法中,有Stream和谓词下推filter部分的实现,在本文只关注如下几行代码:


总结执行顺序:


1、executeQuery 方法:

1)根据算子 linq4j 表达式子生成 Bindable 执行对象,如果有设置缓存,则会将对像存储到缓存中;

2)生成 CalciteResultSet 时会调用 Bindable#bind 方法返回一个 Enumerable 对象;

2、getData 方法:调用 ResultSet#next 方法最终会嗲用 Enumerable#moveNext


一图理解 Bindable 在 calcite 中的作用

640.png

发现 Bindable 缓存会持续增加,说明 Bindable 类内容不一致:

640.png

也说明了 calcite 会根据不同的 SQL 动态生成 linq4j 表达式。


性能优化



以上排查结果可知,在 Calcite 内容会频繁使用 JaninoRexCompiler 使用反射动态生成表达式,由于项目中的 sql 格式相对固定,因此我们是否可以自定义一个  Compiler,然后替换 JaninoRexCompiler ?


我将使用 JaninoRexCompiler 的相关类复制出来,实现一个自定义的 Interpreter.ScalarCompiler,然后在这个地方 org.apache.calcite.interpreter.Interpreter.CompilerImpl#CompilerImpl替换 JaninoRexCompiler。


关于自定义 Interpreter.ScalarCompiler 的具体思路过程,我记录在这里了:

https://issues.apache.org/jira/browse/CALCITE-4144

经过反复调试,发现性能提上了 50 倍以上!

640.png

再次使用 JProfiler 查看,发现 Calcite 查询过程耗时已经大大降低了。

相关文章
|
1月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
560 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
19天前
|
SQL 人工智能 数据挖掘
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
Apache Doris 4.0 原生集成 LLM 函数,将大语言模型能力深度融入 SQL 引擎,实现文本处理智能化与数据分析一体化。通过十大函数,支持智能客服、内容分析、金融风控等场景,提升实时决策效率。采用资源池化管理,保障数据一致性,降低传输开销,毫秒级完成 AI 分析。结合缓存复用、并行执行与权限控制,兼顾性能、成本与安全,推动数据库向 AI 原生演进。
135 0
Apache Doris 4.0 AI 能力揭秘(二):为企业级应用而生的 AI 函数设计与实践
|
2月前
|
SQL 存储 运维
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
本文介绍了 Apache Doris 在菜鸟的大规模落地的实践经验,菜鸟为什么选择 Doris,以及 Doris 如何在菜鸟从 0 开始,一步步的验证、落地,到如今上万核的规模,服务于各个业务线,Doris 已然成为菜鸟 OLAP 数据分析的最优选型。
183 2
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
|
2月前
|
消息中间件 存储 数据采集
Apache InLong:构建10万亿级数据管道的全场景集成框架
Apache InLong(应龙)是一站式、全场景海量数据集成框架,支持数据接入、同步与订阅,具备自动、安全、可靠和高性能的数据传输能力。源自腾讯大数据团队,现为 Apache 顶级项目,广泛应用于广告、支付、社交等多个领域,助力企业构建高效数据分析与应用体系。
|
11月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
337 4
|
7月前
|
存储 安全 数据挖掘
天翼云:Apache Doris + Iceberg 超大规模湖仓一体实践
天翼云基于 Apache Doris 成功落地项目已超 20 个,整体集群规模超 50 套,部署节点超 3000 个,存储容量超 15PB
374 2
天翼云:Apache Doris + Iceberg 超大规模湖仓一体实践
|
7月前
|
存储 分布式数据库 Apache
小米基于 Apache Paimon 的流式湖仓实践
小米基于 Apache Paimon 的流式湖仓实践
178 0
小米基于 Apache Paimon 的流式湖仓实践
|
9月前
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
415 3
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
|
8月前
|
存储 分布式数据库 Apache
小米基于 Apache Paimon 的流式湖仓实践
本文整理自Flink Forward Asia 2024流式湖仓专场分享,由计算平台软件研发工程师钟宇江主讲。内容涵盖三部分:1)背景介绍,分析当前实时湖仓架构(如Flink + Talos + Iceberg)的痛点,包括高成本、复杂性和存储冗余;2)基于Paimon构建近实时数据湖仓,介绍其LSM存储结构及应用场景,如Partial-Update和Streaming Upsert,显著降低计算和存储成本,简化架构;3)未来展望,探讨Paimon在流计算中的进一步应用及自动化维护服务的建设。
418 0
小米基于 Apache Paimon 的流式湖仓实践
|
11月前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
283 61

推荐镜像

更多