如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

简介: 如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

1 业务背景

在数据架构上,很多大数据项目,都会将 HIVE/SPARK 等离线计算引擎计算获得的结果数据同步到下游业务系统的线上数据库,以对外提供服务,而且很多业务系统需要为客户提供稳定的 7*24小时的数据查询功能,要求底层数据库中的数据,需要是准确的,不能出现部分数据缺失的情况。

具体到数据同步工具的选型上,datax 是阿里开源的一款流行的数据集成工具,通过插件机制实现了多种异构数据之间的高效的离线数据同步,目前开源版本 datax 支持的插件已经将近 30 多种了,所以很多大数据项目都选用了 DATAX 来做离线数据的同步。

为保证数据的完整性与准确性,在使用 DATAX 进行数据同步时,目前很多项目都采用了先删除旧数据再插入计算生成的新数据的方式 (通过在作业中配置preSql执行旧数据的删除),此时当需要同步的数据量比较大时,旧数据的删除与新数据的插入,都需要一段时间,此时下游数据库中的表不可避免地会有一段时间的空档期,查询不到对应的数据。

怎么解决这个问题呢?

DATAX 官方推荐的一种方式是配置使用临时表,先向临时表导入数据,完成后再 rename 到线上表(可以通过在作业中配置postSql完成这类操作)。

除了临时表这种曲线救国的方式,也可以尝试以 UPSERT 语义直接更新下游数据库中线上的目标表数据。

那么 DATAX 中,不同数据库 WRITER 插件都是怎么实现 UPSERT 语义的呢?

2 DATAX 常见数据库 WRITER 插件是怎么实现 UPSERT 语义的?

  • datax 的 MysqlWriter 和 oceanbasev10writer, 支持配置 writeMode 参数为 insert/replace/update,可以通过该参数控制写入数据到目标表时,底层采用 insert into/replace into/INSERT INTO ... ON DUPLICATE KEY UPDATE 语句:
  • 其中 insert into 当主键/唯一性索引冲突时会写不进去冲突的行;
  • 后两者没有遇到主键/唯一性索引冲突时与 insert into 行为一致,遇到冲突时会用新行替换原有行所有字段;
  • datax原生的 OracleWriter 和PostgresqlWriter,不支持配置writeMode 参数,在底层实现上都是通过 JDBC 连接远程 Oracle/PG 数据库,并执行相应的 insert into ... sql 语句将数据写入 Oracle/pg,在内部会分批次提交入库。

那么,能不能更改原生的 OracleWriter 以支持 UPSERT 语义插入 ORALCE 呢?

3. ORACLE 的 MERGE INTO 语句

Oracle 9i 引入了对 merge语句的支持, 通过 merge 能够在一个SQL语句中对一个表同时执行 inserts 和 updates操作, Oracle 10g 对 MERGE 语句又做了如下增强:

  • UPDATE或INSERT子句是可选的
  • UPDATE和INSERT子句可以加WHERE子句
  • 在ON条件中可以使用常量过滤谓词来insert所有的行到目标表中,不需要连接源表和目标表
  • UPDATE子句后面可以跟DELETE子句来去除一些不需要的行。

merge into 语句语法如下:

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN
 [UPDATE sql] 
WHEN NOT MATCHED THEN 
[INSERT sql]

merge into 语句实例如下:

MERGE INTO member_staging x
USING (SELECT member_id, first_name, last_name, rank FROM members) y
ON (x.member_id  = y.member_id)
WHEN MATCHED THEN
    UPDATE SET x.first_name = y.first_name, 
                        x.last_name = y.last_name, 
                        x.rank = y.rank
    WHERE x.first_name <> y.first_name OR 
           x.last_name <> y.last_name OR 
           x.rank <> y.rank 
WHEN NOT MATCHED THEN
    INSERT(x.member_id, x.first_name, x.last_name, x.rank)  
    VALUES(y.member_id, y.first_name, y.last_name, y.rank);

所以,虽然 oracle 不支持类似 MYSQL的 REPLACE INTO 和 INSERT ... ON DUPLICATE KEY UPDATE,但由于 ORACLE 原生支持 MERGE INTO 语句,我们完全可以更改datax 的 OracleWriter 源码,通过 merge into 语句,实现 UPSERT 语义。

4. 更改 DATAX oracleWriter 以通过 MERGE INTO 语句实现 UPSERT 语义

涉及改动的 datax源码中类和方法的改动点主要有:

  • com.alibaba.datax.plugin.writer.oraclewriter.OracleWriter.Job#init:更改该方法以允许用户配置 writeMode;
  • com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil#dealWriteMode:更改该方法以获取用户配置的 uniqueKeys 并在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate: 更改该方法,以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,拼接获取 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#init:更改该方法以获取用户配置的 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#calcWriteRecordSql:更改该方法以在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#fillPreparedStatementColumnType(PreparedStatement, int, int, String, Column): 更改该方法以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,对 ORACLE MERGE INTO 语句对应的 preparedStatement 的变量进行 setString 等赋值操作;

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate 方法拼接获取的 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串,示例内容如下:

MERGE INTO %s x
USING (SELECT ? as member_id, ? as first_name, ? as last_name, ? as rank FROM dual) y
ON (x.member_id  = y.member_id and x.xxx = y.xx)
WHEN MATCHED THEN UPDATE SET 
                x.first_name = y.first_name, 
                x.last_name = y.last_name, 
                x.rank = y.rank
WHEN NOT MATCHED THEN INSERT(x.member_id, x.first_name, x.last_name, x.rank)  
    VALUES(?,?,?,?);


相关文章
|
15天前
|
存储 人工智能 Cloud Native
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
在9月20日2024云栖大会上,阿里云智能集团副总裁,数据库产品事业部负责人,ACM、CCF、IEEE会士(Fellow)李飞飞发表《从数据到智能:Data+AI驱动的云原生数据库》主题演讲。他表示,数据是生成式AI的核心资产,大模型时代的数据管理系统需具备多模处理和实时分析能力。阿里云瑶池将数据+AI全面融合,构建一站式多模数据管理平台,以数据驱动决策与创新,为用户提供像“搭积木”一样易用、好用、高可用的使用体验。
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
|
7天前
|
存储 Oracle 关系型数据库
【赵渝强老师】Oracle的还原数据
Oracle数据库中的还原数据(也称为undo数据或撤销数据)存储在还原表空间中,主要用于支持查询的一致性读取、实现闪回技术和恢复失败的事务。文章通过示例详细介绍了还原数据的工作原理和应用场景。
【赵渝强老师】Oracle的还原数据
|
17天前
|
SQL 关系型数据库 数据库
国产数据实战之docker部署MyWebSQL数据库管理工具
【10月更文挑战第23天】国产数据实战之docker部署MyWebSQL数据库管理工具
56 4
国产数据实战之docker部署MyWebSQL数据库管理工具
|
14天前
|
关系型数据库 分布式数据库 数据库
云栖大会|从数据到决策:AI时代数据库如何实现高效数据管理?
在2024云栖大会「海量数据的高效存储与管理」专场,阿里云瑶池讲师团携手AMD、FunPlus、太美医疗科技、中石化、平安科技以及小赢科技、迅雷集团的资深技术专家深入分享了阿里云在OLTP方向的最新技术进展和行业最佳实践。
|
22天前
|
人工智能 Cloud Native 容灾
云数据库“再进化”,OB Cloud如何打造云时代的数据底座?
云数据库“再进化”,OB Cloud如何打造云时代的数据底座?
|
30天前
|
SQL 存储 关系型数据库
数据储存数据库管理系统(DBMS)
【10月更文挑战第11天】
85 3
|
7天前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle的联机重做日志文件与数据写入过程
在Oracle数据库中,联机重做日志文件记录了数据库的变化,用于实例恢复。每个数据库有多组联机重做日志,每组建议至少有两个成员。通过SQL语句可查看日志文件信息。视频讲解和示意图进一步解释了这一过程。
|
7天前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle的数据文件
在Oracle数据库中,数据库由多个表空间组成,每个表空间包含多个数据文件。数据文件存储实际的数据库数据。查询时,如果内存中没有所需数据,Oracle会从数据文件中读取并加载到内存。可通过SQL语句查看和管理数据文件。附有视频讲解及示例。
|
1月前
|
SQL 存储 关系型数据库
添加数据到数据库的SQL语句详解与实践技巧
在数据库管理中,添加数据是一个基本操作,它涉及到向表中插入新的记录
|
1月前
|
SQL 监控 数据处理
SQL数据库数据修改操作详解
数据库是现代信息系统的重要组成部分,其中SQL(StructuredQueryLanguage)是管理和处理数据库的重要工具之一。在日常的业务运营过程中,数据的准确性和及时性对企业来说至关重要,这就需要掌握如何在数据库中正确地进行数据修改操作。本文将详细介绍在SQL数据库中如何修改数据,帮助读者更好
188 4

推荐镜像

更多