flinkcdc读Oracle 项目依赖和demo有吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
以下是关于如何使用 Flink CDC 读取 Oracle 数据库的项目依赖配置和示例代码,基于提供的知识库资料整理。
在使用 Flink CDC 读取 Oracle 数据时,需要将相关的连接器依赖添加到项目的 pom.xml
文件中。以下是一个典型的依赖配置示例:
<dependencies>
<!-- Flink CDC Connector for Oracle -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>${flink-cdc.version}</version>
</dependency>
<!-- Flink Connector Base (公共依赖) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 其他必要的依赖(如Flink核心依赖) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
说明: - ${flink-cdc.version}
是 Flink CDC 连接器的版本号,请根据实际使用的 Flink 版本选择兼容的连接器版本。 - ${flink.version}
是 Flink 的版本号,例如 1.15.0
或 1.16.0
。 - 如果需要从 SNAPSHOT 仓库下载依赖,请确保 Maven 配置文件中已正确设置 SNAPSHOT 仓库地址。
以下是一个通过 Flink CDC 同步 Oracle 数据的完整示例代码:
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
oracle-sync-database \
-database test_db \
-oracle-conf hostname=127.0.0.1 \
-oracle-conf port=1521 \
-oracle-conf username=admin \
-oracle-conf password="password" \
-oracle-conf database-name=XE \
-oracle-conf schema-name=ADMIN \
-including-tables "tbl1|test.*" \
-sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
-sink-conf username=admin \
-sink-conf password=****
参数说明: - -Dexecution.checkpointing.interval=10s
:设置检查点间隔为 10 秒。 - -Dparallelism.default=1
:设置默认并行度为 1。 - -c org.apache.doris.flink.tools.cdc.CdcTools
:指定主类。 - -database test_db
:目标数据库名称。 - -oracle-conf
:Oracle 数据库的连接配置,包括主机名、端口、用户名、密码、数据库名称和模式名称。 - -including-tables "tbl1|test.*"
:指定需要同步的表,支持正则表达式匹配。 - -sink-conf
:目标数据存储的配置信息。
SELECT
权限以及逻辑复制所需的权限。decoding.plugin.name
参数设置为 pgoutput
,以避免非 UTF-8 编码的数据库发生增量解析乱码问题。DATE
类型字段,在 Flink SQL 中的 source 和 sink 表中必须将其映射为 TIMESTAMP
类型,否则可能会因类型不匹配导致作业失败。上述内容基于以下参考资料整理: - PolarDB PostgreSQL 版(兼容 Oracle)的 Flink CDC 连接器文档。 - 通过 Flink 导入数据的示例文档。
如果您需要更详细的配置或遇到具体问题,请提供更多上下文信息以便进一步协助!
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。