大数据和机器学习 > 大数据计算 MaxCompute > 正文

DataWorks支持PyODPS类型任务

简介: 昨天,DataWorks推出了PYODPS任务类型,集成了Maxcompute的Python SDK,可在DataWorks的PYODPS节点上直接编辑Python代码操作Maxcompute,也可以设置调度任务来处理数据,提高数据开发效率。
+关注继续查看

昨天,DataWorks推出了PYODPS任务类型,集成了Maxcompute的Python SDK,可在DataWorks的PYODPS节点上直接编辑Python代码操作Maxcompute,也可以设置调度任务来处理数据,提高数据开发效率。


效果如下图

c281ee3afd54a38962d39713cb19d6fa55e2c06b



适用region

只有华东2(上海)region 支持了 PYODPS 节点。

注:底层的 Python 版本为 2.7 。

新建 PYODPS 节点

新建 PYODPS 节点具体操作如下:

1) 单击数据开发页面工具栏中的 新建 > 新建任务。2) 填写新建任务弹出框中的各配置项。

新建pyodps

3) 单击创建

编辑 PYODPS 节点

ODPS入口

DataWorks 的 PyODPS 节点中,将会包含一个全局的变量 odps 或者 o ,即 ODPS 入口。用户不需要手动定义 ODPS 入口。


print(odps.exist_table('pyodps_iris'))


执行SQL

PyODPS支持ODPS SQL的查询,并可以读取执行的结果。 execute_sql 或者 run_sql 方法的返回值是 运行实例 。

注解:并非所有在 ODPS Console 中可以执行的命令都是 ODPS 可以接受的 SQL 语句。 在调用非 DDL / DML 语句时,请使用其他方法,例如 GRANT / REVOKE 等语句请使用 run_security_query 方法,PAI 命令请使用 run_xflow 或 execute_xflow 方法。


>>> o.execute_sql('select * from dual')  #  同步的方式执行,会阻塞直到SQL执行完成

>>>

>>> instance = o.run_sql('select * from dual')  # 异步的方式执行

>>> print(instance.get_logview_address())  # 获取logview地址

>>> instance.wait_for_success()  # 阻塞直到完成


设置运行参数

有时,我们在运行时,需要设置运行时参数,我们可以通过设置 hints 参数,参数类型是dict。


>>> o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})


我们可以对于全局配置设置sql.settings后,每次运行时则都会添加相关的运行时参数。


>>> from odps import options

>>> options.sql.settings = {'odps.sql.mapper.split.size': 16}

>>> o.execute_sql('select * from pyodps_iris')  # 会根据全局配置添加hints

读取SQL执行结果

运行 SQL 的 instance 能够直接执行 open_reader 的操作,一种情况是SQL返回了结构化的数据。


>>> with o.execute_sql('select * from dual').open_reader() as reader:

>>>     for record in reader:

>>>         # 处理每一个record


另一种情况是 SQL 可能执行的比如 desc,这时通过 reader.raw 属性取到原始的SQL执行结果。


>>> with o.execute_sql('desc dual').open_reader() as reader:

>>>     print(reader.raw)

使用调度参数

PYODPS节点使用调度参数需要注意一下,系统定义的调度参数,可以直接通过此方法获取。

使用调度参数

自定义参数的使用,需要使用单独的方法获取。

在全局包括一个 args 对象,可以在这个中获取,它是一个dict类型。

自定义参数配置

测试运行结果如下:

自定义调度参数结果

请注意:在数据开发下,使用了自定义调度参数,页面上直接触发运行PYODPS节点时,需要写死时间,PYODPS节点无法像SQL一样直接替换。

调度请参考:https://help.aliyun.com/document_detail/30298.html

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
如何将 Studio 模型部署到 EAS 和 Dataworks 调度的任务|学习笔记
快速学习如何将 Studio 模型部署到 EAS 和 Dataworks 调度的任务。
73 0
Odps 同步任务异常处理|学习笔记
快速学习 Odps 同步任务异常处理
203 0
使用钉钉机器人监控DataWorks/消息队列Kakfa/实时计算Flink任务
我们写的数仓任务、Flink任务提交任务缺少对任务运行情况监控报警,这里可以使用钉钉自带的机器人实现钉钉群任务告警功能
573 0
DataWorks公共云优先级和离线同步任务实时同步任务速度
1.资源优先级 2.数据集成离线同步提速 3.数据集成实时同步任务提速 4.相关引擎调优
690 0
DataWorks 批量生成同步任务|学习笔记
快速学习 DataWorks 批量生成同步任务
153 0
阿里分布式任务调度SchedulerX2.0支持Dataworks任务
在实际业务场景中业务处理往往依赖前置数据准备,目前在分布式任务调度平台上可进行dataworks任务数据处理与业务数据处理任务依赖编排定时调度。
792 0
DataWorks熟能生巧系列直播第五期:数据集成批量生成同步任务
本文介绍了数据集成的整库迁移,批量上云和分库分表的操作,并进行了实操演示,对各种规则进行了解释,提出了需要注意的问题和解决方案。
443 0
DataWorks OpenAPI-表结构变更时触发任务变更
随着大数据在企业内部使用越来越广泛,很多业务落地慢慢的都依赖大数据的产出结果做为基础,业务的发展遍地开花产生了越来越多的任务,这些任务之间相互依赖也就越来越复杂,慢慢的超过人为可视、可梳理范围;当上游的数据口径发生变更的时候,对下游业务的影响就会形成放射性,影响难以预料,这时候就急需有一种功能来辅助我们的运维人员一起梳理出整个受影响的业务范围,以防数据口径不一致引起资损、或提供错误的数据导致决策失误。
4147 0
DataWorks百问百答39:VPC网络下adb pg实例引擎如何绑定dataworks工作空间执行任务?
VPC网络下adb pg实例引擎绑定dataworks工作空间任务执行执行指导
1208 0
DataWorks任务实例错误分析
总结Dataworks产品使用过程中遇到的常见问题——任务实例错误
457 0
DataWorks百问百答17:如何排查周期任务取不到数据(产出数据为空)的问题?
如何排查周期任务取不到数据(产出数据为空)的问题?
1296 0
DataWorks百问百答12:定时时间到了,任务为什么还不运行呢?
DataWorks百问百答12:定时时间到了,任务为什么还不运行呢?
1995 0
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。
热门文章
热门讨论
+关注
隐林
阿里云大数据产品专家,擅长MaxCompute、机器学习、分布式、可视化、人工智能等大数据领域;
文章
问答
视频
相关电子书
更多
DataWorks调度任务迁移最佳实践-2020飞天大数据平台实战应用第一季
立即下载
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载