一、DTS链路内ETL介绍
DTS是一个数据迁移和同步服务,通常用于数据搬迁或实时数据传输。DTS在数据同步链路基础上,提供流式数据ETL数据处理功能,支持使用DSL(Domain Specific Language)脚本语言灵活地定义数据处理逻辑
二、DTS链路内ETL适用场景
2.1 数据脱敏
对敏感信息如电话号码,身份证号,地址等进行打码等方式脱敏, 如: 针对电话号码脱敏
原始数据: 18700001111
ETL处理之后的数据: 187****1111
2.2 数据归一化
根据规则将一组数据映射到另一组数据,如根据成绩将成绩 >=60分的归类为A, 将成绩小于60分的时归类为B
2.3 同步过程中附加列
业务A有以下用户表
CREATE TABLE USER_INFO (
USER_ID INT NOT NULL,
USER_NAME VARCHAR(32),
PRIMARY KEY(USER_ID)
)
在使用DTS数据同步时,希望在不改变源库表结构的前提下同步到其他的分析数据库上,并记录数据更时间 UPDATE_TIME
原始数据:
USER_ID | USER_NAME |
31846731 | 张三 |
ETL处理之后的数据:
USER_ID | USER_NAME | UPDATE_TIME |
31846731 | 张三 | 2023-04-01 12:51:46 |
2.4 数据过滤
"ETL数据过滤"和"DTS SQL条件过滤任务数据" 功能对比
| ETL数据过滤 | SQL条件过滤 |
作用域 | DTS实例级 | 表级别 |
语法 | DSL 语法 | SQL 语法 |
字段内容 | 数据库表字段内容,DTS附加的字段(如DML 类型,日志提交时间等) | 数据库表字段内容 |
条件表达式 | DTS附加属性和数据表达式组成过滤条件( 如过滤值ID=10的DELETE日志同步) | SQL表达式 |
推荐使用方法:
- DTS SQL条件过滤任务数据 适用于依据表内容进行数据过滤;
- ETL数据过滤适用于实例级配置,以及结合增量日志属性进行数据过滤
2.5 数据类型转换
根据规则将数据从一种类型映射到另一种类型,如将VARCHAR类型映射到INT类型
原始数据 | 123456 | 99.99 |
ETL处理之后的数据 | 123456 | 100 |
2.6 非法值处理
再异构数据库同步过程中, 存在一系列不兼容数据类型,例如Oracle同步到MySQL的链路,Oracle TIMESTAMP('4712BC-01-01 00:00:00' to '9999-12-31 23:59:59')与MySQL TIMESTAMP(0000-00-00 00:00:00, 1970-01-01 00:00:00 to 2038-01-19 03:14:07) 数据类型范围不兼容,需要进行数据转换
原始数据 | 0001-01-01 00:00:01 |
ETL处理之后的数据 | 1970-01-01 00:00:01 |
三、适用DTS控制台配置链路内ETL的方法
3.1 登陆DTS新版控制台并配置任务
参考公有云DTS文档:https://help.aliyun.com/document_detail/211600.html 配置任务。DTS任务配置有 配置源库及目标库信息、配置任务对象及高级配置、高级配置、预检查、购买 五个步骤,配置ETL功能在 高级配置 步骤,参考3.2
3.2 配置ETL功能
参考公有云DTS文档: https://help.aliyun.com/document_detail/411259.htm
在配置任务对象及高级配置步骤的高级配置中,配置ETL功能选择是,在输入框中按照数据处理DSL语法填写数据处理语句

单击下一步保存任务并预检查,完成后续步骤
四、链路内ETL DSL语言介绍
4.1 链路内ETL DSL概览

4.2 DTS Record 概念介绍如下:
DTS Record是DTS数据同步进程使用的中间数据存储,一个DTS Record代表一行记录;关系数据库中代表一行记录,MongoDB代表一个Document。所有DTS ETL处理都是基于DTS Record进行的。关系型数据库链路中 DTS Record包含表结构信息,在编写ETL DSL脚本时需要考虑到DTS Record结构的稳定,也就是针对同一个表经过不同DSL分支处理产生的表结构相同
4.2.1 INSERT DTS Record
全量数据同步阶段的都是INSERT DTS Record,增量数据同步阶段,源库日志中的INSERT日志 对应INSERT DTS Record。INSERT DTS Record内容如下,不包含前镜像值(插入之前的值),只包含后镜像值(插入的值)。只能获取后镜像值,只能设置后镜像值。
操作类型 | 值类型 | 业务字段: USER_ID | 业务字段: USER_NAME | DTS附加字段 |
INSERT | 前镜像(__BEFORE__) | |
后镜像(__AFTER__) | 1000 | 李四 | |
4.2.2 UPDATE DTS Record
增量数据同步阶段,源库日志中的UPDATE日志对应UPDATE DTS Record,UPDATE DTS Record不会出现在全量同步阶段。UPDATE DTS Record内容如下,包含前镜像值(修改之前的值)和后镜像值(修改之后的值)。可以获取前镜像值和后镜像值,可以设置前镜像和后镜像值。
操作类型 | 值类型 | 业务字段: USER_ID | 业务字段: USER_NAME | DTS附加字段 |
UPDATE | 前镜像(__BEFORE__) | 1000 | 张三 | |
后镜像(__AFTER__) | 1000 | 李四 | |
4.2.3 DELETE DTS Record
增量数据同步阶段,源库日志中的DELETE日志对应DELETE DTS Record,DELETE DTS Record不会出现在全量同步阶段。UPDATE DTS Record内容如下,只包含前镜像值(删除之前的值),无后镜像值。只能获取前镜像值,只能设置前镜像值。
操作类型 | 值类型 | 业务字段: USER_ID | 业务字段: USER_NAME | DTS附加字段 |
DELETE | 前镜像(__BEFORE__) | 1000 | 李四 | |
后镜像(__AFTER__) | |
4.3 ETL DSL语法介绍
DTS链路内ETL语言是由基础的操作(operator)和值表达式(column_exp,value_exp)组成。operator有e_set、e_if、e_if_else、e_switch、e_compose、e_split、record_function组成,值表达式由列引用、基础数据类型、基础函数组成,组合参考如下语言描述
operator:
e_set(column_exp,value_exp [,column_exp,value_exp]*)
| e_if(condition_exp, operator)
| e_if_else(condition_exp,operator,operator)
| e_switch(condition_exp,operator[,condition_exp,operator]* [default=operate]?)
| e_compose(operator [, operator]* )
| e_split(column_exp)
| e_drop()
| record_function()
一个完整的ETL配置,对应一个完整的operator, 可以是e_set、e_if、e_if_else、e_switch、e_compose、e_split、record_function其中的一种,也可以是符合语法的嵌套组合,当前不限制嵌套层次
4.3.1 基础数据类型
数据类型 | 数据范围 | 例子 |
8字节有符号数 | | e_set(id,1000) |
DECIMAL类型 | | e_set(dollar, 6.88) |
字符串 | | e_set(user_name, '张三') |
DATETIME类型 | 0000-00-01 00:00:00 ~9999-12-30 23:59:59 | e_set(gmt_modify, DATETIME("2018-01-01 11:11:11")) |
BOOLEAN类型 | TRUE FALSE | e_set(del_flag, true)) e_set(del_flag, false)) |
NULL值 | NULL | e_set(del_flag, null)) |
4.3.2 值表达式
数据类型 | 需求 | ETL脚本 |
列引用 | 使用id列填充sequence列 | e_set(sequence, id) |
表达式 | 使用当前时间填充CURRENT列 使用dollar * 6.88填充rmb列 | e_set(current, dts_now()) e_set(rmb, dollar*6.88) |
4.3.3 DTS附加字段
附加列名称 | 值 |
__DB__ | 源库名称 |
__TB__ | 源表名称 |
__OPERATION__ | DML类型(__OP_INSERT__, __OP_UPDATE__, __OP_DELETE__)中的一个,全量迁移只有__OP_INSERT__ |
__COMMIT_TIMESTAMP__ | 源库日志提交时间,全量迁移为时间戳0(1970-01-01 08:00:00) |
__BEFORE__ | DTS Record 的前镜像 |
__AFTER__ | DTS Record 的后镜像 |
4.3.4 设置字段的值
设置字段的值使用 e_set(column_exp,value_exp [,column_exp,value_exp]*), 可以设置一个字段的值
也可以设置多个字段的值, [,column_exp,value_exp]* 可以重复一次或者多次
e_set(id, 1000, user_name, '张三')
4.3.5 条件控制语句
4.3.5.1 e_if(condition_exp, operator)
当表达式condition_exp的值为true是,执行operator。如:忽略user_name是张三的记录
e_if(user_name =='张三', e_drop())
4.3.5.2 e_if_else(condition_exp, operator_true, operator_false)
当表达式condition_exp的值为true是,执行operator_true, 否则执行operator_false。如:当user_name是张三时,将dollar字段改成1000000,否则将dollar改成99
e_if(user_name =='张三', e_set(dollar, 1000000), e_set(dollar,99))
4.3.5.3 e_switch(condition_exp,operator[,condition_exp,operator]* [default=operate]?)
当condition_exp为true时执行紧跟该condition_exp的operator, 只会执行第一个condition_exp为true的分支。如:user_name是张三的记录的dollar字段改成1000000, user_name是李四的记录的dollar字段改成8888,user_name是其他值的记录的dollar改成0
e_switch(
user_name=='张三', e_set(dollar, 1000000),
user_name=='李四', e_set(dollar,99),
default=e_set(dollar,0)
)
4.3.5.4 e_compose(operator [, operator]* )
e_compose是一个辅助operator,用来将多个operator包装在一起。e_compose包装起来的每一个operator都会执行如
e_compose(
e_set(id, id*1000),
e_switch(
user_name=='张三', e_set(dollar, 1000000),
user_name=='李四', e_set(dollar,99),
default=e_set(dollar,0)
)
)
4.3.6 基础函数
4.3.6.1 数值运算
功能 | 语法 | 取值范围 | 返回值 | 示例 |
加法 | op_sum(value1, value2) | value1:整数或浮点数 value2:整数或浮点数 | 若参数均为整数,则返回整数,否则返回浮点数。 | op_sum(`col1`, 1.0) |
减法 | op_sub(value1,value2) | value1:整数或浮点数 value2:整数或浮点数 | 若参数均为整数,则返回整数,否则返回浮点数。 | op_sub(`col1`, 1.0) |
乘法 | op_mul(value1,value2) | value1:整数或浮点数 value2:整数或浮点数 | 若参数均为整数,则返回整数,否则返回浮点数。 | op_mul(`col1`, 1.0) |
除法 | op_div_true(value1, value2) | value1:整数或浮点数 value2:整数或浮点数 | 若参数均为整数,则返回整数,否则返回浮点数。 | op_div_true(`col1`, 2.0), 若col1=15,则返回7.5。 |
取模 | op_mod(value1, value2) | value1:整数或浮点数 value2:整数或浮点数 | 若参数均为整数,则返回整数,否则返回浮点数。 | op_mod(`col1`, 10),若col1=23,则返回3 |
4.3.6.2 逻辑运算
功能 | 语法 | 取值范围 | 返回值 | 示例 |
是否相等 | op_eq(value1, value2) | value1:整数、浮点数、字符串 value2:整数、浮点数、字符串 | boolean类型,true或false | op_eq(`col1`, 23) |
是否大于 | op_gt(value1, value2) | value1:整数、浮点数、字符串 value2:整数、浮点数、字符串 | boolean类型,true或false | op_gt(`col1`, 1.0) |
是否小于 | op_lt(value1, value2) | value1:整数、浮点数、字符串 value2:整数、浮点数、字符串 | boolean类型,true或false | op_lt(`col1`, 1.0) |
是否大于等于 | op_ge(value1, value2) | value1:整数、浮点数、字符串 value2:整数、浮点数、字符串 | boolean类型,true或false | op_ge(`col1`, 1.0) |
是否小于等于 | op_le(value1, value2) | value1:整数、浮点数、字符串 value2:整数、浮点数、字符串 | boolean类型,true或false | op_le(`col1`, 1.0) |
AND运算 | op_and(value1, value2) | value1:boolean类型 value2:boolean类型 | boolean类型,true或false | op_and(`is_male`, `is_student`) |
OR运算 | op_or(value1, value2) | value1:boolean类型 value2:boolean类型 | boolean类型,true或false | op_or(`is_male`, `is_student`) |
4.3.6.3 字符串函数
功能 | 语法 | 取值范围 | 返回值 | 示例 |
将字符串中所有大写字符转换为小写字符 | str_lower(value) | value:字符串 | 字符串 | str_lower(`col1`) |
将字符串中所有小写字符转换为大写字符 | str_upper(value) | value:字符串 | 字符串 | str_upper(`col1`) |
删除字符串中指定的字符 | str_strip(value1, value2) | value1: 字符串 value2: 字符串 | 字符串 | str_strip(`col1`, 'abc') |
截断字符串cond之后的部分 | substring_after(value, cond) | value: 字符串 cond: 字符串 | 字符串 | substring_after(`col`, 'abc') |
截断字符串cond之前的部分 | substring_before(value, cond) | value1: 字符串 cond: 字符串 | 字符串 | substring_before(`col`, 'efg') |
截断字符串cond1和cond2之间的部分 | substring_between(value, cond1, cond2) | value: 字符串 cond1: 字符串 cond2: 字符串 | 字符串 | substring_between(`col`, 'abc','efg') |
字符串转换数字 | cast_string_to_long(value) | value:字符串 | 整数 | cast_string_to_long(`col`) |
数字转换字符串 | cast_long_to_string(value) | value:整数 | 字符串 | cast_long_to_string(`col`) |
4.3.6.4 时间函数
功能 | 语法 | 取值范围 | 返回值 | 示例 |
当前系统时间 | dt_now() | 无 | DATETIME,精确到秒 | dts_now() |
dt_now_millis() | 无 | DATETIME,精确到毫秒 | dt_now_millis() |
UTC时间戳转DATETIME | dt_fromtimestamp(value) | value:整数 | DATETIME,精确到毫秒 | dt_fromtimestamp(1626837629) |
DATETIME转UTC时间戳 | dt_totimestamp(value) | value: DATETIME | 整数 | dt_totimestamp(`col`) |
DATETIME转字符串 | dt_str(value, format) | value:DATETIME format:字符串, yyyy-MM-dd HH:mm:ss 格式表示 | 字符串 | dt_str(`col1`, 'yyyy-MM-dd HH:mm:ss') |
字符串转DATETIME | dt_strptime(value,format) | value:字符串 format:字符串, yyyy-MM-dd HH:mm:ss 格式表示 | DATETIME | dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss') |
五、链路内ETL典型场景
5.1 数据脱敏
5.1.1 隐藏电话号码,身份证号等
e_set(phone, str_marsk(phone,3,6,'*'))
原始数据 | 18700001111 |
ETL处理之后的数据 | 187****1111 |
5.2 使用表过滤
使用条件控制语句和DTS附加字段 __DB__和__TB__组合,可以在指定表上执行ETL逻辑。如针对user_db.user_info表进行脱敏
e_if (
__DB__ == 'user_db' AND 'user_info' == __TB__,
e_set(phone, str_marsk(phone,3,6,'*'))
)
DTS附加列__DB__,__TB__和普通字符串一样,可以直接使用字符串函数。如下表名满足正则表达式user_[0-9]+,
进行数据脱敏
e_if(
regex_match(__TB__, 'user_[0-9]+'),
e_set(phone, str_marsk(phone,3,6,'*'))
)
5.2 数据过滤
过滤符合条件的DML,如过滤user_name是张三的记录
e_if(user_name == '张三', e_drop())
过滤增量迁移中的DELETE操作
e_if(__OPERATION__ == __OP_DELETE__, e_drop())
5.3 非法数据处理
使用 e_if(condition, e_set) 语句,通过condition条件提出到非法记录,再通过e_set语句修改非法记录
5.4 多表汇聚场景主键冲突解决方案
多个单元表的数据同步汇聚到中心表时,各单元表存在相同主键,汇聚存在主键冲突丢数据的问题
解决方法:
新增字段region_id
create table user_info(
id int not null auto_increment primary key,
user_name varchar(32)
)
create table global_user_info(
id int not null auto_increment,
user_name varchar(32),
region_id varchar(32),
primary key(id,region_id)
)
e_set(`region_id`,'cn-hangzhou')
通过ETL重新分配各个单元的id到不冲突的值域
create table user_info_0001(
id int not null auto_increment primary key,
user_name varchar(32)
)
create table global_user_info(
id int not null auto_increment,
user_name varchar(32),
primary key(id,region_id)
)
e_set(`id`,case_string_to_long(substring_after_last(`id`,'_')) * 10000000 + id)
5.5 将JSON展开为表的字段
参数名称 | 类型 | 默认值 | 介绍 | 例子 |
field_name | 字符串 | 无 | 需要展开的字段名称,区分大小写 | 'user_info' |
fmt='simple' | 字符串 | fmt='simple' | JSON展开名称规则,可选simple,full | fmt='full' |
sep='_' | 字符串 | sep='_' | Json字段名称层级之间的链接符, 如{"userInfo":{"userName:"hy"}} 展开后userInfo_userName | sep='_' |
mapping | 字符串 | 无 | 需要展开的字段名称映射规则 | mapping='{"username":"user_name"}' |
如下将 user_info: {"user_info":{"username":"hy","password":"******"}} 展开到字段 user_name和user_password
fmt='simple': 使用当前JSON字段名称
e_expand_json('user_info',fmt='simple',mapping='{"username":"user_name","password":"user_password"}')
fmt='full': 使用全路径JSON字段名称
e_expand_json('user_info',fmt='full',seq=':',mapping='{"user_info:username":"user_name","user_info:password":"user_password"}')
5.6 数据路由
根据计算规则,将数据路由到不同的表中,使用e_set(__DB__, new_db_name), e_set(__TB__, new_table_name); new_db_name和new_table_name 可以是字符串、返回值是字符串结果的函数调用、以及表达式
e_set(`__TB__`,`hy_0`)
e_set(`__DB__`, `hy_0`)
5.7 物理删除改为逻辑删除
目的表新增 del_flag varchar(2)字段,使用'Y'标识删除记录,使用'N'标识为删除记录,并转换DELETE SQL为UPDATE SQL;ETL脚本如下
e_if_else(
op_eq(__OPERATION__,__OP_DELETE__), e_set(`del_flag`,'Y'),
e_set(`def_flag`, 'N', __OPERATION__, __OP_UPDATE)
)