在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors.
★ Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 认识的 RowData 数据。RowData 代表了一行的数据,在 RowData 上面会有一个元数据的信息 RowKind,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的数据,包含了旧数据(before)和新数据行(after)以及原数据信息(source),op 的 u表示是 update 更新操作标识符(op 字段的值c,u,d,r 分别对应 create,update,delete,reade),ts_ms 表示同步的时间戳。 ”
下面就来深入 Flink 的源码分析一下 CDC 的实现原理
首先 mysql-cdc 作为 Flink SQL 的一个 connector,那就肯定会对应一个 TableFactory 类,我们就从这个工厂类入手分析一下源码的实现过程,先找到源码里面的 MySQLTableSourceFactory 这个类,然后来看一下它的 UML 类图.
从上图中可以看到 MySQLTableSourceFactory 只实现了 DynamicTableSourceFactory 这个接口,并没有实现 DynamicTableSinkFactory 的接口,所以 mysql-cdc 是只支持作为 source 不支持作为 sink 的,如果想要写入 mysql 的话,可以使用JDBC 的 connector.
然后直接来看 MySQLTableSourceFactory#createDynamicTableSource 方法实现,源码如下所示:
@Override public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX); final ReadableConfig config = helper.getOptions(); String hostname = config.get(HOSTNAME); String username = config.get(USERNAME); String password = config.get(PASSWORD); String databaseName = config.get(DATABASE_NAME); String tableName = config.get(TABLE_NAME); int port = config.get(PORT); Integer serverId = config.getOptional(SERVER_ID).orElse(null); ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); StartupOptions startupOptions = getStartupOptions(config); TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); return new MySQLTableSource( physicalSchema, port, hostname, databaseName, tableName, username, password, serverTimeZone, getDebeziumProperties(context.getCatalogTable().getOptions()), serverId, startupOptions); }
这个方法的主要作用就构造 MySQLTableSource 对象,先会从 DDL 中获取 hostname,username,password 等数据库和表的信息,然后去构建 MySQLTableSource 对象.
接着来看一下 MySQLTableSource#getScanRuntimeProvider 这个方法,它会返回一个用于读取数据的运行实例对象
@Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(physicalSchema.toRowDataType()); DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema( rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone); MySQLSource.Builder<RowData> builder = MySQLSource.<RowData>builder() .hostname(hostname) .port(port) .databaseList(database) .tableList(database + "." + tableName) .username(username) .password(password) .serverTimeZone(serverTimeZone.toString()) .debeziumProperties(dbzProperties) .startupOptions(startupOptions) .deserializer(deserializer); Optional.ofNullable(serverId).ifPresent(builder::serverId); DebeziumSourceFunction<RowData> sourceFunction = builder.build(); return SourceFunctionProvider.of(sourceFunction, false); }
这个方法里面先获取了 rowType 和 typeInfo 信息,然后构建了一个 DebeziumDeserializationSchema 反序列对象,这个对象的作用是把读取到的 SourceRecord 数据类型转换成 Flink 认识的 RowData 类型.接着来看一下 deserialize 方法.
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception { // 获取 op 类型 Operation op = Envelope.operationFor(record); // 获取数据 Struct value = (Struct)record.value(); // 获取 schema 信息 Schema valueSchema = record.valueSchema(); GenericRowData delete; // 根据 op 的不同类型走不同的操作 if (op != Operation.CREATE && op != Operation.READ) { // delete if (op == Operation.DELETE) { delete = this.extractBeforeRow(value, valueSchema); this.validator.validate(delete, RowKind.DELETE); delete.setRowKind(RowKind.DELETE); out.collect(delete); } else { // update delete = this.extractBeforeRow(value, valueSchema); this.validator.validate(delete, RowKind.UPDATE_BEFORE); delete.setRowKind(RowKind.UPDATE_BEFORE); out.collect(delete); GenericRowData after = this.extractAfterRow(value, valueSchema); this.validator.validate(after, RowKind.UPDATE_AFTER); after.setRowKind(RowKind.UPDATE_AFTER); out.collect(after); } } else { // insert delete = this.extractAfterRow(value, valueSchema); this.validator.validate(delete, RowKind.INSERT); delete.setRowKind(RowKind.INSERT); out.collect(delete); } }
这里主要会判断进来的数据类型,然后根据不同的类型走不同的操作逻辑,如果是 update 操作的话,会输出两条数据.最终都是会转换成 RowData 类型输出.
接着往下面看是 builder.build() 该方法构造了 DebeziumSourceFunction 对象,也就是说 Flink 的底层是采用 Debezium 来读取 mysql,PostgreSQL 数据库的变更日志的.为什么没有用 canal 感兴趣的同学自己可以对比一下这两个框架
然后来看一下 DebeziumSourceFunction 的 UML 类图
可以发现 DebeziumSourceF unction 不仅继承了 RichSourceFunction 这个抽象类,而且还实现了 checkpoint 相关的接口,所以 mysql-cdc 是支持 Exactly Once 语义的.这几个接口大家都非常熟悉了,这里不再过多介绍.
直接来看一下核心方法 open 和 run 方法的逻辑如下
public void open(Configuration parameters) throws Exception { super.open(parameters); ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat("debezium-engine").build(); this.executor = Executors.newSingleThreadExecutor(threadFactory); } public void run(SourceContext<T> sourceContext) throws Exception { this.properties.setProperty("name", "engine"); this.properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName()); if (this.restoredOffsetState != null) { this.properties.setProperty("offset.storage.flink.state.value", this.restoredOffsetState); } this.properties.setProperty("key.converter.schemas.enable", "false"); this.properties.setProperty("value.converter.schemas.enable", "false"); this.properties.setProperty("include.schema.changes", "false"); this.properties.setProperty("offset.flush.interval.ms", String.valueOf(9223372036854775807L)); this.properties.setProperty("tombstones.on.delete", "false"); this.properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName()); if (this.engineInstanceName == null) { this.engineInstanceName = UUID.randomUUID().toString(); FlinkDatabaseHistory.registerEmptyHistoryRecord(this.engineInstanceName); } this.properties.setProperty("database.history.instance.name", this.engineInstanceName); String dbzHeartbeatPrefix = this.properties.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString()); this.debeziumConsumer = new DebeziumChangeConsumer(sourceContext, this.deserializer, this.restoredOffsetState == null, this::reportError, dbzHeartbeatPrefix); this.engine = DebeziumEngine.create(Connect.class).using(this.properties).notifying(this.debeziumConsumer).using(OffsetCommitPolicy.always()).using((success, message, error) -> { if (!success && error != null) { this.reportError(error); } }).build(); if (this.running) { this.executor.execute(this.engine); this.debeziumStarted = true; MetricGroup metricGroup = this.getRuntimeContext().getMetricGroup(); metricGroup.gauge("currentFetchEventTimeLag", () -> { return this.debeziumConsumer.getFetchDelay(); }); metricGroup.gauge("currentEmitEventTimeLag", () -> { return this.debeziumConsumer.getEmitDelay(); }); metricGroup.gauge("sourceIdleTime", () -> { return this.debeziumConsumer.getIdleTime(); }); try { while(this.running && !this.executor.awaitTermination(5L, TimeUnit.SECONDS)) { if (this.error != null) { this.running = false; this.shutdownEngine(); ExceptionUtils.rethrow(this.error); } } } catch (InterruptedException var5) { Thread.currentThread().interrupt(); } } }
open 方法里面主要是创建了一个单线程化的线程池(它只会用唯一的工作线程来执行任务).所以 mysql-cdc 源是单线程读取的.
run 方法里先是设置了一大堆 debenium 的属性,比如 include.schema.changes 默认是 false ,也就是说不会捕获表结构的变更,所以如果有新增字段的话,目前 Flink 任务是不能动态感知 schema 变化的.主要关心的是 insert update delete 操作.
最核心的地方是构建 DebeziumEngine 对象,然后通过上面的线程池来执行 engine,后面还有一些 metric 相关的逻辑,最后是一个循环的判断任务的状态,如果程序有异常或者被打断就抛出异常中断线程关闭 DebeziumEngine 对象.