【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。

本文内容来自YashanDB官网,原文内容请见 https://www.yashandb.com/newsinfo/7396983.html?templateId=1718516

概述

本文主要介绍通过flink cdc实现oracle数据实时同步到崖山,支持全量和增量,DML支持新增、修改和删除。

环境

JDK版本:11

Flink版本:1.18.1

flink-sql-connector-oracle-cdc版本:3.1.1

flink-connector-yashandb版本:1.18.1.1

Streampark版本:2.1.4

YMP版本:23.2.1.5

源Oracle版本:11.2.0.2.0

目标YashanDB版本:23.2.2.100

操作步骤

Oracle启用日志归档

Step1:以DBA权限登录Oracle数据库

sqlplus /nolog

CONNECT sys/system AS SYSDBA

Step2:启用日志归档

-- 确认归档日志是否已开启,未开启则需开启

archive log list;

-- 查看db_recovery_file_dest参数

show parameter db_recovery_file_dest;

-- 设置数据库恢复文件目标大小为10G

alter system set db_recovery_file_dest_size = 10G;

-- 设置数据库恢复文件目标路径

alter system set db_recovery_file_dest = '/u01/app/oracle/fast_recovery_area' scope=spfile;

-- 立即关闭数据库

shutdown immediate;

-- 以mount模式启动数据库

startup mount;

-- 启用数据库归档日志模式

alter database archivelog;

-- 打开数据库,允许用户访问

alter database open;

-- 再次确认归档日志是否已开启

archive log list;

用户赋权

Step1:创建表空间

-- 创建一个名为"logminer_tbs"的表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/XE/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

Step2:创建用户并赋予权限

-- 创建一个名为"flinkuser"的用户,密码为"flinkpw",将其默认表空间设置为"LOGMINER_TBS",并在该表空间上设置无限配额。

CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINERTBS;

 

-- 允许"flinkuser"用户创建会话,即允许该用户连接到数据库。

GRANT CREATE SESSION TO flinkuser;

 

-- (不支持Oracle 11g)允许"flinkuser"用户在多租户数据库(CDB)中设置容器。

-- GRANT SET CONTAINER TO flinkuser;

 

-- 允许"flinkuser"用户查询V$DATABASE视图,该视图包含有关数据库实例的信息。

GRANT SELECT ON V_$DATABASE TO flinkuser;

 

-- 允许"flinkuser"用户执行任何表的闪回操作。

GRANT FLASHBACK ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户查询任何表的数据。

GRANT SELECT ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户拥有SELECT_CATALOG_ROLE角色,该角色允许查询数据字典和元数据。

GRANT SELECT_CATALOG_ROLE TO flinkuser;

 

-- 允许"flinkuser"用户拥有EXECUTE_CATALOG_ROLE角色,该角色允许执行一些数据字典中的过程和函数。

GRANT EXECUTE_CATALOG_ROLE TO flinkuser;

 

-- 允许"flinkuser"用户查询任何事务。

GRANT SELECT ANY TRANSACTION TO flinkuser;

 

-- (不支持Oracle 11g)允许"flinkuser"用户进行数据变更追踪(LogMiner)。

-- GRANT LOGMINING TO flinkuser;

 

-- 允许"flinkuser"用户创建表。

GRANT CREATE TABLE TO flinkuser;

 

-- 允许"flinkuser"用户锁定任何表。

GRANT LOCK ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户修改任何表。

GRANT ALTER ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户创建序列。

GRANT CREATE SEQUENCE TO flinkuser;

 

-- 允许"flinkuser"用户执行DBMS_LOGMNR包中的过程。

GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;

 

-- 允许"flinkuser"用户执行DBMS_LOGMNR_D包中的过程。

GRANT EXECUTE ON DBMS_LOGMNRD TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOG视图,该视图包含有关数据库日志文件的信息。

GRANT SELECT ON V$LOG TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGHISTORY视图,该视图包含有关数据库历史日志文件的信息。

GRANT SELECT ON V$LOGHISTORY TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRLOGS视图,该视图包含有关LogMiner日志文件的信息。

GRANT SELECT ON V$LOGMNRLOGS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRCONTENTS视图,该视图包含LogMiner日志文件的内容。

GRANT SELECT ON V$LOGMNRCONTENTS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRPARAMETERS视图,该视图包含有关LogMiner的参数信息。

GRANT SELECT ON V$LOGMNRPARAMETERS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGFILE视图,该视图包含有关数据库日志文件的信息。

GRANT SELECT ON V$LOGFILE TO flinkuser;

 

-- 允许"flinkuser"用户查询V$ARCHIVEDLOG视图,该视图包含已归档的数据库日志文件的信息。

GRANT SELECT ON V$ARCHIVEDLOG TO flinkuser;

 

-- 允许"flinkuser"用户查询V$ARCHIVE_DESTSTATUS视图,该视图包含有关归档目标状态的信息。

GRANT SELECT ON V$ARCHIVE_DEST_STATUS TO flinkuser;

启用增量日志记录

-- 为数据库启用增强日志记录:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

迁移oracle元数据到YashanDB

可通过崖山迁移平台YMP进行迁移,迁移范围只需选择“元数据迁移1 ”和“元数据迁移2”即可,“数据迁移”不用选。

image2024-7-2_16-27-56

安装flink

Step1:创建flink安装用户

adduser -d /home/flink -m flink

passwd flink

flink

Step2:授权

chown -R flink:flink /data/flink

Step3:设置免密

cd ~

ssh-keygen # 一直按回车,按默认设置创建密钥对

ssh-copy-id flink@192.168.133.18

Step4:解压flink安装包

cd /data/flink

tar -zxvf flink-1.18.1-bin-scala_2.12.tgz

Step5:修改flink-conf.yaml配置:

cd /data/flink/flink-1.8.1/conf

vi flink-conf.yaml

1) xxx.bind-host和xxx.bind-address都设置成0.0.0.0

2)taskmanager.numberOfTaskSlots修改为和CPU核数一致:

taskmanager.numberOfTaskSlots: 8

3) 去掉注释并修改checkpoint和savepoints路径配置:

state.checkpoints.dir: file:///data/flink/flink-checkpoints

state.savepoints.dir: file:///data/flink/flink-savepoints

4) 去掉注释并修改classloader.resolve-order配置:

classloader.resolve-order: parent-first

Step6:安装flink-oracle-cdc和flink-connector-yashandb相关的jar包到flink

cp /tmp/flink/flink-sql-connector-oracle-cdc-3.1.1.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/ojdbc8-19.3.0.0.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/xdb-19.3.0.0.jar.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/flink-connector-yashandb-1.18.1.1.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/yashandb-jdbc-1.7.1.jar /data/flink/flink-1.18.1/lib

Step7:设置环境变量

vi ~/.bashrc

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tool.jar

export FLINK_HOME=/data/flink/flink-1.18.1

export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH

source ~/.bashrc

Step8:启动flink

cd /data/flink/flink-1.8.1/bin

./start-cluster.sh

生成flinksql文件

Step1:解压flinksql生成工具gen-flinksql-1.0-bin.zip

unzip gen-flinksql-1.0-bin.zip

Step2:修改gen-flinksql/conf/jdbc.properties配置文件

source.oracle.url = jdbc:oracle:thin:@//192.168.133.18:1521/xe

source.oracle.user = flinkuser

source.oracle.password = flinkpw

source.oracle.schema = SEARCHUSER #此处为需要同步的源库名

sink.yashandb.url = jdbc:yasdb://192.168.133.18:1688/yashandb

sink.yashandb.user = SEARCHUSER

sink.yashandb.password = yasdb_123

sink.yashandb.schema = SEARCHUSER #此处为需要同步的目标库名

Step3:执行生成flinksql文件命令:

cd gen-flinksql/bin

./gen-flinksql.sh oracle2yashandb /data/flink

执行完成后,会在/data/flink目录生成以schema命名的flink sql文件:SEARCHUSER.sql

安装streampark

Step1:解压streampark安装包

cd /data/flink

tar -zxvf apache-streampark_2.12-2.1.4-incubating-bin.tar.gz

Step2:启动streampark

cd /data/flink/apache-streampark_2.12-2.1.4-incubating-bin/bin

./startup.sh

访问地址:http://192.168.133.18/10000

admin/streampark

Step3:配置Flink Home

进入菜单setting - > Flink Home,点击Add New按钮:

image2024-7-2_16-48-48

Step4:配置Flink Cluster

进入菜单setting - > Flink Cluster,点击Add New按钮:

image2024-7-2_16-50-27

创建实时同步任务

Step1:进入菜单Apache Flink -> Application,Add New一个任务,Excution Mode选standalone,然后再选择对应的Flink Version和Flink Cluster,FlinkSQL输入gen-flinksql工具生成的sql内容,最后输入Job Name点submit按钮进行保存;

image2024-7-2_16-55-58

Step2:在任务列表界面Release Job进行job发布,再点Start Job按钮启动同步任务;

image2024-7-2_16-58-8

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
11月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错之读取Oracle数据库时遇到找不到驱动,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错之读取Oracle数据库时遇到找不到驱动,是什么原因
|
数据安全/隐私保护 iOS开发
Apple Music中的DRM保护
苹果音乐(Apple Music)是一种流媒体音乐服务,为用户提供了广泛的音乐内容。然而,为了保护音乐版权,Apple Music使用数字版权管理(DRM)技术对其音乐进行保护。DRM保护是一种加密技术,旨在防止用户未经授权地复制、传播或修改受版权保护的音乐。
1077 1
|
SQL 流计算
(5)Flink CEP SQL四种匹配模式效果演示
Flink CEP SQL中提供了四种匹配策略: (1)skip to next row 从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配 (2)skip past last row 从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配 (3)skip to first pattern Item 从匹配成功的事件序列中第一个对应于patternItem的事件开始进行下一次匹配 (4)skip to last pattern Item 从匹配成功的事件序列中最后一个对应于patternItem的事件开始进行下一次匹配
(5)Flink CEP SQL四种匹配模式效果演示
|
1月前
|
机器学习/深度学习 人工智能 运维
基于AI的自动化服务器管理:解锁运维的未来
基于AI的自动化服务器管理:解锁运维的未来
177 0
|
3月前
|
存储 监控 算法
Flink 四大基石之 Checkpoint 使用详解
Flink 的 Checkpoint 机制通过定期插入 Barrier 将数据流切分并进行快照,确保故障时能从最近的 Checkpoint 恢复,保障数据一致性。Checkpoint 分为精确一次和至少一次两种语义,前者确保每个数据仅处理一次,后者允许重复处理但不会丢失数据。此外,Flink 提供多种重启策略,如固定延迟、失败率和无重启策略,以应对不同场景。SavePoint 是手动触发的 Checkpoint,用于作业升级和迁移。Checkpoint 执行流程包括 Barrier 注入、算子状态快照、Barrier 对齐和完成 Checkpoint。
426 20
|
搜索推荐 算法 前端开发
商品购物管理与推荐系统Python+Django网页界面+协同过滤推荐算法
商品购物管理与推荐系统Python+Django网页界面+协同过滤推荐算法
202 0
|
10月前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之在进行批量导出数据时,如何过滤掉视图并只导出表
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。
148 0
|
8月前
|
Oracle 网络协议 安全
Oracle 11g DataGuard搭建保姆级教程
Oracle 11g DataGuard搭建保姆级教程
556 4
|
9月前
|
分布式计算 Hadoop 关系型数据库
实时计算 Flink版操作报错合集之Hadoop在将文件写入HDFS时,无法在所有指定的数据节点上进行复制,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
11月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何保存savepoint
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

推荐镜像

更多