使用Data Lake Analytics从OSS清洗数据到AnalyticDB for MySQL 2.0

简介: 前提 必须是同一阿里云region的Data Lake Analytics(DLA)到AnalyticDB的才能进行清洗操作; 开通并初始化了该region的DLA服务(目前仅支持上海region(华东2),后续会同步其他region); 开通并购买了AnalyticDB的实例,实例规模和数据清洗速度强相关,与AnalyticDB的实例资源规模基本成线性比例关系。

前提

  • 必须是同一阿里云region的Data Lake Analytics(DLA)到AnalyticDB的才能进行清洗操作;
  • 开通并初始化了该region的DLA服务;
  • 开通并购买了AnalyticDB的实例,实例规模和数据清洗速度强相关,与AnalyticDB的实例资源规模基本成线性比例关系。

整体执行流程示意图:

image.png | left | 600x533.16129032258067

步骤 1:在AnalyticDB中为DLA开通一个VPC访问点

image.png | left | 600x345.16129032258067

DLA在上海region的VPC参数信息:

  • 可用区:cn-shanghai-d
  • VPC id: vpc-uf6wxkgst74es59wqareb
  • VSwitch id: vsw-uf6m7k4fcq3pgd0yjfdnm
DLA Region 可用区 VPC id VSwitch id
华东1(杭州) cn-hangzhou-g vpc-bp1g66t4f0onrvbht2et5 vsw-bp1nh5ri8di2q7tkof474
华东2(上海) cn-shanghai-d vpc-uf6wxkgst74es59wqareb vsw-uf6m7k4fcq3pgd0yjfdnm
华北2(北京) cn-beijing-g vpc-2zeawsrpzbelyjko7i0ir vsw-2zea8ct4hy4hwsrcpd52d
华南1(深圳) cn-shenzhen-a vpc-wz9622zx341dy24ozifn3 vsw-wz91ov6gj2i4u2kenpe42
华北3(张家口) cn-zhangjiakou-a vpc-8vbpi1t7c0devxwfe19sn vsw-8vbjl32xkft0ewggef6g9
新加坡 ap-southeast-a vpc-t4n3sczhu5efvwo1gsupf vsw-t4npcrmzzk64r13e3nhhm
英国(伦敦) eu-west-1a vpc-d7ovzdful8490upm8b413 vsw-d7opmgixr2h34r1975s8a

在AnalyticDB中为DLA创建VPC的专有网络,注意,要使用MySQL命令行连接AnalyticDB的经典网络链接,执行:

alter database txk_cldsj set zone_id='xxx' vpc_id='xxx' vswitch_id='xxx';

其中,“zone_id”、“vpc_id”和“vswitch_id”分别填同region的DLA对应的VPC id和VSwitch id,见上表。

命令执行成功后,刷新DMS for AnalyticDB控制台页面,应该能看到一个VPC的URL。

步骤 2:在AnalyticDB中创建好目标的实时表

image.png | left | 600x290.80590238365494

具体AnalyticDB的建表文档请参考:https://help.aliyun.com/document_detail/26403.html

-- 例如:

-- 目标表为实时维度表:
CREATE DIMENSION TABLE etl_ads_db.etl_ads_dimension_table (
  col1 INT, 
  col2 STRING, 
  col3 INT, 
  col4 STRING,
  primary key (col1)
)
options (updateType='realtime');

-- 目标表为实时分区表:
CREATE TABLE etl_ads_db.etl_ads_partition_table (
  col1 INT, 
  col2 INT, 
  col3 INT, 
  col4 INT, 
  col5 DOUBLE, 
  col6 DOUBLE, 
  col7 DOUBLE
  primary key (col1, col2, col3, col4)
)
PARTITION BY HASH KEY(col1)
PARTITION NUM 32
TABLEGROUP xxx_group
options (updateType='realtime');

步骤 3:在DLA中创建好与AnalyticDB目标表映射的表

image.png | left | 600x533.0645161290322

DLA中的表名、列名与AnalyticDB目标表对应同名

这种情况下,建表语句会比较简单。
其中,如下参数需要指明:

-- 目标AnalyticDB
LOCATION = 'jdbc:mysql://etl_ads_db-e85fbfe8-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/etl_ads_db'

-- 目标AnalyticDB的访问用户名
USER='xxx'

-- 目标AnalyticDB的访问密码
PASSWORD='xxx'
CREATE SCHEMA `etl_dla_schema` WITH DBPROPERTIES 
( 
  CATALOG = 'ads', 
  LOCATION = 'jdbc:mysql://etl_ads_db-e85fbfe8-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/etl_ads_db',
  USER='xxx',
  PASSWORD='xxx'
);

USE etl_dla_schema;

CREATE EXTERNAL TABLE etl_ads_dimension_table (
  col1 INT, 
  col2 VARCHAR(200), 
  col3 INT, 
  col4 VARCHAR(200),
  primary key (col1)
);

CREATE EXTERNAL TABLE etl_ads_partition_table (
  col1 INT, 
  col2 INT, 
  col3 INT, 
  col4 INT, 
  col5 DOUBLE, 
  col6 DOUBLE, 
  col7 DOUBLE
  primary key (col1, col2, col3, col4)
)

步骤 4:在DLA中创建表指向源OSS数据

image.png | left | 600x533.0645161290322

CREATE SCHEMA oss_data_schema with DBPROPERTIES(
  LOCATION = 'oss://my_bucket/',
  catalog='oss'
);

CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_1 (
    col_1 INT, 
    col_2 VARCHAR(200), 
    col_3 INT, 
    col_4 VARCHAR(200)
) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' 
STORED AS TEXTFILE 
LOCATION 'oss://my_bucket/oss_table_1';


CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_2 (
  col_1 INT, 
  col_2 INT, 
  col_3 INT, 
  col_4 INT, 
  col_5 DOUBLE, 
  col_6 DOUBLE, 
  col_7 DOUBLE
) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' 
STORED AS TEXTFILE 
LOCATION 'oss://my_bucket/oss_table_2';

步骤 5:在DLA中执行INSERT FROM SELECT语句

image.png | left | 600x533.0645161290322

INSERT FROM SELECT通常为长时运行任务,建议通过异步执行方式:
注意:用MySQL命令行执行时,连接时,需要在命令行指定-c参数,用来识别MySQL语句前的hint:

mysql -hxxx -Pxxx -uxxx -pxxx db_name -c

示例:

-- 执行OSS到AnalyticDB的全量数据插入
/*+run-async=true*/
INSERT INTO etl_dla_schema.etl_dla_dimension_table 
SELECT * FROM oss_data_schema.dla_table_1;

-- 执行OSS到AnalyticDB的数据插入,包含对OSS数据的筛选逻辑
/*+run-async=true*/
INSERT INTO etl_dla_schema.etl_dla_partition_table (col_1, col_2, col_3, col_7)
SELECT col_1, col_2, col_3, col_7 
FROM oss_data_schema.dla_table_2 
WHERE col_1 > 1000 
LIMIT 10000;

注意:

  • 如果在INSERT INTO子句和SELECT子句中没有指定列信息,请确保源表和目标表的列定义顺序一致,且类型对应匹配;
  • 如果在INSERT INTO子句和SELECT子句中指定了列的信息,请确保两者中的列的顺序符合业务需要的匹配顺序,且类型对应匹配。

如果在DMS for Data Lake Analytics控制台(https://datalakeanalytics.console.aliyun.com/)执行,请选择“异步执行”。

image.png | left | 706x176

然后可以从“执行历史” 中,点击“刷新”,查看任务的执行状态。
异步执行INSERT FROM SELECT语句,会返回一个task id,通过这个task id,可以轮询任务执行情况,如果status为“SUCCESS”,则任务完成:

SHOW query_task WHERE id = '26c6b18b_1532588796832'

注意事项

  • AnalyticDB为主键覆盖逻辑,整个INSERT FROM SELECT的ETL任务失败,用户需要整体重试;
  • AnalyticDB消费数据有一定延时,在AnalyticDB端查询写入数据时,会有一定的延迟可见,具体延迟时间取决于AnalyticDB的资源规格;
  • 建议将ETL任务尽量切成小的单位批次执行,比如,OSS数据200GB,在业务允许的情况下,200GB的数据切成100个文件夹,每个文件夹2GB数据,对应DLA中建100张表,100张表分别做ETL,单个ETL任务失败,可以只重试单个ETL任务;
  • ETL任务结束后,视情况删除DLA中的表,包括映射AnalyticDB中的表、以及指向OSS数据的表。
相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
2月前
|
存储 关系型数据库 MySQL
MySQL——数据库备份上传到阿里云OSS存储
MySQL——数据库备份上传到阿里云OSS存储
96 0
|
3月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
2月前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
2月前
|
DataWorks 负载均衡 Serverless
实时数仓 Hologres产品使用合集之如何导入大量数据
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
2月前
|
SQL 消息中间件 OLAP
OneSQL OLAP实践问题之实时数仓中数据的分层如何解决
OneSQL OLAP实践问题之实时数仓中数据的分层如何解决
47 1
|
2月前
|
SQL DataWorks 数据库连接
实时数仓 Hologres操作报错合集之如何将物理表数据写入临时表
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
2月前
|
SQL 分布式计算 关系型数据库
实时数仓 Hologres操作报错合集之指定主键更新模式报错主键数据重复,该如何处理
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
2月前
|
SQL 分布式计算 MaxCompute
实时数仓 Hologres产品使用合集之如何在插入数据后获取自增的id值
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
实时数仓 Hologres产品使用合集之如何在插入数据后获取自增的id值
|
2月前
|
存储 安全 大数据
对象存储的意义:探索数据新纪元的关键基石
在信息爆炸时代,数据成为核心资产,而高效安全的数据存储至关重要。对象存储作为一种新兴技术,起源于20世纪90年代,旨在解决传统文件系统的局限性。随着云计算和大数据技术的发展,它已成为关键技术之一。对象存储具备高可扩展性、高可靠性、低成本、易于管理和多协议支持等优点。它支撑大数据发展、推动云计算繁荣、助力企业数字化转型并保障数据安全。未来,对象存储将进一步提升性能,实现智能化管理,并与边缘计算融合,获得政策支持,成为数据新时代的关键基石。
104 3
|
2月前
|
存储 SQL 人工智能
AnalyticDB for MySQL:AI时代实时数据分析的最佳选择
阿里云云原生数据仓库AnalyticDB MySQL(ADB-M)与被OpenAI收购的实时分析数据库Rockset对比,两者在架构设计上有诸多相似点,例如存算分离、实时写入等,但ADB-M在多个方面展现出了更为成熟和先进的特性。ADB-M支持更丰富的弹性能力、强一致实时数据读写、全面的索引类型、高吞吐写入、完备的DML和Online DDL操作、智能的数据生命周期管理。在向量检索与分析上,ADB-M提供更高检索精度。ADB-M设计原理包括分布式表、基于Raft协议的同步层、支持DML和DDL的引擎层、高性能低成本的持久化层,这些共同确保了ADB-M在AI时代作为实时数据仓库的高性能与高性价比

热门文章

最新文章