DTS 数据同步集成 MaxCompute 数仓最佳实践|学习笔记

本文涉及的产品
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
简介: 快速学习 DTS 数据同步集成 MaxCompute 数仓最佳实践

开发者学堂课程【SaaS 模式云数据仓库系列课程 —— 2021数仓必修课:DTS 数据同步集成 MaxCompute 数仓最佳实践】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/55/detail/1056


DTS 数据同步集成 MaxCompute 数仓最佳实践


内容简介:

1.演示基础环境部署

2.MaxCompute 数仓搭建

 

场景描述

本文 Step by Step介绍了通过数据传输服务 DTS 实现从云数据库 RDS 到MaxCompute的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现数据 ETL 幂等和数据生命周期快速回溯。

 

解决问题-

1.实现大数据实时同步集成。

2.实现数据 ETL 幂等。

3.实现数据生命周期快速回溯。

 

产品列表

·MaxCompute

·数据传输服务 DTS

·DataWorks

·云数据库 RDS MySQL 版

 

最佳实践概述

客户 T+1 数仓传统 ETL 存在以下痛点:

1)数据抽取不幂等或容错率低,如凌晨0:00启动的 ETL 任务因为各种原因(数据库HA 切换、网络抖动或 MAXC 写入失败等)失败后,再次抽取无法获取0:00时的数据状态。

2)针对不规范设计表,如没有 create  time/update time 的历史遗留表,传统ETL需全量抽取。

3)实时性差,抽取数据+重试任务往往需要 1-3小 时。

另外数据库的数据生命周期回溯难,如客户想回溯一年内某些数据的增删改生命周期,需要从 OSS 拉取一年的 binlog 解析和分析,十分困难,费时费力。

本文 Step by Step 介绍了通过数据传输服务 DTS 实现从云数据库 RDS 到MaxCompute 的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现

·DTS 简便易用,可以通过控制台一键完成同步,也可以通过 OpenAPI 批量生成。

·DTS 将 binlog 作为大数据同步的手段,能够实现 ETL 幂等,大大提高数据仓库的数据质量。

·针对不规范设计的表,仍然可以通过 binlog 的时间来生成创建和修改时间。

·实时性提高,将凌晨的批量抽取改为准实时同步,凌晨 0:00 过后几分钟就可以开始批处理任务。

·通过 MaxCompute 的分布式计算能力,能够快速回溯数据的增删改生命周期。

 

1.演示基础环境部署

本章节在阿里云上搭建演示基础环境云数据库 RDS MySQL 版并构造相关数据。本最佳实践推荐客户使用云数据库 RDS MySQL  版,但 对 ECS 上的自建数据库、以及通过专线/VP N 网关/智能网关接入的自建数据库同样适用。

1.1.环境部署

1.1.1.创建专有网络 VPC

1.1.2.部署云数据库 RDS

1.2.模拟数据构造

本节模拟构造电商数据库 shopping-db。shopping-db 一共包含三个表,分别为用户表 user、库存表 inventory 和订单表 orders。

步骤1  在数据管理 DMS 控制台,选择导航 SQL操>SQL 窗口。

步骤2  复制并粘贴以下 SQL 语句创建用户表 user、库存表 inventory 和订单表orders,单击执行(F8)。

(示例 demo 代码可以通过执行以下命令获取:

git clone git@code.aliyun.com:best-practice/146.git)

--用户/账户表

create table user(

user id bigint not null auto increment comment ‘user id用户ID’,

user name varchar(3o) comment ‘用户名’,

phone num varchar(20) comment ‘手机号’,

email varchar(100) comment ‘email’,

acct decimal(18,2) comment '账户余额,

primary key (user_id),

arcs

key l1 (user_name)

);

--库存表

create table inventory(

inventory_id bigint not nult auto_increment comment 'inventory_id库存商品ID',

inventory_name varchar(30) comment ‘商品名’,

price_unit decimal(18,2) comment ‘商品单价’,

inventory_num bigint not null default 0 comment ’剩余库存数’,

primary key(inventory_id)

);

--订单表

create table orders(

order_id bigint not null auto_increment comment 'order_id订单ID',

user_id. bigint not null comment 'user_id用户ID',

orice_unit decimal(18,2) comment '商品单价’,

order_num bigint not null default 0 comment ‘订单购买数’,

create_time datetime not null default current timestamp,

update_time datetime not nuit default current_timestamp on update current_timestamp,

primary key(ordex_id),

key l1 (user_id),

key l2(inventory_id),

);

 

2.MaxCompute 数仓搭建

本章通过 DTS 实现从云数据库 RDS 到 MaxCompute,的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现数据ETL幂等和数据生命周期快速回溯。

2.1.开通 DataWorks

DataWorks:是一个提供了大数据 OS 能力、并以 all in one box 的方式提供专业高效、安全可靠的一站式大数据智能云研发平台。同时能满足用户对数据治理、质量管理需求,赋予用户对外提供数据服务的能力。

数据传输服务 DTS:数据传输服务 (Data Transmission Service)DTS 支持关系型数据库、NoSQL、大数据(OLAP)等数据源间的数据传输。它是一种集数据迁移、数据订阅及数据实时同步于一体的数据传输服务。数据传输致力于在公共云、混合云场景下,解决远距离、毫秒级异步数据传输难题。

MaxCompute:MaxCompute (原 ODPS)是一项大数据计算服务,它能提供快速、完全托管的 PB 级数据仓库解决方案,使您可以经济并高效的分析处理海量数据。-

2.3.创建 MaxCompute 项目

步骤1  登录 DataWorks 控制台。(https://workbench.data.aliyun.com/console)

步骤2  在左侧导航栏选择工作空间列表,切换地域为华东1(杭州),在工作空间列表页面,单击创建工作空间。

步骤3  在基本配置页,输入工作空间名称 bp_data_warehouse,其他配置保持默认,单击下一步。注:工作空间名称每 个 Region 内唯一,请自定义名称。 

步骤4  在选择引擎页里,在选择计算引擎服务区域,勾选 MaxCompute 按量付费,单机下一步。 

步骤5  在引擎详情页,输入实例显示名称和 MaxCompute 项目名称,单击创建工作空间。

l 实例显示名称:bp_data_warehouse

l MaxCompute 项目名称:bp_data_warehousee

l MaxCompute 访问身份:阿里云主账号

l Quota 组切换:按量付费默认资源

步骤6  在工作间列表页面,可以查看到状态为正常的工作空间

2.4.DTS 数据同步集成 MaxCompute

根据需要同步的数据在写入后是否发生变化,分为恒定的存量数据(通常是日志数据)和持续更新的数据(例如本示例库存表中,商品单价会发生变化)。

对恒定的存量数据进行增量同步,由于数据生成后不会发生变化,因此可以很方便地根据数据的生成规律进行分区。较常见的是根据日期进行分区,例如每天1个分区。

根据数据仓库反映历史变化的特点,需要对持续更新的数据进行增量同步。

传统 ETL 工具提供每日增量、每日全量的上传方式,但由于数据库数据的增量上传是通过数据库提供数据变更的日期字段来实现,要求您的数据库有数据变更的日期字段。

数据传输服务 DTS 数据同步功能通过 binlog 实现。对无数据变更日期的表,仍然可以通过 binlog 的时间来生成创建/修改时间,兼容性更好,同时使用DTS提高了数据集成的实时性,实现准实时同步。

2.4.1.1.配置 DTS 数据同步

注意事项:

1)DTS 在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢 SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的 CPU 负载在30%以下)。

2)仅支持表级别的数据同步。

3)在数据同步时,请勿对源库的同步对象使用 gh-ost 或 pt-online-schema-change 等类似工具执行在线 DDL 变 更,否则会导致同步失败。

4)由于 Maxcompute 不支持主键约束,当 DTS 在同步数据时因网络等原因触发重传,可能会导 致 MaxCompute 中出现重复记录。

步骤1  登录数据传输 DTS 控制台。(https://dts.console.aliyun.com/#/home/)

步骤2  在左侧导航栏选择数据同步,在数据同步页面,选择华东1(杭州),并单击创建同步作业。

步骤3  选择数据传输服务 DTS(后付费),完成以下配置,其他配置保持默认,并单机立即购买。

步骤4  在确认订单页面,确认各项参数信息。确认无误,阅读、同意并勾选《数据传输服务(后付费)服务协议》前的复选框,并单机去开通。

步骤5  开通任务提交成功后,单机管理控制台返回。

步骤6  定位至已购买的数据同步实例,单机配置网络链路。

步骤7  配置同步通道的源实例及目标实例信息

配置完成,单机页面右下角的授权白名单并进行下一步。

步骤8  单机页面右下角的下一步,允许将 MaxCompute 中的项目的下述权限授予给DTS同步账号,详情如下图所示。

步骤9  配置同步策略和同步对象。

上述配置完成后,单机页面右下角的预检查并启动

步骤10  等待同步作业的链路初始化完成,直至处于同步中状态。

可以在数据同步页面,查看数据同步作业的状态。

2.4.1.2.验证 DTS 数据实时同步

同步过程介绍:

1)结构初始化。

DTS 将源库中待同步表的结构定义信息同步至 MaxCompute 中,初始化时 DTS 会为表名增加_ base 后缀。例如源表为 user,那么 MaxCompute 中的表即为user_base.

2)全量数据初始化。

DTS将源库中待同步表的存量数据,全部同步至 MaxCompute 中的目标表名 _base表中(例如从源库的 user 表同步至 MaxCompute 的 user_base,表),作为后续增量同步数据的基线数据。

3)增量数据同步。

DTS 在 MaxCompute 中创建一个增量日志表,表名为同步的目标表名 _log,例如user_log,然后将源库产生的增量数据实时同步到该表中。

增量日志表结构定义说明请参考:

https://help.aliyun.com/document_detail/44547.html?#title-g2g-87l-hfi

步骤1  请参考1.1.2.2.创建数据库和账号的步骤8登录RDS数据库。

步骤2  在数据管理 DMS 控制台,选择导航 SQ 操作>SQL窗口,复制并粘贴如下SQL 语句写入测试数据,单击执行(F8)。

--写入测试用户

insert into user(user_name,phone_num,email,acct)

values(‘test_acct’,‘18866668888',‘test@test.com',300.99);

--写入测试商品及库存

insert into inventory(inventory_name,price_unit,inventory_num)

values(‘lPhone X’,4999,300);

步骤3  登录 DataWorks 控制台 (https://workbench.data.aliyun.com/console)

步骤4  在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。

步骤5  在左侧导航栏选择公共表,单击 user_log,单击数据预览。

可以查看到刚在数据库 shopping-db 的 user 表写入的用户 test_acct,已经实时同步到了 bp_data_warehouse 的 user_log 表。

如果没有出现数据,请耐心等待 2~3 分钟后再查看。

举例调整 iphone 12  的价格

检查后发现数据已生成

2.4.2.新增表数据同步处理

DTS 支持在数据同步的过程中新增同步对象。

步骤1  登录 DTS 控制台。(https:/ldts.console.aliyun.com/#/home/)

步骤2  左侧导航栏,单击数据同步。

步骤3  在同步作业列表页面顶部,选择数据同步实例所属地域。

步骤4  找到目标同步作业,单击其操作列的更多>修改同步对象。

步骤5  在源库对象区域框中,单机需要新增的同步对象,并单机>向右小箭头移动至已选择对象区域框中,

步骤6  单机预检查并启动。

2.5.ETL幂等实现

根据等幂性原则,1个任务多次运行的结果是一致的。

如果数据抽取不幂等,例如凌晨0:00启动的 ETL 任务,因为各种原因(数据库HA切换、网络抖动或 MaxCompute 写入失败等)失败后,再次抽取将无法获取0:00时的数据状态。

DTS 使用 binlog 作为大数据同步的手段,将全量数据同步到 base 表,增量数据依赖binlog 实时同步到 log 表,统一做合表清洗可以拿到任意时间点的快照数据,从而实现 ETL 幂等,大大提高数据仓库的数据质量。

步骤1  请参考1.1.2.2.创建数据库和账号的步骤8登录 RDS 数据库。

步骤2  在数据管理 DMS 控制台,选择导航 SQL 操作>SQL 窗口,复制并粘贴如下SQL 语句写入测试数据,单击执行 (F8)。

注意:在相隔一分钟,两次调整了商品名为 IPhone X 的商品单价。

--多次调整价格 RDS

update inventory set price_unit=3999 where inventory_name=‘IPhone X’,

select sleep(60);

update inventory set price_unit=4999 where inventory name=‘IPhone X’,

步骤3  登录 DataWorks 控制台。(https://workbench.data.aliyun.com/console)

步骤4  在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。

步骤5  在左侧导航栏选择临时查询,单击新建节点 >QDPS SQL。

步骤6  输入节点名称,单次提交。

步骤7  复制粘贴实 例 SQL 语句,运行。

通过结果可以验证,通过 MaxCompute 的 SQL 命 令,对全量基线表 _base 和增量日志表_log 执行合并操作,得到某个时间点的 snapshot 数据,保证抽取的幂等性。

举例如下,修改Iphone 12 价格。

点击执行,在临时查询中修改时间运行

将时间改为17:19

改为17:20,17:21等又出现不同的价格。

2.6.数据回溯实现

本节演示快速回溯数据的增删改生命周期。

步骤1  登录 DataWorks,控制台。(https://workbench.data.alivyun.com/console)

步骤2  在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。

步骤3  在左侧导航栏选择临时查询,单击新建节点 >QDPS SQL。

步骤4  输入节点名称,单次提交。

步骤5  复制粘贴实例 SQL 语句,运行。

通过结果可以验证,即使原表没有创建和修改时间,也可以通过 binlog 生成时间new_dts_sync_utc_timestamp 实现商品名为 IPhone X 的库存数据回溯。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
2月前
|
分布式计算 DataWorks 调度
oss数据同步maxcompute报错
在使用阿里云DataWorks同步OSS数据至MaxCompute时,遇到“Input is not in the .gz format”的报错。问题源于目标目录中存在一个空文件,导致同步时识别错误。
|
5天前
|
缓存 监控 大数据
构建高可用AnalyticDB集群:最佳实践
【10月更文挑战第25天】在大数据时代,数据仓库和分析平台的高可用性变得尤为重要。作为阿里巴巴推出的一款完全托管的PB级实时数据仓库服务,AnalyticDB(ADB)凭借其高性能、易扩展和高可用的特点,成为众多企业的首选。本文将从我个人的角度出发,分享如何构建和维护高可用性的AnalyticDB集群,确保系统在各种情况下都能稳定运行。
12 0
|
8天前
|
数据采集 分布式计算 OLAP
最佳实践:AnalyticDB在企业级大数据分析中的应用案例
【10月更文挑战第22天】在数字化转型的大潮中,企业对数据的依赖程度越来越高。如何高效地处理和分析海量数据,从中提取有价值的洞察,成为企业竞争力的关键。作为阿里云推出的一款实时OLAP数据库服务,AnalyticDB(ADB)凭借其强大的数据处理能力和亚秒级的查询响应时间,已经在多个行业和业务场景中得到了广泛应用。本文将从个人的角度出发,分享多个成功案例,展示AnalyticDB如何助力企业在广告投放效果分析、用户行为追踪、财务报表生成等领域实现高效的数据处理与洞察发现。
28 0
|
5月前
|
消息中间件 关系型数据库 Kafka
深入理解数仓开发(二)数据技术篇之数据同步
深入理解数仓开发(二)数据技术篇之数据同步
|
3月前
|
Java jenkins Shell
jenkins学习笔记之五:Maven、Ant、Gradl、Node构建工具集成
jenkins学习笔记之五:Maven、Ant、Gradl、Node构建工具集成
|
3月前
|
jenkins 持续交付
jenkins学习笔记之六:共享库方式集成构建工具
jenkins学习笔记之六:共享库方式集成构建工具
|
3月前
|
jenkins 持续交付
jenkins学习笔记之九:jenkins认证集成github
jenkins学习笔记之九:jenkins认证集成github
|
3月前
|
安全 jenkins 持续交付
jenkins学习笔记之八:jenkins认证集成gitlab
jenkins学习笔记之八:jenkins认证集成gitlab
|
3月前
|
jenkins Devops 持续交付
jenkins学习笔记之七:jenkins集成LDAP用户认证
jenkins学习笔记之七:jenkins集成LDAP用户认证
|
4月前
|
分布式计算 关系型数据库 数据处理
美柚与MaxCompute的数据同步架构设计与实践
数据处理与分析 一旦数据同步到MaxCompute后,就可以使用MaxCompute SQL或者MapReduce进行复杂的数据处理和分析。