【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。

本文内容来自YashanDB官网,原文内容请见 https://www.yashandb.com/newsinfo/7396983.html?templateId=1718516

概述

本文主要介绍通过flink cdc实现oracle数据实时同步到崖山,支持全量和增量,DML支持新增、修改和删除。

环境

JDK版本:11

Flink版本:1.18.1

flink-sql-connector-oracle-cdc版本:3.1.1

flink-connector-yashandb版本:1.18.1.1

Streampark版本:2.1.4

YMP版本:23.2.1.5

源Oracle版本:11.2.0.2.0

目标YashanDB版本:23.2.2.100

操作步骤

Oracle启用日志归档

Step1:以DBA权限登录Oracle数据库

sqlplus /nolog

CONNECT sys/system AS SYSDBA

Step2:启用日志归档

-- 确认归档日志是否已开启,未开启则需开启

archive log list;

-- 查看db_recovery_file_dest参数

show parameter db_recovery_file_dest;

-- 设置数据库恢复文件目标大小为10G

alter system set db_recovery_file_dest_size = 10G;

-- 设置数据库恢复文件目标路径

alter system set db_recovery_file_dest = '/u01/app/oracle/fast_recovery_area' scope=spfile;

-- 立即关闭数据库

shutdown immediate;

-- 以mount模式启动数据库

startup mount;

-- 启用数据库归档日志模式

alter database archivelog;

-- 打开数据库,允许用户访问

alter database open;

-- 再次确认归档日志是否已开启

archive log list;

用户赋权

Step1:创建表空间

-- 创建一个名为"logminer_tbs"的表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/XE/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

Step2:创建用户并赋予权限

-- 创建一个名为"flinkuser"的用户,密码为"flinkpw",将其默认表空间设置为"LOGMINER_TBS",并在该表空间上设置无限配额。

CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINERTBS;

 

-- 允许"flinkuser"用户创建会话,即允许该用户连接到数据库。

GRANT CREATE SESSION TO flinkuser;

 

-- (不支持Oracle 11g)允许"flinkuser"用户在多租户数据库(CDB)中设置容器。

-- GRANT SET CONTAINER TO flinkuser;

 

-- 允许"flinkuser"用户查询V$DATABASE视图,该视图包含有关数据库实例的信息。

GRANT SELECT ON V_$DATABASE TO flinkuser;

 

-- 允许"flinkuser"用户执行任何表的闪回操作。

GRANT FLASHBACK ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户查询任何表的数据。

GRANT SELECT ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户拥有SELECT_CATALOG_ROLE角色,该角色允许查询数据字典和元数据。

GRANT SELECT_CATALOG_ROLE TO flinkuser;

 

-- 允许"flinkuser"用户拥有EXECUTE_CATALOG_ROLE角色,该角色允许执行一些数据字典中的过程和函数。

GRANT EXECUTE_CATALOG_ROLE TO flinkuser;

 

-- 允许"flinkuser"用户查询任何事务。

GRANT SELECT ANY TRANSACTION TO flinkuser;

 

-- (不支持Oracle 11g)允许"flinkuser"用户进行数据变更追踪(LogMiner)。

-- GRANT LOGMINING TO flinkuser;

 

-- 允许"flinkuser"用户创建表。

GRANT CREATE TABLE TO flinkuser;

 

-- 允许"flinkuser"用户锁定任何表。

GRANT LOCK ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户修改任何表。

GRANT ALTER ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户创建序列。

GRANT CREATE SEQUENCE TO flinkuser;

 

-- 允许"flinkuser"用户执行DBMS_LOGMNR包中的过程。

GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;

 

-- 允许"flinkuser"用户执行DBMS_LOGMNR_D包中的过程。

GRANT EXECUTE ON DBMS_LOGMNRD TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOG视图,该视图包含有关数据库日志文件的信息。

GRANT SELECT ON V$LOG TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGHISTORY视图,该视图包含有关数据库历史日志文件的信息。

GRANT SELECT ON V$LOGHISTORY TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRLOGS视图,该视图包含有关LogMiner日志文件的信息。

GRANT SELECT ON V$LOGMNRLOGS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRCONTENTS视图,该视图包含LogMiner日志文件的内容。

GRANT SELECT ON V$LOGMNRCONTENTS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRPARAMETERS视图,该视图包含有关LogMiner的参数信息。

GRANT SELECT ON V$LOGMNRPARAMETERS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGFILE视图,该视图包含有关数据库日志文件的信息。

GRANT SELECT ON V$LOGFILE TO flinkuser;

 

-- 允许"flinkuser"用户查询V$ARCHIVEDLOG视图,该视图包含已归档的数据库日志文件的信息。

GRANT SELECT ON V$ARCHIVEDLOG TO flinkuser;

 

-- 允许"flinkuser"用户查询V$ARCHIVE_DESTSTATUS视图,该视图包含有关归档目标状态的信息。

GRANT SELECT ON V$ARCHIVE_DEST_STATUS TO flinkuser;

启用增量日志记录

-- 为数据库启用增强日志记录:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

迁移oracle元数据到YashanDB

可通过崖山迁移平台YMP进行迁移,迁移范围只需选择“元数据迁移1 ”和“元数据迁移2”即可,“数据迁移”不用选。

image2024-7-2_16-27-56

安装flink

Step1:创建flink安装用户

adduser -d /home/flink -m flink

passwd flink

flink

Step2:授权

chown -R flink:flink /data/flink

Step3:设置免密

cd ~

ssh-keygen # 一直按回车,按默认设置创建密钥对

ssh-copy-id flink@192.168.133.18

Step4:解压flink安装包

cd /data/flink

tar -zxvf flink-1.18.1-bin-scala_2.12.tgz

Step5:修改flink-conf.yaml配置:

cd /data/flink/flink-1.8.1/conf

vi flink-conf.yaml

1) xxx.bind-host和xxx.bind-address都设置成0.0.0.0

2)taskmanager.numberOfTaskSlots修改为和CPU核数一致:

taskmanager.numberOfTaskSlots: 8

3) 去掉注释并修改checkpoint和savepoints路径配置:

state.checkpoints.dir: file:///data/flink/flink-checkpoints

state.savepoints.dir: file:///data/flink/flink-savepoints

4) 去掉注释并修改classloader.resolve-order配置:

classloader.resolve-order: parent-first

Step6:安装flink-oracle-cdc和flink-connector-yashandb相关的jar包到flink

cp /tmp/flink/flink-sql-connector-oracle-cdc-3.1.1.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/ojdbc8-19.3.0.0.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/xdb-19.3.0.0.jar.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/flink-connector-yashandb-1.18.1.1.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/yashandb-jdbc-1.7.1.jar /data/flink/flink-1.18.1/lib

Step7:设置环境变量

vi ~/.bashrc

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tool.jar

export FLINK_HOME=/data/flink/flink-1.18.1

export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH

source ~/.bashrc

Step8:启动flink

cd /data/flink/flink-1.8.1/bin

./start-cluster.sh

生成flinksql文件

Step1:解压flinksql生成工具gen-flinksql-1.0-bin.zip

unzip gen-flinksql-1.0-bin.zip

Step2:修改gen-flinksql/conf/jdbc.properties配置文件

source.oracle.url = jdbc:oracle:thin:@//192.168.133.18:1521/xe

source.oracle.user = flinkuser

source.oracle.password = flinkpw

source.oracle.schema = SEARCHUSER #此处为需要同步的源库名

sink.yashandb.url = jdbc:yasdb://192.168.133.18:1688/yashandb

sink.yashandb.user = SEARCHUSER

sink.yashandb.password = yasdb_123

sink.yashandb.schema = SEARCHUSER #此处为需要同步的目标库名

Step3:执行生成flinksql文件命令:

cd gen-flinksql/bin

./gen-flinksql.sh oracle2yashandb /data/flink

执行完成后,会在/data/flink目录生成以schema命名的flink sql文件:SEARCHUSER.sql

安装streampark

Step1:解压streampark安装包

cd /data/flink

tar -zxvf apache-streampark_2.12-2.1.4-incubating-bin.tar.gz

Step2:启动streampark

cd /data/flink/apache-streampark_2.12-2.1.4-incubating-bin/bin

./startup.sh

访问地址:http://192.168.133.18/10000

admin/streampark

Step3:配置Flink Home

进入菜单setting - > Flink Home,点击Add New按钮:

image2024-7-2_16-48-48

Step4:配置Flink Cluster

进入菜单setting - > Flink Cluster,点击Add New按钮:

image2024-7-2_16-50-27

创建实时同步任务

Step1:进入菜单Apache Flink -> Application,Add New一个任务,Excution Mode选standalone,然后再选择对应的Flink Version和Flink Cluster,FlinkSQL输入gen-flinksql工具生成的sql内容,最后输入Job Name点submit按钮进行保存;

image2024-7-2_16-55-58

Step2:在任务列表界面Release Job进行job发布,再点Start Job按钮启动同步任务;

image2024-7-2_16-58-8

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
23天前
|
SQL 存储 关系型数据库
【YashanDB知识库】共享从 MySQL异常处理CONTINUE HANDLER的改写方法
【YashanDB知识库】共享从 MySQL异常处理CONTINUE HANDLER的改写方法
|
11天前
|
SQL API Apache
Dinky 和 Flink CDC 在实时整库同步的探索之路
本次分享围绕 Dinky 的整库同步技术演进,从传统数据集成方案的痛点出发,探讨了 Flink CDC Yaml 作业的探索历程。内容分为三个部分:起源、探索、未来。在起源部分,分析了传统数据集成方案中全量与增量割裂、时效性低等问题,引出 Flink CDC 的优势;探索部分详细对比了 Dinky CDC Source 和 Flink CDC Pipeline 的架构与能力,深入讲解了 YAML 作业的细节,如模式演变、数据转换等;未来部分则展望了 Dinky 对 Flink CDC 的支持与优化方向,包括 Pipeline 转换功能、Transform 扩展及实时湖仓治理等。
282 12
Dinky 和 Flink CDC 在实时整库同步的探索之路
|
6天前
|
SQL 测试技术 数据库
【YashanDB知识库】IMP跨网络导入慢问题
问题现象:290M数据,本地导入2分钟,跨机导入耗时显著增加(最高30分钟)。 原因分析:`imp`逐条SQL通过网络传输至yashanDB执行,交互频繁导致性能下降。 影响版本:客户测试环境22.2.8.3。 解决方法:将导入文件上传至与yashanDB同机后使用`imp`,减少网络延迟。 经验总结:优化`imp`工具,支持直接上传文件至服务器端执行,降低网络依赖。
|
6天前
|
监控 数据库
【YashanDB 知识库】ycm 托管数据库时报错 OM host ip:127.0.0.1 is not support join to YCM
在托管数据库时,若 OM 的 IP 被设置为 127.0.0.1,将导致无法托管至 YCM,并使数据库失去监控。此问题源于安装时修改了 OM 的监听 IP。解决方法包括:将 OM 的 IP 修改为本机实际 IP 或 0.0.0.0,同时更新 env 文件及 yasom 后台数据库中的相关配置。经验总结指出,应避免非必要的后台 IP 修改,且数据库安装需遵循规范,不使用仅限本机访问的 IP(如 127.0.0.1)。
|
6天前
|
监控 网络安全 数据库
YashanDB 知识库:ycm 纳管主机安装 YCM-AGENT 时报错 “任务提交失败,无法连接主机”
在安装 ycm-agent 纳管主机时,可能出现因端口未开放导致的报错问题。此问题会阻止 YCM 对主机和数据库的监控功能,影响版本为 `yashandb-cloud-manager-23.2.1.100-linux-aarch64.tar`。原因是目标主机(如 10.149.223.121)未开放 9070 或 9071 端口。解决方法包括关闭防火墙、添加白名单或开放指定端口,需与管理员确认操作。处理过程涉及网络检查、端口测试等步骤。端口问题解决后,若再次安装报唯一键错误,需先移除失败主机再重试。
|
7天前
|
监控 Java Shell
【YashanDB 知识库】ycm 托管数据库时,数据库非 OM 安装无法托管
本文主要介绍了因数据库未按规范使用 yasboot 安装导致的问题及解决方法。问题表现为无 yasom 和 yasagent 进程,且目录结构缺失,致使 ycm 无法托管与监控。分析发现可能是数据库版本旧或安装不规范引起。解决方法为先生成配置文件,安装 yasom 和 yasagent,再生成并修改托管配置模板,最终通过命令完成托管至 yasom 和 ycm。总结强调了按规范安装数据库的重要性以避免类似问题。
|
23天前
|
SQL 关系型数据库 MySQL
【YashanDB知识库】MySQL field 函数的改写方法
【YashanDB知识库】MySQL field 函数的改写方法
|
23天前
|
数据库
【YashanDB知识库】服务器重启后启动yasom和yasagent进程时有告警
【YashanDB知识库】服务器重启后启动yasom和yasagent进程时有告警
|
23天前
|
SQL Oracle 关系型数据库
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
|
26天前
|
数据库
【YashanDB知识库】服务器重启后启动yasom和yasagent进程时有告警
本文介绍了YashanDB在特定场景下的问题分析与解决方法。当使用yasboot重启数据库后,yasom和yasagent进程虽启动成功但出现告警,原因是缺少libnsl.so.1库文件或环境变量配置错误。解决步骤包括:检查系统中是否存在该库文件,若不存在则根据操作系统类型安装(有外网时通过yum或apt,无外网时创建符号链接),若存在则调整环境变量配置,并重新启动相关进程验证问题是否解决。

推荐镜像

更多