PyODPS DataFrame 的代码在哪里跑

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 在使用 PyODPS DataFrame 编写数据应用时,尽管编写的是同一个脚本文件,但其中的代码会在不同位置执行,这可能导致一些无法预期的问题,本文介绍当出现相关问题时,如何确定代码在何处执行,以及提供部分场景下解决问题的方法。

在使用 PyODPS DataFrame 编写数据应用时,尽管编写的是同一个脚本文件,但其中的代码会在不同位置执行,这可能导致一些无法预期的问题,本文介绍当出现相关问题时,如何确定代码在何处执行,以及提供部分场景下解决问题的方法。

概述

假定我们要执行下面的代码:

from odps import ODPS, options
import numpy as np

o = ODPS(access_id, access_key, project, endpoint)
df = o.get_table('pyodps_iris').to_df()

coeffs = [0.1, 0.2, 0.4]

def handle(v):
    import numpy as np
    return float(np.cosh(v)) * sum(coeffs)

options.df.supersede_libraries = True
val = df.sepal_length.map(handle).sum().execute(libraries=['numpy.zip', 'other.zip'])
print(np.sinh(val))

在开始分析之前,首先需要指出的是,PyODPS 是一个 Python 包而非 Python Implementation,PyODPS 的运行环境均为未经修改的 Python,因而并不会出现与正常 Python 解释器不一致的行为。亦即,你所写的每一条语句不会有与标准 Python 语句不同的行为,例如自动变成分布式代码,等等。

下面解释该代码的执行过程。

image.png

上图是执行上述代码时可能涉及的系统。代码本身执行的位置在图中用紫色表示,这些系统都位于 MaxCompute 外部,为了方便表述,下文称为“本地”。在本地执行的代码包括 handle 函数之外的部分(注意 handle 传入 map 时仅传入了函数本身而并未执行)。因而,这些代码在执行时,行为与普通 Python code 的执行行为类似,import 第三方包时,引用的是本地的包。因而,上面的代码中,libraries=['numpy.zip', 'other.zip']引用的other.zip因为并没有在本地安装,因而如果代码中有诸如 import other 这样的语句,会导致执行报错。即便 other.zip 已被上传到 MaxCompute 资源也是如此,因为本地根本没有这个包。理论上,本地代码如果不涉及 PyODPS 包,则与 PyODPS 无关,用户需要自行排查。

对于 handle 函数,情况发生了变化。handle 函数传入 map 方法时,如果使用的后端是 MaxCompute 后端,会先被 cloudpickle 模块提取闭包和字节码,此后 PyODPS DataFrame 会使用闭包和字节码生成一个 Python UDF,提交到 MaxCompute。最后,作业以 SQL 的形式在 MaxCompute 执行时,会调用这个 Python UDF,其中的字节码和闭包内容会被 unpickle,此后在 MaxCompute Executor 执行。由此可见,在上述代码中,

  1. 在 handle 函数体中的代码都不会在本地执行,而会在 MaxCompute Executor 中执行;
  2. handle 函数体中无法引用本地安装的包,只有在 MaxCompute Executor 中存在的包才有效;
  3. 上传的第三方包必须能够在 MaxCompute Executor 中的 Python 版本(目前为 Python 2.7,UCS2)中调用;
  4. handle 函数体中修改引用的外部变量(上述代码中的 coeffs)不会导致本地的 coeffs 值被修改;
  5. 如果在 handle 中引用在 handle 外 import 的包,在 handle 中调用可能会报错,因为在不同环境中,包的结构可能不同,而 cloudpickle 会将本地包的引用带到 MaxCompute Executor,导致报错,因而建议 import 在 handle 中进行;
  6. 由于使用 cloudpickle,如果在 handle 中调用了其他文件中的代码,该文件所在的包必须存在于 MaxCompute Executor 中。如果你不想使用第三方包的形式解决该问题,请将所有引用的个人代码放在同一个文件中。

上述对 handle 函数的解释对于自定义聚合、apply 和 map_reduce 中调用的自定义方法 / Agg 类均适用。如果使用的后端是 Pandas 后端,则所有代码都会在本地运行,因而本地也需要安装相关的包。但鉴于 Pandas 后端调试完毕后通常会转移到 MaxCompute 运行,建议在本地装包的同时,参照 MaxCompute 后端的惯例进行开发。

使用第三方包

  1. 个人电脑 / 自有服务器在本地使用第三方包 / 其他文件中的代码

在相应的 Python 版本上安装即可。

  1. DataWorks 中本地使用其他文件中的代码

该部分功能由 DataWorks 提供,请参考 DataWorks 文档

  1. map / apply / map_reduce / 自定义聚合中使用第三方包 / 其他文件中的代码

参考 https://yq.aliyun.com/articles/591508 。需要补充的是,在 DataWorks 上上传资源后,需要点击“提交”确保资源被正确上传到 MaxCompute。如果需要使用自己的 Numpy 版本,在上传正确版本的 wheel 包的同时,需要配置 odps.df.supersede_libraries = True,同时确保你上传的 numpy 包名位于 libraries 的最前面,如果指定了 options.df.libraries,则 numpy 包名需要位于 options.df.libraries 的最前面。

引用其他 MaxCompute 表中的数据

  1. 个人电脑 / 自有服务器在本地访问 MaxCompute 表

如果 Endpoint 可以连接,使用 PyODPS / DataFrame 访问。

  1. map / apply / map_reduce / 自定义聚合中访问其他 MaxCompute 表

MaxCompute Executor 中通常不支持访问 Endpoint / Tunnel Endpoint,其上也没有 PyODPS 包可用,因而不能直接使用 ODPS 入口对象或者 PyODPS DataFrame,也不能从自定义函数外部传入这些对象。如果表的数据量不大,建议将 DataFrame 作为资源传入(见 https://pyodps.readthedocs.io/zh_CN/latest/df-element.html#function-resource )。如果数据量较大,建议改写成 join。

访问其他服务

  1. 个人电脑 / 自有服务器在本地访问其他服务

保证自己的环境中可以正常访问相关服务,生产服务器可以联系 PE。

  1. DataWorks 上的本地代码中访问其他服务

请咨询 DataWorks。

  1. map / apply / map_reduce / 自定义聚合中访问其他服务

参考 https://yq.aliyun.com/articles/591508 启用 Isolation,如果仍然遇到网络报错,请联系 MaxCompute 用户群,找售后帮忙解决。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标  通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群  企业数据仓库开发人员  大数据平台开发人员  数据分析师  大数据运维人员  对于大数据平台、数据中台产品感兴趣的开发者
目录
相关文章
|
分布式计算 MaxCompute Python
在PyODPS DataFrame自定义函数中使用pandas、scipy和scikit-learn
背景 [PyODPS DataFrame]http://pyodps.readthedocs.io/zh_CN/latest/) 提供了类似 pandas 的接口,来操作 ODPS 数据,同时也支持在本地使用 pandas,和使用数据库来执行。
14465 2
|
SQL 分布式计算 MaxCompute
PyODPS学习:使用DataFrame实现SQL的IF判断
使用DataFrame实现SQL的IF判断
3461 0
|
分布式计算 关系型数据库 数据库
PyODPS DataFrame:统一的数据查询语言
前几天,PyODPS发布了0.7版本,这篇文章给大家介绍下PyODPS新版本带来的重要特性。 之前也有若干篇文章介绍过了,我们PyODPS DataFrame是延迟执行的,在调用立即执行的方法,比如execute、persist等之前,都只是构建了表达式。
6599 0
|
分布式计算 数据挖掘 API
PyOdps DataFrame来临,数据分析从未如此简单!
PyOdps正式发布DataFrame框架(此处应掌声经久不息),DTer的福音!有了它,就像卷福有了花生,比翼双飞,哦不,如虎添翼。 快过年了,大家一定没心情看长篇大论的分析文章。作为介绍PyOdps DataFrame的开篇文章,我只说说其用起来爽的地方。其余的部分,从使用、问题到实现原理,我
11525 0
PyODPS DataFrame 处理笛卡尔积的几种方式
PyODPS 提供了 DataFrame API 来用类似 pandas 的接口进行大规模数据分析以及预处理,本文主要介绍如何使用 PyODPS 执行笛卡尔积的操作。 笛卡尔积最常出现的场景是两两之间需要比较或者运算。
13294 0
|
SQL 分布式计算 MaxCompute
什么是PyODPS DataFrame
这篇文章解释了PyODPS DataFrame是什么,能做什么事情,以及简单介绍一下实现的原理。
7379 0
|
4月前
|
DataWorks Kubernetes 大数据
飞天大数据平台产品问题之DataWorks提供的商业化服务如何解决
飞天大数据平台产品问题之DataWorks提供的商业化服务如何解决
|
4月前
|
SQL DataWorks 安全
DataWorks产品使用合集之如何实现分钟级调度
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
运维 DataWorks 监控
DataWorks产品使用合集之如何自定义UDTF
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。

相关实验场景

更多