阿里云DTS踩坑经验分享系列|链路内ETL使用手册

本文涉及的产品
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
数据管理 DMS,安全协同 3个实例 3个月
推荐场景:
学生管理系统数据库
简介: DTS是一个数据迁移和同步服务,通常用于数据搬迁或实时数据传输。DTS在数据同步链路基础上,提供流式数据ETL数据处理功能,支持使用DSL(Domain Specific Language)脚本语言灵活地定义数据处理逻辑。

阿里云DTS作为数据世界高速传输通道的建造者,每周为您分享一个避坑技巧,助力数据之旅更加快捷、便利、安全。


一、DTS链路内ETL介绍

DTS是一个数据迁移和同步服务,通常用于数据搬迁或实时数据传输。DTS在数据同步链路基础上,提供流式数据ETL数据处理功能,支持使用DSL(Domain Specific Language)脚本语言灵活地定义数据处理逻辑。

二、DTS链路内ETL适用场景

2.1 数据脱敏

对敏感信息如电话号码,身份证号,地址等进行打码等方式脱敏, 如: 针对电话号码脱敏

原始数据: 18700001111

ETL处理之后的数据: 187****1111

2.2 数据归一化

根据规则将一组数据映射到另一组数据,如根据成绩将成绩 >=60分的归类为A, 将成绩小于60分的时归类为B

原始数据

59

99

ETL处理之后的数据

B

A


2.3 同步过程中附加列

业务A有以下用户表

CREATE TABLE USER_INFO (
    USER_ID INT NOT NULL,
    USER_NAME VARCHAR(32),
    PRIMARY KEY(USER_ID)
)

在使用DTS数据同步时,希望在不改变源库表结构的前提下同步到其他的分析数据库上,并记录数据更时间 UPDATE_TIME。


原始数据:

USER_ID

USER_NAME

31846731

张三


ETL处理之后的数据:

USER_ID

USER_NAME

UPDATE_TIME

31846731

张三

2023-04-01 12:51:46

2.4 数据过滤

"ETL数据过滤"和"DTS SQL条件过滤任务数据" 功能对比

ETL数据过滤

SQL条件过滤

作用域

DTS实例级

表级别

语法

DSL 语法

SQL 语法

字段内容

数据库表字段内容,DTS附加的字段(如DML 类型,日志提交时间等)

数据库表字段内容

条件表达式

DTS附加属性和数据表达式组成过滤条件( 如过滤值ID=10的DELETE日志同步)

SQL表达式

推荐使用方法:

  • DTS SQL条件过滤任务数据 适用于依据表内容进行数据过滤;
  • ETL数据过滤适用于实例级配置,以及结合增量日志属性进行数据过滤

2.5 数据类型转换

根据规则将数据从一种类型映射到另一种类型,如将VARCHAR类型映射到INT类型

原始数据

123456

99.99

ETL处理之后的数据

123456

100

2.6 非法值处理

再异构数据库同步过程中, 存在一系列不兼容数据类型,例如Oracle同步到MySQL的链路,Oracle TIMESTAMP('4712BC-01-01 00:00:00' to '9999-12-31 23:59:59')与MySQL TIMESTAMP(0000-00-00 00:00:00, 1970-01-01 00:00:00 to 2038-01-19 03:14:07) 数据类型范围不兼容,需要进行数据转换

原始数据

0001-01-01 00:00:01

ETL处理之后的数据

1970-01-01 00:00:01


三、适用DTS控制台配置链路内ETL的方法

3.1 登陆DTS新版控制台并配置任务

参考公有云DTS文档:https://help.aliyun.com/document_detail/211600.html 配置任务。DTS任务配置有 配置源库及目标库信息、配置任务对象及高级配置、高级配置预检查购买 五个步骤,配置ETL功能在 高级配置 步骤,参考3.2

3.2 配置ETL功能

参考公有云DTS文档:   https://help.aliyun.com/document_detail/411259.htm 

配置任务对象及高级配置步骤的高级配置中,配置ETL功能选择,在输入框中按照数据处理DSL语法填写数据处理语句

单击下一步保存任务并预检查,完成后续步骤。


四、链路内ETL DSL语言介绍

4.1  链路内ETL DSL概览

image.png

4.2  DTS Record 概念介绍如下:

DTS Record是DTS数据同步进程使用的中间数据存储,一个DTS Record代表一行记录;关系数据库中代表一行记录,MongoDB代表一个Document。所有DTS ETL处理都是基于DTS Record进行的。关系型数据库链路中 DTS Record包含表结构信息,在编写ETL DSL脚本时需要考虑到DTS Record结构的稳定,也就是针对同一个表经过不同DSL分支处理产生的表结构相同


4.2.1 INSERT DTS Record

全量数据同步阶段的都是INSERT DTS Record,增量数据同步阶段,源库日志中的INSERT日志 对应INSERT DTS Record。INSERT DTS Record内容如下,不包含前镜像值(插入之前的值),只包含后镜像值(插入的值)。只能获取后镜像值,只能设置后镜像值。

操作类型

值类型

业务字段: USER_ID

业务字段: USER_NAME

DTS附加字段

INSERT

前镜像(__BEFORE__

后镜像(__AFTER__

1000

李四

4.2.2  UPDATE DTS Record

增量数据同步阶段,源库日志中的UPDATE日志对应UPDATE DTS Record,UPDATE DTS Record不会出现在全量同步阶段。UPDATE DTS Record内容如下,包含前镜像值(修改之前的值)和后镜像值(修改之后的值)。可以获取前镜像值和后镜像值,可以设置前镜像和后镜像值。

操作类型

值类型

业务字段: USER_ID

业务字段: USER_NAME

DTS附加字段

UPDATE

前镜像(__BEFORE__

1000

张三

后镜像(__AFTER__

1000

李四

4.2.3  DELETE DTS Record

增量数据同步阶段,源库日志中的DELETE日志对应DELETE DTS Record,DELETE DTS Record不会出现在全量同步阶段。UPDATE DTS Record内容如下,只包含前镜像值(删除之前的值),无后镜像值。只能获取前镜像值,只能设置前镜像值。

操作类型

值类型

业务字段: USER_ID

业务字段: USER_NAME

DTS附加字段

DELETE

前镜像(__BEFORE__

1000

李四

后镜像(__AFTER__


4.3 ETL DSL语法介绍


DTS链路内ETL语言是由基础的操作(operator)和值表达式(column_exp,value_exp)组成。operator有e_set、e_if、e_if_else、e_switch、e_compose、e_split、record_function组成,值表达式由列引用、基础数据类型、基础函数组成,组合参考如下语言描述

operator:
e_set(column_exp,value_exp,[,NEW] [,column_exp,value_exp,[NEW]]*)
| e_if(condition_exp, operator)
| e_if_else(condition_exp,operator,operator)
| e_switch(condition_exp,operator[,condition_exp,operator]* [default=operate]?)
| e_compose(operator [, operator]* )
| e_split(column_exp)
| e_drop()
| record_function()

一个完整的ETL配置,对应一个完整的operator, 可以是e_set、e_if、e_if_else、e_switch、e_compose、e_split、record_function其中的一种,也可以是符合语法的嵌套组合,当前不限制嵌套层次


4.3.1 基础数据类型

数据类型

数据范围

例子

8字节有符号数

e_set(id,1000)

DECIMAL类型

e_set(dollar, 6.88)

字符串

e_set(user_name, '张三')

DATETIME类型

0000-00-01 00:00:00

~9999-12-30 23:59:59

e_set(gmt_modify, DATETIME("2018-01-01 11:11:11"))

BOOLEAN类型

TRUE

FALSE

e_set(del_flag, true))

e_set(del_flag, false))

NULL值

NULL

e_set(del_flag, null))

4.3.2 值表达式

数据类型

需求

ETL脚本

列引用

使用id列填充sequence列

e_set(sequence, id)

表达式

使用当前时间填充CURRENT列

使用dollar * 6.88填充rmb列

e_set(current, dts_now())

e_set(rmb, dollar*6.88)

4.3.3 DTS附加字段

附加列名称

__DB__

源库名称

__TB__

源表名称

__OPERATION__

DML类型(__OP_INSERT__, __OP_UPDATE__, __OP_DELETE__)中的一个,全量迁移只有__OP_INSERT__

__COMMIT_TIMESTAMP__

源库日志提交时间,全量迁移为时间戳0(1970-01-01 08:00:00)

__BEFORE__

DTS Record 的前镜像

__AFTER__

DTS Record 的后镜像

4.3.4 设置字段的值

4.3.5.1  

设置字段的值使用 e_set(column_exp,value_exp [,column_exp,value_exp]*), 可以设置一个字段的值

e_set(id, 1000)

也可以设置多个字段的值, [,column_exp,value_exp]* 可以重复一次或者多次

e_set(id, 1000, user_name, '张三')

4.3.4.2 类型转换

字段类型转换,使用 e_set(column_exp,value_exp, NEW, [,column_exp,value_exp, NEW]*), NEW关键子表示使用新类型, 如把字符串字段  age: '1000' 转换为Int类型

e_set(age, cast_string_toint('1000'))


4.3.5 条件控制语句

4.3.5.1 e_if(condition_exp, operator)

当表达式condition_exp的值为true是,执行operator。如:忽略user_name是张三的记录

e_if(user_name =='张三', e_drop())

4.3.5.2 e_if_else(condition_exp, operator_true,  operator_false)

当表达式condition_exp的值为true是,执行operator_true, 否则执行operator_false。如:当user_name是张三时,将dollar字段改成1000000,否则将dollar改成99

e_if(user_name =='张三', e_set(dollar, 1000000), e_set(dollar,99))

4.3.5.3 e_switch(condition_exp,operator[,condition_exp,operator]* [default=operate]?)

当condition_exp为true时执行紧跟该condition_exp的operator, 只会执行第一个condition_exp为true的分支。如:user_name是张三的记录的dollar字段改成1000000, user_name是李四的记录的dollar字段改成8888,user_name是其他值的记录的dollar改成0

e_switch(
  user_name=='张三', e_set(dollar, 1000000), 
  user_name=='李四', e_set(dollar,99), 
  default=e_set(dollar,0)
)

4.3.5.4 e_compose(operator [, operator]* )

e_compose是一个辅助operator,用来将多个operator包装在一起。e_compose包装起来的每一个operator都会执行如

e_compose(
    e_set(id, id*1000),
    e_switch(
      user_name=='张三', e_set(dollar, 1000000), 
      user_name=='李四', e_set(dollar,99), 
      default=e_set(dollar,0)
    )
)


4.3.6 基础函数

4.3.6.1 数值运算

功能

语法

取值范围

返回值

示例

加法

op_sum(value1, value2)

value1:整数或浮点数

value2:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_sum(`col1`, 1.0)

op_add(value1, value2,...)

value1:整数或浮点数

value2:整数或浮点数

value...:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_add(`col1`, col2,...)

减法

op_sub(value1,value2)

value1:整数或浮点数

value2:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_sub(`col1`, 1.0)

乘法

op_mul(value1,value2)

value1:整数或浮点数

value2:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_mul(`col1`, 1.0)

除法

op_div_true(value1, value2)

value1:整数或浮点数

value2:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_div_true(`col1`, 2.0), 若col1=15,则返回7.5。

取模

op_mod(value1, value2)

value1:整数或浮点数

value2:整数或浮点数

若参数均为整数,则返回整数,否则返回浮点数。

op_mod(`col1`, 10),若col1=23,则返回3

4.3.6.2 逻辑运算

功能

语法

取值范围

返回值

示例

是否相等

op_eq(value1, value2)

value1:整数、浮点数、字符串

value2:整数、浮点数、字符串

boolean类型,true或false

op_eq(`col1`, 23)

是否大于

op_gt(value1, value2)

value1:整数、浮点数、字符串

value2:整数、浮点数、字符串

boolean类型,true或false

op_gt(`col1`, 1.0)

是否小于

op_lt(value1, value2)

value1:整数、浮点数、字符串

value2:整数、浮点数、字符串

boolean类型,true或false

op_lt(`col1`, 1.0)

是否大于等于

op_ge(value1, value2)

value1:整数、浮点数、字符串

value2:整数、浮点数、字符串

boolean类型,true或false

op_ge(`col1`, 1.0)

是否小于等于

op_le(value1, value2)

value1:整数、浮点数、字符串

value2:整数、浮点数、字符串

boolean类型,true或false

op_le(`col1`, 1.0)

AND运算

op_and(value1, value2[,*])

value1:boolean类型

value2:boolean类型

[,*]: 可变字段,任意多个boolean类型值

boolean类型,true或false

op_and(`is_male`, `is_student`)

op_and(`is_male`, `is_student`,`is_x`)

OR运算

op_or(value1, value2[,*])

value1:boolean类型

value2:boolean类型

[,*]: 可变字段,任意多个boolean类型值

boolean类型,true或false

op_or(`is_male`, `is_student`)

op_or(`is_male`, `is_student`,`is_x`)

值是NULL

op_is_null(value)

value: 任意类型

boolean类型,true或false

op_is_null(`name`)

值不是NULL

op_is_not_null(value)

value: 任意类型

boolean类型,true或false

op_is_not_null(`name`)

是否存在

op_in(json_array)

value: json_array('JSON格式字符串')

boolean类型,true或false

op_in(id, json_array('["0","1","2","3","4","5","6","7","8"]'))

4.3.6.3 字符串函数

功能

语法

取值范围

返回值

示例

字符串拼接

op_add(value1,value2,...,value_n)

value1:  字符串

value2:  字符串

value_n:  字符串

字符串

op_add(`col`,'hangzhou','dts')

将字符串中所有大写字符转换为小写字符

str_lower(value)

value:字符串

字符串

str_lower(`col1`)

将字符串中所有小写字符转换为大写字符

str_upper(value)

value:字符串

字符串

str_upper(`col1`)

删除字符串中指定的字符

str_strip(value1, value2)

value1:  字符串

value2:  字符串

字符串

str_strip(`col1`, 'abc')

截取字符串cond之后的部分

substring_after(value, cond)

value:  字符串

cond:  字符串

字符串

substring_after(`col`, 'abc')

截取字符串cond之前的部分

substring_before(value, cond)

value1:  字符串

cond:  字符串

字符串

substring_before(`col`, 'efg')

截取字符串cond1和cond2之间的部分

substring_between(value, cond1, cond2)

value:  字符串

cond1:  字符串

cond2:  字符串

字符串

substring_between(`col`, 'abc','efg')

字符串转换数字

cast_string_to_long(value)

value:字符串

整数

cast_string_to_long(`col`)

数字转换字符串

cast_long_to_string(value)

value:整数

字符串

cast_long_to_string(`col`)

是否是字符串类型

is_string_value(value)

value:字符串或者列名

boolean类型,true或false

is_string_value(`col1`)


4.3.6.4 时间函数

功能

语法

取值范围

返回值

示例

当前系统时间

dt_now()

DATETIME,精确到秒

dts_now()

dt_now_millis()

DATETIME,精确到毫秒

dt_now_millis()

UTC时间戳转DATETIME

dt_fromtimestamp(value,[timezone])

value:整数

timezone:可选参数

DATETIME,精确到秒

dt_fromtimestamp(1626837629)

dt_fromtimestamp(1626837629,'GMT+08')

UTC毫秒时间戳转DATETIME

dt_fromtimestamp_millis(value,[timezone])

value:整数

timezone:可选参数

DATETIME,精确到毫秒

dt_fromtimestamp_millis(1626837629123);

dt_fromtimestamp_millis(1626837629123,'GMT+08')

DATETIME转UTC时间戳

dt_parsetimestamp(value,[timezone])

value: DATETIME

timezone:可选参数

整数

dt_parsetimestamp(`datetime_col`)

dt_parsetimestamp(`datetime_col`,'GMT+08')

DATETIME转UTC毫秒时间戳

dt_parsetimestamp_millis(value,[timezone])

value: DATETIME

timezone:可选参数

整数

dt_parsetimestamp_millis(`datetime_col`)

dt_parsetimestamp_millis(`datetime_col`,'GMT+08')

DATETIME转字符串

dt_str(value, format)

value:DATETIME

format:字符串, yyyy-MM-dd HH:mm:ss 格式表示

字符串

dt_str(`col1`, 'yyyy-MM-dd HH:mm:ss')

字符串转DATETIME

dt_strptime(value,format)

value:字符串

format:字符串, yyyy-MM-dd HH:mm:ss 格式表示

DATETIME

dt_strptime('2021-07-21 03:20:29', 'yyyy-MM-dd hh:mm:ss')

修改时间:

年-月-日 时:分:秒针对其中一个或几个部分增加或减少一个值

dt_add(value, [years=intVal],

[months=intVal],

[days=intVal],

[hours=intVal],

[minutes=intVal]

)

value: DATETIME

intVal: 整数

DATETIME

dt_add(datatime_col, years=1)

dt_add(datatime_col, years=1,months=1)

4.3.6.4 字段函数

功能

语法

取值范围

返回值

示例

字符串类型字段内容替换; 从尾部开始

tail_replace_string_field(search, replace, all)

search: 将被替换的字符串

replace: 用与替换的字符串

all: 是否替换所有,true或者false

字符串

将所有字符串字段类型值的 "\u000f"替换成空格

tail_replace_string_field('\u000f','',true)


五、链路内ETL典型场景

5.1 数据脱敏

5.1.1 隐藏电话号码,身份证号等

e_set(phone, str_marsk(phone,3,6,'*'))

原始数据

18700001111

ETL处理之后的数据

187****1111

5.2 使用表过滤

使用条件控制语句和DTS附加字段 __DB__和__TB__组合,可以在指定表上执行ETL逻辑。如针对user_db.user_info表进行脱敏

e_if (
    __DB__ == 'user_db' AND 'user_info' == __TB__,
    e_set(phone, str_marsk(phone,3,6,'*'))
)


DTS附加列__DB__,__TB__和普通字符串一样,可以直接使用字符串函数。如下表名满足正则表达式user_[0-9]+,

进行数据脱敏

e_if(
    regex_match(__TB__, 'user_[0-9]+'), 
    e_set(phone, str_marsk(phone,3,6,'*'))
)

5.2 数据过滤

过滤符合条件的DML,如过滤user_name是张三的记录

e_if(user_name == '张三', e_drop())

过滤增量迁移中的DELETE操作

e_if(__OPERATION__ == __OP_DELETE__, e_drop())

过滤表user_info 区间id>100 and id<1000的数据, (也可以使用 e_keep操作符,与e_drop相反,保留e_keep条件为真的记录)

e_if(__TB__ == 'user_info' AND id>100 and id<1000, e_drop())
#或者
e_keep(op_not(op_eq(__TB__,'user_info') AND op_gt(id,100) AND op_lt(id,1000)))


过滤表user_info id in (1,2,3,4)的记录(可以使用 op_in函数)

e_keep(__TB__ == 'user_info' AND op_in(id, json_array('[1,2,3,4]')))
#或者
e_if(__TB__ == 'user_info' AND op_not(op_in(id, json_array('[1,2,3,4]'))), e_drop())


过滤表名是字符串"user_"开头的 id<1000的记录

e_if(regex_match(__db__,'^user_.*$') AND op_lt(id,1000), e_drop())


5.3 非法数据处理

使用 e_if(condition, e_set) 语句,通过condition条件提出到非法记录,再通过e_set语句修改非法记录


将表user_info里字段name的非法值NULL改为空格

e_if(__TB__ == 'user_info' AND op_is_null(`name`), e_set(`name`,''))


替换所有字符串中的 "\u000f"为空格

tail_replace_string_field('\u000f','',true)


5.4 多表汇聚场景主键冲突解决方案

多个单元表的数据同步汇聚到中心表时,各单元表存在相同主键,汇聚存在主键冲突丢数据的问题

解决方法:

新增字段region_id

create table user_info(
  id int not null auto_increment primary key,
  user_name varchar(32)
)
create table global_user_info(
  id int not null auto_increment,
  user_name varchar(32),
  region_id varchar(32),
  primary key(id,region_id)
)
e_set(`region_id`,'cn-hangzhou')


通过ETL重新分配各个单元的id到不冲突的值域

create table user_info_0001(
  id int not null auto_increment primary key,
  user_name varchar(32)
)
create table global_user_info(
  id int not null auto_increment,
  user_name varchar(32),
  primary key(id,region_id)
)
e_set(`id`,case_string_to_long(substring_after_last(`id`,'_')) * 10000000 + id)

5.5 将JSON展开为表的字段

参数名称

类型

默认值

介绍

例子

field_name

字符串

需要展开的字段名称,区分大小写

'user_info'

fmt='simple'

字符串

fmt='simple'

JSON展开名称规则,可选simple,full

fmt='full'

sep='_'

字符串

sep='_'

Json字段名称层级之间的链接符, 如{"userInfo":{"userName:"hy"}} 展开后userInfo_userName

sep='_'

mapping

字符串

需要展开的字段名称映射规则

mapping='{"username":"user_name"}'

如下将 user_info: {"user_info":{"username":"hy","password":"******"}} 展开到字段 user_name和user_password

fmt='simple': 使用当前JSON字段名称

e_expand_json('user_info',fmt='simple',mapping='{"username":"user_name","password":"user_password"}')

fmt='full': 使用全路径JSON字段名称

e_expand_json('user_info',fmt='full',seq=':',mapping='{"user_info:username":"user_name","user_info:password":"user_password"}')

5.6 数据路由

根据计算规则,将数据路由到不同的表中,使用e_set(__DB__, new_db_name), e_set(__TB__, new_table_name); new_db_name和new_table_name 可以是字符串、返回值是字符串结果的函数调用、以及表达式

e_set(`__TB__`,`hy_0`)
e_set(`__DB__`, `hy_0`)

5.7 物理删除改为逻辑删除

目的表新增 del_flag varchar(2)字段,使用'Y'标识删除记录,使用'N'标识为删除记录,并转换DELETE SQL为UPDATE SQL;ETL脚本如下

e_if_else(    
  op_eq(__OPERATION__,__OP_DELETE__), e_set(`del_flag`,'Y', __OPERATION__, __OP_UPDATE),
  e_set(`def_flag`, 'N')
)


5.8 时间函数应用

5.8.1 修改时间

针对库user_db下的表user_info的字段login_time, 加8小时

e_if(
    op_and(op_eq('user_db',__DB__),op_eq('user_info',__TB__)), 
    e_set(login_time,dt_add(login_time,hours=8))
)

针对库user_db下的表user_info的字段login_time, 减少8小时

e_if(
    op_and(op_eq('user_db',__DB__),op_eq('user_info',__TB__)), 
    e_set(login_time,dt_add(login_time,hours=-8))
)



5.8 TP到Redis

5.8.1 TP到Redis概览

image.png

TP到Redis的基本ETL数据处理过程如图, 分为两个步骤

第一步: 使用ETL 算子e_split(__OPERATION__), 将UPDATE日志拆分成DELETE和INSERT日志(只有增量阶段有效)

第二步: 使用ETL算子构建DTS Record(Redis),其格式如下

image.png


最重要的是DTS Record(Redis)有两个特殊的字段__DTS_TP_TO_REDIS_KEY__和__DTS_TP_TO_REDIS_VALUE__, 意义和名称对应;

__DTS_TP_TO_REDIS_KEY__:  是写入目的Redis对应的key值,类型为String

__DTS_TP_TO_REDIS_VALUE__:   是写入目的Redis对应的Value值,类型可以为String或者Hash;

综上所述,ETL_2 这个阶段主要就是通过各种ETL算子设置字段__DTS_TP_TO_REDIS_KEY__和__DTS_TP_TO_REDIS_VALUE_的值


5.8.2 TP到Redis常见配置

以下案例参考源库为MySQL表: dts_db.dts_user,其建表SQL如下

create table dts_db.dts_user(
  user_id int primary key,
  user_name varchar(32),
  user_address varchar(256)
)

表中的数据如下

user_id

user_name

user_address

1

hy

Shan'Xi

2

yh

ZheJiang

5.8.2.1 库-表-主键的KV模型

5.8.2.1.1 采用Json编码Value的值

ETL脚本如下

e_set(`_DTS_TP_TO_REDIS_KEY__`, 
      __DB__+'.'+__TB__+'.'+pk_str_with_name_value('=','.'),NEW,
      `__DTS_TP_TO_REDIS_VALUE__`, tp2redis_json_value(), NEW)

生成结果

KEY

Value类型

Value的值

dts_db.dts_user.user_id=1

String

{"user_id":"1", "user_name":"hy","user_address":"Shan'Xi"}

dts_db.dts_user.user_id=2

String

{"user_id":"2", "user_name":"yh","user_address":"ZheJiang"}

5.8.2.1.2 采用String编码Value的值,包含列名

ETL脚本如下

e_set(`_DTS_TP_TO_REDIS_KEY__`, 
      __DB__+'.'+__TB__+'.'+pk_str_with_name_value('=','.'),NEW,
      `__DTS_TP_TO_REDIS_VALUE__`, tp2redis_kv_commas_str_value(), NEW)

生成结果

KEY

Value类型

Value的值

dts_db.dts_user.user_id=1

String

user_id:1,user_name:hy,user_address:Shan'Xi

dts_db.dts_user.user_id=2

String

user_id:1,user_name:yh,user_address:ZheJiang

5.8.2.1.3 采用String编码Value的值,不包含列名

ETL脚本如下

e_set(`_DTS_TP_TO_REDIS_KEY__`, 
      __DB__+'.'+__TB__+'.'+pk_str_with_name_value('=','.'),NEW,
      `__DTS_TP_TO_REDIS_VALUE__`, tp2redis_value_commas_str_value(), NEW)

生成结果

KEY

Value类型

Value的值

dts_db.dts_user.user_id=1

String

1,hy,Shan'Xi

dts_db.dts_user.user_id=2

String

1,yh,ZheJiang


5.8.2.2 库-表-主键的Hash模型

ETL脚本如下

e_set(`_DTS_TP_TO_REDIS_KEY__`, 
      __DB__+'.'+__TB__+'.'+pk_str_with_name_value('=','.'),NEW,
      `__DTS_TP_TO_REDIS_VALUE__`, tp2redis_kv_hash_value(), NEW)

生成结果

KEY

Value类型

Value的值

dts_db.dts_user.user_id=1

Hash

{user_id:1, user_name:hy,user_address:Shan'Xi}

dts_db.dts_user.user_id=2

Hash

{user_id:2, user_name:yh,user_address:ZheJiang}

5.8.2.3 库-表的Hash模型

基于库-表的HASH模型:

ETL脚本如下

e_set(`_DTS_TP_TO_REDIS_KEY__`, 
      __DB__+'.'+__TB__,NEW,
      `__DTS_TP_TO_REDIS_VALUE__`, 
      tp2redis_kv_hash_value(`user_id`,`user_address`), NEW)

生成结果

KEY

Value类型

Value的值

dts_db.dts_user

Hash

{1: Shan'Xi, 2: ZheJiang}


快来关注

  1. 数据传输服务(Data Transmission Service,简称DTS)支持关系型数据库、NoSQL、大数据(OLAP)等数据源,集数据迁移、订阅、实时同步、校验功能于一体,能够解决公共云、混合云场景下,远距离、秒级异步数据传输难题。其底层基础设施采用阿里双11异地多活架构,为数千下游应用提供实时数据流,已在线上稳定运行7年之久,是一款沉淀了丰富实践经验的可靠产品。点击了解更多DTS相关信息
  2. 欢迎加入钉群讨论交流:

image.png


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
7月前
|
关系型数据库 MySQL 数据挖掘
阿里云 SelectDB 携手 DTS ,一键实现 TP 数据实时入仓
DTS 作为阿里云核心的数据交互引擎,以其高效的实时数据流处理能力和广泛的数据源兼容性,为用户构建了一个安全可靠、可扩展、高可用的数据架构桥梁。阿里云数据库 SelectDB 通过与 DTS 联合,为用户提供了简单、实时、极速且低成本的事务数据分析方案。用户可以通过 DTS 数据传输服务,一键将自建 MySQL / RDS MySQL / PolarDB for MySQL 数据库,迁移或同步至阿里云数据库 SelectDB 的实例中,帮助企业在短时间内完成数据迁移或同步,并即时获得深度洞察。
阿里云 SelectDB 携手 DTS ,一键实现 TP 数据实时入仓
|
7天前
|
存储 数据采集 监控
阿里云DTS踩坑经验分享系列|SLS同步至ClickHouse集群
作为强大的日志服务引擎,SLS 积累了用户海量的数据。为了实现数据的自由流通,DTS 开发了以 SLS 为源的数据同步插件。目前,该插件已经支持将数据从 SLS 同步到 ClickHouse。通过这条高效的同步链路,客户不仅能够利用 SLS 卓越的数据采集和处理能力,还能够充分发挥 ClickHouse 在数据分析和查询性能方面的优势,帮助企业显著提高数据查询速度,同时有效降低存储成本,从而在数据驱动决策和资源优化配置上取得更大成效。
92 9
|
14天前
|
弹性计算 安全 容灾
阿里云DTS踩坑经验分享系列|使用VPC数据通道解决网络冲突问题
阿里云DTS作为数据世界高速传输通道的建造者,每周为您分享一个避坑技巧,助力数据之旅更加快捷、便利、安全。本文介绍如何使用VPC数据通道解决网络冲突问题。
64 0
|
3月前
|
NoSQL 安全 容灾
阿里云DTS踩坑经验分享系列|Redis迁移、同步
阿里云数据传输服务DTS在帮助用户迁移Redis数据、同步数据时,在某些复杂场景下会出现报错,或者源库与目标库数据不一致的问题,给用户带来困扰。本文介绍了DTS Redis到Redis迁移、同步过程中的典型问题,以帮助用户更好地使用DTS。
237 2
|
5月前
|
SQL 负载均衡 安全
阿里云DTS踩坑经验分享系列|全量迁移加速方法指南
阿里云数据传输服务DTS是一个便捷、高效的数据迁移和数据同步服务。一般而言,一个完整的DTS数据迁移任务主要包括预检查、结构迁移,全量迁移,增量迁移等阶段,其中全量迁移会将源数据库的存量数据全部迁移到目标数据库。面对各种各样的用户场景, 本文将重点介绍如何使用阿里云DTS实现全量数据迁移加速,以缩短迁移时间,确保数据迁移的效率和稳定性。
570 0
|
7月前
|
SQL 运维 关系型数据库
阿里云DTS踩坑经验分享系列|数据不一致修复大法
阿里云数据传输服务DTS在帮助用户迁移数据、同步数据时,在某些复杂场景下会出现源库与目标库数据不一致的问题,造成数据错误,给用户带来困扰。由于数据不一致的问题很难完全避免,为了及时修复不一致的数据,DTS产品推出数据订正功能,保障用户在同步\迁移数据时的数据一致性。本文介绍了产生数据不一致的一些典型场景,并重点阐述了如何使用DTS数据订正功能来修复不一致的数据。
588 4
|
7月前
|
NoSQL 关系型数据库 数据库
数据传输服务DTS(Data Transmission Service)是阿里云提供的实时数据流服务
【2月更文挑战第29天】数据传输服务DTS(Data Transmission Service)是阿里云提供的实时数据流服务
96 5
|
7月前
|
关系型数据库 MySQL 数据库
使用阿里云的数据传输服务DTS(Data Transmission Service)进行MySQL 5.6到MySQL 8.0的迁移
【2月更文挑战第29天】使用阿里云的数据传输服务DTS(Data Transmission Service)进行MySQL 5.6到MySQL 8.0的迁移
350 2
|
7月前
|
SQL 分布式计算 监控
在数据传输服务(DTS)中,要查看每个小时源端产生了多少条数据
【2月更文挑战第32天】在数据传输服务(DTS)中,要查看每个小时源端产生了多少条数据
71 6
|
7月前
DTS数据传输延迟可能有多种原因
【1月更文挑战第16天】【1月更文挑战第79篇】DTS数据传输延迟可能有多种原因
294 2