通过DataWorks使用PyODPS

本文涉及的产品
大数据开发治理平台 DataWorks,不限时长
简介: PyODPS为MaxCompute的Python版SDK,支持在DataWorks中开发运行PyODPS任务。本文为您介绍在DataWorks上使用PyODPS的使用限制、主要流程和部分简单应用示例

PyODPS为MaxCompute的Python版SDK,支持在DataWorks中开发运行PyODPS任务。本文为您介绍在DataWorks上使用PyODPS的使用限制、主要流程和部分简单应用示例。

使用限制
使用方式限制

如果您发现有Got killed报错,即表明内存使用超限,进程被中止。请避免在PyODPS节点中直接下载数据并在DataWorks中处理数据,建议将数据处理任务提交到MaxCompute进行分布式执行处理,两种方式的对比详情请参见注意事项:请勿下载全量数据到本地并运行PyODPS。

包支持限制

DataWorks的PyODPS节点缺少matplotlib等包,如下功能可能受限:

DataFrame的plot函数。

DataFrame自定义函数需要提交到MaxCompute执行。由于Python沙箱限制,第三方库只支持所有的纯粹Python库以及NumPy,因此不能直接使用Pandas。

DataWorks中执行的非自定义函数代码可以使用平台预装的NumPy和Pandas。不支持其他带有二进制代码的第三方包。

DataWorks的PyODPS节点不支持Python的atexit包,请使用try-finally结构实现相关功能。

读取数据记录数限制

DataWorks的PyODPS节点中,options.tunnel.use_instance_tunnel默认设置为False,即默认情况下,最多读取一万条数据记录。如果需要读取更多数据记录,需全局开启instance tunnel,即需要手动将options.tunnel.use_instance_tunnel设置为True。

主要流程
创建PyODPS节点。

您可以进入DataWorks的数据开发页面创建PyODPS节点。PyODPS节点分为PyODPS 2和PyODPS 3两种:

PyODPS 2底层Python语言版本为Python 2。

PyODPS 3底层Python语言版本为Python 3。

您可根据实际使用的Python语言版本创建PyODPS节点,创建PyODPS节点的详细操作步骤请参见开发PyODPS 2任务和开发PyODPS 3任务。新建节点

开发PyODPS任务代码。

创建完成后,您可参考下文内容进行简单示例的操作学习,了解PyODPS的主要能力维度。

ODPS入口

执行SQL

DataFrame

获取调度参数

设置运行参数hints

更多PyODPS的使用指导请参见基本操作概述、DataFrame概述。您也可以参考示例文档:使用PyODPS节点进行结巴中文分词,进行一个端到端的简单操作。

进行调度配置,完成后保存、提交、发布节点,后续即可周期性运行任务。

ODPS入口
DataWorks的PyODPS节点中,将会包含一个全局变量odps或者o,即为ODPS入口。您不需要手动定义ODPS入口。 命令示例如下。

查看表pyodps_iris是否存在

print(o.exist_table('pyodps_iris'))
返回True,表示表存在。

执行SQL
通用能力
在PyODPS节点中运行SQL命令:例如使用execute_sql()/run_sql()来执行SQL命令,当前主要支持运行DDL、DML类型的SQL命令。

说明
可以执行的SQL语句并非都可以通过入口对象的execute_sql()和run_sql()方法执行。在调用非DDL或非DML语句时,请使用其他方法。例如,调用GRANT或REVOKE语句时,请使用run_security_query方法;调用API命令时,请使用run_xflow或execute_xflow方法。

在PyODPS节点中读取SQL运行结果:例如使用open_reader()来读取SQL命令运行结果。

更多PyODPS节点的SQL相关操作详情请参见SQL。

注意事项:数据记录数量限制
DataWorks上默认没有开启instance tunnel,即instance.open_reader默认使用Result接口(存在limit限制,最多读取一万条数据记录)。如果您需要迭代获取全部数据,则需要开启instance tunnel并关闭limit限制。

全局关闭limit限制

您可以通过下列语句在全局范围内打开Instance Tunnel并关闭limit限制。

options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False # 关闭limit限制,读取全部数据。

with instance.open_reader() as reader:

通过Instance Tunnel可读取全部数据。

您可以通过reader.count获取记录数。

仅本次运行关闭limit限制

您也可以通过在open_reader上添加tunnel=True,实现仅对本次open_reader开启instance tunnel。同时,您还可以添加 limit=False,实现仅对本次关闭limit限制。

重要
若您未开启Instance Tunnel,可能导致获取数据格式错误,解决方法请参见Python SDK常见问题。

with instance.open_reader(tunnel=True, limit=False) as reader:

本次open_reader使用Instance Tunnel接口,且能读取全部数据。

DataFrame
执行

在DataWorks的环境里,DataFrame的执行需要显式调用立即执行的方法(如execute、persist等)。示例代码如下。

调用立即执行的方法,处理每条Record,打印出表pyodps_iris中iris.sepalwidth小于3的所有数据。

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
for record in iris[iris.sepalwidth < 3].execute():
print(record)
打印详细信息

在DataWorks上默认打开options.verbose选项,即默认情况下,DataWorks的PyODPS节点运行过程会打印Logview等详细过程。您可以手动设置此选项,指定运行过程是否会打印Logview等详细过程。

更多DataFrame的操作示例请参见DataFrame概述。

获取调度参数
使用DataWorks的PyODPS节点开发任务代码时,您也可以使用调度参数,例如,需要通过调度参数获取任务运行的业务日期等场景。PyODPS节点与DataWorks中的SQL节点在调度参数的定义参数操作方面一致,但是在代码中的引用方式不同。

SQL节点会在代码中直接使用 ${param_name}这样的字符串。

为了避免影响代码,PyODPS节点在执行代码前,在全局变量中增加了一个名为args的dict,代码中使用args[param_name]的方式获取调度参数取值,而非在代码中替换 ${param_name}。

例如,在节点基本属性 > 参数中设置了调度参数ds=${yyyymmdd},则可以通过以下方式在代码中获取该参数。

获取参数ds的取值。

print('ds=' + args['ds'])

返回ds的时间,如ds=20161116

获取名为ds=${yyyymmdd}的分区的表数据。

o.get_table('table_name').get_partition('ds=' + args['ds'])

获取ds分区下表table_name的数据

更多调度参数详情可参见配置并使用调度参数。

设置运行参数hints
运行任务时如果需要设置运行时参数,可以通过设置hints参数来实现,参数类型是dict。

o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
您也可以对全局设置sql.setting,设置后后续每次运行时都会添加相关的运行时参数。

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') #会根据全局配置添加hints
使用三方包
DataWorks节点预装了以下三方包,版本列表如下:

包名

Python 2节点版本

Python 3节点版本

requests

2.11.1

2.26.0

numpy

1.16.6

1.18.1

pandas

0.24.2

1.0.5

scipy

0.19.0

1.3.0

scikit_learn

0.18.1

0.22.1

pyarrow

0.16.0

2.0.0

lz4

2.1.4

3.1.10

zstandard

0.14.1

0.17.0

如果您需要使用上面列表中不存在的包,DataWorks节点提供了load_resource_package方法,支持从MaxCompute资源下载三方包。使用pyodps-pack打包后,可以直接使用load_resource_package方法加载三方包,之后就可以导入包中的内容。关于pyodps-pack的使用方法请参见PyODPS制作第三方包和PyODPS使用第三方包。

说明
如果为Python 2节点打包,请在打包时为pyodps-pack增加--dwpy27参数。

示例:

使用以下命令打包ipaddress。

pyodps-pack -o ipaddress-bundle.tar.gz ipaddress
上传并提交ipaddress-bundle.tar.gz为资源后,可以在PyODPS 3节点中按照下面的方法使用ipaddress包。

load_resource_package("ipaddress-bundle.tar.gz")
import ipaddress
DataWorks限制下载的包总大小为100 MB。如果您需要跳过预装包的打包,可以在打包时使用pyodps-pack提供的--exclude参数。例如:下面打包方法排除了DataWorks环境中存在的numpy包和pandas包。

pyodps-pack -o bundle.tar.gz --exclude numpy --exclude pandas
使用其他账号
某些场景下您可能希望使用其他账号(而非平台提供的账号)访问MaxCompute。此时,可以使用ODPS入口对象的as_account方法创建一个使用新账号的入口对象,该对象与系统默认提供的o实例彼此独立。

重要
as_account方法从PyODPS 0.11.3版本开始支持。如果您的DataWorks未部署该版本,则无法使用as_account方法。

主要流程
为其他用户授权项目相关权限,详情请参见附录:为其他用户授权。

在PyODPS节点中使用as_account方法切换账号,创建新的入口对象。

import os

确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为 Access Key ID,

ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为 Access Key Secret,

不建议直接使用 Access Key ID / Access Key Secret 字符串

new_odps = o.as_account(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
)
检查账号切换结果。

查询当前用户信息:在要执行的代码中增加以下语句,如果运行结果中打印出的用户信息为您所传入的其他用户的UID,则表示您是使用其他账号访问的MaxCompute。

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标 &nbsp;通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群 &nbsp;企业数据仓库开发人员 &nbsp;大数据平台开发人员 &nbsp;数据分析师 &nbsp;大数据运维人员 &nbsp;对于大数据平台、数据中台产品感兴趣的开发者
目录
相关文章
|
1月前
|
分布式计算 DataWorks 关系型数据库
DataWorks常见问题之pyodps报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
48 6
|
6月前
|
分布式计算 DataWorks MaxCompute
DataWorks中,您可以使用PyODPS库来获取ODPS表的行数
DataWorks中,您可以使用PyODPS库来获取ODPS表的行数
78 1
|
分布式计算 DataWorks MaxCompute
DataWorks百问百答13:如何使用pyodps引用资源文件?
DataWorks百问百答13:如何使用pyodps引用资源文件?
3961 0
|
DataWorks 机器人
使用Dataworks的Pyodps节点发送钉钉机器人消息
使用Dataworks的Pyodps节点发送钉钉机器人消息,需要开启沙箱白名单。否则网络不可达。 测试结果: 代码: import jsonimport requestsimport sys reminders 提醒 def send_msg(url,reminders,msg): headers.
|
自然语言处理 DataWorks 大数据
DataWorks PyODPS节点实现结巴中文分词
DataWorks PyODPS节点实现结巴中文分词
2649 0
|
SQL 分布式计算 DataWorks
DataWorks支持PyODPS类型任务
昨天,DataWorks推出了PYODPS任务类型,集成了Maxcompute的Python SDK,可在DataWorks的PYODPS节点上直接编辑Python代码操作Maxcompute,也可以设置调度任务来处理数据,提高数据开发效率。
6474 0
|
1月前
|
SQL 分布式计算 DataWorks
DataWorks常见问题之dataworks连接FTP服务器失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
SQL 分布式计算 DataWorks
DataWorks常见问题之dataworks自定义函数运行时报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
SQL DataWorks 关系型数据库
DataWorks常见问题之dataworks同步Rds任务失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。