一、DataWorks概述与对接场景
阿里云DataWorks是一站式大数据开发治理平台,提供数据集成、数据开发、数据治理、数据分析和数据服务等全链路数据开发能力。DataWorks深度集成MaxCompute、EMR、Hologres、Flink等主流计算引擎,支持跨引擎协同开发。在实际业务中,DataWorks的对接场景主要包括:将业务数据库(如MySQL、PostgreSQL)的数据批量同步至数仓进行分析;在Data Studio中创建数据库节点进行SQL任务开发与周期性调度;基于数据表快速生成API对外提供数据服务;以及通过OpenAPI以编程方式查询数据血缘、管理任务等。
需要先登录阿里云控制台,点击:阿里云控制台
二、环境准备:工作空间创建与计算资源绑定
工作空间是DataWorks中进行任务开发和成员权限管理的基本单元,所有开发工作都在DataWorks自定义的工作空间内完成。一个工作空间支持创建多种数据源,同时支持绑定多种计算资源。
2.1 创建工作空间
登录DataWorks控制台,在左上角切换至目标地域(地域创建后不可更改)。单击左侧导航栏中的"工作空间列表",进入空间列表页面后单击"创建工作空间"。在创建页面完成以下关键配置:
- 工作空间名称:输入一个唯一且符合团队规范的名称
- 生产、开发环境隔离:建议开启,创建标准模式工作空间,确保开发与生产环境隔离
- 使用新版数据开发(Data Studio):推荐开启
执行创建操作的账号需为阿里云主账号,或已授予AliyunDataWorksFullAccess等权限策略的RAM用户。DataWorks工作空间分为简单模式与标准模式:标准模式提供独立的开发与生产环境,是保障数据安全和流程规范的最佳选择;简单模式仅有生产环境,适用于个人测试或快速验证场景。
2.2 绑定计算资源
创建工作空间后,需要绑定计算资源才能进行任务开发。以MaxCompute为例,需要新建两个MaxCompute项目分别绑定到开发环境和生产环境。DataWorks构建了开放的计算引擎生态,深度集成MaxCompute、EMR、Hologres、Flink等主流引擎。
2.3 资源组规划
DataWorks当前推荐使用Serverless资源组,它涵盖旧版资源组(独享调度资源组、独享数据集成资源组、独享数据服务资源组、公共资源组)的核心功能,通过一个资源组即可完成数据同步、任务调度运行、调用及管理API服务等操作。对于历史项目仍可使用独享资源组:独享调度资源主要用于任务调度,适合高并发或对执行时间有严格要求的场景;独享集成资源主要用于数据的抽取、转换和加载过程。
三、数据源配置:对接各类数据源
使用DataWorks的数据集成、数据开发或数据分析功能前,需要先在DataWorks中添加数据源。
3.1 进入数据源配置页面
登录DataWorks控制台,切换至目标地域后,在左侧导航栏选择对应工作空间,单击进入"管理中心"。进入工作空间管理中心页面后,单击左侧导航栏的"数据源",进入数据源页面。单击"新增数据源",选择对应的数据源类型进行配置。
3.2 网络连通方案
配置数据源前需完成网络连通与白名单配置。DataWorks资源组需要能访问数据源实例。私网连接(推荐):当DataWorks资源组与数据源在同一VPC内时,使用VPC地址进行连接。公网连接:需要将DataWorks资源组的弹性公网IP添加至数据源的白名单中。如果数据源和独享数据集成资源组处于不同的网络环境中,需要通过VPN网关等方式将网络打通。
3.3 以MySQL数据源为例
DataWorks的数据集成实时同步MySQL数据是基于实时订阅MySQL实现的,目前仅支持配置MySQL数据源为RDS的5.x或8.x版本的MySQL。配置MySQL数据源时,关键参数包括:数据源名称、连接串模式或阿里云实例模式、主机地址/IP、端口号(默认3306)、数据库名称、用户名和密码。建议为DataWorks创建一个专用的MySQL账号来访问数据源。测试连通性通过后即可完成创建。
3.4 其他数据源配置要点
DataWorks数据集成支持的数据源类型非常丰富,包括RDS(MySQL、PostgreSQL、SQL Server)、ClickHouse、PolarDB-X(DRDS)、AnalyticDB、Tablestore(OTS)、MongoDB、SelectDB、StarRocks、Hologres等。不同数据源类型的配置方式有所差异:RDS数据源需配置为阿里云实例模式;PolarDB-X、Oracle、Tablestore等需选择连接串模式。以SelectDB为例,需配置主机地址/IP、端口号(MySQL协议端口9030)、HTTP连接地址、数据库名称、用户名和密码。
四、数据集成:离线同步任务配置
数据集成是DataWorks的核心模块,支持通过离线同步任务将数据从源端同步至目标端。
4.1 向导模式配置离线同步任务
向导模式是DataWorks提供的可视化配置方式,适合大多数同步场景。配置流程如下:
- 进入数据集成页面:登录DataWorks控制台,切换至目标地域,选择对应工作空间后单击进入"数据集成"
- 新建离线同步节点:在数据开发(Data Studio)中创建工作流,将"数据集成"目录下的"离线同步"节点拖拽至业务流程编辑面板
- 配置数据来源与去向:选择已配置好的源端数据源和目标端数据源,测试连通性
- 选择同步对象:选择需要同步的表或通过正则表达式匹配多张表
- 配置字段映射:配置源端字段与目标端字段的映射关系
- 配置作业速率:设置同步速率上限、脏数据检查规则等
- 配置调度属性:设置调度周期、依赖关系等
4.2 脚本模式配置离线同步任务
当需要实现更精细化的离线任务配置时,可以使用脚本模式,通过编写数据同步的JSON脚本并结合DataWorks调度参数。脚本模式适用于数据源不支持向导模式、或需要精细化控制同步逻辑的场景。
以下是一个典型的MySQL同步至MaxCompute的离线同步任务JSON脚本示例:
{ "configuration": { "reader": { "plugin": "mysql", "parameter": { "connection": [ { "jdbcUrl": ["jdbc:mysql://rm-xxxxxx.mysql.rds.aliyuncs.com:3306/test_db"], "table": ["user_table"] } ], "username": "dataworks_user", "password": "your_password", "column": ["id", "name", "created_at"], "where": "created_at >= '${bizdate}'" } }, "writer": { "plugin": "odps", "parameter": { "accessId": "${accessId}", "accessKey": "${accessKey}", "project": "your_project", "table": "target_user_table", "partition": "dt=${bizdate}", "column": ["id", "name", "created_at"], "truncate": true } }, "setting": { "speed": { "channel": 3, "mbps": 10 }, "errorLimit": { "record": 0 } } }, "type": "job", "version": "2.0" }
在脚本中,${bizdate}是调度参数,会在任务运行时被替换为实际的业务日期。reader的where条件用于实现增量同步,只同步创建时间大于等于业务日期的数据。
4.3 分库分表同步
DataWorks支持将分库分表数据同步至目标单表。有两种实现方式:分库分表数据源+向导模式,以及普通数据源+向导模式。分库分表数据源支持引用最大5000个数据源,支持正则表达式配置源表名。配置时需选择一个数据源作为Meta数据源,作为默认库表结构的模板。
五、任务调度配置
DataWorks支持分钟、小时、日、周、月、年六种调度周期。调度任务会根据调度类型及周期数生成相应的周期实例,并通过周期实例的方式自动调度运行。
5.1 调度周期与时间配置
在数据开发(Data Studio)的节点编辑页面,单击右侧的"调度配置",在"调度时间"区域设置调度周期。可配置调度频率(如每天0点执行)、生效时间范围等。
5.2 调度依赖配置
调度依赖是保障任务按正确顺序执行的关键机制。配置完成后,上游节点运行成功,下游节点才会启动运行。DataWorks支持两种依赖方式:
- 同周期依赖:当前任务依赖上游任务同周期产出的表数据时使用。需在"依赖的上游节点"区域定义当前任务依赖哪些上游任务
- 跨周期依赖:支持不同调度周期的节点互相依赖
DataWorks支持通过节点代码中的表血缘快速设置节点依赖,单击"从代码解析输入输出"可根据编辑区的最新代码自动解析依赖关系。
5.3 调度参数
调度参数是DataWorks实现动态任务配置的重要能力。数据集成任务配置过程中,支持在数据过滤相关参数中使用调度参数,实现增量同步。常见的调度参数包括bizdate(业务日期)、{bizdate}(业务日期)、{cyctime}(定时时间)等。各周期代码入参与调度参数配置与替换的关系需要根据实际调度类型进行设置。
六、数据服务API的生成与调用
DataWorks的数据服务模块支持快速将数据表生成数据API,并注册现有API至数据服务平台,帮助统一管理和发布API服务。
6.1 生成API
登录DataWorks控制台,切换至目标地域后选择对应工作空间,单击进入"数据服务"。生成API的方式有向导模式和脚本模式两种。向导模式通过可视化界面选择数据源、配置请求参数和返回参数,适合大多数场景。脚本模式通过编写SQL脚本和配置JSON来生成API,适合复杂查询场景。
6.2 API认证方式
DataWorks数据服务API支持两种认证方式:
- 简单身份认证:在请求头中添加AppCode进行认证
- 加密签名身份认证:在请求头中添加AppKey和AppSecret,通过签名算法验证请求者身份
签名认证需要经过复杂的签名算法,调用API时需要拼接签名字符串,并将签名计算后的字符串放在请求的Header传入。
6.3 API调用示例
API发布并授权后即可调用。以下是一个使用Python调用DataWorks数据服务API的示例:
import requests import json import time import hashlib import hmac import base64 # 简单身份认证方式 def call_api_with_appcode(appcode, url, params=None): headers = { 'Authorization': 'APPCODE ' + appcode, 'Content-Type': 'application/json' } response = requests.get(url, headers=headers, params=params) return response.json() # 签名认证方式 def call_api_with_signature(appkey, appsecret, url, params=None): timestamp = str(int(time.time() * 1000)) sign_str = appsecret + timestamp signature = base64.b64encode(hmac.new(appsecret.encode('utf-8'), sign_str.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8') headers = { 'X-Ca-Key': appkey, 'X-Ca-Timestamp': timestamp, 'X-Ca-Signature': signature, 'Content-Type': 'application/json' } response = requests.get(url, headers=headers, params=params) return response.json() # 调用示例 if __name__ == '__main__': url = 'https://xxxxx-cn-hangzhou.data.aliyun.com/api/v1/your_api' appcode = 'your_appcode' result = call_api_with_appcode(appcode, url, {'page': 1, 'size': 10}) print(json.dumps(result, indent=2, ensure_ascii=False))
七、OpenAPI编程调用
DataWorks提供了完整的OpenAPI(2024-05-18版本),支持以编程方式管理任务、查询血缘等。
7.1 安装Python SDK
可以通过pip命令安装阿里云Python SDK:
pip install aliyun-python-sdk-core pip install aliyun-python-sdk-dataworks-public
7.2 调用OpenAPI示例
以下是一个使用Python SDK调用DataWorks OpenAPI查询表血缘的示例:
from aliyunsdkcore.client import AcsClient from aliyunsdkdataworks_public.request.v20240518 import ListLineagesRequest import json # 初始化客户端 client = AcsClient( access_key_id='your_access_key_id', access_secret='your_access_secret', region_id='cn-hangzhou' ) # 构造请求 request = ListLineagesRequest.ListLineagesRequest() request.set_accept_format('json') # 设置参数 - 实体ID格式为 maxcompute-table:::<项目名>::<表名> request.set_EntityId('maxcompute-table:::your_project::your_table') request.set_Direction('UPSTREAM') # 查询上游血缘 request.set_Depth(3) # 查询深度 # 发起请求 response = client.do_action_with_exception(request) result = json.loads(response) print(json.dumps(result, indent=2, ensure_ascii=False))
实体ID是调用元数据和血缘相关API的核心凭据。可以通过DataWorks数据地图模块的表详情页获取实体ID,或通过ListTables、ListColumns API批量获取。
八、数据质量监控与数据治理
8.1 数据质量监控
DataWorks的数据质量监控节点通过配置数据质量监控规则,监控相关数据源表的数据质量。数据质量监控在执行时会扫描监控对象中的数据,依次计算每个规则所指定的指标值,并与期望的阈值做比较,判断规则是否通过。DataWorks提供多种预设表层级、字段层级的监控模板。在Data Studio中,支持为数据开发节点配置数据质量监控规则,用以校验该节点产出的数据表。
8.2 数据血缘
DataWorks会自动解析并记录各种计算任务产生的数据血缘关系。清晰的数据血缘能够实现数据溯源与问题排查、影响分析、数据治理与可信度评估、成本优化与资产盘点等核心价值。
九、常见对接问题与解决方案
问题一:添加数据源时测试连通性失败怎么办?
这通常是网络连通性问题。建议依次检查:网络是否可达、防火墙是否对IP或端口有限制、安全组是否已配置对IP或端口放通。RDS数据源需先为RDS添加白名单;ECS自建数据库需为ECS添加安全组。
问题二:实时同步MySQL数据时支持哪些版本?
DataWorks实时同步MySQL数据目前仅支持配置MySQL数据源为RDS的5.x或8.x版本的MySQL,不支持配置为DRDS的MySQL。如果使用DRDS,应参考配置DRDS数据源文档直接配置为DRDS数据源。
问题三:配置数据源时选择了阿里云子账号,运行任务时使用的是哪个AK/SK?
产品不会使用永久性的AccessKey。在任务运行时,系统会为每次任务动态生成临时的AK/SK,这些临时凭证具有最小化权限,并在任务完成后失效。
问题四:分库分表同步时表结构不一致怎么办?
分库分表同步要求所有数据源内的库表结构保持一致。如果部分表缺失字段,可以配置"字段缺失策略",允许缺失字段作为NULL值往下输出。
问题五:如何选择资源组类型?
当前推荐使用Serverless资源组,一个资源组即可完成数据同步、任务调度运行、API服务调用等所有操作。如果仍使用旧版资源组,独享调度资源适用于高并发任务调度场景,独享集成资源适用于大量数据迁移同步场景。
问题六:数据服务API调用时报签名错误怎么办?
首先确认AppKey和AppSecret是否正确。签名认证需要拼接签名字符串并进行HMAC-SHA256加密。检查时间戳是否在有效期内(通常为15分钟),以及签名算法是否与服务端要求一致。
十、总结
阿里云DataWorks作为一个一站式大数据开发治理平台,其对接使用的核心流程可概括为:创建工作空间并绑定计算资源、配置数据源并打通网络、通过向导模式或脚本模式配置数据集成任务、设置任务调度周期与依赖关系、通过数据服务模块生成API或使用OpenAPI进行编程调用。掌握这些核心能力,即可高效地利用DataWorks完成数据集成、开发、调度、服务和治理的全链路工作。