DataWorks是一站式智能大数据开发治理平台,深度适配阿里云MaxCompute、EMR、Hologres、Flink、PAI 等数十种大数据和AI计算服务,为数据仓库、数据湖、OpenLake湖仓一体、Data+AI解决方案提供全链路智能化的数据集成、大数据AI一体化开发、数据分析与主动式数据资产治理服务,帮助企业进行Data+AI全生命周期数据管理。自2009年以来,DataWorks以阿里巴巴集团大数据建设方法论为基础,不断沉淀数据治理最佳实践,现已广泛应用于包括政务、金融、零售、互联网、汽车、制造等众多行业,数以万计的客户信赖并选择DataWorks进行数字化升级和价值创造。
案例说明
DataWorks为您提供了一个公共的MySQL数据源,存储来自Github的公开实时数据,本案例将此数据进行同步分析,最终将分析结果通过邮件发送至指定邮箱。主要业务过程如下:
- 通过DataWorks的数据集成功能,将MySQL上的Github实时数据同步至MaxCompute。
- 将同步至MaxCompute的数据进行分析处理,查询获取过去1小时Github中Top10的代码语言及提交次数,并将处理结果存储于阿里云OSS。
- 在函数计算中开发一个Python函数,函数逻辑为将OSS中的处理结果发送至指定邮箱。
- 通过DataWorks的任务调度能力,实现过去1小时Github热门编程语言数据自动更新,并将数据处理结果发送至指定邮箱。
操作步骤
1. ETL模板配置
本实验中的,任务代码可以通过ETL工作流模板一键导入,直接体验。在导入模板后,您可以前往目标工作空间,并自行完成任务运维等后续操作。
- 仅空间管理员角色可导入ETL模板至目标工作空间,为账号授权空间管理员角色详情请参见空间级模块权限管控。
- 导入ETL工作流模板,详情请参见ETL工作流快速体验。
- ETL工作流模板快捷入口,请点击GitHub十大热门编程语言。
前置准备
进行本案例前,请务必确保已完成以下操作:
说明
- 建议开通以下云产品时选择同一地域,本文以上海地域为例。
- 如果您此前未使用过阿里云以下产品,可前往DataWorks免费试用、MaxCompute免费试用、函数计算免费试用、OSS免费试用申请免费试用资源。
- 开通大数据开发治理平台DataWorks并创建工作空间。本文以创建标准模式工作空间为例,标准模式和简单模式工作区间区别请参见简单模式和标准模式的区别。
- 新增数据集成资源组并绑定已创建的工作空间。
- 开通云原生大数据计算服务MaxCompute。
- 开通函数计算FC。
- 开通对象存储OSS服务并创建OSS Bucket。
- 确保当前账号已授予
AliyunOSSFullAccess
(对象存储OSS)和AliyunFCFullAccess
(函数计算FC)权限。详细操作,请参见查看RAM用户的权限、为RAM用户授权。
步骤一:登录并进入指定工作区间
- 登录DataWorks控制台。
- 在顶部左上角根据实际情况选择地域。
- 在左侧导航栏选择工作空间,单击指定工作空间名称,进入工作空间详情页。
步骤二:创建MaxCompute数据源并绑定到数据开发
- 在左侧导航栏选择管理中心,进入管理中心配置页面。
- 在左侧导航栏选择数据源。
- 单击新增数据源,选择数据源类型为MaxCompute,根据界面提示创建MaxCompute数据源。
- 进入数据开发 > 数据源>,找到已创建的MaxCompute数据源,进行绑定操作。
步骤三:创建OSS数据源
- 在左侧导航栏选择管理中心,进入管理中心配置页面。
- 在左侧导航栏选择数据源。
- 单击新增数据源,选择数据源类型为OSS,创建OSS数据源。
参数 |
说明 |
数据源名称 |
输入数据源名称,以字母开头,由大小写字母、数字、下划线(_)组成,最多60个字符。 |
数据源描述 |
对数据源进行简单描述,最多80个字符。 |
适用环境 |
本文以使用标准模式工作空间为例,此处选中开发和生产。 |
Endpoint |
本文以上海地域为例,此处输入 |
Bucket |
输入OSS Bucket。如果没有可用的Bucket,请创建Bucket。 |
访问模式 |
|
选择角色 |
当访问模式为RAM角色授权模式,选择RAM角色,详情请参见通过RAM角色授权模式配置数据源。 |
AccessKey ID |
当访问模式为Access Key模式,需输入AccessKey信息,详情请参见查看RAM用户的AccessKey信息。 |
AccessKey Secret |
|
资源组连通性 |
在数据集成页签下,单击已绑定的数据集成资源组操作列测试连通性,等待界面提示测试完成,连通状态列显示为可连通。 |
步骤四:配置案例
在DataWorks控制台左侧导航栏选择大数据体验 > ETL工作流模板,单击Github十大热门编程语言模板,单击载入模板,配置模板参数。
参数 |
说明 |
模板名称 |
显示当前模板名称,即“Github十大热门编程语言”。 |
工作空间 |
选择前置准备中创建的DataWorks工作空间。 |
服务开通 |
|
服务开通 |
本文涉及以下产品服务,请确保已全部开通。如果显示未开通,请参见前置准备开通。
|
MaxCompute配置 |
|
数据源类型 |
显示当前数据源类型,即MaxCompute。 |
数据源名称 |
选择步骤二中创建的MaxCompute数据源。 |
OSS配置 |
|
数据源类型 |
显示当前数据源类型,即OSS。 |
数据源名称 |
选择步骤三中创建的OSS数据源。 |
Bucket名称 |
显示步骤三创建OSS数据源时配置的Bucket。 |
选择文件夹 |
选择上述Bucket下的目录,用于存放加工后的数据。
|
完整路径 |
显示存放加工后数据的OSS路径。 |
函数计算配置 |
|
函数计算配置 |
首次配置时,单击一键创建应用,本案例将在以下产品中创建对应的数据:
|
案例参数配置 |
|
服务器地址 |
发送端服务器,格式为smtp.***.com,例如:smtp.163.com。 说明 以163邮箱为例,获取服务器地址和端口号,请参见客户端设置。 |
端口号 |
发送端服务器端口,例如:上述发送邮件服务器端口号为465。 |
用户名 |
发送端用户名,例如:发送邮箱为dw***_sender@163.com,则用户名为dw***_sender。 |
密码 |
发送端密码,即发送端邮箱密码。 |
邮件发送地址 |
发送端邮箱,例如:dw***_sender@163.com。 |
邮件接收地址 |
接收端邮箱,例如:dw***_receiver@126.com。 |
载入方式 |
首次配置时,本文将为您创建“Github十大热门编程语言”业务流程,如果后续配置且该业务流程已经存在,则按以下方式载入:
|
参数配置完成后,单击确认,进入业务流程页面。
步骤五:调试工作流
- 单击业务流程页面顶部的运行按钮,调试运行整个业务流程。
- 当界面提示运行完成后,您可登录收取数据处理结果的邮箱查看邮件。
2. 手动配置
资源准备
进行本实践前,您需先开通涉及的阿里云产品并完成以下准备工作。
说明
- 建议将以下涉及的云产品开通在同一地域,本文以均开通在上海地域为例。
- 如果您此前未使用过阿里云的以下产品,可申请免费试用资源免费使用,详情请前往DataWorks免费试用、MaxCompute免费试用、函数计算免费试用、OSS免费试用。
- 开通大数据开发治理平台DataWorks并创建工作空间(本实践以使用标准模式工作空间为例,简单模式的操作类似)。操作详情请参见开通DataWorks服务、创建工作空间。
- 开通云原生大数据计算服务MaxCompute,并创建MaxCompute项目。操作详情请参见开通MaxCompute和DataWorks。
- 开通函数计算FC。操作详情请参见步骤一:开通函数计算服务。
- 开通对象存储OSS并创建OSS Bucket。操作详情请参见控制台快速入门、步骤一:创建存储空间。
OSS侧操作:创建OSS Bucket
登录OSS控制台,在Bucket列表页面单击创建Bucket,配置Bucket名称和地域后单击确定,创建OSS Bucket。
函数计算侧操作:创建并开发函数
- 登录函数计算控制台创建服务并为服务添加OSS权限。由于后续开发的函数代码逻辑需要读取OSS Bucket中的数据并将数据发送至指定邮箱,因此需给函数计算的服务授予OSS的权限。
- 在右上角单击返回函数计算2.0,返回至函数计算2.0的工作台页面。
- 在函数计算2.0页面,切换至服务及函数页面,并在左上角切换地域,单击创建服务,配置服务名称后单击确定。
- 单击创建好的服务,单击左侧服务详情,在角色配置区域单击编辑,选择服务角色AliyunFcDefaultRole,单击保存,回到服务详情页签,在角色配置区域单击服务角色AliyunFcDefaultRole,进入RAM访问控制的角色页面。
- 单击新增授权,选择系统策略中的AliyunOSSReadOnlyAccess权限,根据界面提示进行添加。完成后即给函数计算服务授予了OSS的读权限。
- 创建函数并开发函数逻辑。
- 创建函数。
回到函数计算中创建的服务页面,单击左侧函数管理页签,单击创建函数,配置函数名称,并选择运行环境为Python3.9,其他参数可保持默认值,完成后单击创建。 - 为函数环境安装相关依赖包。
说明 : 本实践需使用oss2阿里云二方包和pandas开源三方包。其中oss2包 python3.9 runtime内置支持无需手动安装,您需参考以下步骤手动安装panadas包。
单击创建好的函数,在函数页面的函数配置页签中,单击层配置区域后的编辑,单击添加层,选择添加官方公共层后,选择Pandas1.x,完成后单击确定。
c. 回到函数页面后,单击函数代码页签,当WebIDE的Python环境加载完成后,复制以下代码至Index.py文件中,并修改其中的OSS内网Endpoint参数、邮箱相关参数。
# -*- coding: utf-8 -*- import logging import json import smtplib import oss2 import pandas as pd from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.mime.text import MIMEText from email.utils import COMMASPACE from email import encoders def handler(event, context): evts = json.loads(event) bucket_name = evts["bucketName"] file_path = evts["filePath"] auth = oss2.StsAuth(context.credentials.access_key_id, context.credentials.access_key_secret, context.credentials.security_token) endpoint = 'https://oss-{}-internal.aliyuncs.com'.format(context.region) bucket = oss2.Bucket(auth, endpoint, bucket_name) file_name = file_path for obj in oss2.ObjectIteratorV2(bucket, prefix=file_path): if not obj.key.endswith('/'): file_name = obj.key csv_file = bucket.get_object(file_name) logger = logging.getLogger() logger.info('event: %s', evts) mail_host = 'smtp.***.com' ## 邮箱服务地址 mail_port = '465'; ## 邮箱smtp协议端口号 mail_username = 'sender_****@163.com' ## 身份认证用户名:填完整的邮箱名 mail_password = 'EWEL******KRU' ## 身份认证密码:填邮箱 SMTP 授权码 mail_sender = 'sender_****@163.com' ## 发件人邮箱地址 mail_receivers = ['receiver_****@163.com'] ## 收件人邮箱地址 message = MIMEMultipart('alternative') message['Subject'] = 'Github数据加工结果' message['From'] = mail_sender message['To'] = mail_receivers[0] html_message = generate_mail_content(evts, csv_file) message.attach(html_message) # Send email smtpObj = smtplib.SMTP_SSL(mail_host + ':' + mail_port) smtpObj.login(mail_username,mail_password) smtpObj.sendmail(mail_sender,mail_receivers,message.as_string()) smtpObj.quit() return 'mail send success' def generate_mail_title(evt): mail_title='' if 'mailTitle' in evt.keys(): mail_content=evt['mailTitle'] else: logger = logging.getLogger() logger.error('msg not present in event') return mail_title def generate_mail_content(evts, csv_file): headerList = ['Github Repos', 'Stars'] # Read csv file content dumped_file = pd.read_csv(csv_file, names=headerList) # Convert DataFrame to HTML table table_html = dumped_file.to_html(header=headerList,index=False) # Convert DataFrame to HTML table table_html = dumped_file.to_html(index=False) mail_title=generate_mail_title(evts) # Email body html = f""" <html> <body> <h2>{mail_title}</h2> <p>Here are the top 10 languages on GitHub in the past hour:</p> {table_html} </body> </html> """ # Attach HTML message html_message = MIMEText(html, 'html') return html_message
说明 : 示例代码中使用到了bucketName、filePath、mailTitle这三个变量,此三个变量的取值后续通过DataWorks的函数计算节点同步取值,无需在代码中修改。
待修改参数 |
配置指导 |
OSS内网Endpoint (第20行) |
根据您当前操作的地域,将其中的 以上海地域为例,需修改参数为 各地域的OSS内网Endpoint信息请参见OSS地域和访问域名。 |
邮箱相关参数 (31~36行) |
根据实际业务需要:
说明 您可在您使用的邮箱帮助文档中查看如何获取相关取值。以163邮箱为例,您可以参考SMTP服务器是什么、如何使用授权码获取相关信息。 |
d. 完成代码开发后,单击部署代码。
DataWorks侧操作:创建数据源并绑定计算引擎
- 创建MySQL数据源。本实践使用的公共Github数据存储在公共的MySQL数据库中,您需要先创建一个MySQL数据源,用于后续同步数据至MaxCompute时对接MySQL数据库。
- 进入数据源页面。
- 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 管理中心,在下拉框中选择对应工作空间后单击进入管理中心。
- 进入工作空间管理中心页面后,单击左侧导航栏的数据源 > 数据源列表,进入数据源页面。
- 单击新增数据源,选择数据源类型为MySQL,根据界面提示配置数据源名称等参数,核心参数如下。
参数 |
说明 |
数据源类型 |
选择连接串模式。 |
数据源名称 |
自定义。本文以github_events_share为例。 |
JDBC URL |
配置为:jdbc:mysql://rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com:3306/github_events_share 重要 :该数据源仅支持数据同步场景去读取使用,其他模块不支持。 |
用户名 |
配置为:workshop |
密码 |
配置为:workshop#2017 此密码仅为本教程示例,请勿在实际业务中使用。 |
认证选项 |
无认证。 |
资源组连通性 |
单击数据集成公共资源组后的测试连通性,等待界面提示测试完成,连通状态为可连通。 |
- 创建MaxCompute数据源。后续需将Github数据同步至MaxCompute,因此您需创建一个MaxCompute数据源。
- 进入数据源页面。
- 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 管理中心,在下拉框中选择对应工作空间后单击进入管理中心。
- 进入工作空间管理中心页面后,单击左侧导航栏的数据源 > 数据源列表,进入数据源页面。
- 单击新增数据源,选择数据源类型为MaxCompute,根据界面提示配置数据源名称、对应的MaxCompute项目等参数,详细请参见创建MaxCompute数据源。
- 绑定MaxCompute数据源为计算引擎。后续需创建一个MaxCompute的SQL任务进行数据处理,因此您需要将MaxCompute数据源绑定为DataWorks的计算引擎,便于后续创建ODPS SQL节点进行SQL任务开发。
- 进入管理中心页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的管理中心,在下拉框中选择对应工作空间后单击进入管理中心。 - 单击计算引擎信息页签,在MaxCompute页签下单击数据开发-数据源,选择上述步骤中创建好的MaxCompute数据源。进行绑定操作。绑定后,才能基于数据源的连接信息读取该数据源的数据,进行后续操作。
说明 :当数据源信息发生变更时,若当前界面数据更新不及时,请刷新当前页面更新缓存数据。
DataWorks侧操作:创建业务流程并开发数据处理任务
- 进入数据开发页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发。
- 创建业务流程。
单击左上角的新建 > 新建业务流程,配置业务名称后单击新建。
- 创建业务节点并配置依赖关系。
- 双击创建的业务名称,打开业务流程页面。
- 在业务流程页面单击新建节点,拖拽离线同步节点进业务流程页面,配置节点名称后单击确认,创建一个离线同步节点。
- 重复上述步骤,再创建一个ODPS SQL节点、函数计算节点。
- 配置离线同步节点。
- 双击业务流程中创建的离线同步节点,进入离线同步节点页面。
- 配置离线同步任务的网络与资源。
配置项 |
配置说明 |
数据来源 |
选择数据来源为MySQL,数据源选择上述步骤创建的MySQL数据源。 |
数据去向 |
选择数据去向为MaxCompute,数据源选择已创建的MaxCompute数据源。 |
我的资源 |
选择右下角的更多选项 > 公共资源组(调试资源组)。 |
完成后单击下一步,根据界面提示完成网络连通测试。
c. 配置离线同步任务,核心参数如下,其他参数可保持默认。
配置项 |
配置说明 |
数据来源 |
|
数据去向 |
|
d. 单击页面右侧的调度配置,配置调度参数,核心参数如下,其他参数可保持默认。
配置项 |
配置说明 |
调度参数 |
单击加载代码中的参数,新增以下五个参数并配置参数的取值逻辑如下:
|
时间属性 |
|
调度依赖 |
勾选使用工作空间根节点 |
e. 单击右上角的保存按钮,保存节点配置。
- 配置ODPS SQL节点。
- 双击业务流程中创建的ODPS SQL节点,进入ODPS SQL节点页面。
- 将以下示例代码贴入节点中。
重要 :以下示例代码创建了一个OSS外部表,用于存储处理后的数据。如果您是首次使用OSS外部表,您还需对当前操作账号进行授权,否则后续业务流程运行会报错,授权操作请参见OSS的STS模式授权。
-- 1. 创建odps的oss外部表用于接收Github公共数据集数据加工结果。 -- 本案例创建的oss外表为odps_external,存放于在步骤1创建的OSS Bucket,本案例OSS Bucket名为xc-bucket-demo2,您需要根据实际情况进行修改。 CREATE EXTERNAL TABLE IF NOT EXISTS odps_external( language STRING COMMENT 'repo全名:owner/Repository_name', num STRING COMMENT '提交次数' ) partitioned by ( direction string ) STORED BY 'com.aliyun.odps.CsvStorageHandler' WITH SERDEPROPERTIES( 'odps.text.option.header.lines.count'='0', 'odps.text.option.encoding'='UTF-8', 'odps.text.option.ignore.empty.lines'='false', 'odps.text.option.null.indicator'='') LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/${YOUR_BUCKET_NAME}/odps_external/'; -- 2. 对同步MaxCompute的GitHub数据加工后写入MaxCompute的oss外表。 -- 查询获取过去1小时Github中Top10的代码语言及提交次数 SET odps.sql.unstructured.oss.commit.mode=true; INSERT INTO TABLE odps_external partition (direction='${day_hour}') SELECT language, COUNT(*) AS num FROM github_public_event WHERE language IS NOT NULL AND pt='${day_hour}' GROUP BY language ORDER BY num DESC limit 10;
c. 单击页面右侧的调度配置,配置调度参数,核心参数如下,其他参数可保持默认。
配置项 |
配置说明 |
调度参数 |
单击加载代码中的参数,新增以下几个参数并配置参数的取值逻辑:
|
时间属性 |
|
d. 单击右上角的保存按钮,保存节点配置。
- 配置函数计算节点。
- 双击业务流程中创建的函数计算节点,进入函数计算节点页面。
- 配置函数计算节点任务。
配置项 |
配置说明 |
选择服务 |
选择上述步骤在函数计算控制台中创建的服务。 |
选择函数 |
选择上述步骤在函数计算控制台中创建的函数。 |
调用方式 |
选择同步。 |
变量 |
配置为以下内容。
{ "bucketName": "${YOUR_BUCKET_NAME}", "filePath": "odps_external/direction=${day_hour}/", "mailTitle":"过去1小时Github中Top10的代码语言及其提交次数" } |
c. 单击页面右侧的调度配置,配置调度参数,核心参数如下,其他参数可保持默认。
配置项 |
配置说明 |
调度参数 |
单击新增参数,新增以下几个参数并配置参数的取值逻辑如下:
|
时间属性 |
|
d. 单击右上角的保存按钮,保存节点配置。
DataWorks侧:调试工作流
- 在DataWorks数据开发页面双击创建的业务名称,打开业务流程页面。
- 单击顶部的运行按钮,调试运行整个业务流程。
- 当界面提示运行完成后,您可登录收取数据处理结果的邮箱查看邮件。
DataWorks侧:提交发布工作流
(可选)后续如果您希望周期性同步数据至MaxCompute进行处理,并周期性发送处理结果到指定邮箱,您需要将业务流程提交发布至DataWorks的运维中心。
- 在数据开发页面,双击创建的业务名称,打开业务流程页面。
- 单击业务流程页面的提交按钮,根据界面提示将业务流程提交发布至运维中心,操作详情请参见发布任务。
后续业务流程即会根据配置的调度周期,周期性运行。
后续步骤:释放资源
如果您使用的是免费试用资源,或后续您不需要继续使用此实践的云产品,可释放对应的云产品资源,避免产生额外费用。
- 释放OSS资源:前往OSS控制台删除本案例使用的Bucket。
- 释放函数计算资源:前往函数计算控制台删除服务。
- 释放DataWorks资源:前往DataWorks控制台删除DataWorks工作空间。
- 释放MaxCompute资源:前往MaxCompute控制台删除MaxCompute项目。