新的增量数据同步工具闪亮登场,完美支持Oracle增量同步

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 新的增量数据同步工具闪亮登场,完美支持Oracle增量同步

众所周知,阿里巴巴开源了Canal,支持通过订阅MySQL Binlog从而实现将MySQL增量同步到诸如MQ、ElasticSearch的能力,但对Oracle却无能为力,那如何实现Oracle数据的增量同步呢?


本文将关注CDC领域又一后起之秀:Flink-CDC,用SQL语句定义同步任务,并且支持Oracle增量同步,本篇将一步一步如何使用flink-cdc实现Oracle增量同步,本系列的后续文章会重点剖析其实现原理。


1、准备Oracle环境


本文假设大家成功安装好了Oracle,并不打算介绍如何安装Oracle,如果您还未安装Oracle数据库,可先百度之。


1.1 开启归档日志


首先想要实现Oracle增量同步,必须开启Oracle的归档日志,查看Oracle是否开启归档日志的方法如下图所示:

bb137d033abe9c581e740b3bc7d2a133.png

如果还未开启归档日志,可以使用如下命令开启Oracle归档日志:

sqlplus /nolog
SQL> conn/as sysdba;
SQL> shutdown immediate;
SQL> startup mount;
SQL> alter database  archivelog;
SQL> alter database open;


1.2 开启数据库级别的supplemental log


可以通过如下命令开启数据库级别的supplemental日志。

alter database add supplemental log data (all) columns;

可以通过如下命令查看是否成功开启数据库级别的supplemental:

alter database add supplemental log data (all) columns;
-- 可以通过如下命令查看是否开启
SELECT supplemental_log_data_min min,
       supplemental_log_data_pk pk,
       supplemental_log_data_ui ui,
       supplemental_log_data_fk fk,
       supplemental_log_data_all allc from v$database;
显示结果:
MIN      PK  UI  FK  ALL
-------- --- --- --- ---
IMPLICIT NO  NO  NO  YES

其中 supplemental_log_data_all 为 yes 表示开启了所有字段的 supplemental机制。


1.3 创建测试用户与表


可以通过如下命令简单创建测试的用户与表结构:

-- 创建用户
create user YOUR_NAME identified by YOUR_PASSWORD;
-- 给用户授权
grant connect,resource,dba to YOUR_NAME;
-- 创建表
create table T_TEST_01
(
    ID   NUMBER not null,
    NAME VARCHAR2(100)
)
/
create unique index T_TEST_01_ID_UINDEX
    on T_TEST_01 (ID)
/
alter table T_TEST_01
    add constraint T_TEST_01_PK
        primary key (ID)
/


1.4 开启表级别的supplemental机制


可以通过如下命令开启表级别的supplemental

ALTER TABLE {用户名}.{表名} ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;


2、Flink-CDC测试代码


首先在java中要使用flink-cdc,需要在pom文件中添加如下依赖:

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <flink.version>1.13.3</flink.version>
  <target.java.version>1.8</target.java.version>
  <scala.binary.version>2.11</scala.binary.version>
  <maven.compiler.source>${target.java.version}</maven.compiler.source>
  <maven.compiler.target>${target.java.version}</maven.compiler.target>
  <log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-api-java</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-common</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-core</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-runtime_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
  </dependency>
  <dependency>
   <groupId>com.ververica</groupId>
   <artifactId>flink-connector-oracle-cdc</artifactId>
   <version>2.1.0</version>
  </dependency>
  <dependency>
   <groupId>org.apache.logging.log4j</groupId>
   <artifactId>log4j-slf4j-impl</artifactId>
   <version>${log4j.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.logging.log4j</groupId>
   <artifactId>log4j-api</artifactId>
   <version>${log4j.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.logging.log4j</groupId>
   <artifactId>log4j-core</artifactId>
   <version>${log4j.version}</version>
  </dependency>
</dependencies>

与之对应的Java的测试代码如下:

package net.codingw;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Oracle2ConsoleTest {
    public static void main(String[] args) {
        try {
            StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            int parallelism = 1;
            blinkStreamEnv.setParallelism(parallelism);
            EnvironmentSettings settings = 
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(blinkStreamEnv, settings);
            String sourceSql = "CREATE TABLE test (ID INT,NAME STRING) WITH ('connector' = 'oracle-
              cdc','hostname' = 'localhost','port' = '1521','username' = 'CDC','password' = '123456','database-
              name' = 'helowin','schema-nae' = 'CDC','table-name' = 't_test_01', 'scan.startup.mode' = 'latest-
              offset', 'debezium.log.mining.strategy'='online_catalog',  
                  'debezium.log.mining.continuous.mine'='true' )";
            String sinkSql = "CREATE TABLE sink_table (ID INT, NAME STRING) WITH (    'connector' = 'print')";
            String insertSql = "INSERT INTO sink_table SELECT ID, NAME FROM test";
            tableEnv.executeSql(sourceSql);
            tableEnv.executeSql(sinkSql);
            tableEnv.executeSql(insertSql);
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }
}

启动后,如果在数据库中添加一条记录,运行效果如下:

587a9fe642c5be1241899953a9e2d7b4.png

更新一条记录,运行效果如下:

713219cd466d10153602b95a29e17aff.png

删除一条记录,运行效果如下:

d802535303ceb49969803d181c62835b.png


3、遇到的坑


在测试Flink-CDC实现Oracle增量同步的过程中,主要碰到两个比较大的坑。


3.1 同步延迟较大


默认的配置同步延迟需要耗费3-5分钟,目前官方提供的解决方法,在创建flink schema时,指定如下两个参数:


  • 'debezium.log.mining.strategy'='online_catalog'
  • 'debezium.log.mining.continuous.mine'='true'


上述参数设定后,同步延迟能降低到秒级别


至于背后的原理,将在本系列的后续文章进行详细探讨与分析。


3.2 存在锁表风险


由于测试过程中出现了好几次的锁表,分析查看原因发现在 RelationalSnapshotChangeEventSource 的 doExecute 中会调用 lockTablesForSchemaSnapshot 中会锁表,对应的实现类为OracleSnapshotChangeEventSource:

ca1f00e19365fd9441e8d84352388686.png

但其释放的地方,感觉有些问题,具体在RelationalSnapshotChangeEventSource 的 doExecute中,部分截图如下所示:

3f593ba3a974c09a8c2f2a63438d5943.png

温馨提示:后续会对其机制进行详细解读,后续再来思考是否可以优化,如何优化该段代码,因为锁表在生产环境是一个风险极大的操作。

4、Oracle增量同步背后的理论基础


Oracle CDC 连接器支持捕获并记录 Oracle 数据库服务器中发生的行级变更,其原理是使用 Oracle 提供的 LogMiner 工具或者原生的 XStream API [3] 从 Oracle 中获取变更数据。


LogMiner 是 Oracle 数据库提供的一个分析工具,该工具可以解析 Oracle Redo 日志文件,从而将数据库的数据变更日志解析成变更事件输出。通过 LogMiner 方式时,Oracle 服务器对解析日志文件的进程做了严格的资源限制,所以对规模特别大的表,数据解析会比较慢,优点是 LogMiner 是可以免费使用的。


基本原理如下图所示:


2a7705b9a468ad9b301c296e538dbfb7.png


相关文章
|
1月前
|
SQL 存储 DataWorks
DataWorks数据同步功能支持全量更新和增量更新两种方式
【4月更文挑战第3天】DataWorks数据同步功能支持全量更新和增量更新两种方式
33 3
|
2月前
练习手动立即同步,将深圳的数据同步到北京
练习手动立即同步,将深圳的数据同步到北京
18 0
|
2月前
|
关系型数据库 MySQL Java
flink cdc 同步问题之多表数据如何同步
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
111 0
|
2月前
|
存储 关系型数据库 MySQL
数据同步大事务同步延迟
数据同步大事务同步延迟
26 6
|
2月前
|
SQL 分布式计算 DataWorks
DataWorks提供的数据同步类型不仅包括整库离线同步
【2月更文挑战第31天】DataWorks提供的数据同步类型不仅包括整库离线同步
23 8
|
7天前
|
DataWorks Oracle 关系型数据库
DataWorks操作报错合集之尝试从Oracle数据库同步数据到TDSQL的PG版本,并遇到了与RAW字段相关的语法错误,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
26 0
|
7天前
|
缓存 DataWorks 监控
DataWorks操作报错合集之在DataWorks中进行数据同步时,遇到数据量大的表同步时报链接异常,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
18 0
|
2月前
|
存储 编解码 算法
【ffmpeg音视频同步】解决ffmpeg音视频中多线程之间的数据同步问题
【ffmpeg音视频同步】解决ffmpeg音视频中多线程之间的数据同步问题
45 2
|
2月前
|
SQL API 数据库
flink cdc 同步问题之将Flink CDC 4.x中的数据同步到Doris如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
3月前
|
SQL Oracle 关系型数据库
Flink CDC数据同步问题之同步数据减少如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。

热门文章

最新文章

推荐镜像

更多