阿里云DTS作为数据世界高速传输通道的建造者,每周为您分享一个避坑技巧,助力数据之旅更加快捷、便利、安全。
一、DTS链路内ETL介绍
DTS是一个数据迁移和同步服务,通常用于数据搬迁或实时数据传输。DTS在数据同步链路基础上,提供流式数据ETL数据处理功能,支持使用DSL(Domain Specific Language)脚本语言灵活地定义数据处理逻辑。
二、DTS链路内ETL适用场景
2.1 数据脱敏
对敏感信息如电话号码,身份证号,地址等进行打码等方式脱敏, 如: 针对电话号码脱敏
原始数据: 18700001111
ETL处理之后的数据: 187****1111
2.2 数据归一化
根据规则将一组数据映射到另一组数据,如根据成绩将成绩 >=60分的归类为A, 将成绩小于60分的时归类为B
原始数据 |
59 |
99 |
ETL处理之后的数据 |
B |
A |
2.3 同步过程中附加列
业务A有以下用户表
CREATE TABLE USER_INFO ( USER_ID INT NOT NULL, USER_NAME VARCHAR(32), PRIMARY KEY(USER_ID) )
在使用DTS数据同步时,希望在不改变源库表结构的前提下同步到其他的分析数据库上,并记录数据更时间 UPDATE_TIME。
原始数据:
USER_ID |
USER_NAME |
31846731 |
张三 |
ETL处理之后的数据:
USER_ID |
USER_NAME |
UPDATE_TIME |
31846731 |
张三 |
2023-04-01 12:51:46 |
2.4 数据过滤
"ETL数据过滤"和"DTS SQL条件过滤任务数据" 功能对比
ETL数据过滤 |
SQL条件过滤 |
|
作用域 |
DTS实例级 |
表级别 |
语法 |
DSL 语法 |
SQL 语法 |
字段内容 |
数据库表字段内容,DTS附加的字段(如DML 类型,日志提交时间等) |
数据库表字段内容 |
条件表达式 |
DTS附加属性和数据表达式组成过滤条件( 如过滤值ID=10的DELETE日志同步) |
SQL表达式 |
推荐使用方法:
- DTS SQL条件过滤任务数据 适用于依据表内容进行数据过滤;
- ETL数据过滤适用于实例级配置,以及结合增量日志属性进行数据过滤
2.5 数据类型转换
根据规则将数据从一种类型映射到另一种类型,如将VARCHAR类型映射到INT类型
原始数据 |
123456 |
99.99 |
ETL处理之后的数据 |
123456 |
100 |
2.6 非法值处理
再异构数据库同步过程中, 存在一系列不兼容数据类型,例如Oracle同步到MySQL的链路,Oracle TIMESTAMP('4712BC-01-01 00:00:00' to '9999-12-31 23:59:59')与MySQL TIMESTAMP(0000-00-00 00:00:00, 1970-01-01 00:00:00 to 2038-01-19 03:14:07) 数据类型范围不兼容,需要进行数据转换
原始数据 |
0001-01-01 00:00:01 |
ETL处理之后的数据 |
1970-01-01 00:00:01 |
三、适用DTS控制台配置链路内ETL的方法
3.1 登陆DTS新版控制台并配置任务
参考公有云DTS文档:https://help.aliyun.com/document_detail/211600.html 配置任务。DTS任务配置有 配置源库及目标库信息、配置任务对象及高级配置、高级配置、预检查、购买 五个步骤,配置ETL功能在 高级配置 步骤,参考3.2
3.2 配置ETL功能
参考公有云DTS文档: https://help.aliyun.com/document_detail/411259.htm
在配置任务对象及高级配置步骤的高级配置中,配置ETL功能选择是,在输入框中按照数据处理DSL语法填写数据处理语句
单击下一步保存任务并预检查,完成后续步骤。
四、链路内ETL DSL语言介绍
4.1 链路内ETL DSL概览
4.2 DTS Record 概念介绍如下:
DTS Record是DTS数据同步进程使用的中间数据存储,一个DTS Record代表一行记录;关系数据库中代表一行记录,MongoDB代表一个Document。所有DTS ETL处理都是基于DTS Record进行的。关系型数据库链路中 DTS Record包含表结构信息,在编写ETL DSL脚本时需要考虑到DTS Record结构的稳定,也就是针对同一个表经过不同DSL分支处理产生的表结构相同
4.2.1 INSERT DTS Record
全量数据同步阶段的都是INSERT DTS Record,增量数据同步阶段,源库日志中的INSERT日志 对应INSERT DTS Record。INSERT DTS Record内容如下,不包含前镜像值(插入之前的值),只包含后镜像值(插入的值)。只能获取后镜像值,只能设置后镜像值。
操作类型 |
值类型 |
业务字段: USER_ID |
业务字段: USER_NAME |
DTS附加字段 |
INSERT |
前镜像(__BEFORE__) |
|||
后镜像(__AFTER__) |
1000 |
李四 |
4.2.2 UPDATE DTS Record
增量数据同步阶段,源库日志中的UPDATE日志对应UPDATE DTS Record,UPDATE DTS Record不会出现在全量同步阶段。UPDATE DTS Record内容如下,包含前镜像值(修改之前的值)和后镜像值(修改之后的值)。可以获取前镜像值和后镜像值,可以设置前镜像和后镜像值。
操作类型 |
值类型 |
业务字段: USER_ID |
业务字段: USER_NAME |
DTS附加字段 |
UPDATE |
前镜像(__BEFORE__) |
1000 |
张三 |
|
后镜像(__AFTER__) |
1000 |
李四 |
4.2.3 DELETE DTS Record
增量数据同步阶段,源库日志中的DELETE日志对应DELETE DTS Record,DELETE DTS Record不会出现在全量同步阶段。UPDATE DTS Record内容如下,只包含前镜像值(删除之前的值),无后镜像值。只能获取前镜像值,只能设置前镜像值。
操作类型 |
值类型 |
业务字段: USER_ID |
业务字段: USER_NAME |
DTS附加字段 |
DELETE |
前镜像(__BEFORE__) |
1000 |
李四 |
|
后镜像(__AFTER__) |
4.3 ETL DSL语法介绍
DTS链路内ETL语言是由基础的操作(operator)和值表达式(column_exp,value_exp)组成。operator有e_set、e_if、e_if_else、e_switch、e_compose、e_split、record_function组成,值表达式由列引用、基础数据类型、基础函数组成,组合参考如下语言描述
operator: e_set(column_exp,value_exp,[,NEW] [,column_exp,value_exp,[NEW]]*) | e_if(condition_exp, operator) | e_if_else(condition_exp,operator,operator) | e_switch(condition_exp,operator[,condition_exp,operator]* [default=operate]?) | e_compose(operator [, operator]* ) | e_split(column_exp) | e_drop() | record_function()
一个完整的ETL配置,对应一个完整的operator, 可以是e_set、e_if、e_if_else、e_switch、e_compose、e_split、record_function其中的一种,也可以是符合语法的嵌套组合,当前不限制嵌套层次
4.3.1 基础数据类型
数据类型 |
数据范围 |
例子 |
8字节有符号数 |
e_set(id,1000) |
|
DECIMAL类型 |
e_set(dollar, 6.88) |
|
字符串 |
e_set(user_name, '张三') |
|
DATETIME类型 |
0000-00-01 00:00:00 ~9999-12-30 23:59:59 |
e_set(gmt_modify, DATETIME("2018-01-01 11:11:11")) |
BOOLEAN类型 |
TRUE FALSE |
e_set(del_flag, true)) e_set(del_flag, false)) |
NULL值 |
NULL |
e_set(del_flag, null)) |
4.3.2 值表达式
数据类型 |
需求 |
ETL脚本 |
列引用 |
使用id列填充sequence列 |
e_set(sequence, id) |
表达式 |
使用当前时间填充CURRENT列 使用dollar * 6.88填充rmb列 |
e_set(current, dts_now()) e_set(rmb, dollar*6.88) |
4.3.3 DTS附加字段
附加列名称 |
值 |
__DB__ |
源库名称 |
__TB__ |
源表名称 |
__OPERATION__ |
DML类型(__OP_INSERT__, __OP_UPDATE__, __OP_DELETE__)中的一个,全量迁移只有__OP_INSERT__ |
__COMMIT_TIMESTAMP__ |
源库日志提交时间,全量迁移为时间戳0(1970-01-01 08:00:00) |
__BEFORE__ |
DTS Record 的前镜像 |
__AFTER__ |
DTS Record 的后镜像 |
4.3.4 设置字段的值
4.3.5.1
设置字段的值使用 e_set(column_exp,value_exp [,column_exp,value_exp]*), 可以设置一个字段的值
e_set(id, 1000)
也可以设置多个字段的值, [,column_exp,value_exp]* 可以重复一次或者多次
e_set(id, 1000, user_name, '张三')
4.3.4.2 类型转换
字段类型转换,使用 e_set(column_exp,value_exp, NEW, [,column_exp,value_exp, NEW]*), NEW关键子表示使用新类型, 如把字符串字段 age: '1000' 转换为Int类型
e_set(age, cast_string_toint('1000'))
4.3.5 条件控制语句
4.3.5.1 e_if(condition_exp, operator)
当表达式condition_exp的值为true是,执行operator。如:忽略user_name是张三的记录
e_if(user_name =='张三', e_drop())
4.3.5.2 e_if_else(condition_exp, operator_true, operator_false)
当表达式condition_exp的值为true是,执行operator_true, 否则执行operator_false。如:当user_name是张三时,将dollar字段改成1000000,否则将dollar改成99
e_if(user_name =='张三', e_set(dollar, 1000000), e_set(dollar,99))
4.3.5.3 e_switch(condition_exp,operator[,condition_exp,operator]* [default=operate]?)
当condition_exp为true时执行紧跟该condition_exp的operator, 只会执行第一个condition_exp为true的分支。如:user_name是张三的记录的dollar字段改成1000000, user_name是李四的记录的dollar字段改成8888,user_name是其他值的记录的dollar改成0
e_switch( user_name=='张三', e_set(dollar, 1000000), user_name=='李四', e_set(dollar,99), default=e_set(dollar,0) )
4.3.5.4 e_compose(operator [, operator]* )
e_compose是一个辅助operator,用来将多个operator包装在一起。e_compose包装起来的每一个operator都会执行如
e_compose( e_set(id, id*1000), e_switch( user_name=='张三', e_set(dollar, 1000000), user_name=='李四', e_set(dollar,99), default=e_set(dollar,0) ) )
4.3.6 基础函数
4.3.6.1 数值运算
功能 |
语法 |
取值范围 |
返回值 |
示例 |
加法 |
op_sum(value1, value2) |
value1:整数或浮点数 value2:整数或浮点数 |
若参数均为整数,则返回整数,否则返回浮点数。 |
op_sum(`col1`, 1.0) |
op_add(value1, value2,...) |
value1:整数或浮点数 value2:整数或浮点数 value...:整数或浮点数 |
若参数均为整数,则返回整数,否则返回浮点数。 |
op_add(`col1`, col2,...) |
|
减法 |
op_sub(value1,value2) |
value1:整数或浮点数 value2:整数或浮点数 |
若参数均为整数,则返回整数,否则返回浮点数。 |
op_sub(`col1`, 1.0) |
乘法 |
op_mul(value1,value2) |
value1:整数或浮点数 value2:整数或浮点数 |
若参数均为整数,则返回整数,否则返回浮点数。 |
op_mul(`col1`, 1.0) |
除法 |
op_div_true(value1, value2) |
value1:整数或浮点数 value2:整数或浮点数 |
若参数均为整数,则返回整数,否则返回浮点数。 |
op_div_true(`col1`, 2.0), 若col1=15,则返回7.5。 |
取模 |
op_mod(value1, value2) |
value1:整数或浮点数 value2:整数或浮点数 |
若参数均为整数,则返回整数,否则返回浮点数。 |
op_mod(`col1`, 10),若col1=23,则返回3 |
4.3.6.2 逻辑运算
功能 |
语法 |
取值范围 |
返回值 |
示例 |
是否相等 |
op_eq(value1, value2) |
value1:整数、浮点数、字符串 value2:整数、浮点数、字符串 |
boolean类型,true或false |
op_eq(`col1`, 23) |
是否大于 |
op_gt(value1, value2) |
value1:整数、浮点数、字符串 value2:整数、浮点数、字符串 |
boolean类型,true或false |
op_gt(`col1`, 1.0) |
是否小于 |
op_lt(value1, value2) |
value1:整数、浮点数、字符串 value2:整数、浮点数、字符串 |
boolean类型,true或false |
op_lt(`col1`, 1.0) |
是否大于等于 |
op_ge(value1, value2) |
value1:整数、浮点数、字符串 value2:整数、浮点数、字符串 |
boolean类型,true或false |
op_ge(`col1`, 1.0) |
是否小于等于 |
op_le(value1, value2) |
value1:整数、浮点数、字符串 value2:整数、浮点数、字符串 |
boolean类型,true或false |
op_le(`col1`, 1.0) |
AND运算 |
op_and(value1, value2[,*]) |
value1:boolean类型 value2:boolean类型 [,*]: 可变字段,任意多个boolean类型值 |
boolean类型,true或false |
op_and(`is_male`, `is_student`) op_and(`is_male`, `is_student`,`is_x`) |
OR运算 |
op_or(value1, value2[,*]) |
value1:boolean类型 value2:boolean类型 [,*]: 可变字段,任意多个boolean类型值 |
boolean类型,true或false |
op_or(`is_male`, `is_student`) op_or(`is_male`, `is_student`,`is_x`) |
值是NULL |
op_is_null(value) |
value: 任意类型 |
boolean类型,true或false |
op_is_null(`name`) |
值不是NULL |
op_is_not_null(value) |
value: 任意类型 |
boolean类型,true或false |
op_is_not_null(`name`) |
是否存在 |
op_in(json_array) |
value: json_array('JSON格式字符串') |
boolean类型,true或false |
op_in(id, json_array('["0","1","2","3","4","5","6","7","8"]')) |
4.3.6.3 字符串函数
功能 |
语法 |
取值范围 |
返回值 |
示例 |
字符串拼接 |
op_add(value1,value2,...,value_n) |
value1: 字符串 value2: 字符串 value_n: 字符串 |
字符串 |
op_add(`col`,'hangzhou','dts') |
将字符串中所有大写字符转换为小写字符 |
str_lower(value) |
value:字符串 |
字符串 |
str_lower(`col1`) |
将字符串中所有小写字符转换为大写字符 |
str_upper(value) |
value:字符串 |
字符串 |
str_upper(`col1`) |
删除字符串中指定的字符 |
str_strip(value1, value2) |
value1: 字符串 value2: 字符串 |
字符串 |
str_strip(`col1`, 'abc') |
截取字符串cond之后的部分 |
substring_after(value, cond) |
value: 字符串 cond: 字符串 |
字符串 |
substring_after(`col`, 'abc') |
截取字符串cond之前的部分 |
substring_before(value, cond) |
value1: 字符串 cond: 字符串 |
字符串 |
substring_before(`col`, 'efg') |
截取字符串cond1和cond2之间的部分 |
substring_between(value, cond1, cond2) |
value: 字符串 cond1: 字符串 cond2: 字符串 |
字符串 |
substring_between(`col`, 'abc','efg') |
字符串转换数字 |
cast_string_to_long(value) |
value:字符串 |
整数 |
cast_string_to_long(`col`) |
数字转换字符串 |
cast_long_to_string(value) |
value:整数 |
字符串 |
cast_long_to_string(`col`) |
是否是字符串类型 |
is_string_value(value) |
value:字符串或者列名 |
boolean类型,true或false |
is_string_value(`col1`) |
4.3.6.4 时间函数
功能 |
语法 |
取值范围 |
返回值 |
示例 |
当前系统时间 |
dt_now() |
无 |
DATETIME,精确到秒 |
dts_now() |
dt_now_millis() |
无 |
DATETIME,精确到毫秒 |
dt_now_millis() |
|
UTC时间戳转DATETIME |
dt_fromtimestamp(value,[timezone]) |
value:整数 timezone:可选参数 |
DATETIME,精确到秒 |
dt_fromtimestamp(1626837629) dt_fromtimestamp(1626837629,'GMT+08') |
UTC毫秒时间戳转DATETIME |
dt_fromtimestamp_millis(value,[timezone]) |
value:整数 timezone:可选参数 |
DATETIME,精确到毫秒 |
dt_fromtimestamp_millis(1626837629123); dt_fromtimestamp_millis(1626837629123,'GMT+08') |
DATETIME转UTC时间戳 |
dt_parsetimestamp(value,[timezone]) |
value: DATETIME timezone:可选参数 |
整数 |
dt_parsetimestamp(`datetime_col`) dt_parsetimestamp(`datetime_col`,'GMT+08') |
DATETIME转UTC毫秒时间戳 |
dt_parsetimestamp_millis(value,[timezone]) |
value: DATETIME timezone:可选参数 |
整数 |
dt_parsetimestamp_millis(`datetime_col`) dt_parsetimestamp_millis(`datetime_col`,'GMT+08') |
DATETIME转字符串 |
dt_str(value, format) |
value:DATETIME format:字符串, yyyy-MM-dd HH:mm:ss 格式表示 |
字符串 |
dt_str(`col1`, 'yyyy-MM-dd HH:mm:ss') |
字符串转DATETIME |
dt_strptime(value,format) |
value:字符串 format:字符串, yyyy-MM-dd HH:mm:ss 格式表示 |
DATETIME |
dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss') |
修改时间: 年-月-日 时:分:秒针对其中一个或几个部分增加或减少一个值 |
dt_add(value, [years=intVal], [months=intVal], [days=intVal], [hours=intVal], [minutes=intVal] ) |
value: DATETIME intVal: 整数 |
DATETIME |
dt_add(datatime_col, years=1) dt_add(datatime_col, years=1,months=1) |
4.3.6.4 字段函数
功能 |
语法 |
取值范围 |
返回值 |
示例 |
字符串类型字段内容替换; 从尾部开始 |
tail_replace_string_field(search, replace, all) |
search: 将被替换的字符串 replace: 用与替换的字符串 all: 是否替换所有,true或者false |
字符串 |
将所有字符串字段类型值的 "\u000f"替换成空格 tail_replace_string_field('\u000f','',true) |
五、链路内ETL典型场景
5.1 数据脱敏
5.1.1 隐藏电话号码,身份证号等
e_set(phone, str_marsk(phone,3,6,'*'))
原始数据 |
18700001111 |
ETL处理之后的数据 |
187****1111 |
5.2 使用表过滤
使用条件控制语句和DTS附加字段 __DB__和__TB__组合,可以在指定表上执行ETL逻辑。如针对user_db.user_info表进行脱敏
e_if ( __DB__ == 'user_db' AND 'user_info' == __TB__, e_set(phone, str_marsk(phone,3,6,'*')) )
DTS附加列__DB__,__TB__和普通字符串一样,可以直接使用字符串函数。如下表名满足正则表达式user_[0-9]+,
进行数据脱敏
e_if( regex_match(__TB__, 'user_[0-9]+'), e_set(phone, str_marsk(phone,3,6,'*')) )
5.2 数据过滤
过滤符合条件的DML,如过滤user_name是张三的记录
e_if(user_name == '张三', e_drop())
过滤增量迁移中的DELETE操作
e_if(__OPERATION__ == __OP_DELETE__, e_drop())
过滤表user_info 区间id>100 and id<1000的数据, (也可以使用 e_keep操作符,与e_drop相反,保留e_keep条件为真的记录)
e_if(__TB__ == 'user_info' AND id>100 and id<1000, e_drop()) #或者 e_keep(op_not(op_eq(__TB__,'user_info') AND op_gt(id,100) AND op_lt(id,1000)))
过滤表user_info id in (1,2,3,4)的记录(可以使用 op_in函数)
e_keep(__TB__ == 'user_info' AND op_in(id, json_array('[1,2,3,4]'))) #或者 e_if(__TB__ == 'user_info' AND op_not(op_in(id, json_array('[1,2,3,4]'))), e_drop())
过滤表名是字符串"user_"开头的 id<1000的记录
e_if(regex_match(__db__,'^user_.*$') AND op_lt(id,1000), e_drop())
5.3 非法数据处理
使用 e_if(condition, e_set) 语句,通过condition条件提出到非法记录,再通过e_set语句修改非法记录
将表user_info里字段name的非法值NULL改为空格
e_if(__TB__ == 'user_info' AND op_is_null(`name`), e_set(`name`,''))
替换所有字符串中的 "\u000f"为空格
tail_replace_string_field('\u000f','',true)
5.4 多表汇聚场景主键冲突解决方案
多个单元表的数据同步汇聚到中心表时,各单元表存在相同主键,汇聚存在主键冲突丢数据的问题
解决方法:
新增字段region_id
create table user_info( id int not null auto_increment primary key, user_name varchar(32) )
create table global_user_info( id int not null auto_increment, user_name varchar(32), region_id varchar(32), primary key(id,region_id) )
e_set(`region_id`,'cn-hangzhou')
通过ETL重新分配各个单元的id到不冲突的值域
create table user_info_0001( id int not null auto_increment primary key, user_name varchar(32) )
create table global_user_info( id int not null auto_increment, user_name varchar(32), primary key(id,region_id) )
e_set(`id`,case_string_to_long(substring_after_last(`id`,'_')) * 10000000 + id)
5.5 将JSON展开为表的字段
参数名称 |
类型 |
默认值 |
介绍 |
例子 |
field_name |
字符串 |
无 |
需要展开的字段名称,区分大小写 |
'user_info' |
fmt='simple' |
字符串 |
fmt='simple' |
JSON展开名称规则,可选simple,full |
fmt='full' |
sep='_' |
字符串 |
sep='_' |
Json字段名称层级之间的链接符, 如{"userInfo":{"userName:"hy"}} 展开后userInfo_userName |
sep='_' |
mapping |
字符串 |
无 |
需要展开的字段名称映射规则 |
mapping='{"username":"user_name"}' |
如下将 user_info: {"user_info":{"username":"hy","password":"******"}} 展开到字段 user_name和user_password
fmt='simple': 使用当前JSON字段名称
e_expand_json('user_info',fmt='simple',mapping='{"username":"user_name","password":"user_password"}')
fmt='full': 使用全路径JSON字段名称
e_expand_json('user_info',fmt='full',seq=':',mapping='{"user_info:username":"user_name","user_info:password":"user_password"}')
5.6 数据路由
根据计算规则,将数据路由到不同的表中,使用e_set(__DB__, new_db_name), e_set(__TB__, new_table_name); new_db_name和new_table_name 可以是字符串、返回值是字符串结果的函数调用、以及表达式
e_set(`__TB__`,`hy_0`) e_set(`__DB__`, `hy_0`)
5.7 物理删除改为逻辑删除
目的表新增 del_flag varchar(2)字段,使用'Y'标识删除记录,使用'N'标识为删除记录,并转换DELETE SQL为UPDATE SQL;ETL脚本如下
e_if_else( op_eq(__OPERATION__,__OP_DELETE__), e_set(`del_flag`,'Y', __OPERATION__, __OP_UPDATE), e_set(`def_flag`, 'N') )
5.8 时间函数应用
5.8.1 修改时间
针对库user_db下的表user_info的字段login_time, 加8小时
e_if( op_and(op_eq('user_db',__DB__),op_eq('user_info',__TB__)), e_set(login_time,dt_add(login_time,hours=8)) )
针对库user_db下的表user_info的字段login_time, 减少8小时
e_if( op_and(op_eq('user_db',__DB__),op_eq('user_info',__TB__)), e_set(login_time,dt_add(login_time,hours=-8)) )
5.8 TP到Redis
5.8.1 TP到Redis概览
TP到Redis的基本ETL数据处理过程如图, 分为两个步骤
第一步: 使用ETL 算子e_split(__OPERATION__), 将UPDATE日志拆分成DELETE和INSERT日志(只有增量阶段有效)
第二步: 使用ETL算子构建DTS Record(Redis),其格式如下
最重要的是DTS Record(Redis)有两个特殊的字段__DTS_TP_TO_REDIS_KEY__和__DTS_TP_TO_REDIS_VALUE__, 意义和名称对应;
__DTS_TP_TO_REDIS_KEY__: 是写入目的Redis对应的key值,类型为String
__DTS_TP_TO_REDIS_VALUE__: 是写入目的Redis对应的Value值,类型可以为String或者Hash;
综上所述,ETL_2 这个阶段主要就是通过各种ETL算子设置字段__DTS_TP_TO_REDIS_KEY__和__DTS_TP_TO_REDIS_VALUE_的值
5.8.2 TP到Redis常见配置
以下案例参考源库为MySQL表: dts_db.dts_user,其建表SQL如下
create table dts_db.dts_user( user_id int primary key, user_name varchar(32), user_address varchar(256) )
表中的数据如下
user_id |
user_name |
user_address |
1 |
hy |
Shan'Xi |
2 |
yh |
ZheJiang |
5.8.2.1 库-表-主键的KV模型
5.8.2.1.1 采用Json编码Value的值
ETL脚本如下
e_set(`_DTS_TP_TO_REDIS_KEY__`, __DB__+'.'+__TB__+'.'+pk_str_with_name_value('=','.'),NEW, `__DTS_TP_TO_REDIS_VALUE__`, tp2redis_json_value(), NEW)
生成结果
KEY |
Value类型 |
Value的值 |
dts_db.dts_user.user_id=1 |
String |
{"user_id":"1", "user_name":"hy","user_address":"Shan'Xi"} |
dts_db.dts_user.user_id=2 |
String |
{"user_id":"2", "user_name":"yh","user_address":"ZheJiang"} |
5.8.2.1.2 采用String编码Value的值,包含列名
ETL脚本如下
e_set(`_DTS_TP_TO_REDIS_KEY__`, __DB__+'.'+__TB__+'.'+pk_str_with_name_value('=','.'),NEW, `__DTS_TP_TO_REDIS_VALUE__`, tp2redis_kv_commas_str_value(), NEW)
生成结果
KEY |
Value类型 |
Value的值 |
dts_db.dts_user.user_id=1 |
String |
user_id:1,user_name:hy,user_address:Shan'Xi |
dts_db.dts_user.user_id=2 |
String |
user_id:1,user_name:yh,user_address:ZheJiang |
5.8.2.1.3 采用String编码Value的值,不包含列名
ETL脚本如下
e_set(`_DTS_TP_TO_REDIS_KEY__`, __DB__+'.'+__TB__+'.'+pk_str_with_name_value('=','.'),NEW, `__DTS_TP_TO_REDIS_VALUE__`, tp2redis_value_commas_str_value(), NEW)
生成结果
KEY |
Value类型 |
Value的值 |
dts_db.dts_user.user_id=1 |
String |
1,hy,Shan'Xi |
dts_db.dts_user.user_id=2 |
String |
1,yh,ZheJiang |
5.8.2.2 库-表-主键的Hash模型
ETL脚本如下
e_set(`_DTS_TP_TO_REDIS_KEY__`, __DB__+'.'+__TB__+'.'+pk_str_with_name_value('=','.'),NEW, `__DTS_TP_TO_REDIS_VALUE__`, tp2redis_kv_hash_value(), NEW)
生成结果
KEY |
Value类型 |
Value的值 |
dts_db.dts_user.user_id=1 |
Hash |
{user_id:1, user_name:hy,user_address:Shan'Xi} |
dts_db.dts_user.user_id=2 |
Hash |
{user_id:2, user_name:yh,user_address:ZheJiang} |
5.8.2.3 库-表的Hash模型
基于库-表的HASH模型:
ETL脚本如下
e_set(`_DTS_TP_TO_REDIS_KEY__`, __DB__+'.'+__TB__,NEW, `__DTS_TP_TO_REDIS_VALUE__`, tp2redis_kv_hash_value(`user_id`,`user_address`), NEW)
生成结果
KEY |
Value类型 |
Value的值 |
dts_db.dts_user |
Hash |
{1: Shan'Xi, 2: ZheJiang} |
快来关注
- 数据传输服务(Data Transmission Service,简称DTS)支持关系型数据库、NoSQL、大数据(OLAP)等数据源,集数据迁移、订阅、实时同步、校验功能于一体,能够解决公共云、混合云场景下,远距离、秒级异步数据传输难题。其底层基础设施采用阿里双11异地多活架构,为数千下游应用提供实时数据流,已在线上稳定运行7年之久,是一款沉淀了丰富实践经验的可靠产品。点击了解更多DTS相关信息
- 欢迎加入钉群讨论交流: