背景
Lindorm作为阿里巴巴线上最大规模的NoSql数据库之一,全面支撑了淘宝、天猫、蚂蚁、菜鸟、阿里妈妈、高德、优酷、钉钉、大文娱等经济体业务,其中有将近1/3的Lindorm业务会使用数据订阅的功能,初步统计每天有将近500多TB数据从lindorm实时导出到TT、MetaQ、ODPS等其他系统,用于实时、离线计算、数据订阅、算法训练、数据备份等场景。然而,Lindorm产品上云之后,我们发现原先在集团内部使用的方案输出阿里云遇到了非常多的问题,这不得不让我们重新思考一套同时适合内外业务的数据订阅方案。
传统方案
过去Lindorm数据的数据订阅是通过Lindorm Exporter这个外挂组件来实现的,如上图所示,Lindorm Exporter在整条链路中作为一个外挂的Lindorm数据采集模块,采集Lindorm WAL(Write Ahead Logging)的数据并将其转存到消息中间件(类似Kafka),消息中间件一方面对外提供订阅的能力,可以对接流计算引擎进行实时计算,另一方面,消息中间件内置一些Connector能够自动的将数据归档到ODPS,用于离线的分析。然而,走Lindorm Exporter的导出方案也有许多的不足:
- 链路长,运维困难,上下游出现异常定位问题困难
- 存储成本高,例如Lindorm归档ODPS的链路,数据在中间过程转存多次
- 数据转换 、数据清洗能力较弱
- 产品化输出阿里云可行性偏低,由于系统依赖了消息系统,部署形态、计费复杂
Lindorm CDC方案
于此同时,我们也在思考是否可以借助Lindorm Exporter数据采集的能力实现一个Log-Based CDC去替换上图中虚线框内的采集组件和消息队列,并且让CDC能力作为Lindorm内核能力的一部分,从而缩短同步的链路,大大降低运维成本。
如上图所示,Lindorm CDC对外提供订阅能力,业务可以直接订阅Lindorm实时的变更数据。此外Lindorm CDC也对接了Flink生态,可以通过Flink DataStream API或者Flink SQL进行实时计算,这大大提高了数据转换 、数据清洗能力,并能借助于Flink Connector能力对接各种异构系统、数湖、数仓。
主流方案调研
前面章节聊了聊Lindorm CDC立项的原因,在项目立项之初,我们调研了市面上比较主流的CDC产品,从几个维度对产品能力进行了比较,比较的结果如下:
- 从CDC实现机制看,目前主流的实现方式有两种:
- Query-based CDC:需要持续执行Query查询(e.g. via JDBC)来获取最新的变更。
- Log-based CDC:通过解析数据库的log文件来获取数据的变更(e.g. MySQL’s binlog)
相比于Query-based CDC,Log-Based CDC对于增量同步实现相对容易,在业务影响、性能等方面都更具优势,《Five Advantages of Log-Based Change Data Capture》这篇文章做了比较充分的总结。
- 从全增量同步能力看,DynamoDB CDC和MongoDB Change Stream并没有提供全量的支持,一般可以通过扫表来获取全量的数据。
- 从消息投递语义看主要分成顺序性和重复性,这两方面都是针对增量同步来说的。顺序性方面,绝大多数CDC订阅都是和数据写入的顺序保持一致。重复性方面,Dynamodb CDC 、MongoDB Change Stream提供了Exactly once的语义(这里的Exactly once是对采集端而言的),Flink CDC对接的许多数据源也具备此能力。Exactly once投递语义能够保证下游消费的数据不会出现重复。
- 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。
- 在数据转换、数据清洗能力上,Flink CDC能借助Flink计算引擎底座的能力可以方便的对数据做一些过滤或者清洗,甚至聚合。其他系统往往在这方面比较弱,更多的是扮演数据采集的角色,复杂的计算只能通过对接计算引擎来弥补。
这里做个简单的总结,目前CDC的产品可以简单的分为两类:
- 数据库内置CDC能力,如AWS Dynamodb、 MongoDB、SQL Server等数据库厂商将CDC作为数据库的核心能力,对外提供一体化的使用体验。这类CDC产品自行封装了底层复杂数据订阅逻辑,便于外部系统对接。它们在Transformation方面相对比较薄弱,更多的是把数据清洗、转化、计算等逻辑交给其他计算引擎来做。
- 数据通道、数据集成相关产品会作为一个外挂系统来赋予数据库的CDC能力,比如Debezium CDC、Canal、Flink CDC,支持订阅多种数据源,并提供一定数据转化、清洗、计算的能力。比如Flink CDC支持订阅Mysql全增量的数据,并在此基础上做了不少的优化,《Flink CDC 2.0 正式发布,详解核心改进》做了比较充分的说明。
产品能力定义
通过上一章节的调研,结合Lindorm目前的现状,我们最终决定借鉴Dynamodb、 MongoDB的方式,将Lindorm CDC作为Lindorm内置的核心能力,封装解析底层文件、维护点位的复杂逻辑,并在此基础之上提供了Rowkey级别保序、行级别订阅的能力,这么做的好处有几点:
- 对接Flink、Spark等计算引擎的过程中不再需要各自实现一遍复杂的订阅逻辑,大大简化对接外部系统的难度。
- 统一接口,便于bug的修复,性能的优化,不会因为Lindorm内核版本更新出现代码冲突,降低外部插件的维护成本。
- 核心订阅逻辑由我们自己来掌握,而不是在下游订阅端,便于问题的排查。
生态对接
在计算方面,我们希望Lindorm CDC可以通过KafkaStream、Flink、Spark Streaming等计算引擎来对数据进行清洗、转化、聚合计算。在异构数据系统对接方面,可以借力Kafka Connector、Flink CDC完成异构系统对接,并且Lindorm的数据能够很方便的入湖、入仓。
消息投递语义
在投递语义方面,我们认为Lindorm CDC需要具备以下能力才能满足绝大多数的业务需求。
- 顺序性
- 默认不严格保序,整体有序,异常情况下可能出现乱序
- 支持RowKey级别保序
- 重复性
- At Least once - 默认模式,在出现系统宕机或异常的情况下可能出现消息重复
- Exactly Once - 保证消息“仅一次”投递语义
内容格式
Lindorm CDC在订阅的类型上看齐了Dynamodb,提供如下几种订阅类型:
- ROWKEY_ONLY - 只订阅变更的Rowkey
- NEW_IMAGE - 返回更变后的行数据信息
- OLD_IMAGE - 返回变更前的行数据信息
- NEW_AND_OLD_IMAGES - 订阅变更前后的行数据信息
具体的内容格式上,Lindorm CDC借鉴了Debezium的格式,在NEW_AND_OLD_IMAGES模式下,内容全面包含了更新前后的记录信息,能够满足绝大部分的业务需求,具体格式如下:
{ "op": "u", "ts_ms": 1465491411815, "before": { ⑴"id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar", }, "after": { "id": "1004", "first_name": "Anne", "last_name": "Kretchmar", }, "source": { "version": "3.2.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411815 } }
Item |
Field name |
Description |
1 |
op |
|
3 |
ts_ms |
event写入kafka的时间戳 |
4 |
before |
old image 的信息,更新前记录信息 |
5 |
after |
new image 的信息,更新后的记录信息 |
6 |
source |
event 额外信息, 支持额外添加
|
技术挑战
前面的章节,我们比较了各个主流的CDC产品,从而确定了Lindorm CDC所需要具备产品能力。这一章节,重点从投递语义、订阅格式、生态对接、底层存储等方面介绍lindorm CDC所面临的挑战以及在实现方面的思考。
投递语义
RowKey级别保序
保序是CDC一个很重要的能力,很多场景需要保证同一个行数据的变更消息按变更发生的时间有序输出到下游,各大厂主流数据库比如AWS Dynamodb,Azure Cosmos DB都提供了保序的CDC订阅能力。
对于一些单机数据库来说,做到保序是比较容易的事情,但是保序的实现对于Lindorm来说非常具有挑战,Lindorm是面向海量存储的NoSql数据库,其数据都是分布在成千上万的Region中,这些Region在整个集群运行的过程中会不停发生Split、Merge、Rebalance,同一个Region不同时段可能会在不同的Region Server上线,也就意味着同一个Rowkey的数据可能是打散在各个Region Server的WAL中的,并且各Region Server本地时钟存在误差,如果要实现全局保序,写WAL可能需要引入逻辑时钟的概念。
在保序能力上,Lindorm CDC参考了Dynamodb提供RowKey级别保序能力。之所以没有提供全局保序的能力,因为我们认为其不适用于大数据场景之下,全局保序的消费者往往只有一个,消费能力无法企及集群的写入能力,另外我们没有在实际客户中看到全局保序的需求。
关于保序实现我们内部也讨论过许多的版本:
第一种方式是改造Lindorm WAL写入机制,由原来机器粒度改为以Region为粒度组织日志数据,由于WAL本身在时间维度上是有序的,这样就能够按照一定顺序解析Region WAL,从而实现CDC的保序。然而这种方式对Lindorm的写路径改动比较大,会导致底层WAL过多,过多的文件句柄对性能、稳定性都会产生影响。
第二种方式是转存 + 排序,对Lindorm各个节点实时写入的数据按照RowKey进行分桶聚合,然后对汇总数据按照一定的时间窗口进行排序输出,这种方式对内核改动最小,然而实时性比较差,数据转存多次。
第三种方案也是我们目前所使用的的方案,我们引入了两个概念,第一个概念是Barrier,每当Region open的时候,就会往对应节点的WAL写入一个新的Barrier,Lindorm CDC在同步WAL中遇到了Barrier信息就会上报Master,Master根据Barrier中所包含的Region、点位等信息判断这个Region对应点位之前的数据是否同步完成,如果同步完成,对应的Barrier就会被放行,否则阻塞。第二个概念是RegionTraceLog,RegionTraceLog记录了Region的移动轨迹,Master通过回放RegionTraceLog来获取对应Region前后的移动路径,通过Region的轨迹路径以及Lindorm CDC上报各个日志的点位信息就能判断对应的Barrier是否可以放行。此外RegionTraceLog有定期清理的机制,避免数据过大。这种方式对于内核的改造比较小,延迟、性能上相比第二种方案也更具优势。
Exactly Once
在at-least-once投递语义下,异常情况往往会导致相同数据会被多次处理和发送,这就需要下游具备去重能力或者支持幂等执行,这大大提高了下游的逻辑复杂程度。此外如果业务的数据体量不小,去重阶段会导致流式计算引擎的StateStore暂用空间很大,导致高昂的使用成本。如果不去重,changelog内容会出现重复,聚合计算的结果可能会出错。基于以上原因Lindorm CDC增加了Exactly Once的能力,保证在异常情况下也能做到仅一次投递的语义。
实现Exactly Once需要做到对增量日志处理点位的精确管理,保证同一条消息在写入下游成功后不会再被重复处理,即保证数据写入和点位更新同时成功。因此,Lindorm CDC将消息消费的点位信息维护在StreamStorage(Lindorm流式计算引擎,后续的章节会介绍)中的Topic中,利用Stream Storage提供的事务写功能保证数据和对应的点位信息同时写入存储层。如此,当worker节点发生宕机,master可以通过回放Topic中点位信息来获取失败任务最近一次写入成功的点位,并将失败的任务发往存活的worker,之前没有写入成功的事务会被Abort,随后开始断点续传。
由于Lindorm CDC采用分布式架构,将不同增量日志的消费任务分发给不同节点处理,需要避免同一任务因worker节点假死等情况分发给不同worker而引起的重复消费。由于StreamStorage提供了写入时的Fence机制,Lindorm CDC利用该机制实现了任务级别的隔离。综上,对于worker来说,增量日志消费任务的步骤可以概括为Fence(阻止其他相同ID任务继续写入)、读取日志最新消费点位(强制读取到offset topic的最新数据)、开始处理日志 、同时写入数据和对应点位。
StreamStorage中存储的点位信息不可能是无限增长的,数据存在过期时间。因此CDC Master采用了Checkpoint机制,将某一时刻所有日志点位信息和当前Offset Topic的消费位置持久化到zk中,在宕机恢复时,可以通过Checkpoint恢复+Offset Topic回放的方式来恢复点位信息。
订阅内容
行级别订阅
之前的产品能力一栏,我们认为Lindorm CDC需要具备对外提供订阅变更前后整行数据信息的能力,然而,Lindorm底层SSTable、WAL文件却是按照KV为粒度进行组织的。最早Lindorm的导出只支持KV级别的数据订阅,业务只能订阅变更的具体KV,为了获取变更KV所对应的整行数据,业务侧需要进行反查Lindorm,大量的反查请求会影响线上的业务。此外,业务也无法获取变更前的信息。
因此,我们在Lindorm CDC上做了改进,支持全增量的行级别订阅的能力。对于全量订阅来说,Dynamodb、 MongoDB通过扫表的形式获取全量的数据,大表扫描往往会对在线的业务产生影响,Lindorm CDC这方面做了优化,降低了对业务的影响,实现了Client端的RegionScanner,直接在Client端读取底层文件拼接成行,从而避免走API访问源集群。对于增量行级别订阅来说,我们改造了Lindorm的内核,数据在写入Lindorm的过程中,会将变更前整行数据一同输出到WAL中,从而让Lindorm CDC实现增量数据的行级别订阅的能力。
生态对接
接口定义
和MongoDB Change Stream不同的是,Lindorm CDC没有独立开发一套新的订阅SDK/API,接口的定义选择兼容了Kafka协议,下游可以通过Kafka客户端直接消费Lindorm CDC的数据,这样做的好处显而易见,Lindorm CDC可以无缝对接Kafka的生态。由于各大计算引擎都对Kafka做了支持,Lindorm CDC可以复用Kafka Connector,不用在开发各个计算引擎的Connector,一举多得。
SQL能力对接
为了让Lindorm CDC订阅的数据能够支持使用Flink SQL进行流式的处理,这里需要引入Dynamic Table 和 Changelog Stream两个概念,Flink官方文档《动态表 (Dynamic Table)》对这两个概念做了一定的解释,进一步深入可以看一下《Streams and Tables: Two Sides of the Same Coin》这篇论文,这里不进行过多的赘述。流式计算引擎中使用的表本质上其实就是一个Changelog流。如上图所示Lindorm CDC可以将Lindorm WAL实时写入的数据转化成ChangeLog流,从而业务可以自由的选择Flink DataStream API 、Table API或者Flink SQL的方式来处理Lindorm实时的数据。
流式存储引擎
前面的章节,我们介绍了Lindorm CDC兼容了Kafka协议,我们并非直接使用了Kafka,而是基于Lindorm DFS实现了一套自己的流式存储引擎,并在此基础之上提供了Kafka Proxy。Lindorm CDC将Lindorm WAL数据转化ChangeLog流存入流式引擎中供下游消费。流式存储引擎对上层提供Topic、Partition等概念的抽象定义,实际数据的冗余备份的逻辑下沉给Lindorm DFS,做到存算分离,并且基于Lindorm DFS我们可以实现无感知的数据压缩,冷热分离的能力。由于篇幅的限制,关于流式存储引擎的技术细节以及思考会在我的后续文章中进行介绍。
总结
本文主要介绍了Lindorm CDC项目诞生的背景,产品能力的定义,以及部分技术实现的细节。未来我们将继续从易用性、可靠性、性能、成本等多个方面持续投入,为Lindorm客户创造价值,为业务发展保驾护航。