背景知识
DataWorks
DataWorks(大数据开发治理平台)是阿里云重要的PaaS(Platform-as-a-Service)平台产品,为您提供数据集成、数据开发、数据地图、数据质量和数据服务等全方位的产品服务,一站式开发管理的界面,帮助企业专注于数据价值的挖掘和探索。
DataWorks支持多种计算和存储引擎服务,包括离线计算MaxCompute、开源大数据引擎E-MapReduce、实时计算(基于Flink)、机器学习PAI、实时数仓Hologres,云原生数据仓库 AnalyticDB for PostgreSQL,云原生数据仓库AnalyticDB for MySQL,并且支持用户自定义接入计算和存储服务。DataWorks为您提供全链路智能大数据及AI开发和治理服务。
云原生大数据计算服务MaxCompute
MaxCompute是面向分析的企业级SaaS(Software-as-a-Service)模式云数据仓库,以Serverless架构提供快速、全托管的在线数据仓库服务,消除了传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您可以经济并高效的分析处理海量数据。数以万计的企业正基于MaxCompute进行数据计算与分析,将数据高效转换为业务洞察。
1.采集数据
1.1 新建OSS数据源。
本步骤将通过DataWorks采集日志数据至MaxCompute。
复制下方地址,在Chromium网页浏览器打开新页签,粘贴并访问DataWorks控制台。
https://workbench.data.aliyun.com/
在工作空间列表页面,找到您的资源,单击右侧操作列下的数据集成。
在数据集成首页,单击右上角的图标。
在左侧导航栏中,单击数据源管理。
在数据源管理页面,单击右上方的新增数据源。
在新增数据源对话框中,选择数据源类型为OSS。
在新增OSS数据源对话框中,配置各项参数,单击更多选项。
数据源名称:输入oss_workshop_log。
Endpoint:输入http://oss-cn-shanghai-internal.aliyuncs.com。
Bucket: 输入new-dataworks-workshop。
AccessKey ID:输入xxxxxx。
AccessKey Secret:输入xxxxxx。
在资源组列表,单击公共资源组右侧的测试连通性。
返回如下结果,连通状态为可连通,表示数据集成资源组能够与数据源连通。
在新增OSS数据源对话框中,单击完成。
1.2 新建OSS数据源。
在新增MySQL数据源对话框中,配置各项参数,单击更多选项。
参数说明:
数据源类型:选择连接串模式。
数据源名称:输入rds_workshop_log。
JDBC URL:输入jdbc:mysql://rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com:3306/workshop。
用户名:输入workshop。
密码:输入workshop#2017。
认证选项:选择无认证。
返回如下结果,连通状态为可连通,表示数据集成资源组能够与数据源连通。
1.3 创建业务流程。
在数据开发页面,右键单击业务流程,选择新建业务流程。
在新建业务流程对话框中,输入业务名称,例如test,单击新建
在业务流程开发面板,单击虚拟节点并拖拽至右侧的编辑页面
在新建节点对话框中,节点名称输入为workshop_start,单击提交。
在业务流程开发面板,单击离线同步并拖拽至右侧的编辑页面。
在新建节点对话框中,节点名称输入为OSS_数据同步,单击提交。
在业务流程开发面板,单击离线同步并拖拽至右侧的编辑页面。
在新建节点对话框中,节点名称输入为rds_数据同步,单击提交。
在右侧的编辑页面,通过拖拽连线,将workshop_start节点设置为两个离线同步节点的上游节点。
1.4 配置workshop_start节点。
在数据开发页面左侧,选择业务流程>您的业务流程>通用,双击虚拟节点workshop_start。
在该节点的编辑页面,单击右侧的调度配置。
在调度配置面板的时间属性区域,重跑属性选择为运行成功或失败后皆可重跑。在调度配置面板的调度依赖区域,单击使用工作空间根节点,设置workshop_start节点的上游节点为工作空间根节点。
由于新版本给每个节点都设置了输入输出节点,所以需要给workshop_start节点设置一个输入。此处设置其上游节点为工作空间根节点,通常命名为工作空间名称_root。
在该节点的编辑页面左上方,单击图标保存配置。
1.5 新建表
在数据开发页面,选择业务流程>MaxCompute,右键单击表,单击新建表。
在新建表对话框中,表名输入ods_raw_log_d,单击新建。
在表ods_raw_log_d的编辑页面上方,单击DDL。
在DDL模式对话框中,输入如下创建OSS日志对应目标表的建表语句,单击生成表结构。
CREATE TABLE IF NOT EXISTS ods_raw_log_d (
col STRING
)
PARTITIONED BY (
dt STRING
);
在表ods_raw_log_d的编辑页面的基本属性区域,中文名输入为OSS日志对应目标表,单击提交到生产环境。
在数据开发页面,选择业务流程>MaxCompute,右键单击表,单击新建表。
在新建表对话框中,表名输入ods_user_info_d,单击提交。
在表ods_user_info_d的编辑页面上方,单击DDL。在DDL模式对话框中,输入如下创建RDS对应目标表的建表语句,单击生成表结构。
CREATE TABLE IF NOT EXISTS ods_user_info_d (
uid STRING COMMENT 'uid',
gender STRING COMMENT 'gender',
age_range STRING COMMENT 'age_range',
zodiac STRING COMMENT 'zodiac'
)
PARTITIONED BY (
dt STRING
);
在表ods_user_info_d的编辑页面的基本属性区域,中文名输入为RDS对应目标表,单击提交到生产环境。
1.6 配置离线同步节点
在数据开发页面左侧,选择业务流程>您的业务流程>数据集成,双击oss_数据同步。
在oss_数据同步页面的数据来源中,配置如下参数,其他配置保持默认。
参数说明:
数据源:选择OSS>oss_workshop_log数据源。
文件名(含路径):输入user_log.txt。
文件类型:选择text类型。
列分隔符:输入列分隔符为|。
在oss_数据同步页面的数据去向中,配置如下参数,其他配置保存默认。
参数说明:
数据源:选择ODPS>odps_first数据源。
表:选择数据源中的ods_raw_log_d表。
odps_first数据源是工作空间绑定MaxCompute实例时,系统自动生成的默认数据源。odps_first数据源写入至当前工作空间下的MaxCompute项目中。
在页面右侧,单击调度配置。
在调度配置面板的时间属性区域,重跑属性选择为运行成功或失败后皆可重跑。在调度配置面板的调度依赖区域的本节点的输出中,输入本节点的输出名称为工作空间名称.ods_raw_log_d,单击添加。
在页面右侧,单击数据集成资源组配置。
在数据集成资源组配置面板,单击更多选项。
在警告对话框中,单击确认。
在数据集成资源组配置面板,方案选择调试资源组。
在数据开发页面左侧,选择业务流程>数据集成,双击rds_数据同步。
在rds_数据同步页面的数据来源中,配置如下参数,其他配置保持默认。
参数说明:
数据源:选择MySQL>rds_workshop_log数据源。
表:选择数据源中的ods_user_info_d表。
在rds_数据同步页面的数据去向中,配置如下参数,其他配置保存默认。
参数说明:
数据源:选择ODPS>odps_first数据源。
表:选择数据源中的ods_user_info_d表。
在页面右侧,单击调度配置。
在调度配置面板的时间属性区域,重跑属性选择为运行成功或失败后皆可重跑。在调度配置面板的调度依赖区域的本节点的输出中,输入本节点的输出名称为工作空间名称.ods_user_info_d,单击添加。
在数据集成资源组配置面板,单击更多选项。
在数据集成资源组配置面板,方案选择调试资源组。
1.7 提交业务流程
在数据开发页面左侧,双击您的业务流程。
在业务流程的编辑页面上方的菜单栏中,单击 提交图标。
在提交对话框中,选择所有的节点,输入备注,选择忽略输入输出不一致的告警,单击提交。
返回如下页面, 等待所有节点提交成功,单击 图标关闭即可。
1.8运行业务流程
在上方菜单栏中,单击图标运行。
在业务流程的编辑页面,右键单击rds_数据同步。
在功能列表,单击查看日志。
返回结果如下,表示rds_数据同步节点运行成功,并成功同步数据。
在日志页面右上角,单击图标关闭日志。
在业务流程的编辑页面,右键单击oss_di(即下图中的oss_数据同步)。
在功能列表,单击查看日志。返回结果如下,表示oss_数据同步节点运行成功,并成功同步数据。
1.9 确认数据是否成功导入MaxCompute
在数据开发页面的左侧导航栏,单击临时查询。
在临时查询页面左侧,右键单击临时查询,选择新建节点>ODPS SQL。
在新建节点对话框,单击提交。
在SQL查询页签,输入如下SQL语句,单击 图标运行,查看导入ods_raw_log_d和ods_user_info_d的记录数。
SQL语句中字段dt的参数${bdp.system.bizdate}表示业务日期,例如,任务运行的日期为20220916,则业务日期为20220915,即任务运行日期的前一天。
select count(*) from ods_raw_log_d where dt=${bdp.system.bizdate};
select count(*) from ods_user_info_d where dt=${bdp.system.bizdate};
在参数对话框中, 单击确定。
在MaxCompute计算成本估计对话框中, 单击运行。
返回如下结果,您可以查看到导入ods_raw_log_d表和ods_user_info_d表的记录数,表示OSS和RDS中的数据成功导入到MaxCompute。