如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

简介: 如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?

1 业务背景

在数据架构上,很多大数据项目,都会将 HIVE/SPARK 等离线计算引擎计算获得的结果数据同步到下游业务系统的线上数据库,以对外提供服务,而且很多业务系统需要为客户提供稳定的 7*24小时的数据查询功能,要求底层数据库中的数据,需要是准确的,不能出现部分数据缺失的情况。

具体到数据同步工具的选型上,datax 是阿里开源的一款流行的数据集成工具,通过插件机制实现了多种异构数据之间的高效的离线数据同步,目前开源版本 datax 支持的插件已经将近 30 多种了,所以很多大数据项目都选用了 DATAX 来做离线数据的同步。

为保证数据的完整性与准确性,在使用 DATAX 进行数据同步时,目前很多项目都采用了先删除旧数据再插入计算生成的新数据的方式 (通过在作业中配置preSql执行旧数据的删除),此时当需要同步的数据量比较大时,旧数据的删除与新数据的插入,都需要一段时间,此时下游数据库中的表不可避免地会有一段时间的空档期,查询不到对应的数据。

怎么解决这个问题呢?

DATAX 官方推荐的一种方式是配置使用临时表,先向临时表导入数据,完成后再 rename 到线上表(可以通过在作业中配置postSql完成这类操作)。

除了临时表这种曲线救国的方式,也可以尝试以 UPSERT 语义直接更新下游数据库中线上的目标表数据。

那么 DATAX 中,不同数据库 WRITER 插件都是怎么实现 UPSERT 语义的呢?

2 DATAX 常见数据库 WRITER 插件是怎么实现 UPSERT 语义的?

  • datax 的 MysqlWriter 和 oceanbasev10writer, 支持配置 writeMode 参数为 insert/replace/update,可以通过该参数控制写入数据到目标表时,底层采用 insert into/replace into/INSERT INTO ... ON DUPLICATE KEY UPDATE 语句:
  • 其中 insert into 当主键/唯一性索引冲突时会写不进去冲突的行;
  • 后两者没有遇到主键/唯一性索引冲突时与 insert into 行为一致,遇到冲突时会用新行替换原有行所有字段;
  • datax原生的 OracleWriter 和PostgresqlWriter,不支持配置writeMode 参数,在底层实现上都是通过 JDBC 连接远程 Oracle/PG 数据库,并执行相应的 insert into ... sql 语句将数据写入 Oracle/pg,在内部会分批次提交入库。

那么,能不能更改原生的 OracleWriter 以支持 UPSERT 语义插入 ORALCE 呢?

3. ORACLE 的 MERGE INTO 语句

Oracle 9i 引入了对 merge语句的支持, 通过 merge 能够在一个SQL语句中对一个表同时执行 inserts 和 updates操作, Oracle 10g 对 MERGE 语句又做了如下增强:

  • UPDATE或INSERT子句是可选的
  • UPDATE和INSERT子句可以加WHERE子句
  • 在ON条件中可以使用常量过滤谓词来insert所有的行到目标表中,不需要连接源表和目标表
  • UPDATE子句后面可以跟DELETE子句来去除一些不需要的行。

merge into 语句语法如下:

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN
 [UPDATE sql] 
WHEN NOT MATCHED THEN 
[INSERT sql]

merge into 语句实例如下:

MERGE INTO member_staging x
USING (SELECT member_id, first_name, last_name, rank FROM members) y
ON (x.member_id  = y.member_id)
WHEN MATCHED THEN
    UPDATE SET x.first_name = y.first_name, 
                        x.last_name = y.last_name, 
                        x.rank = y.rank
    WHERE x.first_name <> y.first_name OR 
           x.last_name <> y.last_name OR 
           x.rank <> y.rank 
WHEN NOT MATCHED THEN
    INSERT(x.member_id, x.first_name, x.last_name, x.rank)  
    VALUES(y.member_id, y.first_name, y.last_name, y.rank);

所以,虽然 oracle 不支持类似 MYSQL的 REPLACE INTO 和 INSERT ... ON DUPLICATE KEY UPDATE,但由于 ORACLE 原生支持 MERGE INTO 语句,我们完全可以更改datax 的 OracleWriter 源码,通过 merge into 语句,实现 UPSERT 语义。

4. 更改 DATAX oracleWriter 以通过 MERGE INTO 语句实现 UPSERT 语义

涉及改动的 datax源码中类和方法的改动点主要有:

  • com.alibaba.datax.plugin.writer.oraclewriter.OracleWriter.Job#init:更改该方法以允许用户配置 writeMode;
  • com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil#dealWriteMode:更改该方法以获取用户配置的 uniqueKeys 并在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate: 更改该方法,以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,拼接获取 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#init:更改该方法以获取用户配置的 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#calcWriteRecordSql:更改该方法以在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;
  • com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#fillPreparedStatementColumnType(PreparedStatement, int, int, String, Column): 更改该方法以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,对 ORACLE MERGE INTO 语句对应的 preparedStatement 的变量进行 setString 等赋值操作;

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate 方法拼接获取的 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串,示例内容如下:

MERGE INTO %s x
USING (SELECT ? as member_id, ? as first_name, ? as last_name, ? as rank FROM dual) y
ON (x.member_id  = y.member_id and x.xxx = y.xx)
WHEN MATCHED THEN UPDATE SET 
                x.first_name = y.first_name, 
                x.last_name = y.last_name, 
                x.rank = y.rank
WHEN NOT MATCHED THEN INSERT(x.member_id, x.first_name, x.last_name, x.rank)  
    VALUES(?,?,?,?);


相关文章
|
3天前
|
关系型数据库 MySQL 数据库
实时计算 Flink版产品使用合集之支持将数据写入 OceanBase 数据库吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
23 5
|
3天前
|
关系型数据库 MySQL API
实时计算 Flink版产品使用合集之可以通过mysql-cdc动态监听MySQL数据库的数据变动吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
78 0
|
2天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错合集之采集oracle的时候报ORA-65040:不允许从可插入数据库内部执行该操作如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
20 3
|
3天前
|
NoSQL Shell MongoDB
NoSQL数据使用指令和引擎连接数据库实例
【5月更文挑战第8天】本文介绍了MongoDB的本地使用和常用操作,包括通过mongo shell连接数据库、显示数据库和集合,以及副本集设置。最后提到了MongoDB的日志功能和顶点集的使用,如capped collection的创建和管理。
41 3
|
3天前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之可以通过配置Oracle数据库的schema注册表来监测表结构的变化吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
10 1
|
3天前
|
存储 SQL Oracle
关系型数据库文件方式存储DATA FILE(数据文件)
【5月更文挑战第11天】关系型数据库文件方式存储DATA FILE(数据文件)
14 3
|
4天前
|
消息中间件 Java Kafka
实时计算 Flink版产品使用合集之可以将数据写入 ClickHouse 数据库中吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
15 1
|
4天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用合集之Flink CDC 2.3.0和Flink 1.17,无法从MySQL数据库中抽取数据,是什么原因导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
20 1
|
4天前
|
存储 JSON 前端开发
数据库中的数据
数据库中的数据
10 0
|
4天前
|
监控 安全 关系型数据库
关系型数据库数据完整性保障
【5月更文挑战第10天】关系型数据库数据完整性保障
15 4

推荐镜像

更多