DTS 数据同步集成 MaxCompute 数仓最佳实践|学习笔记

本文涉及的产品
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
简介: 快速学习 DTS 数据同步集成 MaxCompute 数仓最佳实践

开发者学堂课程【SaaS 模式云数据仓库系列课程 —— 2021数仓必修课:DTS 数据同步集成 MaxCompute 数仓最佳实践】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/55/detail/1056


DTS 数据同步集成 MaxCompute 数仓最佳实践


内容简介:

1.演示基础环境部署

2.MaxCompute 数仓搭建

 

场景描述

本文 Step by Step介绍了通过数据传输服务 DTS 实现从云数据库 RDS 到MaxCompute的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现数据 ETL 幂等和数据生命周期快速回溯。

 

解决问题-

1.实现大数据实时同步集成。

2.实现数据 ETL 幂等。

3.实现数据生命周期快速回溯。

 

产品列表

·MaxCompute

·数据传输服务 DTS

·DataWorks

·云数据库 RDS MySQL 版

 

最佳实践概述

客户 T+1 数仓传统 ETL 存在以下痛点:

1)数据抽取不幂等或容错率低,如凌晨0:00启动的 ETL 任务因为各种原因(数据库HA 切换、网络抖动或 MAXC 写入失败等)失败后,再次抽取无法获取0:00时的数据状态。

2)针对不规范设计表,如没有 create  time/update time 的历史遗留表,传统ETL需全量抽取。

3)实时性差,抽取数据+重试任务往往需要 1-3小 时。

另外数据库的数据生命周期回溯难,如客户想回溯一年内某些数据的增删改生命周期,需要从 OSS 拉取一年的 binlog 解析和分析,十分困难,费时费力。

本文 Step by Step 介绍了通过数据传输服务 DTS 实现从云数据库 RDS 到MaxCompute 的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现

·DTS 简便易用,可以通过控制台一键完成同步,也可以通过 OpenAPI 批量生成。

·DTS 将 binlog 作为大数据同步的手段,能够实现 ETL 幂等,大大提高数据仓库的数据质量。

·针对不规范设计的表,仍然可以通过 binlog 的时间来生成创建和修改时间。

·实时性提高,将凌晨的批量抽取改为准实时同步,凌晨 0:00 过后几分钟就可以开始批处理任务。

·通过 MaxCompute 的分布式计算能力,能够快速回溯数据的增删改生命周期。

 

1.演示基础环境部署

本章节在阿里云上搭建演示基础环境云数据库 RDS MySQL 版并构造相关数据。本最佳实践推荐客户使用云数据库 RDS MySQL  版,但 对 ECS 上的自建数据库、以及通过专线/VP N 网关/智能网关接入的自建数据库同样适用。

1.1.环境部署

1.1.1.创建专有网络 VPC

1.1.2.部署云数据库 RDS

1.2.模拟数据构造

本节模拟构造电商数据库 shopping-db。shopping-db 一共包含三个表,分别为用户表 user、库存表 inventory 和订单表 orders。

步骤1  在数据管理 DMS 控制台,选择导航 SQL操>SQL 窗口。

步骤2  复制并粘贴以下 SQL 语句创建用户表 user、库存表 inventory 和订单表orders,单击执行(F8)。

(示例 demo 代码可以通过执行以下命令获取:

git clone git@code.aliyun.com:best-practice/146.git)

--用户/账户表

create table user(

user id bigint not null auto increment comment ‘user id用户ID’,

user name varchar(3o) comment ‘用户名’,

phone num varchar(20) comment ‘手机号’,

email varchar(100) comment ‘email’,

acct decimal(18,2) comment '账户余额,

primary key (user_id),

arcs

key l1 (user_name)

);

--库存表

create table inventory(

inventory_id bigint not nult auto_increment comment 'inventory_id库存商品ID',

inventory_name varchar(30) comment ‘商品名’,

price_unit decimal(18,2) comment ‘商品单价’,

inventory_num bigint not null default 0 comment ’剩余库存数’,

primary key(inventory_id)

);

--订单表

create table orders(

order_id bigint not null auto_increment comment 'order_id订单ID',

user_id. bigint not null comment 'user_id用户ID',

orice_unit decimal(18,2) comment '商品单价’,

order_num bigint not null default 0 comment ‘订单购买数’,

create_time datetime not null default current timestamp,

update_time datetime not nuit default current_timestamp on update current_timestamp,

primary key(ordex_id),

key l1 (user_id),

key l2(inventory_id),

);

 

2.MaxCompute 数仓搭建

本章通过 DTS 实现从云数据库 RDS 到 MaxCompute,的数据同步集成,并介绍如何使用 DTS 和 MaxCompute 数仓联合实现数据ETL幂等和数据生命周期快速回溯。

2.1.开通 DataWorks

DataWorks:是一个提供了大数据 OS 能力、并以 all in one box 的方式提供专业高效、安全可靠的一站式大数据智能云研发平台。同时能满足用户对数据治理、质量管理需求,赋予用户对外提供数据服务的能力。

数据传输服务 DTS:数据传输服务 (Data Transmission Service)DTS 支持关系型数据库、NoSQL、大数据(OLAP)等数据源间的数据传输。它是一种集数据迁移、数据订阅及数据实时同步于一体的数据传输服务。数据传输致力于在公共云、混合云场景下,解决远距离、毫秒级异步数据传输难题。

MaxCompute:MaxCompute (原 ODPS)是一项大数据计算服务,它能提供快速、完全托管的 PB 级数据仓库解决方案,使您可以经济并高效的分析处理海量数据。-

2.3.创建 MaxCompute 项目

步骤1  登录 DataWorks 控制台。(https://workbench.data.aliyun.com/console)

步骤2  在左侧导航栏选择工作空间列表,切换地域为华东1(杭州),在工作空间列表页面,单击创建工作空间。

步骤3  在基本配置页,输入工作空间名称 bp_data_warehouse,其他配置保持默认,单击下一步。注:工作空间名称每 个 Region 内唯一,请自定义名称。 

步骤4  在选择引擎页里,在选择计算引擎服务区域,勾选 MaxCompute 按量付费,单机下一步。 

步骤5  在引擎详情页,输入实例显示名称和 MaxCompute 项目名称,单击创建工作空间。

l 实例显示名称:bp_data_warehouse

l MaxCompute 项目名称:bp_data_warehousee

l MaxCompute 访问身份:阿里云主账号

l Quota 组切换:按量付费默认资源

步骤6  在工作间列表页面,可以查看到状态为正常的工作空间

2.4.DTS 数据同步集成 MaxCompute

根据需要同步的数据在写入后是否发生变化,分为恒定的存量数据(通常是日志数据)和持续更新的数据(例如本示例库存表中,商品单价会发生变化)。

对恒定的存量数据进行增量同步,由于数据生成后不会发生变化,因此可以很方便地根据数据的生成规律进行分区。较常见的是根据日期进行分区,例如每天1个分区。

根据数据仓库反映历史变化的特点,需要对持续更新的数据进行增量同步。

传统 ETL 工具提供每日增量、每日全量的上传方式,但由于数据库数据的增量上传是通过数据库提供数据变更的日期字段来实现,要求您的数据库有数据变更的日期字段。

数据传输服务 DTS 数据同步功能通过 binlog 实现。对无数据变更日期的表,仍然可以通过 binlog 的时间来生成创建/修改时间,兼容性更好,同时使用DTS提高了数据集成的实时性,实现准实时同步。

2.4.1.1.配置 DTS 数据同步

注意事项:

1)DTS 在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢 SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的 CPU 负载在30%以下)。

2)仅支持表级别的数据同步。

3)在数据同步时,请勿对源库的同步对象使用 gh-ost 或 pt-online-schema-change 等类似工具执行在线 DDL 变 更,否则会导致同步失败。

4)由于 Maxcompute 不支持主键约束,当 DTS 在同步数据时因网络等原因触发重传,可能会导 致 MaxCompute 中出现重复记录。

步骤1  登录数据传输 DTS 控制台。(https://dts.console.aliyun.com/#/home/)

步骤2  在左侧导航栏选择数据同步,在数据同步页面,选择华东1(杭州),并单击创建同步作业。

步骤3  选择数据传输服务 DTS(后付费),完成以下配置,其他配置保持默认,并单机立即购买。

步骤4  在确认订单页面,确认各项参数信息。确认无误,阅读、同意并勾选《数据传输服务(后付费)服务协议》前的复选框,并单机去开通。

步骤5  开通任务提交成功后,单机管理控制台返回。

步骤6  定位至已购买的数据同步实例,单机配置网络链路。

步骤7  配置同步通道的源实例及目标实例信息

配置完成,单机页面右下角的授权白名单并进行下一步。

步骤8  单机页面右下角的下一步,允许将 MaxCompute 中的项目的下述权限授予给DTS同步账号,详情如下图所示。

步骤9  配置同步策略和同步对象。

上述配置完成后,单机页面右下角的预检查并启动

步骤10  等待同步作业的链路初始化完成,直至处于同步中状态。

可以在数据同步页面,查看数据同步作业的状态。

2.4.1.2.验证 DTS 数据实时同步

同步过程介绍:

1)结构初始化。

DTS 将源库中待同步表的结构定义信息同步至 MaxCompute 中,初始化时 DTS 会为表名增加_ base 后缀。例如源表为 user,那么 MaxCompute 中的表即为user_base.

2)全量数据初始化。

DTS将源库中待同步表的存量数据,全部同步至 MaxCompute 中的目标表名 _base表中(例如从源库的 user 表同步至 MaxCompute 的 user_base,表),作为后续增量同步数据的基线数据。

3)增量数据同步。

DTS 在 MaxCompute 中创建一个增量日志表,表名为同步的目标表名 _log,例如user_log,然后将源库产生的增量数据实时同步到该表中。

增量日志表结构定义说明请参考:

https://help.aliyun.com/document_detail/44547.html?#title-g2g-87l-hfi

步骤1  请参考1.1.2.2.创建数据库和账号的步骤8登录RDS数据库。

步骤2  在数据管理 DMS 控制台,选择导航 SQ 操作>SQL窗口,复制并粘贴如下SQL 语句写入测试数据,单击执行(F8)。

--写入测试用户

insert into user(user_name,phone_num,email,acct)

values(‘test_acct’,‘18866668888',‘test@test.com',300.99);

--写入测试商品及库存

insert into inventory(inventory_name,price_unit,inventory_num)

values(‘lPhone X’,4999,300);

步骤3  登录 DataWorks 控制台 (https://workbench.data.aliyun.com/console)

步骤4  在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。

步骤5  在左侧导航栏选择公共表,单击 user_log,单击数据预览。

可以查看到刚在数据库 shopping-db 的 user 表写入的用户 test_acct,已经实时同步到了 bp_data_warehouse 的 user_log 表。

如果没有出现数据,请耐心等待 2~3 分钟后再查看。

举例调整 iphone 12  的价格

检查后发现数据已生成

2.4.2.新增表数据同步处理

DTS 支持在数据同步的过程中新增同步对象。

步骤1  登录 DTS 控制台。(https:/ldts.console.aliyun.com/#/home/)

步骤2  左侧导航栏,单击数据同步。

步骤3  在同步作业列表页面顶部,选择数据同步实例所属地域。

步骤4  找到目标同步作业,单击其操作列的更多>修改同步对象。

步骤5  在源库对象区域框中,单机需要新增的同步对象,并单机>向右小箭头移动至已选择对象区域框中,

步骤6  单机预检查并启动。

2.5.ETL幂等实现

根据等幂性原则,1个任务多次运行的结果是一致的。

如果数据抽取不幂等,例如凌晨0:00启动的 ETL 任务,因为各种原因(数据库HA切换、网络抖动或 MaxCompute 写入失败等)失败后,再次抽取将无法获取0:00时的数据状态。

DTS 使用 binlog 作为大数据同步的手段,将全量数据同步到 base 表,增量数据依赖binlog 实时同步到 log 表,统一做合表清洗可以拿到任意时间点的快照数据,从而实现 ETL 幂等,大大提高数据仓库的数据质量。

步骤1  请参考1.1.2.2.创建数据库和账号的步骤8登录 RDS 数据库。

步骤2  在数据管理 DMS 控制台,选择导航 SQL 操作>SQL 窗口,复制并粘贴如下SQL 语句写入测试数据,单击执行 (F8)。

注意:在相隔一分钟,两次调整了商品名为 IPhone X 的商品单价。

--多次调整价格 RDS

update inventory set price_unit=3999 where inventory_name=‘IPhone X’,

select sleep(60);

update inventory set price_unit=4999 where inventory name=‘IPhone X’,

步骤3  登录 DataWorks 控制台。(https://workbench.data.aliyun.com/console)

步骤4  在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。

步骤5  在左侧导航栏选择临时查询,单击新建节点 >QDPS SQL。

步骤6  输入节点名称,单次提交。

步骤7  复制粘贴实 例 SQL 语句,运行。

通过结果可以验证,通过 MaxCompute 的 SQL 命 令,对全量基线表 _base 和增量日志表_log 执行合并操作,得到某个时间点的 snapshot 数据,保证抽取的幂等性。

举例如下,修改Iphone 12 价格。

点击执行,在临时查询中修改时间运行

将时间改为17:19

改为17:20,17:21等又出现不同的价格。

2.6.数据回溯实现

本节演示快速回溯数据的增删改生命周期。

步骤1  登录 DataWorks,控制台。(https://workbench.data.alivyun.com/console)

步骤2  在左侧导航栏选择工作空间列表,在工作空间列表页面,定位至已创建的工作空间,单击进入数据开发。

步骤3  在左侧导航栏选择临时查询,单击新建节点 >QDPS SQL。

步骤4  输入节点名称,单次提交。

步骤5  复制粘贴实例 SQL 语句,运行。

通过结果可以验证,即使原表没有创建和修改时间,也可以通过 binlog 生成时间new_dts_sync_utc_timestamp 实现商品名为 IPhone X 的库存数据回溯。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
16天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
48 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
分布式计算 DataWorks 调度
oss数据同步maxcompute报错
在使用阿里云DataWorks同步OSS数据至MaxCompute时,遇到“Input is not in the .gz format”的报错。问题源于目标目录中存在一个空文件,导致同步时识别错误。
|
3月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute 生态系统中的数据集成工具
【8月更文第31天】在大数据时代,数据集成对于构建高效的数据处理流水线至关重要。阿里云的 MaxCompute 是一个用于处理大规模数据集的服务平台,它提供了强大的计算能力和丰富的生态系统工具来帮助用户管理和处理数据。本文将详细介绍如何使用 DataWorks 这样的工具将 MaxCompute 整合到整个数据处理流程中,以便更有效地管理数据生命周期。
120 0
|
17天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1
|
20天前
|
数据采集 分布式计算 OLAP
最佳实践:AnalyticDB在企业级大数据分析中的应用案例
【10月更文挑战第22天】在数字化转型的大潮中,企业对数据的依赖程度越来越高。如何高效地处理和分析海量数据,从中提取有价值的洞察,成为企业竞争力的关键。作为阿里云推出的一款实时OLAP数据库服务,AnalyticDB(ADB)凭借其强大的数据处理能力和亚秒级的查询响应时间,已经在多个行业和业务场景中得到了广泛应用。本文将从个人的角度出发,分享多个成功案例,展示AnalyticDB如何助力企业在广告投放效果分析、用户行为追踪、财务报表生成等领域实现高效的数据处理与洞察发现。
47 0
|
3月前
|
消息中间件 分布式计算 大数据
RabbitMQ与大数据平台的集成
【8月更文第28天】在现代的大数据处理架构中,消息队列作为数据传输的关键组件扮演着重要的角色。RabbitMQ 是一个开源的消息代理软件,它支持多种消息协议,能够为分布式系统提供可靠的消息传递服务。本篇文章将探讨如何使用 RabbitMQ 与 Hadoop 和 Spark 进行集成,以实现高效的数据处理和分析。
36 1
|
3月前
|
分布式计算 大数据 数据处理
【大数据管理新纪元】EMR Delta Lake 与 DLF 深度集成:解锁企业级数据湖的无限潜能!
【8月更文挑战第26天】随着大数据技术的发展,Apache Spark已成为处理大规模数据集的首选工具。亚马逊的EMR服务简化了Spark集群的搭建和运行流程。结合使用Delta Lake(提供ACID事务保证和数据版本控制)与DLF(加强数据访问控制及管理),可以显著提升数据湖的可靠性和性能。本文通过一个电商公司的具体案例展示了如何在EMR上部署集成Delta Lake和DLF的环境,以及这一集成方案带来的几大优势:增强的可靠性、细粒度访问控制、性能优化以及易于管理的特性。这为数据工程师提供了一个高效且灵活的数据湖平台,简化了数据湖的建设和维护工作。
59 1
|
3月前
|
消息中间件 存储 大数据
大数据-数据仓库-实时数仓架构分析
大数据-数据仓库-实时数仓架构分析
137 1
|
3月前
|
机器学习/深度学习 设计模式 人工智能
面向对象方法在AIGC和大数据集成项目中的应用
【8月更文第12天】随着人工智能生成内容(AIGC)和大数据技术的快速发展,企业面临着前所未有的挑战和机遇。AIGC技术能够自动产生高质量的内容,而大数据技术则能提供海量数据的支持,两者的结合为企业提供了强大的竞争优势。然而,要充分利用这些技术,就需要构建一个既能处理大规模数据又能高效集成机器学习模型的集成框架。面向对象编程(OOP)以其封装性、继承性和多态性等特点,在构建这样的复杂系统中扮演着至关重要的角色。
66 3
|
3月前
|
分布式计算 关系型数据库 Serverless
实时数仓 Hologres产品使用合集之如何将ODPS视图表数据导入到Hologres内表
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。