DTS链路内ETL使用手册

简介: DTS是一个数据迁移和同步服务,通常用于数据搬迁或实时数据传输。DTS在数据同步链路基础上,提供流式数据ETL数据处理功能,支持使用DSL(Domain Specific Language)脚本语言灵活地定义数据处理逻辑
+关注继续查看

一、DTS链路内ETL介绍

DTS是一个数据迁移和同步服务,通常用于数据搬迁或实时数据传输。DTS在数据同步链路基础上,提供流式数据ETL数据处理功能,支持使用DSL(Domain Specific Language)脚本语言灵活地定义数据处理逻辑

二、DTS链路内ETL适用场景

2.1 数据脱敏

对敏感信息如电话号码,身份证号,地址等进行打码等方式脱敏, 如: 针对电话号码脱敏

原始数据: 18700001111

ETL处理之后的数据: 187****1111

2.2 数据归一化

根据规则将一组数据映射到另一组数据,如根据成绩将成绩 >=60分的归类为A, 将成绩小于60分的时归类为B

原始数据

59

99

ETL处理之后的数据

B

A

2.3 同步过程中附加列

业务A有以下用户表

CREATE TABLE USER_INFO (
    USER_ID INT NOT NULL,
    USER_NAME VARCHAR(32),
    PRIMARY KEY(USER_ID)
)

在使用DTS数据同步时,希望在不改变源库表结构的前提下同步到其他的分析数据库上,并记录数据更时间 UPDATE_TIME

原始数据:

USER_ID

USER_NAME

31846731

张三

ETL处理之后的数据:

USER_ID

USER_NAME

UPDATE_TIME

31846731

张三

2023-04-01 12:51:46

2.4 数据过滤

"ETL数据过滤"和"DTS SQL条件过滤任务数据" 功能对比

ETL数据过滤

SQL条件过滤

作用域

DTS实例级

表级别

语法

DSL 语法

SQL 语法

字段内容

数据库表字段内容,DTS附加的字段(如DML 类型,日志提交时间等)

数据库表字段内容

条件表达式

DTS附加属性和数据表达式组成过滤条件( 如过滤值ID=10的DELETE日志同步)

SQL表达式

推荐使用方法:

  • DTS SQL条件过滤任务数据 适用于依据表内容进行数据过滤;
  • ETL数据过滤适用于实例级配置,以及结合增量日志属性进行数据过滤

2.5 数据类型转换

根据规则将数据从一种类型映射到另一种类型,如将VARCHAR类型映射到INT类型

原始数据

123456

99.99

ETL处理之后的数据

123456

100

2.6 非法值处理

再异构数据库同步过程中, 存在一系列不兼容数据类型,例如Oracle同步到MySQL的链路,Oracle TIMESTAMP('4712BC-01-01 00:00:00' to '9999-12-31 23:59:59')与MySQL TIMESTAMP(0000-00-00 00:00:00, 1970-01-01 00:00:00 to 2038-01-19 03:14:07) 数据类型范围不兼容,需要进行数据转换

原始数据

0001-01-01 00:00:01

ETL处理之后的数据

1970-01-01 00:00:01

三、适用DTS控制台配置链路内ETL的方法

3.1 登陆DTS新版控制台并配置任务

参考公有云DTS文档:https://help.aliyun.com/document_detail/211600.html 配置任务。DTS任务配置有 配置源库及目标库信息、配置任务对象及高级配置、高级配置预检查购买 五个步骤,配置ETL功能在 高级配置 步骤,参考3.2

3.2 配置ETL功能

参考公有云DTS文档:   https://help.aliyun.com/document_detail/411259.htm 

配置任务对象及高级配置步骤的高级配置中,配置ETL功能选择,在输入框中按照数据处理DSL语法填写数据处理语句

image

单击下一步保存任务并预检查,完成后续步骤

四、链路内ETL DSL语言介绍

4.1  链路内ETL DSL概览

DTS.jpg

4.2  DTS Record 概念介绍如下:

DTS Record是DTS数据同步进程使用的中间数据存储,一个DTS Record代表一行记录;关系数据库中代表一行记录,MongoDB代表一个Document。所有DTS ETL处理都是基于DTS Record进行的。关系型数据库链路中 DTS Record包含表结构信息,在编写ETL DSL脚本时需要考虑到DTS Record结构的稳定,也就是针对同一个表经过不同DSL分支处理产生的表结构相同

4.2.1 INSERT DTS Record

全量数据同步阶段的都是INSERT DTS Record,增量数据同步阶段,源库日志中的INSERT日志 对应INSERT DTS Record。INSERT DTS Record内容如下,不包含前镜像值(插入之前的值),只包含后镜像值(插入的值)。只能获取后镜像值,只能设置后镜像值。

操作类型

值类型

业务字段: USER_ID

业务字段: USER_NAME

DTS附加字段

INSERT

前镜像(__BEFORE__

后镜像(__AFTER__

1000

李四

4.2.2  UPDATE DTS Record

增量数据同步阶段,源库日志中的UPDATE日志对应UPDATE DTS Record,UPDATE DTS Record不会出现在全量同步阶段。UPDATE DTS Record内容如下,包含前镜像值(修改之前的值)和后镜像值(修改之后的值)。可以获取前镜像值和后镜像值,可以设置前镜像和后镜像值。

操作类型

值类型

业务字段: USER_ID

业务字段: USER_NAME

DTS附加字段

UPDATE

前镜像(__BEFORE__

1000

张三

后镜像(__AFTER__

1000

李四

4.2.3  DELETE DTS Record

增量数据同步阶段,源库日志中的DELETE日志对应DELETE DTS Record,DELETE DTS Record不会出现在全量同步阶段。UPDATE DTS Record内容如下,只包含前镜像值(删除之前的值),无后镜像值。只能获取前镜像值,只能设置前镜像值。

操作类型

值类型

业务字段: USER_ID

业务字段: USER_NAME

DTS附加字段

DELETE

前镜像(__BEFORE__

1000

李四

后镜像(__AFTER__

4.3 ETL DSL语法介绍

DTS链路内ETL语言是由基础的操作(operator)和值表达式(column_exp,value_exp)组成。operator有e_set、e_if、e_if_else、e_switch、e_compose、e_split、record_function组成,值表达式由列引用、基础数据类型、基础函数组成,组合参考如下语言描述

operator:
e_set(column_exp,value_exp [,column_exp,value_exp]*)
| e_if(condition_exp, operator)
| e_if_else(condition_exp,operator,operator)
| e_switch(condition_exp,operator[,condition_exp,operator]* [default=operate]?)
| e_compose(operator [, operator]* )
| e_split(column_exp)
| e_drop()
| record_function()

一个完整的ETL配置,对应一个完整的operator, 可以是e_set、e_if、e_if_else、e_switch、e_compose、e_split、record_function其中的一种,也可以是符合语法的嵌套组合,当前不限制嵌套层次

4.3.1 基础数据类型

数据类型

数据范围

例子

8字节有符号数

e_set(id,1000)

DECIMAL类型

e_set(dollar, 6.88)

字符串

e_set(user_name, '张三')

DATETIME类型

0000-00-01 00:00:00

~9999-12-30 23:59:59

e_set(gmt_modify, DATETIME("2018-01-01 11:11:11"))

BOOLEAN类型

TRUE

FALSE

e_set(del_flag, true))

e_set(del_flag, false))

NULL值

NULL

e_set(del_flag, null))

4.3.2 值表达式

数据类型

需求

ETL脚本

列引用

使用id列填充sequence列

e_set(sequence, id)

表达式

使用当前时间填充CURRENT列

使用dollar * 6.88填充rmb列

e_set(current, dts_now())

e_set(rmb, dollar*6.88)

4.3.3 DTS附加字段

附加列名称

__DB__

源库名称

__TB__

源表名称

__OPERATION__

DML类型(__OP_INSERT__, __OP_UPDATE__, __OP_DELETE__)中的一个,全量迁移只有__OP_INSERT__

__COMMIT_TIMESTAMP__

源库日志提交时间,全量迁移为时间戳0(1970-01-01 08:00:00)

__BEFORE__

DTS Record 的前镜像

__AFTER__

DTS Record 的后镜像

4.3.4 设置字段的值

设置字段的值使用 e_set(column_exp,value_exp [,column_exp,value_exp]*), 可以设置一个字段的值

e_set(id, 1000)

也可以设置多个字段的值, [,column_exp,value_exp]* 可以重复一次或者多次

e_set(id, 1000, user_name, '张三')

4.3.5 条件控制语句

4.3.5.1 e_if(condition_exp, operator)

当表达式condition_exp的值为true是,执行operator。如:忽略user_name是张三的记录

e_if(user_name =='张三', e_drop())

4.3.5.2 e_if_else(condition_exp, operator_true,  operator_false)

当表达式condition_exp的值为true是,执行operator_true, 否则执行operator_false。如:当user_name是张三时,将dollar字段改成1000000,否则将dollar改成99

e_if(user_name =='张三', e_set(dollar, 1000000), e_set(dollar,99))

4.3.5.3 e_switch(condition_exp,operator[,condition_exp,operator]* [default=operate]?)

当condition_exp为true时执行紧跟该condition_exp的operator, 只会执行第一个condition_exp为true的分支。如:user_name是张三的记录的dollar字段改成1000000, user_name是李四的记录的dollar字段改成8888,user_name是其他值的记录的dollar改成0

e_switch(
  user_name=='张三', e_set(dollar, 1000000), 
  user_name=='李四', e_set(dollar,99), 
  default=e_set(dollar,0)
)

4.3.5.4 e_compose(operator [, operator]* )

e_compose是一个辅助operator,用来将多个operator包装在一起。e_compose包装起来的每一个operator都会执行如

e_compose(
    e_set(id, id*1000),
    e_switch(
      user_name=='张三', e_set(dollar, 1000000), 
      user_name=='李四', e_set(dollar,99), 
      default=e_set(dollar,0)
    )
)

4.3.6 基础函数

4.3.6.1 数值运算

功能

语法

取值范围

返回值

示例

加法

op_sum(value1, value2)

value1:整数或浮点数

value2:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_sum(`col1`, 1.0)

减法

op_sub(value1,value2)

value1:整数或浮点数

value2:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_sub(`col1`, 1.0)

乘法

op_mul(value1,value2)

value1:整数或浮点数

value2:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_mul(`col1`, 1.0)

除法

op_div_true(value1, value2)

value1:整数或浮点数

value2:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_div_true(`col1`, 2.0), 若col1=15,则返回7.5。

取模

op_mod(value1, value2)

value1:整数或浮点数

value2:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_mod(`col1`, 10),若col1=23,则返回3

4.3.6.2 逻辑运算

功能

语法

取值范围

返回值

示例

是否相等

op_eq(value1, value2)

value1:整数、浮点数、字符串

value2:整数、浮点数、字符串

boolean类型,true或false

op_eq(`col1`, 23)

是否大于

op_gt(value1, value2)

value1:整数、浮点数、字符串

value2:整数、浮点数、字符串

boolean类型,true或false

op_gt(`col1`, 1.0)

是否小于

op_lt(value1, value2)

value1:整数、浮点数、字符串

value2:整数、浮点数、字符串

boolean类型,true或false

op_lt(`col1`, 1.0)

是否大于等于

op_ge(value1, value2)

value1:整数、浮点数、字符串

value2:整数、浮点数、字符串

boolean类型,true或false

op_ge(`col1`, 1.0)

是否小于等于

op_le(value1, value2)

value1:整数、浮点数、字符串

value2:整数、浮点数、字符串

boolean类型,true或false

op_le(`col1`, 1.0)

AND运算

op_and(value1, value2)

value1:boolean类型

value2:boolean类型

boolean类型,true或false

op_and(`is_male`, `is_student`)

OR运算

op_or(value1, value2)

value1:boolean类型

value2:boolean类型

boolean类型,true或false

op_or(`is_male`, `is_student`)

4.3.6.3 字符串函数

功能

语法

取值范围

返回值

示例

将字符串中所有大写字符转换为小写字符

str_lower(value)

value:字符串

字符串

str_lower(`col1`)

将字符串中所有小写字符转换为大写字符

str_upper(value)

value:字符串

字符串

str_upper(`col1`)

删除字符串中指定的字符

str_strip(value1, value2)

value1:  字符串

value2:  字符串

字符串

str_strip(`col1`, 'abc')

截断字符串cond之后的部分

substring_after(value, cond)

value:  字符串

cond:  字符串

字符串

substring_after(`col`, 'abc')

截断字符串cond之前的部分

substring_before(value, cond)

value1:  字符串

cond:  字符串

字符串

substring_before(`col`, 'efg')

截断字符串cond1和cond2之间的部分

substring_between(value, cond1, cond2)

value:  字符串

cond1:  字符串

cond2:  字符串

字符串

substring_between(`col`, 'abc','efg')

字符串转换数字

cast_string_to_long(value)

value:字符串

整数

cast_string_to_long(`col`)

数字转换字符串

cast_long_to_string(value)

value:整数

字符串

cast_long_to_string(`col`)

4.3.6.4 时间函数

功能

语法

取值范围

返回值

示例

当前系统时间

dt_now()

DATETIME,精确到秒

dts_now()

dt_now_millis()

DATETIME,精确到毫秒

dt_now_millis()

UTC时间戳转DATETIME

dt_fromtimestamp(value)

value:整数

DATETIME,精确到毫秒

dt_fromtimestamp(1626837629)

DATETIME转UTC时间戳

dt_totimestamp(value)

value: DATETIME

整数

dt_totimestamp(`col`)

DATETIME转字符串

dt_str(value, format)

value:DATETIME

format:字符串, yyyy-MM-dd HH:mm:ss 格式表示

字符串

dt_str(`col1`, 'yyyy-MM-dd HH:mm:ss')

字符串转DATETIME

dt_strptime(value,format)

value:字符串

format:字符串, yyyy-MM-dd HH:mm:ss 格式表示

DATETIME

dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss')

五、链路内ETL典型场景

5.1 数据脱敏

5.1.1 隐藏电话号码,身份证号等

e_set(phone, str_marsk(phone,3,6,'*'))

原始数据

18700001111

ETL处理之后的数据

187****1111

5.2 使用表过滤

使用条件控制语句和DTS附加字段 __DB__和__TB__组合,可以在指定表上执行ETL逻辑。如针对user_db.user_info表进行脱敏

e_if (
    __DB__ == 'user_db' AND 'user_info' == __TB__,
    e_set(phone, str_marsk(phone,3,6,'*'))
)

DTS附加列__DB__,__TB__和普通字符串一样,可以直接使用字符串函数。如下表名满足正则表达式user_[0-9]+,

进行数据脱敏

e_if(
    regex_match(__TB__, 'user_[0-9]+'), 
    e_set(phone, str_marsk(phone,3,6,'*'))
)

5.2 数据过滤

过滤符合条件的DML,如过滤user_name是张三的记录

e_if(user_name == '张三', e_drop())

过滤增量迁移中的DELETE操作

e_if(__OPERATION__ == __OP_DELETE__, e_drop())

5.3 非法数据处理

使用 e_if(condition, e_set) 语句,通过condition条件提出到非法记录,再通过e_set语句修改非法记录

5.4 多表汇聚场景主键冲突解决方案

多个单元表的数据同步汇聚到中心表时,各单元表存在相同主键,汇聚存在主键冲突丢数据的问题

解决方法:

新增字段region_id

create table user_info(
  id int not null auto_increment primary key,
  user_name varchar(32)
)
create table global_user_info(
  id int not null auto_increment,
  user_name varchar(32),
  region_id varchar(32),
  primary key(id,region_id)
)
e_set(`region_id`,'cn-hangzhou')

通过ETL重新分配各个单元的id到不冲突的值域

create table user_info_0001(
  id int not null auto_increment primary key,
  user_name varchar(32)
)
create table global_user_info(
  id int not null auto_increment,
  user_name varchar(32),
  primary key(id,region_id)
)
e_set(`id`,case_string_to_long(substring_after_last(`id`,'_')) * 10000000 + id)

5.5 将JSON展开为表的字段

参数名称

类型

默认值

介绍

例子

field_name

字符串

需要展开的字段名称,区分大小写

'user_info'

fmt='simple'

字符串

fmt='simple'

JSON展开名称规则,可选simple,full

fmt='full'

sep='_'

字符串

sep='_'

Json字段名称层级之间的链接符, 如{"userInfo":{"userName:"hy"}} 展开后userInfo_userName

sep='_'

mapping

字符串

需要展开的字段名称映射规则

mapping='{"username":"user_name"}'

如下将 user_info: {"user_info":{"username":"hy","password":"******"}} 展开到字段 user_name和user_password

fmt='simple': 使用当前JSON字段名称

e_expand_json('user_info',fmt='simple',mapping='{"username":"user_name","password":"user_password"}')

fmt='full': 使用全路径JSON字段名称

e_expand_json('user_info',fmt='full',seq=':',mapping='{"user_info:username":"user_name","user_info:password":"user_password"}')

5.6 数据路由

根据计算规则,将数据路由到不同的表中,使用e_set(__DB__, new_db_name)e_set(__TB__, new_table_name); new_db_name和new_table_name 可以是字符串、返回值是字符串结果的函数调用、以及表达式

e_set(`__TB__`,`hy_0`)
e_set(`__DB__`, `hy_0`)

5.7 物理删除改为逻辑删除

目的表新增 del_flag varchar(2)字段,使用'Y'标识删除记录,使用'N'标识为删除记录,并转换DELETE SQL为UPDATE SQL;ETL脚本如下

e_if_else(
    op_eq(__OPERATION__,__OP_DELETE__), e_set(`del_flag`,'Y'), 
    e_set(`def_flag`, 'N', __OPERATION__, __OP_UPDATE)
)

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
Sqoop 企业级大数据迁移方案实战
Sqoop是一个用于在Hadoop和关系数据库服务器之间传输数据的工具。它用于从关系数据库(如MySQL,Oracle)导入数据到Hadoop HDFS,并从Hadoop文件系统导出到关系数据库。 本课程主要讲解了Sqoop的设计思想及原理、部署安装及配置、详细具体的使用方法技巧与实操案例、企业级任务管理等。结合日常工作实践,培养解决实际问题的能力。本课程由黑马程序员提供。
相关文章
|
1月前
|
数据采集 DataWorks 数据挖掘
DataWorks可以支持数据迁移的功能
DataWorks可以支持数据迁移的功能
37 1
|
10月前
|
机器学习/深度学习 运维 关系型数据库
数据集成-整库迁移 | 学习笔记
快速学习数据集成-整库迁移
108 0
数据集成-整库迁移 | 学习笔记
|
分布式计算 DataWorks 关系型数据库
最佳实践:使用DTS进行数据同步练习题
最佳实践:使用DTS进行数据同步练习题 最佳实践:使用Dataworks集成 AnalyticDB PostgreSQL 最佳实践:Hadoop数据同步
172 0
|
SQL 数据采集 分布式计算
DTS 数据同步集成 MaxCompute 数仓最佳实践|学习笔记
快速学习 DTS 数据同步集成 MaxCompute 数仓最佳实践
407 0
|
SQL JSON 分布式计算
【MaxCompute 常见问题】 数据同步
当需要新增数据源时,首先要确认自己的数据源类型、网络类型、是否支持测试连通性。当新增数据源无法支持测试连通性时,可以尝试用独享资源组来解决数据集成问题。
【MaxCompute 常见问题】 数据同步
|
canal 数据可视化 关系型数据库
可视化数据同步迁移工具 CloudCanal
可视化数据同步迁移工具 CloudCanal
2268 0
可视化数据同步迁移工具 CloudCanal
|
SQL 分布式计算 关系型数据库
PolarDB-X 1.0-用户指南-数据导入导出-数据迁移或同步方案概览
本文汇总了PolarDB-X 1.0支持的数据迁移或同步的方案。
223 0
|
分布式计算 DataWorks 关系型数据库
DTS数据同步集成MaxCompute数仓
介绍通过DTS实现从RDS到MaxCompute的数据同步集成, 并介绍如何使用DTS和MaxCompute联合实现数据ETL幂等和快速数据回溯。
DTS数据同步集成MaxCompute数仓
|
SQL 监控 关系型数据库
相关产品
数据传输
推荐文章
更多