如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

简介: 如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

1 业务背景

在数据架构上,很多大数据项目,都会将 HIVE/SPARK 等离线计算引擎计算获得的结果数据同步到下游业务系统的线上数据库,以对外提供服务,而且很多业务系统需要为客户提供稳定的 7*24小时的数据查询功能,要求底层数据库中的数据,需要是准确的,不能出现部分数据缺失的情况。

具体到数据同步工具的选型上,datax 是阿里开源的一款流行的数据集成工具,通过插件机制实现了多种异构数据之间的高效的离线数据同步,目前开源版本 datax 支持的插件已经将近 30 多种了,所以很多大数据项目都选用了 DATAX 来做离线数据的同步。

为保证数据的完整性与准确性,在使用 DATAX 进行数据同步时,目前很多项目都采用了先删除旧数据再插入计算生成的新数据的方式 (通过在作业中配置preSql执行旧数据的删除),此时当需要同步的数据量比较大时,旧数据的删除与新数据的插入,都需要一段时间,此时下游数据库中的表不可避免地会有一段时间的空档期,查询不到对应的数据。

怎么解决这个问题呢?

DATAX 官方推荐的一种方式是配置使用临时表,先向临时表导入数据,完成后再 rename 到线上表(可以通过在作业中配置postSql完成这类操作)。

除了临时表这种曲线救国的方式,也可以尝试以 UPSERT 语义直接更新下游数据库中线上的目标表数据。

那么 DATAX 中,不同数据库 WRITER 插件都是怎么实现 UPSERT 语义的呢?

2 DATAX 常见数据库 WRITER 插件是怎么实现 UPSERT 语义的?

  • datax 的 MysqlWriter 和 oceanbasev10writer, 支持配置 writeMode 参数为 insert/replace/update,可以通过该参数控制写入数据到目标表时,底层采用 insert into/replace into/INSERT INTO ... ON DUPLICATE KEY UPDATE 语句:
  • 其中 insert into 当主键/唯一性索引冲突时会写不进去冲突的行;
  • 后两者没有遇到主键/唯一性索引冲突时与 insert into 行为一致,遇到冲突时会用新行替换原有行所有字段;
  • datax原生的 OracleWriter 和PostgresqlWriter,不支持配置writeMode 参数,在底层实现上都是通过 JDBC 连接远程 Oracle/PG 数据库,并执行相应的 insert into ... sql 语句将数据写入 Oracle/pg,在内部会分批次提交入库。

那么,能不能更改原生的 OracleWriter 以支持 UPSERT 语义插入 ORALCE 呢?

3. ORACLE 的 MERGE INTO 语句

Oracle 9i 引入了对 merge语句的支持, 通过 merge 能够在一个SQL语句中对一个表同时执行 inserts 和 updates操作, Oracle 10g 对 MERGE 语句又做了如下增强:

  • UPDATE或INSERT子句是可选的
  • UPDATE和INSERT子句可以加WHERE子句
  • 在ON条件中可以使用常量过滤谓词来insert所有的行到目标表中,不需要连接源表和目标表
  • UPDATE子句后面可以跟DELETE子句来去除一些不需要的行。

merge into 语句语法如下:

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN
 [UPDATE sql] 
WHEN NOT MATCHED THEN 
[INSERT sql]

merge into 语句实例如下:

MERGE INTO member_staging x
USING (SELECT member_id, first_name, last_name, rank FROM members) y
ON (x.member_id  = y.member_id)
WHEN MATCHED THEN
    UPDATE SET x.first_name = y.first_name, 
                        x.last_name = y.last_name, 
                        x.rank = y.rank
    WHERE x.first_name <> y.first_name OR 
           x.last_name <> y.last_name OR 
           x.rank <> y.rank 
WHEN NOT MATCHED THEN
    INSERT(x.member_id, x.first_name, x.last_name, x.rank)  
    VALUES(y.member_id, y.first_name, y.last_name, y.rank);

所以,虽然 oracle 不支持类似 MYSQL的 REPLACE INTO 和 INSERT ... ON DUPLICATE KEY UPDATE,但由于 ORACLE 原生支持 MERGE INTO 语句,我们完全可以更改datax 的 OracleWriter 源码,通过 merge into 语句,实现 UPSERT 语义。

4. 更改 DATAX oracleWriter 以通过 MERGE INTO 语句实现 UPSERT 语义

涉及改动的 datax源码中类和方法的改动点主要有:

  • com.alibaba.datax.plugin.writer.oraclewriter.OracleWriter.Job#init:更改该方法以允许用户配置 writeMode;
  • com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil#dealWriteMode:更改该方法以获取用户配置的 uniqueKeys 并在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate: 更改该方法,以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,拼接获取 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#init:更改该方法以获取用户配置的 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#calcWriteRecordSql:更改该方法以在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#fillPreparedStatementColumnType(PreparedStatement, int, int, String, Column): 更改该方法以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,对 ORACLE MERGE INTO 语句对应的 preparedStatement 的变量进行 setString 等赋值操作;

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate 方法拼接获取的 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串,示例内容如下:

MERGE INTO %s x
USING (SELECT ? as member_id, ? as first_name, ? as last_name, ? as rank FROM dual) y
ON (x.member_id  = y.member_id and x.xxx = y.xx)
WHEN MATCHED THEN UPDATE SET 
                x.first_name = y.first_name, 
                x.last_name = y.last_name, 
                x.rank = y.rank
WHEN NOT MATCHED THEN INSERT(x.member_id, x.first_name, x.last_name, x.rank)  
    VALUES(?,?,?,?);


相关文章
|
2天前
|
存储 人工智能 Cloud Native
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
在9月20日2024云栖大会上,阿里云智能集团副总裁,数据库产品事业部负责人,ACM、CCF、IEEE会士(Fellow)李飞飞发表《从数据到智能:Data+AI驱动的云原生数据库》主题演讲。他表示,数据是生成式AI的核心资产,大模型时代的数据管理系统需具备多模处理和实时分析能力。阿里云瑶池将数据+AI全面融合,构建一站式多模数据管理平台,以数据驱动决策与创新,为用户提供像“搭积木”一样易用、好用、高可用的使用体验。
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
|
4天前
|
SQL 关系型数据库 数据库
国产数据实战之docker部署MyWebSQL数据库管理工具
【10月更文挑战第23天】国产数据实战之docker部署MyWebSQL数据库管理工具
21 3
国产数据实战之docker部署MyWebSQL数据库管理工具
|
1天前
|
关系型数据库 分布式数据库 数据库
云栖大会|从数据到决策:AI时代数据库如何实现高效数据管理?
在2024云栖大会「海量数据的高效存储与管理」专场,阿里云瑶池讲师团携手AMD、FunPlus、太美医疗科技、中石化、平安科技以及小赢科技、迅雷集团的资深技术专家深入分享了阿里云在OLTP方向的最新技术进展和行业最佳实践。
|
4天前
|
SQL Oracle 关系型数据库
Oracle数据库优化方法
【10月更文挑战第25天】Oracle数据库优化方法
17 7
|
4天前
|
Oracle 关系型数据库 数据库
oracle数据库技巧
【10月更文挑战第25天】oracle数据库技巧
11 6
|
4天前
|
存储 Oracle 关系型数据库
Oracle数据库优化策略
【10月更文挑战第25天】Oracle数据库优化策略
11 5
|
9天前
|
人工智能 Cloud Native 容灾
云数据库“再进化”,OB Cloud如何打造云时代的数据底座?
云数据库“再进化”,OB Cloud如何打造云时代的数据底座?
|
11天前
|
存储 Oracle 关系型数据库
数据库数据恢复—Oracle ASM磁盘组故障数据恢复案例
Oracle数据库数据恢复环境&故障: Oracle ASM磁盘组由4块磁盘组成。Oracle ASM磁盘组掉线 ,ASM实例不能mount。 Oracle数据库故障分析&恢复方案: 数据库数据恢复工程师对组成ASM磁盘组的磁盘进行分析。对ASM元数据进行分析发现ASM存储元数据损坏,导致磁盘组无法挂载。
|
10天前
|
NoSQL 前端开发 MongoDB
前端的全栈之路Meteor篇(三):运行在浏览器端的NoSQL数据库副本-MiniMongo介绍及其前后端数据实时同步示例
MiniMongo 是 Meteor 框架中的客户端数据库组件,模拟了 MongoDB 的核心功能,允许前端开发者使用类似 MongoDB 的 API 进行数据操作。通过 Meteor 的数据同步机制,MiniMongo 与服务器端的 MongoDB 实现实时数据同步,确保数据一致性,支持发布/订阅模型和响应式数据源,适用于实时聊天、项目管理和协作工具等应用场景。
|
3月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之数据集成并发数不支持批量修改,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。

推荐镜像

更多