开发者学堂课程【SaaS 模式云数据仓库系列课程 —— 2021数仓必修课:DTS 数据同步集成 MaxCompute 数仓最佳实践】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/55/detail/1056
DTS 数据同步集成 MaxCompute 数仓最佳实践
内容简介:
1.演示基础环境部署
2.MaxCompute 数仓搭建
场景描述
本文 Step by Step介绍了通过数据传输服务 DTS 实现从云数据库 RDS 到MaxCompute的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现数据 ETL 幂等和数据生命周期快速回溯。
解决问题-
1.实现大数据实时同步集成。
2.实现数据 ETL 幂等。
3.实现数据生命周期快速回溯。
产品列表
·MaxCompute
·数据传输服务 DTS
·DataWorks
·云数据库 RDS MySQL 版
最佳实践概述
客户 T+1 数仓传统 ETL 存在以下痛点:
1)数据抽取不幂等或容错率低,如凌晨0:00启动的 ETL 任务因为各种原因(数据库HA 切换、网络抖动或 MAXC 写入失败等)失败后,再次抽取无法获取0:00时的数据状态。
2)针对不规范设计表,如没有 create time/update time 的历史遗留表,传统ETL需全量抽取。
3)实时性差,抽取数据+重试任务往往需要 1-3小 时。
另外数据库的数据生命周期回溯难,如客户想回溯一年内某些数据的增删改生命周期,需要从 OSS 拉取一年的 binlog 解析和分析,十分困难,费时费力。
本文 Step by Step 介绍了通过数据传输服务 DTS 实现从云数据库 RDS 到MaxCompute 的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现
·DTS 简便易用,可以通过控制台一键完成同步,也可以通过 OpenAPI 批量生成。
·DTS 将 binlog 作为大数据同步的手段,能够实现 ETL 幂等,大大提高数据仓库的数据质量。
·针对不规范设计的表,仍然可以通过 binlog 的时间来生成创建和修改时间。
·实时性提高,将凌晨的批量抽取改为准实时同步,凌晨 0:00 过后几分钟就可以开始批处理任务。
·通过 MaxCompute 的分布式计算能力,能够快速回溯数据的增删改生命周期。
1.演示基础环境部署
本章节在阿里云上搭建演示基础环境云数据库 RDS MySQL 版并构造相关数据。本最佳实践推荐客户使用云数据库 RDS MySQL 版,但 对 ECS 上的自建数据库、以及通过专线/VP N 网关/智能网关接入的自建数据库同样适用。
1.1.环境部署
1.1.1.创建专有网络 VPC
1.1.2.部署云数据库 RDS
1.2.模拟数据构造
本节模拟构造电商数据库 shopping-db。shopping-db 一共包含三个表,分别为用户表 user、库存表 inventory 和订单表 orders。
步骤1 在数据管理 DMS 控制台,选择导航 SQL操>SQL 窗口。
步骤2 复制并粘贴以下 SQL 语句创建用户表 user、库存表 inventory 和订单表orders,单击执行(F8)。
(示例 demo 代码可以通过执行以下命令获取:
git clone git@code.aliyun.com:best-practice/146.git)
--用户/账户表
create table user(
user id bigint not null auto increment comment ‘user id用户ID’,
user name varchar(3o) comment ‘用户名’,
phone num varchar(20) comment ‘手机号’,
email varchar(100) comment ‘email’,
acct decimal(18,2) comment '账户余额,
primary key (user_id),
arcs
key l1 (user_name)
);
--库存表
create table inventory(
inventory_id bigint not nult auto_increment comment 'inventory_id库存商品ID',
inventory_name varchar(30) comment ‘商品名’,
price_unit decimal(18,2) comment ‘商品单价’,
inventory_num bigint not null default 0 comment ’剩余库存数’,
primary key(inventory_id)
);
--订单表
create table orders(
order_id bigint not null auto_increment comment 'order_id订单ID',
user_id. bigint not null comment 'user_id用户ID',
orice_unit decimal(18,2) comment '商品单价’,
order_num bigint not null default 0 comment ‘订单购买数’,
create_time datetime not null default current timestamp,
update_time datetime not nuit default current_timestamp on update current_timestamp,
primary key(ordex_id),
key l1 (user_id),
key l2(inventory_id),
);
2.MaxCompute 数仓搭建
本章通过 DTS 实现从云数据库 RDS 到 MaxCompute,的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现数据ETL幂等和数据生命周期快速回溯。
2.1.开通 DataWorks
DataWorks:是一个提供了大数据 OS 能力、并以 all in one box 的方式提供专业高效、安全可靠的一站式大数据智能云研发平台。同时能满足用户对数据治理、质量管理需求,赋予用户对外提供数据服务的能力。
数据传输服务 DTS:数据传输服务 (Data Transmission Service)DTS 支持关系型数据库、NoSQL、大数据(OLAP)等数据源间的数据传输。它是一种集数据迁移、数据订阅及数据实时同步于一体的数据传输服务。数据传输致力于在公共云、混合云场景下,解决远距离、毫秒级异步数据传输难题。
MaxCompute:MaxCompute (原 ODPS)是一项大数据计算服务,它能提供快速、完全托管的 PB 级数据仓库解决方案,使您可以经济并高效的分析处理海量数据。-
2.3.创建 MaxCompute 项目
步骤1 登录 DataWorks 控制台。(https://workbench.data.aliyun.com/console)
步骤2 在左侧导航栏选择工作空间列表,切换地域为华东1(杭州),在工作空间列表页面,单击创建工作空间。
步骤3 在基本配置页,输入工作空间名称 bp_data_warehouse,其他配置保持默认,单击下一步。注:工作空间名称每 个 Region 内唯一,请自定义名称。
步骤4 在选择引擎页里,在选择计算引擎服务区域,勾选 MaxCompute 按量付费,单机下一步。
步骤5 在引擎详情页,输入实例显示名称和 MaxCompute 项目名称,单击创建工作空间。
l 实例显示名称:bp_data_warehouse
l MaxCompute 项目名称:bp_data_warehousee
l MaxCompute 访问身份:阿里云主账号
l Quota 组切换:按量付费默认资源
步骤6 在工作间列表页面,可以查看到状态为正常的工作空间
2.4.DTS 数据同步集成 MaxCompute
根据需要同步的数据在写入后是否发生变化,分为恒定的存量数据(通常是日志数据)和持续更新的数据(例如本示例库存表中,商品单价会发生变化)。
对恒定的存量数据进行增量同步,由于数据生成后不会发生变化,因此可以很方便地根据数据的生成规律进行分区。较常见的是根据日期进行分区,例如每天1个分区。
根据数据仓库反映历史变化的特点,需要对持续更新的数据进行增量同步。
传统 ETL 工具提供每日增量、每日全量的上传方式,但由于数据库数据的增量上传是通过数据库提供数据变更的日期字段来实现,要求您的数据库有数据变更的日期字段。
数据传输服务 DTS 数据同步功能通过 binlog 实现。对无数据变更日期的表,仍然可以通过 binlog 的时间来生成创建/修改时间,兼容性更好,同时使用DTS提高了数据集成的实时性,实现准实时同步。
2.4.1.1.配置 DTS 数据同步
注意事项:
1)DTS 在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢 SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的 CPU 负载在30%以下)。
2)仅支持表级别的数据同步。
3)在数据同步时,请勿对源库的同步对象使用 gh-ost 或 pt-online-schema-change 等类似工具执行在线 DDL 变 更,否则会导致同步失败。
4)由于 Maxcompute 不支持主键约束,当 DTS 在同步数据时因网络等原因触发重传,可能会导 致 MaxCompute 中出现重复记录。
步骤1 登录数据传输 DTS 控制台。(https://dts.console.aliyun.com/#/home/)
步骤2 在左侧导航栏选择数据同步,在数据同步页面,选择华东1(杭州),并单击创建同步作业。
步骤3 选择数据传输服务 DTS(后付费),完成以下配置,其他配置保持默认,并单机立即购买。
步骤4 在确认订单页面,确认各项参数信息。确认无误,阅读、同意并勾选《数据传输服务(后付费)服务协议》前的复选框,并单机去开通。
步骤5 开通任务提交成功后,单机管理控制台返回。
步骤6 定位至已购买的数据同步实例,单机配置网络链路。
步骤7 配置同步通道的源实例及目标实例信息
配置完成,单机页面右下角的授权白名单并进行下一步。
步骤8 单机页面右下角的下一步,允许将 MaxCompute 中的项目的下述权限授予给DTS同步账号,详情如下图所示。
步骤9 配置同步策略和同步对象。
上述配置完成后,单机页面右下角的预检查并启动
步骤10 等待同步作业的链路初始化完成,直至处于同步中状态。
可以在数据同步页面,查看数据同步作业的状态。
2.4.1.2.验证 DTS 数据实时同步
同步过程介绍:
1)结构初始化。
DTS 将源库中待同步表的结构定义信息同步至 MaxCompute 中,初始化时 DTS 会为表名增加_ base 后缀。例如源表为 user,那么 MaxCompute 中的表即为user_base.
2)全量数据初始化。
DTS将源库中待同步表的存量数据,全部同步至 MaxCompute 中的目标表名 _base表中(例如从源库的 user 表同步至 MaxCompute 的 user_base,表),作为后续增量同步数据的基线数据。
3)增量数据同步。
DTS 在 MaxCompute 中创建一个增量日志表,表名为同步的目标表名 _log,例如user_log,然后将源库产生的增量数据实时同步到该表中。
增量日志表结构定义说明请参考:
https://help.aliyun.com/document_detail/44547.html?#title-g2g-87l-hfi
步骤1 请参考1.1.2.2.创建数据库和账号的步骤8登录RDS数据库。
步骤2 在数据管理 DMS 控制台,选择导航 SQ 操作>SQL窗口,复制并粘贴如下SQL 语句写入测试数据,单击执行(F8)。
--写入测试用户
insert into user(user_name,phone_num,email,acct)
values(‘test_acct’,‘18866668888',‘test@test.com',300.99);
--写入测试商品及库存
insert into inventory(inventory_name,price_unit,inventory_num)
values(‘lPhone X’,4999,300);
步骤3 登录 DataWorks 控制台 (https://workbench.data.aliyun.com/console)
步骤4 在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。
步骤5 在左侧导航栏选择公共表,单击 user_log,单击数据预览。
可以查看到刚在数据库 shopping-db 的 user 表写入的用户 test_acct,已经实时同步到了 bp_data_warehouse 的 user_log 表。
如果没有出现数据,请耐心等待 2~3 分钟后再查看。
举例调整 iphone 12 的价格
检查后发现数据已生成
2.4.2.新增表数据同步处理
DTS 支持在数据同步的过程中新增同步对象。
步骤1 登录 DTS 控制台。(https:/ldts.console.aliyun.com/#/home/)
步骤2 左侧导航栏,单击数据同步。
步骤3 在同步作业列表页面顶部,选择数据同步实例所属地域。
步骤4 找到目标同步作业,单击其操作列的更多>修改同步对象。
步骤5 在源库对象区域框中,单机需要新增的同步对象,并单机>向右小箭头移动至已选择对象区域框中,
步骤6 单机预检查并启动。
2.5.ETL幂等实现
根据等幂性原则,1个任务多次运行的结果是一致的。
如果数据抽取不幂等,例如凌晨0:00启动的 ETL 任务,因为各种原因(数据库HA切换、网络抖动或 MaxCompute 写入失败等)失败后,再次抽取将无法获取0:00时的数据状态。
DTS 使用 binlog 作为大数据同步的手段,将全量数据同步到 base 表,增量数据依赖binlog 实时同步到 log 表,统一做合表清洗可以拿到任意时间点的快照数据,从而实现 ETL 幂等,大大提高数据仓库的数据质量。
步骤1 请参考1.1.2.2.创建数据库和账号的步骤8登录 RDS 数据库。
步骤2 在数据管理 DMS 控制台,选择导航 SQL 操作>SQL 窗口,复制并粘贴如下SQL 语句写入测试数据,单击执行 (F8)。
注意:在相隔一分钟,两次调整了商品名为 IPhone X 的商品单价。
--多次调整价格 RDS
update inventory set price_unit=3999 where inventory_name=‘IPhone X’,
select sleep(60);
update inventory set price_unit=4999 where inventory name=‘IPhone X’,
步骤3 登录 DataWorks 控制台。(https://workbench.data.aliyun.com/console)
步骤4 在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。
步骤5 在左侧导航栏选择临时查询,单击新建节点 >QDPS SQL。
步骤6 输入节点名称,单次提交。
步骤7 复制粘贴实 例 SQL 语句,运行。
通过结果可以验证,通过 MaxCompute 的 SQL 命 令,对全量基线表 _base 和增量日志表_log 执行合并操作,得到某个时间点的 snapshot 数据,保证抽取的幂等性。
举例如下,修改Iphone 12 价格。
点击执行,在临时查询中修改时间运行
将时间改为17:19
改为17:20,17:21等又出现不同的价格。
2.6.数据回溯实现
本节演示快速回溯数据的增删改生命周期。
步骤1 登录 DataWorks,控制台。(https://workbench.data.alivyun.com/console)
步骤2 在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。
步骤3 在左侧导航栏选择临时查询,单击新建节点 >QDPS SQL。
步骤4 输入节点名称,单次提交。
步骤5 复制粘贴实例 SQL 语句,运行。
通过结果可以验证,即使原表没有创建和修改时间,也可以通过 binlog 生成时间new_dts_sync_utc_timestamp 实现商品名为 IPhone X 的库存数据回溯。