如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

简介: 如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

1 业务背景

在数据架构上,很多大数据项目,都会将 HIVE/SPARK 等离线计算引擎计算获得的结果数据同步到下游业务系统的线上数据库,以对外提供服务,而且很多业务系统需要为客户提供稳定的 7*24小时的数据查询功能,要求底层数据库中的数据,需要是准确的,不能出现部分数据缺失的情况。

具体到数据同步工具的选型上,datax 是阿里开源的一款流行的数据集成工具,通过插件机制实现了多种异构数据之间的高效的离线数据同步,目前开源版本 datax 支持的插件已经将近 30 多种了,所以很多大数据项目都选用了 DATAX 来做离线数据的同步。

为保证数据的完整性与准确性,在使用 DATAX 进行数据同步时,目前很多项目都采用了先删除旧数据再插入计算生成的新数据的方式 (通过在作业中配置preSql执行旧数据的删除),此时当需要同步的数据量比较大时,旧数据的删除与新数据的插入,都需要一段时间,此时下游数据库中的表不可避免地会有一段时间的空档期,查询不到对应的数据。

怎么解决这个问题呢?

DATAX 官方推荐的一种方式是配置使用临时表,先向临时表导入数据,完成后再 rename 到线上表(可以通过在作业中配置postSql完成这类操作)。

除了临时表这种曲线救国的方式,也可以尝试以 UPSERT 语义直接更新下游数据库中线上的目标表数据。

那么 DATAX 中,不同数据库 WRITER 插件都是怎么实现 UPSERT 语义的呢?

2 DATAX 常见数据库 WRITER 插件是怎么实现 UPSERT 语义的?

  • datax 的 MysqlWriter 和 oceanbasev10writer, 支持配置 writeMode 参数为 insert/replace/update,可以通过该参数控制写入数据到目标表时,底层采用 insert into/replace into/INSERT INTO ... ON DUPLICATE KEY UPDATE 语句:
  • 其中 insert into 当主键/唯一性索引冲突时会写不进去冲突的行;
  • 后两者没有遇到主键/唯一性索引冲突时与 insert into 行为一致,遇到冲突时会用新行替换原有行所有字段;
  • datax原生的 OracleWriter 和PostgresqlWriter,不支持配置writeMode 参数,在底层实现上都是通过 JDBC 连接远程 Oracle/PG 数据库,并执行相应的 insert into ... sql 语句将数据写入 Oracle/pg,在内部会分批次提交入库。

那么,能不能更改原生的 OracleWriter 以支持 UPSERT 语义插入 ORALCE 呢?

3. ORACLE 的 MERGE INTO 语句

Oracle 9i 引入了对 merge语句的支持, 通过 merge 能够在一个SQL语句中对一个表同时执行 inserts 和 updates操作, Oracle 10g 对 MERGE 语句又做了如下增强:

  • UPDATE或INSERT子句是可选的
  • UPDATE和INSERT子句可以加WHERE子句
  • 在ON条件中可以使用常量过滤谓词来insert所有的行到目标表中,不需要连接源表和目标表
  • UPDATE子句后面可以跟DELETE子句来去除一些不需要的行。

merge into 语句语法如下:

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN
 [UPDATE sql] 
WHEN NOT MATCHED THEN 
[INSERT sql]

merge into 语句实例如下:

MERGE INTO member_staging x
USING (SELECT member_id, first_name, last_name, rank FROM members) y
ON (x.member_id  = y.member_id)
WHEN MATCHED THEN
    UPDATE SET x.first_name = y.first_name, 
                        x.last_name = y.last_name, 
                        x.rank = y.rank
    WHERE x.first_name <> y.first_name OR 
           x.last_name <> y.last_name OR 
           x.rank <> y.rank 
WHEN NOT MATCHED THEN
    INSERT(x.member_id, x.first_name, x.last_name, x.rank)  
    VALUES(y.member_id, y.first_name, y.last_name, y.rank);

所以,虽然 oracle 不支持类似 MYSQL的 REPLACE INTO 和 INSERT ... ON DUPLICATE KEY UPDATE,但由于 ORACLE 原生支持 MERGE INTO 语句,我们完全可以更改datax 的 OracleWriter 源码,通过 merge into 语句,实现 UPSERT 语义。

4. 更改 DATAX oracleWriter 以通过 MERGE INTO 语句实现 UPSERT 语义

涉及改动的 datax源码中类和方法的改动点主要有:

  • com.alibaba.datax.plugin.writer.oraclewriter.OracleWriter.Job#init:更改该方法以允许用户配置 writeMode;
  • com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil#dealWriteMode:更改该方法以获取用户配置的 uniqueKeys 并在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate: 更改该方法,以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,拼接获取 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#init:更改该方法以获取用户配置的 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#calcWriteRecordSql:更改该方法以在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#fillPreparedStatementColumnType(PreparedStatement, int, int, String, Column): 更改该方法以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,对 ORACLE MERGE INTO 语句对应的 preparedStatement 的变量进行 setString 等赋值操作;

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate 方法拼接获取的 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串,示例内容如下:

MERGE INTO %s x
USING (SELECT ? as member_id, ? as first_name, ? as last_name, ? as rank FROM dual) y
ON (x.member_id  = y.member_id and x.xxx = y.xx)
WHEN MATCHED THEN UPDATE SET 
                x.first_name = y.first_name, 
                x.last_name = y.last_name, 
                x.rank = y.rank
WHEN NOT MATCHED THEN INSERT(x.member_id, x.first_name, x.last_name, x.rank)  
    VALUES(?,?,?,?);


相关文章
|
2月前
|
存储 JSON 关系型数据库
【干货满满】解密 API 数据解析:从 JSON 到数据库存储的完整流程
本文详解电商API开发中JSON数据解析与数据库存储的全流程,涵盖数据提取、清洗、转换及优化策略,结合Python实战代码与主流数据库方案,助开发者构建高效、可靠的数据处理管道。
|
12天前
|
存储 数据管理 数据库
数据字典是什么?和数据库、数据仓库有什么关系?
在数据处理中,你是否常困惑于字段含义、指标计算或数据来源?数据字典正是解答这些问题的关键工具,它清晰定义数据的名称、类型、来源、计算方式等,服务于开发者、分析师和数据管理者。本文详解数据字典的定义、组成及其与数据库、数据仓库的关系,助你夯实数据基础。
数据字典是什么?和数据库、数据仓库有什么关系?
|
5月前
|
存储 缓存 数据库
数据库数据删除策略:硬删除vs软删除的最佳实践指南
在项目开发中,“删除”操作常见但方式多样,主要分为硬删除与软删除。硬删除直接从数据库移除数据,操作简单、高效,但不可恢复;适用于临时或敏感数据。软删除通过标记字段保留数据,支持恢复和审计,但增加查询复杂度与数据量;适合需追踪历史或可恢复的场景。两者各有优劣,实际开发中常结合使用以满足不同需求。
356 4
|
22天前
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL数据库的WAL日志与数据写入的过程
PostgreSQL中的WAL(预写日志)是保证数据完整性的关键技术。在数据修改前,系统会先将日志写入WAL,确保宕机时可通过日志恢复数据。它减少了磁盘I/O,提升了性能,并支持手动切换日志文件。WAL文件默认存储在pg_wal目录下,采用16进制命名规则。此外,PostgreSQL提供pg_waldump工具解析日志内容。
|
6月前
|
存储 Oracle 关系型数据库
【YashanDB 知识库】YMP 校验从 yashandb 同步到 oracle 的数据时,字段 timestamp(0) 出现不一致
在YMP校验过程中,从yashandb同步至Oracle的数据出现timestamp(0)字段不一致问题。原因是yashandb的timestamp(x)存储为固定6位小数,而Oracle的timestamp(0)无小数位,同步时会截断yashandb的6位小数,导致数据差异。受影响版本:yashandb 23.2.7.101、YMP 23.3.1.3、YDS联调版本。此问题会导致YMP校验数据内容不一致。
|
3月前
|
存储 SQL Java
数据存储使用文件还是数据库,哪个更合适?
数据库和文件系统各有优劣:数据库读写性能较低、结构 rigid,但具备计算能力和数据一致性保障;文件系统灵活易管理、读写高效,但缺乏计算能力且无法保证一致性。针对仅需高效存储与灵活管理的场景,文件系统更优,但其计算短板可通过开源工具 SPL(Structured Process Language)弥补。SPL 提供独立计算语法及高性能文件格式(如集文件、组表),支持复杂计算与多源混合查询,甚至可替代数据仓库。此外,SPL 易集成、支持热切换,大幅提升开发运维效率,是后数据库时代文件存储的理想补充方案。
|
6月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
|
6月前
|
存储 Oracle 关系型数据库
【YashanDB 知识库】YMP 校验从 yashandb 同步到 oracle 的数据时,字段 timestamp(0) 出现不一致
【YashanDB 知识库】YMP 校验从 yashandb 同步到 oracle 的数据时,字段 timestamp(0) 出现不一致
|
6月前
|
数据库 Python
【YashanDB知识库】python驱动查询gbk字符集崖山数据库CLOB字段,数据被驱动截断
【YashanDB知识库】python驱动查询gbk字符集崖山数据库CLOB字段,数据被驱动截断
|
5月前
|
人工智能 关系型数据库 分布式数据库
让数据与AI贴得更近,阿里云瑶池数据库系列产品焕新升级
4月9日阿里云AI势能大会上,阿里云瑶池数据库发布重磅新品及一系列产品能力升级。「推理加速服务」Tair KVCache全新上线,实现KVCache动态分层存储,显著提高内存资源利用率,为大模型推理降本提速。

热门文章

最新文章

推荐镜像

更多