如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?
1 业务背景
在数据架构上,很多大数据项目,都会将 HIVE/SPARK 等离线计算引擎计算获得的结果数据同步到下游业务系统的线上数据库,以对外提供服务,而且很多业务系统需要为客户提供稳定的 7*24小时的数据查询功能,要求底层数据库中的数据,需要是准确的,不能出现部分数据缺失的情况。
具体到数据同步工具的选型上,datax 是阿里开源的一款流行的数据集成工具,通过插件机制实现了多种异构数据之间的高效的离线数据同步,目前开源版本 datax 支持的插件已经将近 30 多种了,所以很多大数据项目都选用了 DATAX 来做离线数据的同步。
为保证数据的完整性与准确性,在使用 DATAX 进行数据同步时,目前很多项目都采用了先删除旧数据再插入计算生成的新数据的方式 (通过在作业中配置preSql执行旧数据的删除),此时当需要同步的数据量比较大时,旧数据的删除与新数据的插入,都需要一段时间,此时下游数据库中的表不可避免地会有一段时间的空档期,查询不到对应的数据。
怎么解决这个问题呢?
DATAX 官方推荐的一种方式是配置使用临时表,先向临时表导入数据,完成后再 rename 到线上表(可以通过在作业中配置postSql完成这类操作)。
除了临时表这种曲线救国的方式,也可以尝试以 UPSERT 语义直接更新下游数据库中线上的目标表数据。
那么 DATAX 中,不同数据库 WRITER 插件都是怎么实现 UPSERT 语义的呢?
2 DATAX 常见数据库 WRITER 插件是怎么实现 UPSERT 语义的?
- datax 的 MysqlWriter 和 oceanbasev10writer, 支持配置 writeMode 参数为 insert/replace/update,可以通过该参数控制写入数据到目标表时,底层采用 insert into/replace into/INSERT INTO ... ON DUPLICATE KEY UPDATE 语句:
- 其中 insert into 当主键/唯一性索引冲突时会写不进去冲突的行;
- 后两者没有遇到主键/唯一性索引冲突时与 insert into 行为一致,遇到冲突时会用新行替换原有行所有字段;
- datax原生的 OracleWriter 和PostgresqlWriter,不支持配置writeMode 参数,在底层实现上都是通过 JDBC 连接远程 Oracle/PG 数据库,并执行相应的 insert into ... sql 语句将数据写入 Oracle/pg,在内部会分批次提交入库。
那么,能不能更改原生的 OracleWriter 以支持 UPSERT 语义插入 ORALCE 呢?
3. ORACLE 的 MERGE INTO 语句
Oracle 9i 引入了对 merge语句的支持, 通过 merge 能够在一个SQL语句中对一个表同时执行 inserts 和 updates操作, Oracle 10g 对 MERGE 语句又做了如下增强:
- UPDATE或INSERT子句是可选的
- UPDATE和INSERT子句可以加WHERE子句
- 在ON条件中可以使用常量过滤谓词来insert所有的行到目标表中,不需要连接源表和目标表
- UPDATE子句后面可以跟DELETE子句来去除一些不需要的行。
merge into 语句语法如下:
MERGE INTO [target-table] A USING [source-table sql] B ON([conditional expression] and [...]...) WHEN MATCHED THEN [UPDATE sql] WHEN NOT MATCHED THEN [INSERT sql]
merge into 语句实例如下:
MERGE INTO member_staging x USING (SELECT member_id, first_name, last_name, rank FROM members) y ON (x.member_id = y.member_id) WHEN MATCHED THEN UPDATE SET x.first_name = y.first_name, x.last_name = y.last_name, x.rank = y.rank WHERE x.first_name <> y.first_name OR x.last_name <> y.last_name OR x.rank <> y.rank WHEN NOT MATCHED THEN INSERT(x.member_id, x.first_name, x.last_name, x.rank) VALUES(y.member_id, y.first_name, y.last_name, y.rank);
所以,虽然 oracle 不支持类似 MYSQL的 REPLACE INTO 和 INSERT ... ON DUPLICATE KEY UPDATE,但由于 ORACLE 原生支持 MERGE INTO 语句,我们完全可以更改datax 的 OracleWriter 源码,通过 merge into 语句,实现 UPSERT 语义。
4. 更改 DATAX oracleWriter 以通过 MERGE INTO 语句实现 UPSERT 语义
涉及改动的 datax源码中类和方法的改动点主要有:
- com.alibaba.datax.plugin.writer.oraclewriter.OracleWriter.Job#init:更改该方法以允许用户配置 writeMode;
- com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil#dealWriteMode:更改该方法以获取用户配置的 uniqueKeys 并在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;
- com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate: 更改该方法,以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,拼接获取 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串;
- com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#init:更改该方法以获取用户配置的 uniqueKeys;
- com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#calcWriteRecordSql:更改该方法以在调用 WriterUtil.getWriteTemplate 时传递 uniqueKeys;
- com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task#fillPreparedStatementColumnType(PreparedStatement, int, int, String, Column): 更改该方法以在用户配置 writeMode 使用 replace 且配置了uniqueKeys时,对 ORACLE MERGE INTO 语句对应的 preparedStatement 的变量进行 setString 等赋值操作;
com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil#getWriteTemplate 方法拼接获取的 ORACLE MERGE INTO 语句对应的 preparedStatement 字符串,示例内容如下:
MERGE INTO %s x USING (SELECT ? as member_id, ? as first_name, ? as last_name, ? as rank FROM dual) y ON (x.member_id = y.member_id and x.xxx = y.xx) WHEN MATCHED THEN UPDATE SET x.first_name = y.first_name, x.last_name = y.last_name, x.rank = y.rank WHEN NOT MATCHED THEN INSERT(x.member_id, x.first_name, x.last_name, x.rank) VALUES(?,?,?,?);