在 MaxCompute UDF 中运行 Scipy

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
EMR Serverless StarRocks,5000CU*H 48000GB*H
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 新版 MaxCompute Isolation Session 支持 Python UDF。也就是说,Python UDF 中已经可以跑二进制包。刚才以 Scipy 为例踩了一下坑,把相关的过程分享出来。

新版 MaxCompute Isolation Session 支持 Python UDF。也就是说,Python UDF 中已经可以跑二进制包。刚才以 Scipy 为例踩了一下坑,把相关的过程分享出来。

下载 Scipy 包并上传资源

首先,从 PyPI 或其他镜像下载 Scipy 包。你需要下载后缀为“cp27-cp27m-manylinux1_x86_64.whl”的包,其他的包会无法加载,包括名为“cp27-cp27mu”的包。以下的截图来自 https://pypi.python.org/pypi/scipy ,仅有打勾的包可以直接使用:

image

下载 whl 后,将文件名更改为 scipy.zip。此后,在 MaxCompute Console 中执行

add archive scipy.zip;

此后,scipy.zip 即被创建为 MaxCompute Archive 资源。不建议使用其他类型的资源,因为在执行时,MaxCompute 会自动解压 Archive 类型的资源,从而省去手动解压的步骤。

从非 Whl 包生成 Whl 包

如果列出的包中包含 Whl,则可以直接上传并跳过此步骤。如果列出的包不包含 whl(如手中仅有图中的 scipy-0.19.0.zip),需要在 Linux 环境中手动编译并打包为 whl。打包前,需要确保下列命令返回“cp27m”而不是“cp27mu”:

python -c "import pip; print pip.pep425tags.get_abi_tag()"

如果返回值为“cp27mu”,你需要使用 “--enable-unicode=no" 选项编译一个可用的 Python 2.7,再使用编译得到的 Python。如果返回值正确,通常可以在该环境下使用

python setup.py bdist_wheel

完成,具体请参考各个包的编译/安装说明。

打包完成后,将生成的 whl 包上传。

编写和创建 UDF

我们需要编写一个 UDF 支持计算 psi。编写下列代码:

from odps.udf import annotate
from odps.distcache import get_cache_archive

def include_package_path(res_name):
    import os, sys
    archive_files = get_cache_archive(res_name)
    dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
                       if '.dist_info' not in f.name], key=lambda v: len(v))
    sys.path.append(os.path.dirname(dir_names[0]))

@annotate("double->double")
class MyPsi(object):
    def __init__(self):        
        include_package_path('scipy.zip')

    def evaluate(self, arg0):
        from scipy.special import psi
        return float(psi(arg0))

这里有必要解释一下 include_package_path 这个函数。get_cache_archive 返回一个包含包中所有文件的文件对象。我们首先取出所有的文件名,此后获得最短的路径作为包的路径,并加入 sys.path。此后,便可以正常 import scipy 这个包。

需要注意的是,因为 MaxCompute 会在执行前通过原有的沙箱检查 UDF 的输入/输出,因而 include_package_path 和 import 在函数外调用会报错。

编写完成后,将代码保存为 my_psi.py,并在 MaxCompute Console 中执行

add py my_psi.py;

此后创建函数。在 MaxCompute Console 中输入

create function my_psi as my_psi.MyPsi using my_psi.py,scipy.zip;

注意在 create function 时,不要忘记加上刚才上传的包,例如上面的 scipy.zip。

执行

创建 UDF 后,便可以在 MaxCompute Console 中执行查询(暂不支持 pypy,因而需禁用 pypy):

set odps.pypy.enabled=false;
set odps.isolation.session.enable = true;
select my_psi(sepal_length) from iris;

其他

如果包依赖了其他 Python 包,需要一并上传并同时加入到 UDF 依赖中。

使用 0.7.4 以上的 PyODPS DataFrame 可以简化使用二进制包的 UDF 的编写,无需手动调用 include_package_path。

本人没有进行更深入的使用,相关问题请提工单提问,或者加入 MaxCompute 钉钉群讨论。

MaxCompute 钉钉群

image

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
7月前
|
分布式计算 大数据 调度
MaxCompute产品使用问题之为什么用python写的udf函数跑起来比本地还要慢
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
7月前
|
JSON 分布式计算 大数据
MaxCompute产品使用问题之pyODPS3如何引用udf资源的函数
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之在使用udf遇到报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
8月前
|
分布式计算 数据挖掘 数据处理
基于 MaxCompute MaxFrame 实现分布式 Pandas 处理
阿里云分布式计算框架 MaxCompute MaxFrame 兼容 Pandas 接口且自动进行分布式处理,在保证强大数据处理能力的同时,可以大幅度提高数据处理规模及计算效率。
571 1
|
8月前
|
分布式计算 DataWorks 大数据
maxcompute函数问题之自定义函数报错如何解决
MaxCompute函数包括内置函数和自定义函数(UDF),它们用于在MaxCompute平台上执行数据处理和分析任务;本合集将介绍MaxCompute函数的使用方法、函数编写和优化技巧,以及常见的函数错误和解决途径。
|
分布式计算 DataWorks MaxCompute
DataWorks中,您可以使用PyODPS库来获取ODPS表的行数
DataWorks中,您可以使用PyODPS库来获取ODPS表的行数
276 1
|
分布式计算 MaxCompute Python
在MaxCompute中使用pyodps的DataFrame
在MaxCompute中使用pyodps的DataFrame
292 2
|
SQL 编解码 分布式计算
【MaxCompute 常见问题】 UDF
查看资源信息 假设资源名称为 pyudf_test.py,在 odpscmd 客户端执行 desc resource pyudf_test.py;,或在 datastudio 中新建 SQL 节点后输入 desc resource pyudf_test.py;执行。
【MaxCompute 常见问题】 UDF
|
SQL 分布式计算 DataWorks
ODPS Python3开发UDF实践 dataworks平台
# 业务背景 花呗有一个生息产品叫做循环, 也就最低还款: 即每月进行最小还款, 剩下的金额产生利息. 用户每个月都可以进行最低还款的办理, 即还不掉的本金永远在里面滚着. 业务方想要知道一个业务指标, 就是用户连续办理了多少个月的循环, 然后针对这部分用户做精细化运营 # 解决思路 这个问题有两个解法 #### 1. ODPS SQL解法 每月月末跑一个数, 统计本月用户
4538 0
ODPS Python3开发UDF实践 dataworks平台
|
分布式计算 DataWorks MaxCompute

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute