Flink mysql-cdc connector 源码解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors.

在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors.


Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog 转换为 Flink SQL 认识的 RowData 数据。RowData 代表了一行的数据,在 RowData 上面会有一个元数据的信息 RowKind,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的数据,包含了旧数据(before)和新数据行(after)以及原数据信息(source),op 的 u表示是 update 更新操作标识符(op 字段的值c,u,d,r 分别对应 create,update,delete,reade),ts_ms 表示同步的时间戳。


下面就来深入 Flink 的源码分析一下 CDC 的实现原理


首先 mysql-cdc 作为 Flink SQL 的一个 connector,那就肯定会对应一个 TableFactory 类,我们就从这个工厂类入手分析一下源码的实现过程,先找到源码里面的 MySQLTableSourceFactory 这个类,然后来看一下它的 UML 类图.



从上图中可以看到 MySQLTableSourceFactory 只实现了 DynamicTableSourceFactory 这个接口,并没有实现 DynamicTableSinkFactory 的接口,所以 mysql-cdc 是只支持作为 source 不支持作为 sink 的,如果想要写入 mysql 的话,可以使用JDBC 的 connector.

然后直接来看 MySQLTableSourceFactory#createDynamicTableSource 方法实现,源码如下所示:


@Override
public DynamicTableSource createDynamicTableSource(Context context) {
    final FactoryUtil.TableFactoryHelper helper =
            FactoryUtil.createTableFactoryHelper(this, context);
    helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
    final ReadableConfig config = helper.getOptions();
    String hostname = config.get(HOSTNAME);
    String username = config.get(USERNAME);
    String password = config.get(PASSWORD);
    String databaseName = config.get(DATABASE_NAME);
    String tableName = config.get(TABLE_NAME);
    int port = config.get(PORT);
    Integer serverId = config.getOptional(SERVER_ID).orElse(null);
    ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
    StartupOptions startupOptions = getStartupOptions(config);
    TableSchema physicalSchema =
            TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
    return new MySQLTableSource(
            physicalSchema,
            port,
            hostname,
            databaseName,
            tableName,
            username,
            password,
            serverTimeZone,
            getDebeziumProperties(context.getCatalogTable().getOptions()),
            serverId,
            startupOptions);
}


这个方法的主要作用就构造 MySQLTableSource 对象,先会从 DDL 中获取 hostname,username,password 等数据库和表的信息,然后去构建 MySQLTableSource 对象.


接着来看一下 MySQLTableSource#getScanRuntimeProvider 这个方法,它会返回一个用于读取数据的运行实例对象


@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
    RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
    TypeInformation<RowData> typeInfo =
            scanContext.createTypeInformation(physicalSchema.toRowDataType());
    DebeziumDeserializationSchema<RowData> deserializer =
            new RowDataDebeziumDeserializeSchema(
                    rowType, typeInfo, ((rowData, rowKind) -> {}), serverTimeZone);
    MySQLSource.Builder<RowData> builder =
            MySQLSource.<RowData>builder()
                    .hostname(hostname)
                    .port(port)
                    .databaseList(database)
                    .tableList(database + "." + tableName)
                    .username(username)
                    .password(password)
                    .serverTimeZone(serverTimeZone.toString())
                    .debeziumProperties(dbzProperties)
                    .startupOptions(startupOptions)
                    .deserializer(deserializer);
    Optional.ofNullable(serverId).ifPresent(builder::serverId);
    DebeziumSourceFunction<RowData> sourceFunction = builder.build();
    return SourceFunctionProvider.of(sourceFunction, false);
}


这个方法里面先获取了 rowType 和 typeInfo 信息,然后构建了一个 DebeziumDeserializationSchema 反序列对象,这个对象的作用是把读取到的 SourceRecord 数据类型转换成 Flink 认识的 RowData 类型.接着来看一下 deserialize 方法.


public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
    // 获取 op 类型
    Operation op = Envelope.operationFor(record);
    // 获取数据
    Struct value = (Struct)record.value();
    // 获取 schema 信息
    Schema valueSchema = record.valueSchema();
    GenericRowData delete;
    // 根据 op 的不同类型走不同的操作
    if (op != Operation.CREATE && op != Operation.READ) {
        // delete
        if (op == Operation.DELETE) {
            delete = this.extractBeforeRow(value, valueSchema);
            this.validator.validate(delete, RowKind.DELETE);
            delete.setRowKind(RowKind.DELETE);
            out.collect(delete);
        } else {
            // update
            delete = this.extractBeforeRow(value, valueSchema);
            this.validator.validate(delete, RowKind.UPDATE_BEFORE);
            delete.setRowKind(RowKind.UPDATE_BEFORE);
            out.collect(delete);
            GenericRowData after = this.extractAfterRow(value, valueSchema);
            this.validator.validate(after, RowKind.UPDATE_AFTER);
            after.setRowKind(RowKind.UPDATE_AFTER);
            out.collect(after);
        }
    } else {
        // insert
        delete = this.extractAfterRow(value, valueSchema);
        this.validator.validate(delete, RowKind.INSERT);
        delete.setRowKind(RowKind.INSERT);
        out.collect(delete);
    }
}


这里主要会判断进来的数据类型,然后根据不同的类型走不同的操作逻辑,如果是 update 操作的话,会输出两条数据.最终都是会转换成 RowData 类型输出.


接着往下面看是 builder.build() 该方法构造了 DebeziumSourceFunction 对象,也就是说 Flink 的底层是采用 Debezium 来读取 mysql,PostgreSQL 数据库的变更日志的.为什么没有用 canal 感兴趣的同学自己可以对比一下这两个框架


然后来看一下 DebeziumSourceFunction 的 UML 类图



可以发现 DebeziumSourceF unction 不仅继承了 RichSourceFunction 这个抽象类,而且还实现了 checkpoint 相关的接口,所以 mysql-cdc 是支持 Exactly Once 语义的.这几个接口大家都非常熟悉了,这里不再过多介绍.

直接来看一下核心方法 open 和 run 方法的逻辑如下


public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat("debezium-engine").build();
        this.executor = Executors.newSingleThreadExecutor(threadFactory);
    }
public void run(SourceContext<T> sourceContext) throws Exception {
    this.properties.setProperty("name", "engine");
    this.properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
    if (this.restoredOffsetState != null) {
        this.properties.setProperty("offset.storage.flink.state.value", this.restoredOffsetState);
    }
    this.properties.setProperty("key.converter.schemas.enable", "false");
    this.properties.setProperty("value.converter.schemas.enable", "false");
    this.properties.setProperty("include.schema.changes", "false");
    this.properties.setProperty("offset.flush.interval.ms", String.valueOf(9223372036854775807L));
    this.properties.setProperty("tombstones.on.delete", "false");
    this.properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName());
    if (this.engineInstanceName == null) {
        this.engineInstanceName = UUID.randomUUID().toString();
        FlinkDatabaseHistory.registerEmptyHistoryRecord(this.engineInstanceName);
    }
    this.properties.setProperty("database.history.instance.name", this.engineInstanceName);
    String dbzHeartbeatPrefix = this.properties.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
    this.debeziumConsumer = new DebeziumChangeConsumer(sourceContext, this.deserializer, this.restoredOffsetState == null, this::reportError, dbzHeartbeatPrefix);
    this.engine = DebeziumEngine.create(Connect.class).using(this.properties).notifying(this.debeziumConsumer).using(OffsetCommitPolicy.always()).using((success, message, error) -> {
        if (!success && error != null) {
            this.reportError(error);
        }
    }).build();
    if (this.running) {
        this.executor.execute(this.engine);
        this.debeziumStarted = true;
        MetricGroup metricGroup = this.getRuntimeContext().getMetricGroup();
        metricGroup.gauge("currentFetchEventTimeLag", () -> {
            return this.debeziumConsumer.getFetchDelay();
        });
        metricGroup.gauge("currentEmitEventTimeLag", () -> {
            return this.debeziumConsumer.getEmitDelay();
        });
        metricGroup.gauge("sourceIdleTime", () -> {
            return this.debeziumConsumer.getIdleTime();
        });
        try {
            while(this.running && !this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
                if (this.error != null) {
                    this.running = false;
                    this.shutdownEngine();
                    ExceptionUtils.rethrow(this.error);
                }
            }
        } catch (InterruptedException var5) {
            Thread.currentThread().interrupt();
        }
    }
}


open 方法里面主要是创建了一个单线程化的线程池(它只会用唯一的工作线程来执行任务).所以 mysql-cdc 源是单线程读取的.


run 方法里先是设置了一大堆 debenium 的属性,比如 include.schema.changes 默认是 false ,也就是说不会捕获表结构的变更,所以如果有新增字段的话,目前 Flink 任务是不能动态感知 schema 变化的.主要关心的是 insert update delete 操作.


最核心的地方是构建 DebeziumEngine 对象,然后通过上面的线程池来执行 engine,后面还有一些 metric 相关的逻辑,最后是一个循环的判断任务的状态,如果程序有异常或者被打断就抛出异常中断线程关闭 DebeziumEngine 对象.

相关文章
|
9天前
|
SQL 存储 关系型数据库
数据库开发之mysql前言以及详细解析
数据库开发之mysql前言以及详细解析
18 0
|
9天前
|
运维 监控 安全
云HIS医疗管理系统源码——技术栈【SpringBoot+Angular+MySQL+MyBatis】
云HIS系统采用主流成熟技术,软件结构简洁、代码规范易阅读,SaaS应用,全浏览器访问前后端分离,多服务协同,服务可拆分,功能易扩展;支持多样化灵活配置,提取大量公共参数,无需修改代码即可满足不同客户需求;服务组织合理,功能高内聚,服务间通信简练。
26 4
|
2天前
|
缓存 Java 开发者
10个点介绍SpringBoot3工作流程与核心组件源码解析
Spring Boot 是Java开发中100%会使用到的框架,开发者不仅要熟练使用,对其中的核心源码也要了解,正所谓知其然知其所以然,V 哥建议小伙伴们在学习的过程中,一定要去研读一下源码,这有助于你在开发中游刃有余。欢迎一起交流学习心得,一起成长。
|
2天前
|
关系型数据库 MySQL 数据库
【MySQL】:约束全解析
【MySQL】:约束全解析
8 0
|
2天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在DataWorks中,使用JSON解析函数将MySQL表中的字段解析成多个字段将这些字段写入到ODPS(MaxCompute)中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
11 3
|
3天前
|
消息中间件 缓存 前端开发
Netty消息编码及发送源码解析
Netty消息编码及发送源码解析
6 0
|
6天前
|
SQL 关系型数据库 MySQL
MySQL锁:解析隐式锁与显式锁
【4月更文挑战第20天】
28 0
|
7天前
|
XML 人工智能 Java
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
|
7天前
|
SQL 关系型数据库 MySQL
[AIGC] MySQL连接查询全面解析
[AIGC] MySQL连接查询全面解析
|
9天前
|
存储 SQL 关系型数据库
MySQL数据库:深入解析与应用实例
MySQL数据库:深入解析与应用实例
25 0

推荐镜像

更多