众所周知,阿里巴巴开源了Canal,支持通过订阅MySQL Binlog从而实现将MySQL增量同步到诸如MQ、ElasticSearch的能力,但对Oracle却无能为力,那如何实现Oracle数据的增量同步呢?
本文将关注CDC领域又一后起之秀:Flink-CDC,用SQL语句定义同步任务,并且支持Oracle增量同步,本篇将一步一步如何使用flink-cdc实现Oracle增量同步,本系列的后续文章会重点剖析其实现原理。
1、准备Oracle环境
本文假设大家成功安装好了Oracle,并不打算介绍如何安装Oracle,如果您还未安装Oracle数据库,可先百度之。
1.1 开启归档日志
首先想要实现Oracle增量同步,必须开启Oracle的归档日志,查看Oracle是否开启归档日志的方法如下图所示:
如果还未开启归档日志,可以使用如下命令开启Oracle归档日志:
sqlplus /nolog SQL> conn/as sysdba; SQL> shutdown immediate; SQL> startup mount; SQL> alter database archivelog; SQL> alter database open;
1.2 开启数据库级别的supplemental log
可以通过如下命令开启数据库级别的supplemental日志。
alter database add supplemental log data (all) columns;
可以通过如下命令查看是否成功开启数据库级别的supplemental:
alter database add supplemental log data (all) columns; -- 可以通过如下命令查看是否开启 SELECT supplemental_log_data_min min, supplemental_log_data_pk pk, supplemental_log_data_ui ui, supplemental_log_data_fk fk, supplemental_log_data_all allc from v$database; 显示结果: MIN PK UI FK ALL -------- --- --- --- --- IMPLICIT NO NO NO YES
其中 supplemental_log_data_all 为 yes 表示开启了所有字段的 supplemental机制。
1.3 创建测试用户与表
可以通过如下命令简单创建测试的用户与表结构:
-- 创建用户 create user YOUR_NAME identified by YOUR_PASSWORD; -- 给用户授权 grant connect,resource,dba to YOUR_NAME; -- 创建表 create table T_TEST_01 ( ID NUMBER not null, NAME VARCHAR2(100) ) / create unique index T_TEST_01_ID_UINDEX on T_TEST_01 (ID) / alter table T_TEST_01 add constraint T_TEST_01_PK primary key (ID) /
1.4 开启表级别的supplemental机制
可以通过如下命令开启表级别的supplemental
ALTER TABLE {用户名}.{表名} ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
2、Flink-CDC测试代码
首先在java中要使用flink-cdc,需要在pom文件中添加如下依赖:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.13.3</flink.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>2.12.1</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-oracle-cdc</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> </dependency> </dependencies>
与之对应的Java的测试代码如下:
package net.codingw; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Oracle2ConsoleTest { public static void main(String[] args) { try { StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); int parallelism = 1; blinkStreamEnv.setParallelism(parallelism); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(blinkStreamEnv, settings); String sourceSql = "CREATE TABLE test (ID INT,NAME STRING) WITH ('connector' = 'oracle- cdc','hostname' = 'localhost','port' = '1521','username' = 'CDC','password' = '123456','database- name' = 'helowin','schema-nae' = 'CDC','table-name' = 't_test_01', 'scan.startup.mode' = 'latest- offset', 'debezium.log.mining.strategy'='online_catalog', 'debezium.log.mining.continuous.mine'='true' )"; String sinkSql = "CREATE TABLE sink_table (ID INT, NAME STRING) WITH ( 'connector' = 'print')"; String insertSql = "INSERT INTO sink_table SELECT ID, NAME FROM test"; tableEnv.executeSql(sourceSql); tableEnv.executeSql(sinkSql); tableEnv.executeSql(insertSql); } catch (Throwable e) { e.printStackTrace(); } } }
启动后,如果在数据库中添加一条记录,运行效果如下:
更新一条记录,运行效果如下:
删除一条记录,运行效果如下:
3、遇到的坑
在测试Flink-CDC实现Oracle增量同步的过程中,主要碰到两个比较大的坑。
3.1 同步延迟较大
默认的配置同步延迟需要耗费3-5分钟,目前官方提供的解决方法,在创建flink schema时,指定如下两个参数:
- 'debezium.log.mining.strategy'='online_catalog'
- 'debezium.log.mining.continuous.mine'='true'
上述参数设定后,同步延迟能降低到秒级别。
至于背后的原理,将在本系列的后续文章进行详细探讨与分析。
3.2 存在锁表风险
由于测试过程中出现了好几次的锁表,分析查看原因发现在 RelationalSnapshotChangeEventSource 的 doExecute 中会调用 lockTablesForSchemaSnapshot 中会锁表,对应的实现类为OracleSnapshotChangeEventSource:
但其释放的地方,感觉有些问题,具体在RelationalSnapshotChangeEventSource 的 doExecute中,部分截图如下所示:
温馨提示:后续会对其机制进行详细解读,后续再来思考是否可以优化,如何优化该段代码,因为锁表在生产环境是一个风险极大的操作。
4、Oracle增量同步背后的理论基础
Oracle CDC 连接器支持捕获并记录 Oracle 数据库服务器中发生的行级变更,其原理是使用 Oracle 提供的 LogMiner 工具或者原生的 XStream API [3] 从 Oracle 中获取变更数据。
LogMiner 是 Oracle 数据库提供的一个分析工具,该工具可以解析 Oracle Redo 日志文件,从而将数据库的数据变更日志解析成变更事件输出。通过 LogMiner 方式时,Oracle 服务器对解析日志文件的进程做了严格的资源限制,所以对规模特别大的表,数据解析会比较慢,优点是 LogMiner 是可以免费使用的。
基本原理如下图所示: