请问一下flink cdc 采集oracle rac是怎么配置的?

请问一下flink cdc 采集oracle rac是怎么配置的?

展开
收起
真的很搞笑 2023-06-04 20:01:00 118 分享 版权
阿里云 AI 助理回答

配置 Flink CDC 采集 Oracle RAC 数据需要完成以下几个步骤,包括环境准备、连接器配置以及相关参数的设置。以下是详细的配置说明:


1. 环境准备

在开始配置之前,确保以下环境和资源已准备好: - 创建阿里云账号:完成实名认证。 - 创建专有网络 VPC 和安全组:根据需求创建 VPC 和安全组,确保网络通信的安全性。 - 选择合适的 ECS 实例:根据 Oracle RAC 的性能要求,选择适合的 ECS 实例规格。 - 配置共享存储:使用阿里云块存储服务为 Oracle RAC 配置共享存储,以保证数据一致性和可靠性。


2. Oracle RAC 配置

为了支持 Flink CDC 采集 Oracle RAC 数据,需要对 Oracle RAC 进行以下配置: 1. 启用 LogMiner 支持: - 确保 Oracle 数据库启用了 LogMiner 功能,这是 Oracle CDC 的核心依赖。 - 如果使用 PDB(Pluggable Database),需要配置 debezium.database.pdb.name 参数,以便正确读取 PDB 的补偿日志。

  1. 配置数据库参数

    • 设置 log-slave-updates = 1,确保从库能够将主库同步的数据写入自身的 Binlog 文件中。
    • 如果主库启用了 GTID 模式,则从库也需要启用 GTID 模式:
      gtid_mode = on
      enforce_gtid_consistency = on
      
    • 确保 max_wal_sendersmax_replication_slots 参数值大于当前已使用的复制槽数量与 Flink 作业所需的 slot 数量。
  2. 创建高权限账号

    • 创建一个具有 LOGINREPLICATION 权限的高权限账号,并确保该账号对订阅表具有 SELECT 权限。

3. Flink CDC 连接器配置

Flink CDC 连接器用于捕获 Oracle RAC 的变更数据,具体配置如下:

3.1 添加依赖

$FLINK_HOME/lib 目录下添加对应的 Flink CDC 依赖 JAR 包,例如:

flink-sql-connector-oracle-cdc-${version}.jar

确保版本与 Flink 版本兼容。

3.2 配置 Oracle CDC Source

在 Flink SQL 中创建临时表,配置 Oracle CDC Source 参数。示例如下:

CREATE TEMPORARY TABLE oracle_source (
    id INT,
    name STRING,
    update_time TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'oracle-cdc',
    'hostname' = '<yourHostname>',
    'port' = '<yourPort>',
    'username' = '<yourUserName>',
    'password' = '<yourPassWord>',
    'database-name' = '<yourDatabaseName>',
    'schema-name' = '<yourSchemaName>',
    'table-name' = '<yourTableName>',
    'debezium.database.pdb.name' = '<yourPdbName>', -- 如果使用 PDB
    'scan.startup.mode' = 'initial'
);

重要参数说明: - hostname:Oracle RAC 的主地址或从库地址。 - port:Oracle 数据库监听端口,默认为 1521。 - usernamepassword:高权限账号的用户名和密码。 - database-nameschema-name:目标数据库和服务名。 - table-name:需要采集的表名,格式为 schema.table。 - debezium.database.pdb.name:如果使用 PDB,需指定 PDB 名称。


4. 数据管道配置

在 Flink 中定义数据管道,将 Oracle RAC 的变更数据传输到目标系统。

4.1 路由规则

通过 YAML 文件定义路由规则,指定源表和目标表之间的映射关系。示例如下:

source-table: flink_test.customers
sink-table: db.customers_o
description: sync customers table

说明: - source-table:源表名称。 - sink-table:目标表名称。 - description:描述信息,用于解释路由规则。

4.2 Schema 变更行为

通过 schema.change.behavior 参数配置 Schema 变更时的处理方式。可选值包括: - LENIENT(默认值):忽略不支持的变更。 - EXCEPTION:抛出异常。 - EVOLVE:自动演化 Schema。 - TRY_EVOLVE:尝试演化 Schema。 - IGNORE:忽略所有变更。

示例配置:

schema.change.behavior: LENIENT

5. 测试与验证

  1. 启动 Flink 作业: 使用以下命令提交 Flink 作业:

    <FLINK_HOME>/bin/flink run \
       -Dexecution.checkpointing.interval=10s \
       -Dparallelism.default=1 \
       -c org.apache.doris.flink.tools.cdc.CdcTools \
       lib/flink-doris-connector-1.16-1.5.2.jar \
       oracle-sync-database \
       -database test_db \
       -oracle-conf hostname=127.0.0.1 \
       -oracle-conf port=1521 \
       -oracle-conf username=admin \
       -oracle-conf password="password" \
       -oracle-conf database-name=XE \
       -oracle-conf schema-name=ADMIN \
       -including-tables "tbl1|test.*"
    
  2. 验证数据同步

    • 在源数据库执行 DML 操作(如插入、更新、删除)。
    • 检查目标系统是否实时同步了这些变更数据。

6. 注意事项

  • 网络连通性:确保 Flink 作业所在的网络能够访问 Oracle RAC 数据库。如果是跨 VPC 访问,需要配置 NAT 或 VPC 对等连接。
  • 日志监控:建议配置日志输出到 OSS 或 SLS,便于排查问题。
  • SLA 保障:自定义打包的连接器可能不提供 SLA 保障,请谨慎使用。

通过以上步骤,您可以成功配置 Flink CDC 采集 Oracle RAC 数据,并实现高效的数据同步。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

大数据领域前沿技术分享与交流,这里不止有技术干货、学习心得、企业实践、社区活动,还有未来。

还有其他疑问?
咨询AI助理