1.7 Debezium和Flink CDC支持的数据库类型
Debezium 1.8支持以下数据库版本:
MySQL 5.7和8.0
PostgreSQL 9.4到13.x
MongoDB 3.6到4.4
SQL Server 2016、2017和2019
Oracle 11gR2、12cR1、12cR2和19c
请注意,这只是针对Debezium 1.8的支持列表。不同版本的Debezium可能支持不同的数据库版本。
Flink CDC 2.3.x 支持的数据库版本:
1.8 FlinkCDC 和Flink的版本对应关系
下表显示了Flink CDC连接器和Flink之间的版本映射:
第 2 章 Flink CDC 案例实操
2.1 DataStream 方式的应用
2.1.1 导入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.atguigu</groupId> <artifactId>atguigu-flink-cdc</artifactId> <version>1.0-SNAPSHOT</version> <properties> <flink-version>1.13.0</flink-version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2.1.2 编写代码
package com.atguigu; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FlinkCDC { public static void main(String[] args) throws Exception { //1.获取Flink 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);//设置并行度为1 //1.1 开启CK(CK就是Checkpoint) env.enableCheckpointing(5000);//5s一次断点,可以理解为5s备份一次 env.getCheckpointConfig().setCheckpointTimeout(10000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //断点的文件存储的位置(在下面2.1.3的第七点创建) env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck")); //2.通过FlinkCDC构建SourceFunction DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("hadoop102") .port(3306) .username("root") .password("000000") .databaseList("cdc_test") .tableList("cdc_test.user_info") .deserializer(new StringDebeziumDeserializationSchema()) //设置序列化器 .startupOptions(StartupOptions.initial()) .build(); DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction); //3.数据打印 dataStreamSource.print(); //4.启动任务 env.execute("FlinkCDC"); } }
.tableList("cdc_test.user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的 .databaseList("cdc_test")数据库里的所有表的数据
注意:指定的时候需要使用"db.table"的方式,因为databaseList里面可以写多个库名,不指定数据库的话可能会出现歧义
.startupOptions(StartupOptions.initial())里面的StartupOptions的方法介绍
initial (default): 在第一次启动时对受监控的数据库表执行初始快照,并继续读取最新的binlog。其实就是能获取表中原本存在的数据,在监控后的操作也能获取到
latest-offset: 从不在第一次启动时对受监控的数据库表执行快照,只从binlog的末尾读取,这意味着只具有自连接器启动以来的更改。其实就是不能获取表中原本存在的数据,只能获取在监控后的操作
timestamp: 从不在第一次启动时对被监控的数据库表执行快照,直接从指定的时间戳读取binlog。消费者将从头开始遍历binlog,并忽略时间戳小于指定时间戳的更改事件
specific-offset: 从不在第一次启动时对被监视的数据库表执行快照,直接从指定的偏移量读取binlog。
2.1.3 案例测试
1)打包并上传至 Linux
2)开启 MySQL Binlog 并重启 MySQL
3)启动 Flink 集群
[atguigu@hadoop102 flink-standalone]$ bin/start-cluster.sh
4)启动 HDFS 集群
[atguigu@hadoop102 flink-standalone]$ start-dfs.sh
5)启动程序
[atguigu@hadoop102 flink-standalone]$ bin/flink run -c com.atguigu.FlinkCDC flink-1.0-
SNAPSHOT-jar-with-dependencies.jar
6)在 MySQL 的 gmall-flink.z_user_info 表中添加、修改或者删除数据
7)给当前的 Flink 程序创建 Savepoint
[atguigu@hadoop102 flink-standalone]$ bin/flink savepoint JobId
hdfs://hadoop102:8020/flink/save
8)关闭程序以后从 Savepoint 重启程序
[atguigu@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c
com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
2.2 FlinkSQL 方式的应用
2.2.1 代码实现
package com.atguigu; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class FlinkSQLCDC { public static void main(String[] args) throws Exception { //1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);//设置并行度为1 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //2.使用FLINKSQL DDL模式构建CDC 表 tableEnv.executeSql("CREATE TABLE user_info ( " + " id STRING primary key, " + " name STRING, " + " sex STRING " + ") WITH ( " + " 'connector' = 'mysql-cdc', " + " 'scan.startup.mode' = 'latest-offset', " + " 'hostname' = 'hadoop102', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = '000000', " + " 'database-name' = 'cdc_test', " + " 'table-name' = 'user_info' " + ")"); //3.查询数据并转换为流输出 Table table = tableEnv.sqlQuery("select * from user_info"); DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class); retractStream.print(); //4.启动 env.execute("FlinkSQLCDC"); } }
2.3 自定义反序列化器
2.3.1 代码实现
package com.atguigu.func; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> { /** * { * "db":"", * "tableName":"", * "before":{"id":"1001","name":""...}, * "after":{"id":"1001","name":""...}, * "op":"" * } */ @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //创建JSON对象用于封装结果数据 JSONObject result = new JSONObject(); //获取库名&表名 String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); result.put("db", fields[1]); result.put("tableName", fields[2]); //获取before数据 Struct value = (Struct) sourceRecord.value(); Struct before = value.getStruct("before"); JSONObject beforeJson = new JSONObject(); if (before != null) { //获取列信息 Schema schema = before.schema(); List<Field> fieldList = schema.fields(); for (Field field : fieldList) { beforeJson.put(field.name(), before.get(field)); } } result.put("before", beforeJson); //获取after数据 Struct after = value.getStruct("after"); JSONObject afterJson = new JSONObject(); if (after != null) { //获取列信息 Schema schema = after.schema(); List<Field> fieldList = schema.fields(); for (Field field : fieldList) { afterJson.put(field.name(), after.get(field)); } } result.put("after", afterJson); //获取操作类型 Envelope.Operation operation = Envelope.operationFor(sourceRecord); result.put("op", operation); //输出数据 collector.collect(result.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
第 3 章 Flink-CDC 2.0
本章图片来源于北京站 Flink Meetup 分享的《详解 Flink-CDC》
3.1 1.x 痛点
FlinkCDC1.0版本的缺点
全量 + 增量读取的过程需要保证所有数据的一致性,因此需要通过加锁保证,但是加锁在数据库层面上是一个十分高危的操作。底层 Debezium 在保证数据一致性时,需要对读取的库或表加锁,全局锁可能导致数据库锁住,表级锁会锁住表的读,DBA 一般不给锁权限。
不支持水平扩展,因为 Flink CDC 底层是基于 Debezium,起架构是单节点,所以Flink CDC 只支持单并发。在全量阶段读取阶段,如果表非常大 (亿级别),读取时间在小时甚至天级别,用户不能通过增加资源去提升作业速度。
全量读取阶段不支持 checkpoint:CDC 读取分为两个阶段,全量读取和增量读取,目前全量读取阶段是不支持 checkpoint 的,因此会存在一个问题:当我们同步全量数据时,假设需要 5 个小时,当我们同步了 4 小时的时候作业失败,这时候就需要重新开始,再读取 5 个小时。
3.2 Debezium 锁分析
Flink CDC 底层封装了 Debezium, Debezium 同步一张表分为两个阶段:
- 全量阶段:查询当前表中所有记录;
- 增量阶段:从 binlog 消费变更数据。
大部分用户使用的场景都是全量 + 增量同步,加锁是发生在全量阶段,目的是为了确定全量阶段的初始位点,保证增量 + 全量实现一条不多,一条不少,从而保证数据一致性。从下图中我们可以分析全局锁和表锁的一些加锁流程,左边红色线条是锁的生命周期,右边是 MySQL 开启可重复读事务的生命周期。
以全局锁为例,首先是获取一个锁,然后再去开启可重复读的事务。这里锁住操作是读取 binlog 的起始位置和当前表的 schema。这样做的目的是保证 binlog 的起始位置和读取到的当前 schema 是可以对应上的,因为表的 schema 是会改变的,比如如删除列或者增加列。在读取这两个信息后,SnapshotReader 会在可重复读事务里读取全量数据,在全量数据读取完成后,会启动 BinlogReader 从读取的 binlog 起始位置开始增量读取,从而保证全量数据 + 增量数据的无缝衔接。
表锁是全局锁的退化版,因为全局锁的权限会比较高,因此在某些场景,用户只有表锁。表锁锁的时间会更长。经过上面分析,接下来看看这些锁到底会造成怎样严重的后果:
Flink CDC 1.x 可以不加锁,能够满足大部分场景,但牺牲了一定的数据准确性。Flink CDC 1.x 默认加全局锁,虽然能保证数据一致性,但存在上述 hang 住数据的风险。