Flink的CDC连接器和JDBC连接器在功能、使用场景以及实现机制上存在显著差异,以下是两者的详细对比:
1. 功能与用途
-
CDC连接器:
- 主要用于捕获数据库的实时变更数据(Change Data Capture),包括插入、更新和删除操作。
- 支持流模式运行,能够持续监听数据库的变更日志(如MySQL的Binlog或PostgreSQL的WAL日志)。
- 适用于需要实时同步数据到下游系统的场景,例如数据仓库、消息队列等。
-
JDBC连接器:
- 主要用于批量读取或写入数据库中的数据。
- 支持流模式和批模式运行,但作为源表时是Bounded Source,即数据读取完成后任务会结束。
- 适用于一次性数据迁移、离线数据处理或结果数据写入数据库的场景。
2. 数据捕获方式
-
CDC连接器:
- 基于数据库的日志机制(如MySQL Binlog、PostgreSQL WAL)捕获数据变更。
- 能够精确捕获每一条数据的变更(INSERT、UPDATE、DELETE),并支持全量+增量的数据同步。
- 需要数据库开启日志功能(如MySQL需开启Binlog,PostgreSQL需启用逻辑复制)。
-
JDBC连接器:
- 通过SQL查询直接从数据库中读取数据,无法捕获实时变更。
- 如果需要捕获实时变更数据,必须使用CDC连接器。
3. 使用限制
-
CDC连接器:
- 对数据库版本有要求,例如MySQL CDC需要开启Binlog,PostgreSQL CDC需要启用逻辑复制。
- 消耗更多资源(如带宽),因为需要读取整个实例级别的日志(即使只关注部分表)。
- 需要额外配置(如Debezium参数、插件名称等)。
-
JDBC连接器:
- 不支持实时数据变更捕获,仅适用于静态数据的读取或写入。
- 对数据库Driver有依赖,需要手动上传目标数据库的Driver JAR包。
- 写入性能较低,尤其是对于包含主键的表,每次写入都会拼接SQL语句执行。
4. 性能与优化
-
CDC连接器:
- 在VVR 8.0.5版本中,针对MySQL CDC进行了优化,支持更快速地定位指定偏移量或时间戳后的数据。
- 可以通过Source复用减少带宽消耗。
-
JDBC连接器:
- 支持批量写入(
sink.buffer-flush.max-rows
和sink.buffer-flush.interval
参数)以提高写入性能。
- 对于PostgreSQL结果表,支持JSONB和UUID类型,进一步扩展了数据类型的兼容性。
5. 配置与使用
-
CDC连接器:
- 配置较为复杂,需要指定日志插件(如
pgoutput
)、Slot名称等参数。
- 示例:MySQL CDC连接器的WITH参数可能包括
debezium.snapshot.mode
、database.server.id
等。
-
JDBC连接器:
- 配置相对简单,主要指定数据库URL、用户名、密码等基本信息。
- 示例:
CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'your_table',
'username' = 'root',
'password' = 'password'
);
6. 特殊场景支持
-
CDC连接器:
- 支持表结构变更同步(如MongoDB CDC支持自动加列)。
- 支持多种数据库(如MySQL、PostgreSQL、MongoDB等)的变更数据捕获。
-
JDBC连接器:
- 支持维表关联查询,可以将数据库表作为维表使用。
- 支持多种数据库(如MySQL、PostgreSQL、Oracle等)的读写操作。
7. 包依赖与版本管理
-
CDC连接器:
- 提供两种包形式:
flink-sql-connector-xxx
:包含所有依赖的FAT JAR,适合SQL作业。
flink-connector-xxx
:仅包含连接器代码,需自行管理依赖,适合DataStream作业。
-
JDBC连接器:
- Flink仅提供开源JDBC连接器的实现,不包含具体数据库的Driver。
- 需要手动上传目标数据库的Driver JAR包(如MySQL的
mysql-connector-java
)。
总结
- CDC连接器更适合需要实时捕获数据库变更的场景,支持全量+增量数据同步,但配置复杂且对资源消耗较高。
- JDBC连接器更适合批量数据读写或结果数据写入的场景,配置简单但无法捕获实时变更。
根据实际需求选择合适的连接器,可以有效提升数据处理效率和系统稳定性。