DTS 数据同步集成 MaxCompute 数仓最佳实践|学习笔记

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
数据传输服务 DTS,同步至DuckDB 3个月
RDS AI 助手,专业版
简介: 快速学习 DTS 数据同步集成 MaxCompute 数仓最佳实践

开发者学堂课程【SaaS 模式云数据仓库系列课程 —— 2021数仓必修课:DTS 数据同步集成 MaxCompute 数仓最佳实践】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/55/detail/1056


DTS 数据同步集成 MaxCompute 数仓最佳实践


内容简介:

1.演示基础环境部署

2.MaxCompute 数仓搭建

 

场景描述

本文 Step by Step介绍了通过数据传输服务 DTS 实现从云数据库 RDS 到MaxCompute的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现数据 ETL 幂等和数据生命周期快速回溯。

 

解决问题-

1.实现大数据实时同步集成。

2.实现数据 ETL 幂等。

3.实现数据生命周期快速回溯。

 

产品列表

·MaxCompute

·数据传输服务 DTS

·DataWorks

·云数据库 RDS MySQL 版

 

最佳实践概述

客户 T+1 数仓传统 ETL 存在以下痛点:

1)数据抽取不幂等或容错率低,如凌晨0:00启动的 ETL 任务因为各种原因(数据库HA 切换、网络抖动或 MAXC 写入失败等)失败后,再次抽取无法获取0:00时的数据状态。

2)针对不规范设计表,如没有 create  time/update time 的历史遗留表,传统ETL需全量抽取。

3)实时性差,抽取数据+重试任务往往需要 1-3小 时。

另外数据库的数据生命周期回溯难,如客户想回溯一年内某些数据的增删改生命周期,需要从 OSS 拉取一年的 binlog 解析和分析,十分困难,费时费力。

本文 Step by Step 介绍了通过数据传输服务 DTS 实现从云数据库 RDS 到MaxCompute 的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现

·DTS 简便易用,可以通过控制台一键完成同步,也可以通过 OpenAPI 批量生成。

·DTS 将 binlog 作为大数据同步的手段,能够实现 ETL 幂等,大大提高数据仓库的数据质量。

·针对不规范设计的表,仍然可以通过 binlog 的时间来生成创建和修改时间。

·实时性提高,将凌晨的批量抽取改为准实时同步,凌晨 0:00 过后几分钟就可以开始批处理任务。

·通过 MaxCompute 的分布式计算能力,能够快速回溯数据的增删改生命周期。

 

1.演示基础环境部署

本章节在阿里云上搭建演示基础环境云数据库 RDS MySQL 版并构造相关数据。本最佳实践推荐客户使用云数据库 RDS MySQL  版,但 对 ECS 上的自建数据库、以及通过专线/VP N 网关/智能网关接入的自建数据库同样适用。

1.1.环境部署

1.1.1.创建专有网络 VPC

1.1.2.部署云数据库 RDS

1.2.模拟数据构造

本节模拟构造电商数据库 shopping-db。shopping-db 一共包含三个表,分别为用户表 user、库存表 inventory 和订单表 orders。

步骤1  在数据管理 DMS 控制台,选择导航 SQL操>SQL 窗口。

步骤2  复制并粘贴以下 SQL 语句创建用户表 user、库存表 inventory 和订单表orders,单击执行(F8)。

(示例 demo 代码可以通过执行以下命令获取:

git clone git@code.aliyun.com:best-practice/146.git)

--用户/账户表

create table user(

user id bigint not null auto increment comment ‘user id用户ID’,

user name varchar(3o) comment ‘用户名’,

phone num varchar(20) comment ‘手机号’,

email varchar(100) comment ‘email’,

acct decimal(18,2) comment '账户余额,

primary key (user_id),

arcs

key l1 (user_name)

);

--库存表

create table inventory(

inventory_id bigint not nult auto_increment comment 'inventory_id库存商品ID',

inventory_name varchar(30) comment ‘商品名’,

price_unit decimal(18,2) comment ‘商品单价’,

inventory_num bigint not null default 0 comment ’剩余库存数’,

primary key(inventory_id)

);

--订单表

create table orders(

order_id bigint not null auto_increment comment 'order_id订单ID',

user_id. bigint not null comment 'user_id用户ID',

orice_unit decimal(18,2) comment '商品单价’,

order_num bigint not null default 0 comment ‘订单购买数’,

create_time datetime not null default current timestamp,

update_time datetime not nuit default current_timestamp on update current_timestamp,

primary key(ordex_id),

key l1 (user_id),

key l2(inventory_id),

);

 

2.MaxCompute 数仓搭建

本章通过 DTS 实现从云数据库 RDS 到 MaxCompute,的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现数据ETL幂等和数据生命周期快速回溯。

2.1.开通 DataWorks

DataWorks:是一个提供了大数据 OS 能力、并以 all in one box 的方式提供专业高效、安全可靠的一站式大数据智能云研发平台。同时能满足用户对数据治理、质量管理需求,赋予用户对外提供数据服务的能力。

数据传输服务 DTS:数据传输服务 (Data Transmission Service)DTS 支持关系型数据库、NoSQL、大数据(OLAP)等数据源间的数据传输。它是一种集数据迁移、数据订阅及数据实时同步于一体的数据传输服务。数据传输致力于在公共云、混合云场景下,解决远距离、毫秒级异步数据传输难题。

MaxCompute:MaxCompute (原 ODPS)是一项大数据计算服务,它能提供快速、完全托管的 PB 级数据仓库解决方案,使您可以经济并高效的分析处理海量数据。-

2.3.创建 MaxCompute 项目

步骤1  登录 DataWorks 控制台。(https://workbench.data.aliyun.com/console)

步骤2  在左侧导航栏选择工作空间列表,切换地域为华东1(杭州),在工作空间列表页面,单击创建工作空间。

步骤3  在基本配置页,输入工作空间名称 bp_data_warehouse,其他配置保持默认,单击下一步。注:工作空间名称每 个 Region 内唯一,请自定义名称。 

步骤4  在选择引擎页里,在选择计算引擎服务区域,勾选 MaxCompute 按量付费,单机下一步。 

步骤5  在引擎详情页,输入实例显示名称和 MaxCompute 项目名称,单击创建工作空间。

l 实例显示名称:bp_data_warehouse

l MaxCompute 项目名称:bp_data_warehousee

l MaxCompute 访问身份:阿里云主账号

l Quota 组切换:按量付费默认资源

步骤6  在工作间列表页面,可以查看到状态为正常的工作空间

2.4.DTS 数据同步集成 MaxCompute

根据需要同步的数据在写入后是否发生变化,分为恒定的存量数据(通常是日志数据)和持续更新的数据(例如本示例库存表中,商品单价会发生变化)。

对恒定的存量数据进行增量同步,由于数据生成后不会发生变化,因此可以很方便地根据数据的生成规律进行分区。较常见的是根据日期进行分区,例如每天1个分区。

根据数据仓库反映历史变化的特点,需要对持续更新的数据进行增量同步。

传统 ETL 工具提供每日增量、每日全量的上传方式,但由于数据库数据的增量上传是通过数据库提供数据变更的日期字段来实现,要求您的数据库有数据变更的日期字段。

数据传输服务 DTS 数据同步功能通过 binlog 实现。对无数据变更日期的表,仍然可以通过 binlog 的时间来生成创建/修改时间,兼容性更好,同时使用DTS提高了数据集成的实时性,实现准实时同步。

2.4.1.1.配置 DTS 数据同步

注意事项:

1)DTS 在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢 SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的 CPU 负载在30%以下)。

2)仅支持表级别的数据同步。

3)在数据同步时,请勿对源库的同步对象使用 gh-ost 或 pt-online-schema-change 等类似工具执行在线 DDL 变 更,否则会导致同步失败。

4)由于 Maxcompute 不支持主键约束,当 DTS 在同步数据时因网络等原因触发重传,可能会导 致 MaxCompute 中出现重复记录。

步骤1  登录数据传输 DTS 控制台。(https://dts.console.aliyun.com/#/home/)

步骤2  在左侧导航栏选择数据同步,在数据同步页面,选择华东1(杭州),并单击创建同步作业。

步骤3  选择数据传输服务 DTS(后付费),完成以下配置,其他配置保持默认,并单机立即购买。

步骤4  在确认订单页面,确认各项参数信息。确认无误,阅读、同意并勾选《数据传输服务(后付费)服务协议》前的复选框,并单机去开通。

步骤5  开通任务提交成功后,单机管理控制台返回。

步骤6  定位至已购买的数据同步实例,单机配置网络链路。

步骤7  配置同步通道的源实例及目标实例信息

配置完成,单机页面右下角的授权白名单并进行下一步。

步骤8  单机页面右下角的下一步,允许将 MaxCompute 中的项目的下述权限授予给DTS同步账号,详情如下图所示。

步骤9  配置同步策略和同步对象。

上述配置完成后,单机页面右下角的预检查并启动

步骤10  等待同步作业的链路初始化完成,直至处于同步中状态。

可以在数据同步页面,查看数据同步作业的状态。

2.4.1.2.验证 DTS 数据实时同步

同步过程介绍:

1)结构初始化。

DTS 将源库中待同步表的结构定义信息同步至 MaxCompute 中,初始化时 DTS 会为表名增加_ base 后缀。例如源表为 user,那么 MaxCompute 中的表即为user_base.

2)全量数据初始化。

DTS将源库中待同步表的存量数据,全部同步至 MaxCompute 中的目标表名 _base表中(例如从源库的 user 表同步至 MaxCompute 的 user_base,表),作为后续增量同步数据的基线数据。

3)增量数据同步。

DTS 在 MaxCompute 中创建一个增量日志表,表名为同步的目标表名 _log,例如user_log,然后将源库产生的增量数据实时同步到该表中。

增量日志表结构定义说明请参考:

https://help.aliyun.com/document_detail/44547.html?#title-g2g-87l-hfi

步骤1  请参考1.1.2.2.创建数据库和账号的步骤8登录RDS数据库。

步骤2  在数据管理 DMS 控制台,选择导航 SQ 操作>SQL窗口,复制并粘贴如下SQL 语句写入测试数据,单击执行(F8)。

--写入测试用户

insert into user(user_name,phone_num,email,acct)

values(‘test_acct’,‘18866668888',‘test@test.com',300.99);

--写入测试商品及库存

insert into inventory(inventory_name,price_unit,inventory_num)

values(‘lPhone X’,4999,300);

步骤3  登录 DataWorks 控制台 (https://workbench.data.aliyun.com/console)

步骤4  在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。

步骤5  在左侧导航栏选择公共表,单击 user_log,单击数据预览。

可以查看到刚在数据库 shopping-db 的 user 表写入的用户 test_acct,已经实时同步到了 bp_data_warehouse 的 user_log 表。

如果没有出现数据,请耐心等待 2~3 分钟后再查看。

举例调整 iphone 12  的价格

检查后发现数据已生成

2.4.2.新增表数据同步处理

DTS 支持在数据同步的过程中新增同步对象。

步骤1  登录 DTS 控制台。(https:/ldts.console.aliyun.com/#/home/)

步骤2  左侧导航栏,单击数据同步。

步骤3  在同步作业列表页面顶部,选择数据同步实例所属地域。

步骤4  找到目标同步作业,单击其操作列的更多>修改同步对象。

步骤5  在源库对象区域框中,单机需要新增的同步对象,并单机>向右小箭头移动至已选择对象区域框中,

步骤6  单机预检查并启动。

2.5.ETL幂等实现

根据等幂性原则,1个任务多次运行的结果是一致的。

如果数据抽取不幂等,例如凌晨0:00启动的 ETL 任务,因为各种原因(数据库HA切换、网络抖动或 MaxCompute 写入失败等)失败后,再次抽取将无法获取0:00时的数据状态。

DTS 使用 binlog 作为大数据同步的手段,将全量数据同步到 base 表,增量数据依赖binlog 实时同步到 log 表,统一做合表清洗可以拿到任意时间点的快照数据,从而实现 ETL 幂等,大大提高数据仓库的数据质量。

步骤1  请参考1.1.2.2.创建数据库和账号的步骤8登录 RDS 数据库。

步骤2  在数据管理 DMS 控制台,选择导航 SQL 操作>SQL 窗口,复制并粘贴如下SQL 语句写入测试数据,单击执行 (F8)。

注意:在相隔一分钟,两次调整了商品名为 IPhone X 的商品单价。

--多次调整价格 RDS

update inventory set price_unit=3999 where inventory_name=‘IPhone X’,

select sleep(60);

update inventory set price_unit=4999 where inventory name=‘IPhone X’,

步骤3  登录 DataWorks 控制台。(https://workbench.data.aliyun.com/console)

步骤4  在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。

步骤5  在左侧导航栏选择临时查询,单击新建节点 >QDPS SQL。

步骤6  输入节点名称,单次提交。

步骤7  复制粘贴实 例 SQL 语句,运行。

通过结果可以验证,通过 MaxCompute 的 SQL 命 令,对全量基线表 _base 和增量日志表_log 执行合并操作,得到某个时间点的 snapshot 数据,保证抽取的幂等性。

举例如下,修改Iphone 12 价格。

点击执行,在临时查询中修改时间运行

将时间改为17:19

改为17:20,17:21等又出现不同的价格。

2.6.数据回溯实现

本节演示快速回溯数据的增删改生命周期。

步骤1  登录 DataWorks,控制台。(https://workbench.data.alivyun.com/console)

步骤2  在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。

步骤3  在左侧导航栏选择临时查询,单击新建节点 >QDPS SQL。

步骤4  输入节点名称,单次提交。

步骤5  复制粘贴实例 SQL 语句,运行。

通过结果可以验证,即使原表没有创建和修改时间,也可以通过 binlog 生成时间new_dts_sync_utc_timestamp 实现商品名为 IPhone X 的库存数据回溯。

相关实践学习
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
7月前
|
数据可视化 Java BI
将 Spring 微服务与 BI 工具集成:最佳实践
本文探讨了 Spring 微服务与商业智能(BI)工具集成的潜力与实践。随着微服务架构和数据分析需求的增长,Spring Boot 和 Spring Cloud 提供了构建可扩展、弹性服务的框架,而 BI 工具则增强了数据可视化与实时分析能力。文章介绍了 Spring 微服务的核心概念、BI 工具在企业中的作用,并深入分析了两者集成带来的优势,如实时数据处理、个性化报告、数据聚合与安全保障。同时,文中还总结了集成过程中的最佳实践,包括事件驱动架构、集中配置管理、数据安全控制、模块化设计与持续优化策略,旨在帮助企业构建高效、智能的数据驱动系统。
356 1
将 Spring 微服务与 BI 工具集成:最佳实践
|
6月前
|
存储 Prometheus 监控
136_生产监控:Prometheus集成 - 设置警报与指标选择与LLM部署监控最佳实践
在大语言模型(LLM)部署的生产环境中,有效的监控系统是确保服务稳定性、可靠性和性能的关键。随着LLM模型规模的不断扩大和应用场景的日益复杂,传统的监控手段已难以满足需求。Prometheus作为当前最流行的开源监控系统之一,凭借其强大的时序数据收集、查询和告警能力,已成为LLM部署监控的首选工具。
709 6
|
7月前
|
人工智能 安全 API
Dify平台集成安全护栏最佳实践
Dify平台提供低代码构建AI大模型应用的解决方案,支持云服务与私有化部署。本文介绍了在工作流和Agent中集成安全护栏的最佳实践,包括插件和扩展API两种方案。插件方式适用于工作流,一键安装实现输入输出防控;扩展API方式适用于Agent和工作流私有化部署场景,通过本地服务适配安全护栏API。文中还详细说明了操作步骤、前提条件及常见问题处理方法,帮助用户快速实现内容安全控制。
|
9月前
|
Java 关系型数据库 MySQL
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
852 2
|
9月前
|
存储 SQL 分布式计算
MaxCompute x 聚水潭:基于近实时数仓解决方案构建统一增全量一体化数据链路
聚水潭作为中国领先的电商SaaS ERP服务商,致力于为88,400+客户提供全链路数字化解决方案。其核心ERP产品助力企业实现数据驱动的智能决策。为应对业务扩展带来的数据处理挑战,聚水潭采用MaxCompute近实时数仓Delta Table方案,有效提升数据新鲜度和计算效率,提效比例超200%,资源消耗显著降低。未来,聚水潭将进一步优化数据链路,结合MaxQA实现实时分析,赋能商家快速响应市场变化。
388 0
|
12月前
|
人工智能 运维 关系型数据库
云服务API与MCP深度集成,RDS MCP最佳实践
近日,阿里云数据库RDS发布开源RDS MCP Server,将复杂的技术操作转化为自然语言交互,实现"对话即运维"的流畅体验。通过将RDS OpenAPI能力封装为MCP协议工具,用户只需像聊天一样描述需求,即可完成数据库实例创建、性能调优、故障排查等专业操作。本文介绍了RDS MCP(Model Context Protocol)的最佳实践及其应用,0代码,两步即可轻松完成RDS实例选型与创建,快来体验!
云服务API与MCP深度集成,RDS MCP最佳实践
|
Devops 测试技术 持续交付
软件测试中的自动化与持续集成:最佳实践与挑战
在快速迭代的软件开发周期中,自动化测试和持续集成(CI)已成为提高软件质量和加速产品上市的关键策略。本文探讨了自动化测试和CI的实施如何帮助开发团队提前发现缺陷、缩短反馈循环,并确保代码质量。我们将深入分析自动化测试的策略选择、工具应用以及面临的挑战,同时提供一些克服这些挑战的最佳实践。
580 27
|
SQL 存储 分布式计算
MaxCompute近实时数仓能力升级
本文介绍了阿里云自研的离线实时一体化数仓,重点涵盖MaxCompute和Hologres两大产品。首先阐述了两者在ETL处理、AP分析及Serverless场景中的核心定位与互补关系。接着详细描述了MaxCompute在近实时能力上的升级,包括Delta Table形态、增量计算与查询支持、MCQ 2.0的优化等关键技术,并展示了其性能提升的效果。最后展望了未来在秒级数据导入、多引擎融合及更高效资源利用方面的改进方向。
|
分布式计算 DataWorks 搜索推荐
DataWorks产品评测:大数据开发治理平台的最佳实践与体验
DataWorks是阿里云推出的一款大数据开发治理平台,集成了多种大数据引擎,支持数据集成、开发、分析和任务调度。本文通过用户画像分析的最佳实践,评测了DataWorks的功能和使用体验,并提出了优化建议。通过实践,DataWorks在数据整合、清洗及可视化方面表现出色,适合企业高效管理和分析数据。
552 0