实时集成采集-Flink CDC
概念
Flink CDC(Change Data Capture)是一个基于Apache Flink构建的开源库,通过与数据库交互并读取其变更日志(例如MySQL的binlog或Oracle的Redo Log)来获取数据变化事件,然后将这些事件转换成Flink内部可处理的Changelog流格式,后续利用Flink强大的状态管理和流处理能力,对变更数据进行实时分析、聚合或者传输至目标存储系统如Hadoop HDFS、Kafka或其他数据库等。
数据库变更日志和Changelog流
数据库的变更日志(Change Log)和Changelog流都是用于记录数据库状态变化的数据结构
- 数据库变更日志:
数据库变更日志是数据库系统内用来记录所有对数据进行修改操作的日志文件或机制,是数据库本身维护的用于保证数据完整性和提供恢复能力的底层技术实现。例如:
- MySQL的二进制日志(Binary Log):它记录了所有的DDL(数据定义语言)和DML(数据操作语言)操作,如INSERT、UPDATE、DELETE等,这些操作按照事务提交的顺序排列。
- Oracle的Redo Log:用于保存事务对数据库所做的更改信息,以支持事务的回滚和数据库的恢复。
- PostgreSQL的WAL(Write-Ahead Log):在事务提交前先将变更写入日志,确保在系统崩溃时能够恢复到一致的状态。
- Changelog流:
Apache Flink中的Changelog流是一种特殊的事件流,是从变更日志中提取出来,适合于流处理框架使用的、描述数据变更过程的数据流形式。每个事件包含了关于某个数据对象(如数据库表的一行记录)状态变更的具体信息,如添加、更新或删除的动作类型,以及变更后的数据内容。
- 举个例子:
假设我们有一个电商数据库,其中包含一个orders
表,记录了所有的订单信息。现在发生了以下操作:
- 数据库变更日志:
- 插入操作:当用户A下单后,数据库会执行一条INSERT语句,向
orders
表中插入一行新数据(例如:订单ID=1001,用户ID=A,商品ID=1,数量=2)。 - 更新操作:假设用户A修改了订单1001的商品数量,从2个变为3个。数据库会执行一条UPDATE语句,更新
orders
表中的相应记录。 - INSERT操作和UPDATE操作会被数据库的变更日志记录下来,如MySQL的二进制日志可能记录为:
INSERT INTO orders (order_id, user_id, product_id, quantity) VALUES (1001, 'A', 1, 2); UPDATE orders SET quantity = 3 WHERE order_id = 1001;
- 如果之后用户A取消订单并执行了一条DELETE语句,该操作也会被记录到变更日志中。
- Changelog流:
- 当使用Apache Flink CDC等工具捕获上述数据库变更日志时,会将其转换为Changelog流的形式:在这里,Changelog流以事件形式表示了数据库中的变化,每个事件包含了操作类型(INSERT、UPDATE、DELETE)以及对应的数据内容。
+ INSERT: Order(order_id=1001, user_id='A', product_id=1, quantity=2) + UPDATE_BEFORE: Order(order_id=1001, user_id='A', product_id=1, quantity=2) + UPDATE_AFTER: Order(order_id=1001, user_id='A', product_id=1, quantity=3) + DELETE: Order(order_id=1001, user_id='A', product_id=1, quantity=3)
Flink增量读取和全量读取
增量读取:单并发,关系型数据库是单机,多并发无意义
全量读取:多并发,否则同步速率慢,读不过来
Flink实现全量读取数据的方式通常不依赖于数据库的Redo Log或Binlog日志,而是直接从数据源(如数据库、文件系统或其他数据存储)中一次性加载所有符合查询条件的数据,并将其转换为DataStream或Table,然后进一步进行流式或批量处理。
目前实时集成仅支持增量读取,全量读取可通过离线集成实现
实时集成写入
对采集到的Changelog流进行处理,通过对应的connector进行写入
JDBC写入关系型数据库:Changelog流根据关系型数据库的特性,如字段类型、约束条件等,转换处理为合理的数据
写入大数据存储、消息队列:Changelog流转换为Append流写入
通过jdbc connector写入Mysql、Oracle | 通过对应的connector写入odps、Hive | 通过 kafka connector写入Kafka | |||
结果表没有主键 |
结果表有主键 |
||||
Changelog流消息类型 |
insert |
处理为insert |
处理为upsert |
处理为insert,标记操作类型insert |
处理为insert,标记操作类型insert |
update_before |
直接丢弃 |
直接丢弃 |
直接丢弃 |
直接丢弃 |
|
update_after |
处理为insert |
处理为upsert |
处理为insert,标记操作类型update |
处理为insert,标记操作类型update |
|
delete |
直接丢弃 |
处理为delete |
处理为insert,标记操作类型delete |
处理为insert,标记操作类型delete |
|
目标表 |
需要映射目标表 |
写入同一个topic中,数据会带有表名字段 |
|||
DDL处理 |
正常处理/忽略/报错 |
全部写入,无法忽略/报错 |
用户可以自行处理得到的结果表,按主键聚合,按更新时间进行排序,根据最新的操作类型执行
实时集成与实时研发对比
实时研发 |
实时集成 |
|
配置方法 |
写flink sql/flink datastream代码 |
配置化操作,无需手写代码 |
实时采集支持的数据源 |
多种,实现方式包含Flink CDC(MySQL)、定时查表(SAP)、直接读取(Kafka)等 |
目前仅支持Flink CDC采集 |
同步表量 |
一个任务同步的表量由代码决定,每张表需要单独的ddl定义(开源的connector仅支持单表处理,flink datastream支持,代码编写难度大) |
一个任务可支持整库同步,可圈选或排除某些表 (实时集成是在开源的connector基础上进行改造) 是否有改造引擎? |
数据处理 |
数据处理能力较灵活,可以自行编写数据处理和计算代码 |
数据处理能力较弱 |
场景 |
同步表数量较少,需要进行数据处理 |
同步表数量较多且不需要进行数据处理 |
实时集成工具对比
实时数据采集工具 |
厂家 |
特点 |
OGG |
Oracle原厂的Oracle采集和同步工具 |
价格昂贵 性能高 只支持Oracle实时数据同步 |
Dataphin实时集成 |
Dataphin自研的 CDC Connector |
配置化方式,无需写代码 支持整库或多表同步 支持多种数据源 性能优于开源Flink CDC |
开源Flink CDC(可搭配Dataphin实时研发) |
开源免费的Flink CDC |
需要写Flink SQL代码 只支持单表数据同步 |
总结-性能/价格:OGG > Dataphin实时集成 >> 开源Flink CDC