Flink mysql-cdc connector 源码解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors.

在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors.


Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 认识的 RowData 数据。RowData 代表了一行的数据,在 RowData 上面会有一个元数据的信息 RowKind,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的数据,包含了旧数据(before)和新数据行(after)以及原数据信息(source),op 的 u表示是 update 更新操作标识符(op 字段的值c,u,d,r 分别对应 create,update,delete,reade),ts_ms 表示同步的时间戳。


下面就来深入 Flink 的源码分析一下 CDC 的实现原理


首先 mysql-cdc 作为 Flink SQL 的一个 connector,那就肯定会对应一个 TableFactory 类,我们就从这个工厂类入手分析一下源码的实现过程,先找到源码里面的 MySQLTableSourceFactory 这个类,然后来看一下它的 UML 类图.



从上图中可以看到 MySQLTableSourceFactory 只实现了 DynamicTableSourceFactory 这个接口,并没有实现 DynamicTableSinkFactory 的接口,所以 mysql-cdc 是只支持作为 source 不支持作为 sink 的,如果想要写入 mysql 的话,可以使用JDBC 的 connector.

然后直接来看 MySQLTableSourceFactory#createDynamicTableSource 方法实现,源码如下所示:


@Override
public DynamicTableSource createDynamicTableSource(Context context) {
    final FactoryUtil.TableFactoryHelper helper =
            FactoryUtil.createTableFactoryHelper(this, context);
    helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
    final ReadableConfig config = helper.getOptions();
    String hostname = config.get(HOSTNAME);
    String username = config.get(USERNAME);
    String password = config.get(PASSWORD);
    String databaseName = config.get(DATABASE_NAME);
    String tableName = config.get(TABLE_NAME);
    int port = config.get(PORT);
    Integer serverId = config.getOptional(SERVER_ID).orElse(null);
    ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
    StartupOptions startupOptions = getStartupOptions(config);
    TableSchema physicalSchema =
            TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
    return new MySQLTableSource(
            physicalSchema,
            port,
            hostname,
            databaseName,
            tableName,
            username,
            password,
            serverTimeZone,
            getDebeziumProperties(context.getCatalogTable().getOptions()),
            serverId,
            startupOptions);
}


这个方法的主要作用就构造 MySQLTableSource 对象,先会从 DDL 中获取 hostname,username,password 等数据库和表的信息,然后去构建 MySQLTableSource 对象.


接着来看一下 MySQLTableSource#getScanRuntimeProvider 这个方法,它会返回一个用于读取数据的运行实例对象


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
    RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
    TypeInformation<RowData> typeInfo =
            scanContext.createTypeInformation(physicalSchema.toRowDataType());
    DebeziumDeserializationSchema<RowData> deserializer =
            new RowDataDebeziumDeserializeSchema(
                    rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
    MySQLSource.Builder<RowData> builder =
            MySQLSource.<RowData>builder()
                    .hostname(hostname)
                    .port(port)
                    .databaseList(database)
                    .tableList(database + "." + tableName)
                    .username(username)
                    .password(password)
                    .serverTimeZone(serverTimeZone.toString())
                    .debeziumProperties(dbzProperties)
                    .startupOptions(startupOptions)
                    .deserializer(deserializer);
    Optional.ofNullable(serverId).ifPresent(builder::serverId);
    DebeziumSourceFunction<RowData> sourceFunction = builder.build();
    return SourceFunctionProvider.of(sourceFunction, false);
}


这个方法里面先获取了 rowType 和 typeInfo 信息,然后构建了一个 DebeziumDeserializationSchema 反序列对象,这个对象的作用是把读取到的 SourceRecord 数据类型转换成 Flink 认识的 RowData 类型.接着来看一下 deserialize 方法.


public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
    // 获取 op 类型
    Operation op = Envelope.operationFor(record);
    // 获取数据
    Struct value = (Struct)record.value();
    // 获取 schema 信息
    Schema valueSchema = record.valueSchema();
    GenericRowData delete;
    // 根据 op 的不同类型走不同的操作
    if (op != Operation.CREATE && op != Operation.READ) {
        // delete
        if (op == Operation.DELETE) {
            delete = this.extractBeforeRow(value, valueSchema);
            this.validator.validate(delete, RowKind.DELETE);
            delete.setRowKind(RowKind.DELETE);
            out.collect(delete);
        } else {
            // update
            delete = this.extractBeforeRow(value, valueSchema);
            this.validator.validate(delete, RowKind.UPDATE_BEFORE);
            delete.setRowKind(RowKind.UPDATE_BEFORE);
            out.collect(delete);
            GenericRowData after = this.extractAfterRow(value, valueSchema);
            this.validator.validate(after, RowKind.UPDATE_AFTER);
            after.setRowKind(RowKind.UPDATE_AFTER);
            out.collect(after);
        }
    } else {
        // insert
        delete = this.extractAfterRow(value, valueSchema);
        this.validator.validate(delete, RowKind.INSERT);
        delete.setRowKind(RowKind.INSERT);
        out.collect(delete);
    }
}


这里主要会判断进来的数据类型,然后根据不同的类型走不同的操作逻辑,如果是 update 操作的话,会输出两条数据.最终都是会转换成 RowData 类型输出.


接着往下面看是 builder.build() 该方法构造了 DebeziumSourceFunction 对象,也就是说 Flink 的底层是采用 Debezium 来读取 mysql,PostgreSQL 数据库的变更日志的.为什么没有用 canal 感兴趣的同学自己可以对比一下这两个框架


然后来看一下 DebeziumSourceFunction 的 UML 类图



可以发现 DebeziumSourceF unction 不仅继承了 RichSourceFunction 这个抽象类,而且还实现了 checkpoint 相关的接口,所以 mysql-cdc 是支持 Exactly Once 语义的.这几个接口大家都非常熟悉了,这里不再过多介绍.

直接来看一下核心方法 open 和 run 方法的逻辑如下


public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat("debezium-engine").build();
        this.executor = Executors.newSingleThreadExecutor(threadFactory);
    }
public void run(SourceContext<T> sourceContext) throws Exception {
    this.properties.setProperty("name", "engine");
    this.properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
    if (this.restoredOffsetState != null) {
        this.properties.setProperty("offset.storage.flink.state.value", this.restoredOffsetState);
    }
    this.properties.setProperty("key.converter.schemas.enable", "false");
    this.properties.setProperty("value.converter.schemas.enable", "false");
    this.properties.setProperty("include.schema.changes", "false");
    this.properties.setProperty("offset.flush.interval.ms", String.valueOf(9223372036854775807L));
    this.properties.setProperty("tombstones.on.delete", "false");
    this.properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName());
    if (this.engineInstanceName == null) {
        this.engineInstanceName = UUID.randomUUID().toString();
        FlinkDatabaseHistory.registerEmptyHistoryRecord(this.engineInstanceName);
    }
    this.properties.setProperty("database.history.instance.name", this.engineInstanceName);
    String dbzHeartbeatPrefix = this.properties.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
    this.debeziumConsumer = new DebeziumChangeConsumer(sourceContext, this.deserializer, this.restoredOffsetState == null, this::reportError, dbzHeartbeatPrefix);
    this.engine = DebeziumEngine.create(Connect.class).using(this.properties).notifying(this.debeziumConsumer).using(OffsetCommitPolicy.always()).using((success, message, error) -> {
        if (!success && error != null) {
            this.reportError(error);
        }
    }).build();
    if (this.running) {
        this.executor.execute(this.engine);
        this.debeziumStarted = true;
        MetricGroup metricGroup = this.getRuntimeContext().getMetricGroup();
        metricGroup.gauge("currentFetchEventTimeLag", () -> {
            return this.debeziumConsumer.getFetchDelay();
        });
        metricGroup.gauge("currentEmitEventTimeLag", () -> {
            return this.debeziumConsumer.getEmitDelay();
        });
        metricGroup.gauge("sourceIdleTime", () -> {
            return this.debeziumConsumer.getIdleTime();
        });
        try {
            while(this.running && !this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
                if (this.error != null) {
                    this.running = false;
                    this.shutdownEngine();
                    ExceptionUtils.rethrow(this.error);
                }
            }
        } catch (InterruptedException var5) {
            Thread.currentThread().interrupt();
        }
    }
}


open 方法里面主要是创建了一个单线程化的线程池(它只会用唯一的工作线程来执行任务).所以 mysql-cdc 源是单线程读取的.


run 方法里先是设置了一大堆 debenium 的属性,比如 include.schema.changes 默认是 false ,也就是说不会捕获表结构的变更,所以如果有新增字段的话,目前 Flink 任务是不能动态感知 schema 变化的.主要关心的是 insert update delete 操作.


最核心的地方是构建 DebeziumEngine 对象,然后通过上面的线程池来执行 engine,后面还有一些 metric 相关的逻辑,最后是一个循环的判断任务的状态,如果程序有异常或者被打断就抛出异常中断线程关闭 DebeziumEngine 对象.

相关文章
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
135 0
|
9天前
|
监控 关系型数据库 MySQL
MySQL自增ID耗尽应对策略:技术解决方案全解析
在数据库管理中,MySQL的自增ID(AUTO_INCREMENT)属性为表中的每一行提供了一个唯一的标识符。然而,当自增ID达到其最大值时,如何处理这一情况成为了数据库管理员和开发者必须面对的问题。本文将探讨MySQL自增ID耗尽的原因、影响以及有效的应对策略。
31 3
|
10天前
|
存储 关系型数据库 MySQL
MySQL 字段类型深度解析:VARCHAR(50) 与 VARCHAR(500) 的差异
在MySQL数据库中,`VARCHAR`类型是一种非常灵活的字符串存储类型,它允许存储可变长度的字符串。然而,`VARCHAR(50)`和`VARCHAR(500)`之间的差异不仅仅是长度的不同,它们在存储效率、性能和使用场景上也有所不同。本文将深入探讨这两种字段类型的区别及其对数据库设计的影响。
23 2
|
14天前
|
存储 关系型数据库 MySQL
PHP与MySQL动态网站开发深度解析####
本文作为技术性文章,深入探讨了PHP与MySQL结合在动态网站开发中的应用实践,从环境搭建到具体案例实现,旨在为开发者提供一套详尽的实战指南。不同于常规摘要仅概述内容,本文将以“手把手”的教学方式,引导读者逐步构建一个功能完备的动态网站,涵盖前端用户界面设计、后端逻辑处理及数据库高效管理等关键环节,确保读者能够全面掌握PHP与MySQL在动态网站开发中的精髓。 ####
|
22天前
|
存储 关系型数据库 MySQL
MySQL MVCC深度解析:掌握并发控制的艺术
【10月更文挑战第23天】 在数据库领域,MVCC(Multi-Version Concurrency Control,多版本并发控制)是一种重要的并发控制机制,它允许多个事务并发执行而不产生冲突。MySQL作为广泛使用的数据库系统,其InnoDB存储引擎就采用了MVCC来处理事务。本文将深入探讨MySQL中的MVCC机制,帮助你在面试中自信应对相关问题。
68 3
|
22天前
|
缓存 关系型数据库 MySQL
MySQL执行计划深度解析:如何做出最优选择
【10月更文挑战第23天】 在数据库查询性能优化中,执行计划的选择至关重要。MySQL通过查询优化器来生成执行计划,但有时不同的执行计划会导致性能差异。理解如何选择合适的执行计划,以及为什么某些计划更优,对于数据库管理员和开发者来说是一项必备技能。
30 2
|
1月前
|
Java 关系型数据库 MySQL
【编程基础知识】Eclipse连接MySQL 8.0时的JDK版本和驱动问题全解析
本文详细解析了在使用Eclipse连接MySQL 8.0时常见的JDK版本不兼容、驱动类错误和时区设置问题,并提供了清晰的解决方案。通过正确配置JDK版本、选择合适的驱动类和设置时区,确保Java应用能够顺利连接MySQL 8.0。
142 1
|
1月前
|
架构师 关系型数据库 MySQL
MySQL最左前缀优化原则:深入解析与实战应用
【10月更文挑战第12天】在数据库架构设计与优化中,索引的使用是提升查询性能的关键手段之一。其中,MySQL的最左前缀优化原则(Leftmost Prefix Principle)是复合索引(Composite Index)应用中的核心策略。作为资深架构师,深入理解并掌握这一原则,对于平衡数据库性能与维护成本至关重要。本文将详细解读最左前缀优化原则的功能特点、业务场景、优缺点、底层原理,并通过Java示例展示其实现方式。
73 1
|
2月前
|
消息中间件 canal 关系型数据库
Maxwell:binlog 解析器,轻松同步 MySQL 数据
Maxwell:binlog 解析器,轻松同步 MySQL 数据
304 11

热门文章

最新文章