【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山

简介: 本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。

本文内容来自YashanDB官网,原文内容请见 https://www.yashandb.com/newsinfo/7396983.html?templateId=1718516

概述

本文主要介绍通过flink cdc实现oracle数据实时同步到崖山,支持全量和增量,DML支持新增、修改和删除。

环境

JDK版本:11

Flink版本:1.18.1

flink-sql-connector-oracle-cdc版本:3.1.1

flink-connector-yashandb版本:1.18.1.1

Streampark版本:2.1.4

YMP版本:23.2.1.5

源Oracle版本:11.2.0.2.0

目标YashanDB版本:23.2.2.100

操作步骤

Oracle启用日志归档

Step1:以DBA权限登录Oracle数据库

sqlplus /nolog

CONNECT sys/system AS SYSDBA

Step2:启用日志归档

-- 确认归档日志是否已开启,未开启则需开启

archive log list;

-- 查看db_recovery_file_dest参数

show parameter db_recovery_file_dest;

-- 设置数据库恢复文件目标大小为10G

alter system set db_recovery_file_dest_size = 10G;

-- 设置数据库恢复文件目标路径

alter system set db_recovery_file_dest = '/u01/app/oracle/fast_recovery_area' scope=spfile;

-- 立即关闭数据库

shutdown immediate;

-- 以mount模式启动数据库

startup mount;

-- 启用数据库归档日志模式

alter database archivelog;

-- 打开数据库,允许用户访问

alter database open;

-- 再次确认归档日志是否已开启

archive log list;

用户赋权

Step1:创建表空间

-- 创建一个名为"logminer_tbs"的表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/XE/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

Step2:创建用户并赋予权限

-- 创建一个名为"flinkuser"的用户,密码为"flinkpw",将其默认表空间设置为"LOGMINER_TBS",并在该表空间上设置无限配额。

CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINERTBS;

 

-- 允许"flinkuser"用户创建会话,即允许该用户连接到数据库。

GRANT CREATE SESSION TO flinkuser;

 

-- (不支持Oracle 11g)允许"flinkuser"用户在多租户数据库(CDB)中设置容器。

-- GRANT SET CONTAINER TO flinkuser;

 

-- 允许"flinkuser"用户查询V$DATABASE视图,该视图包含有关数据库实例的信息。

GRANT SELECT ON V_$DATABASE TO flinkuser;

 

-- 允许"flinkuser"用户执行任何表的闪回操作。

GRANT FLASHBACK ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户查询任何表的数据。

GRANT SELECT ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户拥有SELECT_CATALOG_ROLE角色,该角色允许查询数据字典和元数据。

GRANT SELECT_CATALOG_ROLE TO flinkuser;

 

-- 允许"flinkuser"用户拥有EXECUTE_CATALOG_ROLE角色,该角色允许执行一些数据字典中的过程和函数。

GRANT EXECUTE_CATALOG_ROLE TO flinkuser;

 

-- 允许"flinkuser"用户查询任何事务。

GRANT SELECT ANY TRANSACTION TO flinkuser;

 

-- (不支持Oracle 11g)允许"flinkuser"用户进行数据变更追踪(LogMiner)。

-- GRANT LOGMINING TO flinkuser;

 

-- 允许"flinkuser"用户创建表。

GRANT CREATE TABLE TO flinkuser;

 

-- 允许"flinkuser"用户锁定任何表。

GRANT LOCK ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户修改任何表。

GRANT ALTER ANY TABLE TO flinkuser;

 

-- 允许"flinkuser"用户创建序列。

GRANT CREATE SEQUENCE TO flinkuser;

 

-- 允许"flinkuser"用户执行DBMS_LOGMNR包中的过程。

GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;

 

-- 允许"flinkuser"用户执行DBMS_LOGMNR_D包中的过程。

GRANT EXECUTE ON DBMS_LOGMNRD TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOG视图,该视图包含有关数据库日志文件的信息。

GRANT SELECT ON V$LOG TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGHISTORY视图,该视图包含有关数据库历史日志文件的信息。

GRANT SELECT ON V$LOGHISTORY TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRLOGS视图,该视图包含有关LogMiner日志文件的信息。

GRANT SELECT ON V$LOGMNRLOGS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRCONTENTS视图,该视图包含LogMiner日志文件的内容。

GRANT SELECT ON V$LOGMNRCONTENTS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGMNRPARAMETERS视图,该视图包含有关LogMiner的参数信息。

GRANT SELECT ON V$LOGMNRPARAMETERS TO flinkuser;

 

-- 允许"flinkuser"用户查询V$LOGFILE视图,该视图包含有关数据库日志文件的信息。

GRANT SELECT ON V$LOGFILE TO flinkuser;

 

-- 允许"flinkuser"用户查询V$ARCHIVEDLOG视图,该视图包含已归档的数据库日志文件的信息。

GRANT SELECT ON V$ARCHIVEDLOG TO flinkuser;

 

-- 允许"flinkuser"用户查询V$ARCHIVE_DESTSTATUS视图,该视图包含有关归档目标状态的信息。

GRANT SELECT ON V$ARCHIVE_DEST_STATUS TO flinkuser;

启用增量日志记录

-- 为数据库启用增强日志记录:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

迁移oracle元数据到YashanDB

可通过崖山迁移平台YMP进行迁移,迁移范围只需选择“元数据迁移1 ”和“元数据迁移2”即可,“数据迁移”不用选。

image2024-7-2_16-27-56

安装flink

Step1:创建flink安装用户

adduser -d /home/flink -m flink

passwd flink

flink

Step2:授权

chown -R flink:flink /data/flink

Step3:设置免密

cd ~

ssh-keygen # 一直按回车,按默认设置创建密钥对

ssh-copy-id flink@192.168.133.18

Step4:解压flink安装包

cd /data/flink

tar -zxvf flink-1.18.1-bin-scala_2.12.tgz

Step5:修改flink-conf.yaml配置:

cd /data/flink/flink-1.8.1/conf

vi flink-conf.yaml

1) xxx.bind-host和xxx.bind-address都设置成0.0.0.0

2)taskmanager.numberOfTaskSlots修改为和CPU核数一致:

taskmanager.numberOfTaskSlots: 8

3) 去掉注释并修改checkpoint和savepoints路径配置:

state.checkpoints.dir: file:///data/flink/flink-checkpoints

state.savepoints.dir: file:///data/flink/flink-savepoints

4) 去掉注释并修改classloader.resolve-order配置:

classloader.resolve-order: parent-first

Step6:安装flink-oracle-cdc和flink-connector-yashandb相关的jar包到flink

cp /tmp/flink/flink-sql-connector-oracle-cdc-3.1.1.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/ojdbc8-19.3.0.0.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/xdb-19.3.0.0.jar.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/flink-connector-yashandb-1.18.1.1.jar /data/flink/flink-1.18.1/lib

cp /tmp/flink/yashandb-jdbc-1.7.1.jar /data/flink/flink-1.18.1/lib

Step7:设置环境变量

vi ~/.bashrc

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tool.jar

export FLINK_HOME=/data/flink/flink-1.18.1

export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH

source ~/.bashrc

Step8:启动flink

cd /data/flink/flink-1.8.1/bin

./start-cluster.sh

生成flinksql文件

Step1:解压flinksql生成工具gen-flinksql-1.0-bin.zip

unzip gen-flinksql-1.0-bin.zip

Step2:修改gen-flinksql/conf/jdbc.properties配置文件

source.oracle.url = jdbc:oracle:thin:@//192.168.133.18:1521/xe

source.oracle.user = flinkuser

source.oracle.password = flinkpw

source.oracle.schema = SEARCHUSER #此处为需要同步的源库名

sink.yashandb.url = jdbc:yasdb://192.168.133.18:1688/yashandb

sink.yashandb.user = SEARCHUSER

sink.yashandb.password = yasdb_123

sink.yashandb.schema = SEARCHUSER #此处为需要同步的目标库名

Step3:执行生成flinksql文件命令:

cd gen-flinksql/bin

./gen-flinksql.sh oracle2yashandb /data/flink

执行完成后,会在/data/flink目录生成以schema命名的flink sql文件:SEARCHUSER.sql

安装streampark

Step1:解压streampark安装包

cd /data/flink

tar -zxvf apache-streampark_2.12-2.1.4-incubating-bin.tar.gz

Step2:启动streampark

cd /data/flink/apache-streampark_2.12-2.1.4-incubating-bin/bin

./startup.sh

访问地址:http://192.168.133.18/10000

admin/streampark

Step3:配置Flink Home

进入菜单setting - > Flink Home,点击Add New按钮:

image2024-7-2_16-48-48

Step4:配置Flink Cluster

进入菜单setting - > Flink Cluster,点击Add New按钮:

image2024-7-2_16-50-27

创建实时同步任务

Step1:进入菜单Apache Flink -> Application,Add New一个任务,Excution Mode选standalone,然后再选择对应的Flink Version和Flink Cluster,FlinkSQL输入gen-flinksql工具生成的sql内容,最后输入Job Name点submit按钮进行保存;

image2024-7-2_16-55-58

Step2:在任务列表界面Release Job进行job发布,再点Start Job按钮启动同步任务;

image2024-7-2_16-58-8

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
7月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
998 43
|
7月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
442 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
11月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
976 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
7月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
2822 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
8月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
508 1
京东零售基于Flink的推荐系统智能数据体系
|
7月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
687 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
4196 74
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
664 56

热门文章

最新文章

推荐镜像

更多