背景
在漫长的数仓建设过程中,实时数仓与离线数仓分别由不同的团队进行独立建设,有大而广的离线数仓体系,也有需要追求业务时效,需要建设实时数仓。然而,业务数据需求和数据产品需求中,往往需要把实时数据与离线数据结合在一起进行比对和分析,但是这两个天然不一样的数据存储和计算结构,需要同时开发两套数据模型。在数据处理过程中,实时数仓需要使用Blink/Flink 处理,离线需要写ODPS SQL处理,还有在线计算模型,需要开发java代码处理。
如上图所示,实时数据与离线数据在存储层、计算层、服务层,都是割裂分离、独立建设的。实时数据,有着增量式计算的特性,需要快速的流转与计算,它主要以DataHub、Flink、Hbase等异构系统做为支撑,串联形成一个完整实时计算全链路。而离线数据,是定时、批量计算的特性,由存储计算统一的ODPS系统做为支撑。它们除了计算链路的差异,还有着数据处理的逻辑差异:
- 流批处理不能复用,ODPS和Blink/Flink的SQL标准不一致,他们的底层调度和数据处理逻辑也有根本的差别,一个是以MR作为核心的批处理方式,一个是以Flink/Blink为核心的流处理。
- 有些流批处理场景需要调用HSF接口,调用HSF接口,在Java Spring环境里,是信手拈来的事情,但到了ODPS/Flink环境里,就变得额外的一个大挑战,甚至不可实现,因为在ODPS/Flink是无法加载或者极难使用Spring容器的,这就会让开发在面对复杂流处理场景,更倾向使用自己熟悉的Java环境,但同时也意味失去ODPS/Flink那种贴近业务表达的SQL表达。
- 计算处理除了流批处理,还更广泛存在在线交互计算,这个和流处理异步处理还不太一样,是需要同步计算并返回结果,这通常是在Java环境开发HSF接口,但如果你要对外同时提供三种能力:流计算、批计算和在线计算的时候,就面临需要三端开发,流计算和批计算尚且还有SQL,虽然不太一致,但至少大同小异,但在线交互计算就是需要纯Java开发了,将SQL翻译成Java代码可是个不小的挑战。
Unify SQL VS 流批一体
面对三种计算模式,较低的研发效率、不可控的数据质量,以及臃肿数据接口服务的困境,三端(流计算、批计算、在线计算)一体计算的想法自然油然而生,在20年做价格力业务时候,我就一直思考有什么解法,其实,这个也是大数据架构经常面临的问题,业界达成共识,可以归纳两种方案:
▐ 流批一体计算
同一个引擎承载流、批两种计算模式,在流计算模式下进行实时数据计算,在批计算模式下进行离线数据计算。
流批一体计算的典型架构是:Flink + Kappa架构。Flink可以实现基于SQL的流批一体的计算表达,复杂计算通过Java应用承接,价格力计算架构就是典型这种架构,但是这种架构存在以下问题:
- 没有解决在线交互计算
Flink解决了流计算、批计算一体的能力,这两种都是异步处理,只是时效不同而已,但没有解决在线计算的能力,如果要提供在线计算能力,不得不在以下两个方案选择:
- 通过Java应用提供同步计算接口,这样就存在两套逻辑:一个是Flink实现的流批处理,另一个是Java实现的在线处理。
- 提供两个接口,一个接口是发起计算请求,将计算请求交到Flink处理后,再提供一个轮训查询接口,查询计算好后的数据,这个方案至少在计算上做到一套代码,但这种同步转异步处理的方案势必会影响产品的设计。
- Flink的批处理吞吐量
Flink实现批处理,其实是有点一厢情愿,为啥这么说,因为其吞吐规模,跟MR批计算(ODPS)完全不是一个量级,如果Flink真能实现和ODPS完全对等的吞吐规模和资源成本,那完全不需要ODPS什么事了,但现实是,对于一些只有批量处理场景的(比如特征预处理、统计计算),ODPS仍然是第一优先选择,只有当面临流批同时存在的场景时候,并且对批处理规模要求不大时候,Flink的确提供非常不错的一体化解决方案。
▐ Unify SQL
同一SQL代码通过自动化转义,翻译到流计算引擎和批计算引擎上进行流、批计算,也包括翻译到HSF接口代码,提供在线交互计算能力。
Flink的流批一体架构非常优秀,能解决90%的流批一体问题,但不幸的是,我们有些业务场景(典型的价格计算场景),远不是Flink写写SQL可以解决的:
- 整个电商是个非常复杂的业务体系,就以我所处在的营销域里,就要面对招商模型、投放模型、活动模型、权益等等,这些都远不是一个单一系统可以承接的,也不是一个单一团队可以承接的,阿里针对这样复杂的业务,设计了HSF这样微服务电商架构,但是Flink和电商这样的Java技术栈明显是割裂的,怎么结合两种体系架构,一方面发挥电商的微服务架构的红利,一方面又能利用到Flink的SQL流批处理能力,考虑到Flink本身的局限性,与其让Flink支持HSF,不如让Java环境支持Flink SQL,换句话说,设计一个SQL引擎,它能通过sql的流处理方式,处理Java对象,将流引擎嵌入到Java里,随调随用。
- Flink的批引擎,在面对T级离线数据批量处理,是非常耗资源,几乎不可用,正如上面所讲的,Flink批处理的吞吐量远不及ODPS的MR批处理,那么,我们为何不让这样计算仍然交接给ODPS处理,但是,ODPS和Flink的SQL标准不一致,需要两端开发,现在问题变成:怎么统一ODPS和Flink的开发,说的再通俗点,我们可不可以在ODPS和Flink上面架设一层统一Unify SQL,这个SQL引擎可以翻译成ODPS或者Flink能理解的处理(ODPS翻译成MR程序,Flink翻译成Stream Operator)方式,抹平ODPS和Flink的SQL语义差别。
- 如果仅仅是抹平ODPS和Flink的SQL差异,带来的收益其实并不大,但是其统一SQL表达计算的设计,是可以进一步扩宽其应用范围,比如在线交互计算,或者说,我们可以进一步打造统一计算引擎,包括编排不同模式的计算能力,比如:有些场景对时效要求比较高,我们可以调度Flink计算,对时效没有要求,但数据量巨大,可以只调度离线计算,有些需要提供HSF接口,就调度应用启动spring接口。
Unify SQL Engine
淘系价格计算引擎,以Flink + Kappa为核心的数据架构,关于这种数据架构演进,可以参考我其他文章,三种计算模式的叠加是价格服务计算引擎的常态模式,他们都在各自核心计算发挥自己最大的优势:
- ODPS:离线批量计算引擎,核心优势,非常高的计算吞吐量,但时效差,有面向MR和SQL编程模式,业务和BI友好,主要用在数据预处理、离线特征加工、常见维表ETL等。
- Flink:流式处理引擎,核心优势,低延迟计算,时效好,极高的容错和高可靠性,但吞吐量相比ODPS一般,有面向Stream API和SQL API的编程模式,业务和BI友好,主要用在实时数据加工(优惠、订单等)、消息预处理等
- Java计算:核心优势,丰富的电商Java HSF接口,复杂的领域模型,面向对象设计,开发友好,但是业务和BI不友好,容错和可靠性依赖开发设计,延迟和吞吐量也高度依赖开发设计。
那么如何整合这3个不同计算架构,Flink提出一个引擎承接所有计算模式,也就是Flink的流批一体引擎,但这带来的问题就是,不同计算模式,底层的引擎本身就很难完全周全到,与其去统一计算引擎,为何不统一表达和调度,而把真正的计算下放到各自计算引擎,这就是Unify Engine的核心思想。
▐ SQL引擎技术
在实现三端一体化时候,有个核心技术难点,就是SQL引擎,很多数据产品都自带自己的SQL引擎,Flink内部有SQL引擎,ODPS内部有C++实现的SQL引擎,Hive也有,Mysql内部也有SQL解析引擎,这些SQL引擎都高度集成到各自的存储和计算里,如果你说要找个独立的可用在Java环境的SQL引擎,市面上有是有,不过要么是非常复杂的calcite sql引擎,要么是非常简单的select * 简易sql引擎,能做的事情非常少,开箱即用的几乎没有。但Unify SQL引擎又是实现三端一体化的核心组件,没有它,其他什么事情都无从谈起。
从无设计一个SQL引擎成本是非常高的,其中不说复杂的语法解析,生成AST语法树,就单单SQL逻辑计划优化,就是非常复杂,幸运的是,业界是存在一个可以二次开发的SQL引擎,就是calcite SQL引擎,其实,很多SQL引擎都是基于calcite二次开发的,比如Flink、Spark内部的SQL解析引擎就是基于calcite二次开发的,我们设计的SQL引擎也是基于calcite的。
Calcite 使用了基于关系代数的查询引擎,聚焦在关系代数的语法分析和查询逻辑的规划,通过calcite提供的SQL API(解析、验证等)将它们转换成关系代数的抽象语法树,并根据一定的规则或成本估计对AST关系进行优化,最后进一步生成ODPS/Flink/Java环境可以理解的执行代码。
calcite的主要功能:
- SQL解析:Calcite的SQL解析是通过JavaCC实现,使用JavaCC变成SQL语法描述文件,将SQL解析成未经校验(unvalided AST)的AST语法树。
- SQL校验:无状态校验,即验证SQL语句是否符合规范;有状态校验,通过和元数据验证SQL的schema,字段,UDF是否存在,以及类型是否匹配等。这一步生成的是未经优化的RelNode(逻辑计划树)
- SQL查询优化:对上面步骤的输出(RelNode),进行优化,这一过程会循环使用优化器(RBO规则优化器和CBO成本优化器),在保持语义等价的基础上,生成执行成本最低的SQL逻辑树(Lo)
至于calcite的比较详细的原理,可以详解:Apache Calcite 处理流程详解(地址:https://xie.infoq.cn/article/1df5a39bb071817e8b4cb4b29),这里不详解了。
有了calcite,解决了SQL->逻辑树,但是真正执行SQL计算的,还需要进一步将逻辑数转换成物理执行树(Physical Exec DAG),在这个DAG,是包含可执行的Java代码(JavaCode)片段,最后下发到不同执行环境,会被进一步串联可被环境执行的链路,比如在ODPS环境,会生成MR代码,在Flink环境,会被转换成Stream Operator,在Java环境,会被转换成CollectorChain,在Spring环境,会被转换成Bean组件。
PS:如果你们看过Flink源码,对上面流程会非常眼熟,是的,Unify SQL Engine不是从头设计的,是基于Flink 1.12源码魔改的,其中Parse和下面要说的Codegen技术都是直接参考了Flink设计,当然说是魔改的,就是还有大量代码需要基于上面做二次开发,比如从执行DAG到各个环境真正可执行的MR/Bean/Stream。
▐ Codegen技术
在SQL解析后,经过逻辑优化器和物理优化器,产生的PhyscialRel物理计划树,包含大量的复杂数据逻辑处理,比如SQL常见的CASE WHEN语句,常见的做法是给所有符号运算定义个父类(比如ExecNode),实际运行时,委派给真实的子类运行,这涉及到大量虚拟函数表的搜寻,最终这种分支指令一定程度阻止指令的管道化和并行执行,导致这种搜寻成本比函数本身执行成本还高。
Codegen技术就是专门针对这样的场景孕育而生,行业做的比较出色的Codegen技术,有LLVM和Janino,LLVM主要针对编译器,而Java的代码codegen通常使用Janino,Janino做为一种小巧快速的Java编译器,不仅能像Javac将一组java文件编译成Class文件,也可以将Java表达式、语句块、类定义块或者Java文件进行编译,直接加载成ByteCode,并在同一个JVM里进行运行。
Unify SQL Engine也使用Janino用来做CodeGen技术,并有效地提升代码的执行效率。关于Janino更多内容,可以参考这篇文章:Java CodeGen编译器Janino(地址:https://zhuanlan.zhihu.com/p/407857568)。这里有采用Codegen和不采用Codegen的技术性能对比:
表达式 |
100*x+20/2 |
(x+y)(xx+y)/(x-y)100/(xy) |
Node树遍历执行 |
10ms |
88ms |
Janino生成代码执行 |
6ms |
9ms |
可以看出当表达式越复杂时,使用Janino的效果就会体现越明显。
▐ 有状态计算
通常计算分为无状态计算和有状态计算,无状态计算一般是过滤、project映射,其每次计算依赖当前数据上下文,相互独立的,不依赖前后数据,因此,不需要有额外的存储保存中间计算结果或者缓存数据,但还有一类是有状态计算,除了当前数据上下文,还需要依赖之前计算的中间态数据,典型的比如:
- sum求和:需要有存储保存当前求和的结果,当有新的数据过来,结合当前中间结果基础上累加
- 去重:去掉之前重复出现的数据,需要保存之前已经处理过哪些数据,然后有新的数据需要计算,要和保存的数据比较是否重复
- 排序:需要有存储保存之前排好的数据,当有新的数据过来,会变更之前的排序结果,并diff后,将重新排序后有变动的数据重新发到下游
可见,当需要进行有状态计算,需要有后背存储来承载中间状态结果,Unify SQL Engine是支持3种后背存储:内存、Redis和Hbase:
- 内存State是只保存到内存,一旦重新启动,就丢失历史数据,内存State通常用在单机有状态计算,并且容忍数据丢失。一般用在ODPS的MR程序里,因为一次MR调用状态计算,只需要当前执行上下文的累计结果,不需要放在全局缓存,不同批次之间的累计是通过MR API之间传输,内存State完全够用。
- Redis:对于需要跨多机状态计算,就会用到Redis作为后背存储,Unify SQL Engine在Java环境里默认是使用这个作为后背存储。Redis后备存储一般用在Java计算环境,数据会流经过不同生产机器,计算的中间结果需要全局可见。
- Hbase:如果状态数据超过100G,可以选择Hbase做为后背存储,性能虽然比不上Redis,但状态可以保存很长时间,对于长周期的状态计算非常有用。
▐ JOIN语义
Flink是可以支持双流Join,但是Flink的双流Join的语义完全照搬了SQL的JOIN语义,就是一边的数据会和另一边的所有数据JOIN,这个对于离线分析没有任何问题,但是对于实时计算是会存在重复计算,在有些场景还有损业务逻辑,比如:当订单流去双流JOIN优惠表的时候,就会出现这个问题,优惠表的数据是会不停变化的,但是我们希望以快照数据做为JOIN的依据,而不是把优惠变更的数据都复现一遍,Unify SQL Engine是做到后者语义的,也就是SNAPSHOT JOIN,也是业务场景常见的语义:
一些想法
▐ 统一调度
Unify SQL Engine现在已经可以做到将SQL翻译成不同执行环境可运行的任务,通过Unify SQL统一表达了不同环境的逻辑计算,但是离最终我们期望的还很远,其中一点就是要做到统一调度和分配,现在不同环境的协调是需要开发者自己去分配和调度,比如哪些计算需要下发到ODPS MR计算,哪些是在Java环境运行,未来我们希望这些分配也是可以做到统一调度和运行,包括全量和增量计算的自动协同,离线和在线数据协同等
▐ 资源成本
通过Unify SQL Engine,开发者可以自己选择底层的计算引擎,对于数据量较大但对时效要求不高的场景,可以选择在ODPS计算,对于时效有要求同时数据规模可接受内,可以选择在Flink调度,对于计算逻辑复杂,需要大量依赖HSF接口,可以选择在Java环境启动,选择自己最容易接受的资源和成本,承接其计算语义。
同时,也是希望通过Unify SQL Engine最大化的利用计算资源,比如Java应用,很多情况下是空闲状态的,CPU利用率是比较低下的,比如一些流计算可以下发到这些空闲的应用,并占用非常小的CPU(比如5%以内),整体的资源利用率就提升了,还比如,Flink计算资源是比较难申请,那么可以选择在Java环境里计算(Java相比Flink环境缺乏一些特性,比如Exactly once语义)等等。