请问一下flink cdc 采集oracle rac是怎么配置的?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
配置 Flink CDC 采集 Oracle RAC 数据需要完成以下几个步骤,包括环境准备、连接器配置以及相关参数的设置。以下是详细的配置说明:
在开始配置之前,确保以下环境和资源已准备好: - 创建阿里云账号:完成实名认证。 - 创建专有网络 VPC 和安全组:根据需求创建 VPC 和安全组,确保网络通信的安全性。 - 选择合适的 ECS 实例:根据 Oracle RAC 的性能要求,选择适合的 ECS 实例规格。 - 配置共享存储:使用阿里云块存储服务为 Oracle RAC 配置共享存储,以保证数据一致性和可靠性。
为了支持 Flink CDC 采集 Oracle RAC 数据,需要对 Oracle RAC 进行以下配置: 1. 启用 LogMiner 支持: - 确保 Oracle 数据库启用了 LogMiner
功能,这是 Oracle CDC 的核心依赖。 - 如果使用 PDB(Pluggable Database),需要配置 debezium.database.pdb.name
参数,以便正确读取 PDB 的补偿日志。
配置数据库参数:
log-slave-updates = 1
,确保从库能够将主库同步的数据写入自身的 Binlog 文件中。gtid_mode = on
enforce_gtid_consistency = on
max_wal_senders
和 max_replication_slots
参数值大于当前已使用的复制槽数量与 Flink 作业所需的 slot 数量。创建高权限账号:
LOGIN
和 REPLICATION
权限的高权限账号,并确保该账号对订阅表具有 SELECT
权限。Flink CDC 连接器用于捕获 Oracle RAC 的变更数据,具体配置如下:
在 $FLINK_HOME/lib
目录下添加对应的 Flink CDC 依赖 JAR 包,例如:
flink-sql-connector-oracle-cdc-${version}.jar
确保版本与 Flink 版本兼容。
在 Flink SQL 中创建临时表,配置 Oracle CDC Source 参数。示例如下:
CREATE TEMPORARY TABLE oracle_source (
id INT,
name STRING,
update_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '<yourHostname>',
'port' = '<yourPort>',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>',
'debezium.database.pdb.name' = '<yourPdbName>', -- 如果使用 PDB
'scan.startup.mode' = 'initial'
);
重要参数说明: - hostname
:Oracle RAC 的主地址或从库地址。 - port
:Oracle 数据库监听端口,默认为 1521
。 - username
和 password
:高权限账号的用户名和密码。 - database-name
和 schema-name
:目标数据库和服务名。 - table-name
:需要采集的表名,格式为 schema.table
。 - debezium.database.pdb.name
:如果使用 PDB,需指定 PDB 名称。
在 Flink 中定义数据管道,将 Oracle RAC 的变更数据传输到目标系统。
通过 YAML 文件定义路由规则,指定源表和目标表之间的映射关系。示例如下:
source-table: flink_test.customers
sink-table: db.customers_o
description: sync customers table
说明: - source-table
:源表名称。 - sink-table
:目标表名称。 - description
:描述信息,用于解释路由规则。
通过 schema.change.behavior
参数配置 Schema 变更时的处理方式。可选值包括: - LENIENT
(默认值):忽略不支持的变更。 - EXCEPTION
:抛出异常。 - EVOLVE
:自动演化 Schema。 - TRY_EVOLVE
:尝试演化 Schema。 - IGNORE
:忽略所有变更。
示例配置:
schema.change.behavior: LENIENT
启动 Flink 作业: 使用以下命令提交 Flink 作业:
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
oracle-sync-database \
-database test_db \
-oracle-conf hostname=127.0.0.1 \
-oracle-conf port=1521 \
-oracle-conf username=admin \
-oracle-conf password="password" \
-oracle-conf database-name=XE \
-oracle-conf schema-name=ADMIN \
-including-tables "tbl1|test.*"
验证数据同步:
通过以上步骤,您可以成功配置 Flink CDC 采集 Oracle RAC 数据,并实现高效的数据同步。