flink oracle cdc实时同步(超详细)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: flink oracle cdc实时同步(超详细)

01 引言

官方文档:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md

本文参照官方文档来记录Oracle CDC 的配置。

在本文开始前,需要先安装Oracle,有兴趣的同学可以参考博主之前写的《docker下安装oracle11g(一次安装成功)》

02 前提条件

如果要做oracle的实时同步,Oracle数据库配置必须满足如下:

  1. Oracle数据库启用日志归档;
  2. 定义具有适当权限的Oracle用户;
  3. 被捕获的表或数据库上必须启用增量日志记录;

在官网的安装教程中,可以看到有两种数据库类型的配置,分别是:

类型 描述 版本
独立的数据库架构(Non-CDB database) 是传统的独立数据库架构,每个数据库实例包含所有的对象和数据 Oracle 11g以及之前的版本中使用
多租户数据库架构(CDB database) 多租户架构的数据库,包含一个根容器和多个子容器(PDB),每个PDB可以看作是一个独立的数据库 Oracle 12c开始

因为 之前安装 的是Oracle11g,所以本文以独立的数据库架构(Non-CDB database)的配置来讲解。

03 配置

注意:以下执行的条件是基于之前安装好的oracle环境来执行的,详情参阅:《docker下安装oracle11g(一次安装成功)》

3.1 启用日志归档

Step1:进入容器:

docker exec -it oracle_11g bash

Step2:以DBA的权限登录数据库:

sqlplus /nolog
CONNECT sys/system AS SYSDBA

Step3:启用日志归档:

-- 设置数据库恢复文件目标大小为10G
alter system set db_recovery_file_dest_size = 10G;
-- 设置数据库恢复文件目标路径
alter system set db_recovery_file_dest = '/home/oracle/app/oracle/product/11.2.0' scope=spfile;
-- 立即关闭数据库
shutdown immediate;
-- 以mount模式启动数据库
startup mount;
-- 启用数据库归档日志模式
alter database archivelog;
-- 打开数据库,允许用户访问
alter database open;

操作记录如下:

Step4:查看日志归档是否启用(如果显示“Archive Mode”表示已经启用)

archive log list;

/home/oracle/app/oracle/product/11.2.0目录也能看到数据库恢复文件(按日期分目录):


备注:

  • 启用日志归档需要重启数据库。
  • 归档日志将占用大量的磁盘空间,因此需要定期清理过期的日志。

3.2 用户赋权

Step1创建表空间(创建表空间是为了提供一个独立、可控、可扩展的存储区域,以供CDC工具捕获和管理数据库的增量数据,这对于实时同步和数据变更追踪非常重要,并为数据流和数据仓库等应用提供可靠的数据源。)

-- 以DBA的权限登录数据库
sqlplus /nolog
CONNECT sys/system AS SYSDBA
-- 创建一个名为"logminer_tbs"的表空间
-- 指定表空间的数据文件路径为"/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf",其中"/home/oracle/app/oracle/product/11.2.0"是数据文件存储的目录,"logminer_tbs.dbf"是数据文件的文件名
-- 设置表空间的初始大小为25MB
-- 如果数据文件已经存在且可重用,将其重用,否则创建一个新的数据文件
-- 启用表空间的自动扩展功能,即当表空间空间不足时,自动增加数据文件的大小
-- 设置表空间的最大允许大小为无限,即表空间可以无限制地自动扩展
CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

可以看到在“/home/oracle/app/oracle/product/11.2.0”目录里已经创建了logminer_tbs.dbf文件:

Step2:创建用户并赋予权限

-- 创建一个名为"flinkuser"的用户,密码为"flinkpw",将其默认表空间设置为"LOGMINER_TBS",并在该表空间上设置无限配额。
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
-- 允许"flinkuser"用户创建会话,即允许该用户连接到数据库。
GRANT CREATE SESSION TO flinkuser;
-- (不支持Oracle 11g)允许"flinkuser"用户在多租户数据库(CDB)中设置容器。
-- GRANT SET CONTAINER TO flinkuser;
-- 允许"flinkuser"用户查询V_$DATABASE视图,该视图包含有关数据库实例的信息。
GRANT SELECT ON V_$DATABASE TO flinkuser;
-- 允许"flinkuser"用户执行任何表的闪回操作。
GRANT FLASHBACK ANY TABLE TO flinkuser;
-- 允许"flinkuser"用户查询任何表的数据。
GRANT SELECT ANY TABLE TO flinkuser;
-- 允许"flinkuser"用户拥有SELECT_CATALOG_ROLE角色,该角色允许查询数据字典和元数据。
GRANT SELECT_CATALOG_ROLE TO flinkuser;
-- 允许"flinkuser"用户拥有EXECUTE_CATALOG_ROLE角色,该角色允许执行一些数据字典中的过程和函数。
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
-- 允许"flinkuser"用户查询任何事务。
GRANT SELECT ANY TRANSACTION TO flinkuser;
-- (不支持Oracle 11g)允许"flinkuser"用户进行数据变更追踪(LogMiner)。
-- GRANT LOGMINING TO flinkuser;
-- 允许"flinkuser"用户创建表。
GRANT CREATE TABLE TO flinkuser;
-- 允许"flinkuser"用户锁定任何表。
GRANT LOCK ANY TABLE TO flinkuser;
-- 允许"flinkuser"用户修改任何表。
GRANT ALTER ANY TABLE TO flinkuser;
-- 允许"flinkuser"用户创建序列。
GRANT CREATE SEQUENCE TO flinkuser;
-- 允许"flinkuser"用户执行DBMS_LOGMNR包中的过程。
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
-- 允许"flinkuser"用户执行DBMS_LOGMNR_D包中的过程。
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
-- 允许"flinkuser"用户查询V_$LOG视图,该视图包含有关数据库日志文件的信息。
GRANT SELECT ON V_$LOG TO flinkuser;
-- 允许"flinkuser"用户查询V_$LOG_HISTORY视图,该视图包含有关数据库历史日志文件的信息。
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
-- 允许"flinkuser"用户查询V_$LOGMNR_LOGS视图,该视图包含有关LogMiner日志文件的信息。
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
-- 允许"flinkuser"用户查询V_$LOGMNR_CONTENTS视图,该视图包含LogMiner日志文件的内容。
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
-- 允许"flinkuser"用户查询V_$LOGMNR_PARAMETERS视图,该视图包含有关LogMiner的参数信息。
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
-- 允许"flinkuser"用户查询V_$LOGFILE视图,该视图包含有关数据库日志文件的信息。
GRANT SELECT ON V_$LOGFILE TO flinkuser;
-- 允许"flinkuser"用户查询V_$ARCHIVED_LOG视图,该视图包含已归档的数据库日志文件的信息。
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
-- 允许"flinkuser"用户查询V_$ARCHIVE_DEST_STATUS视图,该视图包含有关归档目标状态的信息。
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

3.3 表或数据库上启用增量日志记录(supplemental log)

在讲解Oracle之前,很有必要先了解Oracle的逻辑结构。

3.3.1 Oracle 逻辑结构

Oracle数据库的物理结构与MySQL以及SQLServer有着很大的不同,在使用MySQL或SQLServer时,我们不需要去关心它们的逻辑结构和物理结构。

Oracle在逻辑结构中,分别是如下的结构:数据库实例 => 表空间 => 数据段(表) => 区 => 块

  • 数据库实例:前面的《docker下安装oracle11g(一次安装成功)》,在启动容器时,已经指定了Oracle的数据库实例的唯一ID,每个Oracle数据库实例都有一个唯一的SID,也就是说,安装的时候,已经创建好了一个“helowin”数据库实例了。
  • 表空间:在 “用户赋权”的第1步骤,可以看出已经创建了“logminer_tbs”表空间(CREATE TABLESPACE logminer_tbs…)

相关的查询SQL:

-- 以DBA的权限登录数据库
sqlplus /nolog
CONNECT sys/system AS SYSDBA
-- 查询数据库实例名称
SELECT NAME FROM V$DATABASE;
-- 查询所有表空间名称
SELECT TABLESPACE_NAME FROM DBA_TABLESPACES;

结果如下:

ok,接下来就可以进入“LOGMINER_TBS”表空间去创建表了。

3.3.2 创建表

LOGMINER_TBS表空间下的flinkuser用户下创建customers表:

-- 切换至flinkuser用户
sqlplus /nolog
CONNECT flinkuser/flinkpw
-- 创建customers表
CREATE TABLE customers (
    customer_id NUMBER PRIMARY KEY,
    customer_name VARCHAR2(50),
    email VARCHAR2(100),
    phone VARCHAR2(20)
) TABLESPACE LOGMINER_TBS;

查看表是否创建成功:

-- 查看LOGMINER_TBS表空间下的所有表
select tablespace_name, table_name from user_tables
where tablespace_name = 'LOGMINER_TBS';

可以看到表创建成功:

3.3.3 启用增量日志

切换至SYS用户,以DBA的权限登录数据库,为表和数据库启用增量日志:

-- 以DBA的权限登录数据库
sqlplus /nolog
CONNECT sys/system AS SYSDBA
-- 为LOGMINER_TBS表空间下的customers表启用增强日志记录
ALTER TABLE FLINKUSER.CUSTOMERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
-- 为数据库启用增强日志记录:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

操作成功:

04 flink sql

注意:源表的字段定义、schema-name以及table-name都要大写,否则无法同步。

-- 创建Oracle CDC源表table_source_oracle,从Oracle数据库中读取数据
CREATE TABLE table_source_oracle (
        CUSTOMER_ID INT,
        CUSTOMER_NAME STRING,
        EMAIL STRING,
        PHONE STRING,
        PRIMARY KEY (CUSTOMER_ID) NOT ENFORCED
        )
WITH (
    'connector' = 'oracle-cdc',
    'hostname' = '10.194.183.120',
    'port' = '30026',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database-name' = 'helowin',
    'schema-name' = 'FLINKUSER',
    'table-name' = 'CUSTOMERS'
)
-- 创建MySQL JDBC接收表table_sink_mysql,将数据写入到MySQL数据库
CREATE TABLE table_sink_mysql (
    customer_id INT,
    customer_name STRING,
    email STRING,
    phone STRING,
    PRIMARY KEY (customer_id) NOT ENFORCED
)
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.194.183.120:30025/test',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'customers'
);
-- 将table_source_oracle表的数据插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM table_source_oracle;

执行flinksql后,可以看到控制台没有报错,程序已经正常启动:

往customers插入一条数据:

-- 切换至flinkuser用户
sqlplus /nolog
CONNECT flinkuser/flinkpw
-- 插入数据
INSERT INTO customers (customer_id, customer_name, email, phone)
VALUES (1, 'Dumas', 'Dumas@example.com', '123-456-7890');

可以看到,已经写入成功了:

05 其它问题

问题解决参考:https://flink-learning.org.cn/article/detail/bf01dd4ff3ed8a11d6d38f365bc2a15d

虽然能做同步了,但是感觉还是有很多坑的,比如控制台会报错:

解决方式:在 create 语句中加上

'debezium.database.tablename.case.insensitive'='false'

还有数据延迟较大,也是在create语句加上:

'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'

所以,最终的Flink SQL如下:

-- 创建Oracle CDC源表table_source_oracle,从Oracle数据库中读取数据
CREATE TABLE table_source_oracle (
        CUSTOMER_ID INT,
        CUSTOMER_NAME STRING,
        EMAIL STRING,
        PHONE STRING,
        PRIMARY KEY (CUSTOMER_ID) NOT ENFORCED
        )
WITH (
        'connector' = 'oracle-cdc',
        'hostname' = '10.194.183.120',
        'port' = '30026',
        'username' = 'flinkuser',
        'password' = 'flinkpw',
        'database-name' = 'HELOWIN',
        'schema-name' = 'FLINKUSER',
        'table-name' = 'CUSTOMERS',
        'debezium.database.tablename.case.insensitive'='false',
        'debezium.log.mining.strategy'='online_catalog',
        'debezium.log.mining.continuous.mine'='true'
);
-- 创建MySQL JDBC接收表table_sink_mysql,将数据写入到MySQL数据库
CREATE TABLE table_sink_mysql (
    customer_id INT,
    customer_name STRING,
    email STRING,
    phone STRING,
    PRIMARY KEY (customer_id) NOT ENFORCED
)
WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.194.183.120:30025/test',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'customers'
);
-- 将table_source_oracle表的数据插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM table_source_oracle;

06 文末

本文主要讲解了Flink Oracle CDC实时同步的所有步骤,希望能帮助到大家,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6天前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
80 43
|
4天前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
250 12
Flink CDC YAML:面向数据集成的 API 设计
|
27天前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
181 0
Flink CDC 在阿里云实时计算Flink版的云上实践
|
2月前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
174 17
|
3月前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
199 9
|
5月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
799 2
Flink CDC:新一代实时数据集成框架
|
5月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
635 14
Flink CDC 在货拉拉的落地与实践
|
5月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
3月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1676 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
3月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
204 56

推荐镜像

更多