通过流计算与消息中间件实现Oracle到ADB的实时数据同步链路优化实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 介绍基于流计算与消息中间件链路(Oracle->OGG->DataHub->Flink/Blink->ADB)实现的Oracle到ADB的数据实时同步架构潜在问题及处理方案。

介绍基于流计算与消息中间件链路(Oracle->OGG->DataHub->Flink/Blink->ADB)实现的Oracle到ADB的数据实时同步架构潜在问题及处理方案。

数据流转架构

image

实时数仓架构及数据流转示意

如上图所示,某项目中基础全量数据从离线数仓的MaxCompute导入至ADB,而增量数据生产后经过了Oracle->OGG->DataHub->Flink/Blink->ADB 这样一个冗长的链路才最终复制到ADB中,由于链路串行,任意一个节点异常都可能导致数据复制的效率和稳定性出现问题,受多种原因影响,项目中各环节异常问题均有遭遇,我们花费了较多时间针对这些问题进行了逐一治理,接下来我们将介绍链路中主要组件的运行机制/限制及由此引发的问题,同时给出各问题的解决方案。

各组件的工作机制/限制

Oracle附加日志介绍&OGG订阅日志信息

Oracle附加日志开启后会在redo日志中添加额外信息到日志流中,以支持基于日志的复制工具,如逻辑standby、streams、GoldenGate、LogMiner等,附加日志支持按DB或Table级别设置生效范围,且附加日志一般分为MIN、PK/UK/FK、ALL等三个级别,不同级别下OGG拉取写入DataHub的日志记录内容差异如下:

image

Oracle不同附加日志级别下DML操作的日志数据差异

DataHub数据分布规则

DataHub多shard分布时可指定某个字段列作为shardkey用户数据hash分布,如未指定shardkey对应列则默认生成隐式列自增行号用户数据hash分布。

Flink/Blink写入ADB机制

Blink在实现DataHub源表数据拉取写入ADB目标表时只支持Insert/Replace语法,不支持Delete&update操作,因此无论OGG写入DataHub的增量数据是哪种操作类型,均只能转换为Insert/Replace into语法写入ADB3.0,可参考下图理解:

image

Flink/Blink写入ADB示意

潜在问题及解决方案

Oracle附加日志级别导致的数据冗余问题

基于前面介绍的不同附加级别下的数据内容差异及blink写入ADB3.0机制,实时报表改造项目中涉及的业务表客户的默认附加日志级别是MIN,这就导致了Delete/Update操作写入DataHub的信息大概率不包含PK/UK等唯一键信息,无法使用PK/UK做行定位,只能使用物理RowID做行定位,因此就会出现如下几个问题:

Case1:全量数据是从MaxCompute获取,MaxCompute也是早期基于Oracle复制,ODPS从Oracle逻辑复制全量时并无RowID信息,且全量复制后部分行数据始终未有更新操作,导致MaxCompute无法通过后续的数据合并等操作给该行附加上RowID信息,ADB从MaxCompute全量复制后对应数据行的RowID也是空,此时Oracle侧该行上发生了DML 操作,由于行数据在ADB全量数据中已存在,只是RowID列为空,Flink/Blink获取到增量操作数据尝试覆写进ADB时由于找不到对应RowID的行,因而直接新增写入,导致数据冗余。

Case2:Oracle的物理RowID一般情况下是不变的,但如果表为分区表且遭遇分区分裂或收缩等场景,引发行迁移,某一行的RowID会发生变更,而RowID变更后的增量操作无法在ADB找到对应行覆写,因而新增写入,导致数据冗余。

针对这个问题,解决方案如下:

Oracle附加日志级别调整,至少为PK/UK/FK,或为ALL,从而保证DML操作都会带上PK/UK列前镜像,之后Blink使用PK/UK做行定位,保证始终可以有效更新正确的行。

由于日志级别调整为ALL会记录完整的前镜像信息,导致日志体量增长较大,且有额外的IO压力,对Oracle性能可能有一定影响,故客户最终决定调整为PK/UK/FK级别。

Blink写入机制导致的数据质量问题

前文我们介绍了Flink/Blink支持Insert/Replace写入操作,以上图Flink/Blink写入ADB示意所示结构为例,假如ID列是PK,调整后附加日志级别后我们已经可以固定获取到DML操作行的ID信息,假如此时Oracle侧对ID为5的行company列做了Tencent-> Alibaba 的更新,那么Blink实际重放的操作是这样操作的后果是ADB中ID=5的这行不仅company列被更新成了Alibaba,其他业务列也被 null 值覆盖了,这和Update操作逻辑相悖了,数据质量自然也无法保证了。

这个问题理论上有两个解决方案:

Case1:Flink/Blink从DataHub获取数据写入ADB时,如果获取到Update类型操作,则先回查ADB数据,将所有字段当前值都拉取到,和更新列新值拼接,之后再覆写ADB。

Case2:Flink/Blink端最终的数据操作实际是由sink connector组件(该组件为可定制的Jar包)执行,同时ADB具有upsert特性(当插入行检测已存在时,可以转换为Update操作,只更新有数据的列,Upsert特性只能单行逐行处理),因此可以定制专属的ADB sink connector,实现对Update/Delete 操作的Upsert转换。

Case1由于Flink/Blink回查ADB需要以ADB作为源表,但目前Flink/Blink暂时只支持ADB作为目标表,故方案不可行;最终选择Case2,通过定制sink Connector包并调整参数修改batchsize为1,逐行处理写入,验证该方案可行,但由于逐行处理,验证最高RPS为1500,可满足日常非高峰期增量处理效率(约日300W行更新)。

ADB sink Connector定制包的写入性能问题

定制包性能开发后验证可满足日常非高峰期性能要求,但临近项目上线期伙伴才逐步了解到个别报表月初高峰期会有持续数日的每日千万行级别的数据更新,定制包在高峰期处理效率无法满足该报表需求,且该报表主要作用也是月初大量更新时实时统计汇算,ADB增量入库性能问题亟待解决。

针对该问题同样有两个解决方案:

Case1:由于会对数据质量产生影响的操作只有Update操作,且Update操作较Insert/Delete操作数量差距巨大,因此可以调整ADB sink Connector定制包逻辑,对于Insert/Delete操作正常攒批batchinsert,仅当处理到Update操作时自动退化batchsize=1单步处理

Case2:推动客户再次调整Oracle附加日志级别为ALL,保证DML操作有完整的更新前镜像信息,针对未更新列的空值用前镜像补齐,之后正常攒批batchinsert

由于该问题发生时项目进度紧张,定制包开发迭代需要时间,且开发后需要做效率和数据质量验证,为保证项目进度,两个方案同步进行并分别做性能验证,最终验证Case1调整后RPS达到3W,Case2RPS约3.5W,Case1性能略差于Case2,但均完全满足客户月初日千万行数据更新效率要求,且数据质量均可保证。

DataHub默认分布的坑

如前文DataHub数据分布规则所述,DataHub多shard分区时如不指定shardkey,则会创建隐式列自动hash分区,该项目由于复用离线报表DataHub数据,离线报表链路设计时选择了默认分布,数据是自动分区的,因此导致相同主键的数据无法落在同一个shard内;Flink/Blink多线程从DataHub取数据时,多线程间无法保证全局有序,因此如果出现某行数据在短时间内多次更新,先更新的操作A在shard1中,后更新的操作B在shard2,Flink/Blink线程2先从shard2取了操作B的记录并在ADB执行,线程1随后取了shard1的操作A在ADB执行,结果是ADB最终数据反而是操作A的状态,数据质量无法保证,如下图

image

DataHub默认分布导致Flink写入ADB乱序

该问题解决方案有两个:

Case1:DataHub调整shardkey为PK,使数据重分布,重分布后相同主键操作记录在同一shard内,blink拉取时可单shard保序。

Case2:DataHub不做调整,ADB侧定期和Oracle做数据质量校验订正,保证数据质量。

项目中由于调整DataHub的Shardkey可能影响客户其他业务,Case1暂不具备变更条件,最终使用Case2并增加定期校验订正机制保证数据质量。

相关实践学习
阿里云百炼xAnalyticDB PostgreSQL构建AIGC应用
通过该实验体验在阿里云百炼中构建企业专属知识库构建及应用全流程。同时体验使用ADB-PG向量检索引擎提供专属安全存储,保障企业数据隐私安全。
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
打赏
0
0
0
0
2
分享
相关文章
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
145 1
Flink CDC + Hologres高性能数据同步优化实践
如何在 Oracle 中配置和使用 SQL Profiles 来优化查询性能?
在 Oracle 数据库中,SQL Profiles 是优化查询性能的工具,通过提供额外统计信息帮助生成更有效的执行计划。配置和使用步骤包括:1. 启用自动 SQL 调优;2. 手动创建 SQL Profile,涉及收集、执行调优任务、查看报告及应用建议;3. 验证效果;4. 使用 `DBA_SQL_PROFILES` 视图管理 Profile。
瑶池数据库微课堂|PolarDB/RDS+ADB Zero-ETL:一种免费、易用、高效的数据同步方式
瑶池数据库微课堂介绍阿里云PolarDB/RDS与ADB的Zero-ETL功能,实现免费、易用、高效的数据同步。内容涵盖OLTP与OLAP的区别、传统ETL存在的问题及Zero-ETL的优势(零成本、高效同步),并演示了从RDS MySQL到AnalyticDB MySQL的具体操作步骤。未来将优化和迭代此功能,提供更好的用户体验。
Oracle数据库优化方法
【10月更文挑战第25天】Oracle数据库优化方法
85 7
Oracle数据库优化策略
【10月更文挑战第25天】Oracle数据库优化策略
61 5
"深度剖析:Oracle SGA大小调整策略——从组件解析到动态优化,打造高效数据库性能"
【8月更文挑战第9天】在Oracle数据库性能优化中,系统全局区(SGA)的大小调整至关重要。SGA作为一组共享内存区域,直接影响数据库处理能力和响应速度。本文通过问答形式介绍SGA调整策略:包括SGA的组成(如数据缓冲区、共享池等),如何根据负载与物理内存确定初始大小,手动调整SGA的方法(如使用`ALTER SYSTEM`命令),以及利用自动内存管理(AMM)特性实现智能调整。调整过程中需注意监控与测试,确保稳定性和性能。
504 2
(二十四)全解MySQL之主从篇:死磕主从复制中数据同步原理与优化
兜兜转转,经过《全解MySQL专栏》前面二十多篇的内容讲解后,基本对MySQL单机模式下的各方面进阶知识做了详细阐述,同时在前面的《分库分表概念篇》、《分库分表隐患篇》两章中也首次提到了数据库的一些高可用方案,但前两章大多属于方法论,并未涵盖真正的实操过程。接下来的内容,会以目前这章作为分割点,开启MySQL高可用方案的落地实践分享的新章程!
2763 1
实时计算 Flink版产品使用问题之Oracle数据库是集群部署的,怎么进行数据同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
美柚与MaxCompute的数据同步架构设计与实践
数据处理与分析 一旦数据同步到MaxCompute后,就可以使用MaxCompute SQL或者MapReduce进行复杂的数据处理和分析。
MySQL超时参数优化与DataX高效数据同步实践
通过合理设置MySQL的超时参数,可以有效地提升数据库的稳定性和性能。而DataX作为一种高效的数据同步工具,可以帮助企业轻松实现不同数据源之间的数据迁移。无论是优化MySQL参数还是使用DataX进行数据同步,都需要根据具体的应用场景来进行细致的配置和测试,以达到最佳效果。

热门文章

最新文章

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等