如何将py文件资源传入ODPS DataFrame的map/apply方法的resources参数?

1、使用场景:
在PyODPS3节点中使用了DataFrame的map和apply方法,编写自定义函数,传入一个collection资源和一个py文件资源,并在自定义函数中使用collection和py文件资源中的函数进行数据处理。
2、问题:
通过map/apply方法的resources参数,可以传入collection资源和py文件资源,目前传入collection资源没有问题,但是如何正确传入py文件资源?并在自定义函数中使用py文件资源中的函数?

已经查看过官网文档中的相关内容介绍(链接:https://help.aliyun.com/zh/maxcompute/user-guide/use-udfs-and-the-third-party-python-libraries?spm=a2c4g.11186623.help-menu-27797.d_2_4_0_4_8.6d0e4426edEzPR&scm=20140722.H_90716._.OR_help-T_cn~zh-V_1):
image.png
官网介绍的方法是通过o.create_resource方法创建一个文件资源,再传入resources参数。
我的场景区别是:需要传入py文件资源(py文件资源已提前上传至MaxCompute资源目录下,且已提交),且需要调用py文件资源中的函数。

3、问题复现代码(PyODPS3节点):

from odps import DataFrame

def myfunc(resources):  # resources按调用顺序传入。
    # collection资源
    collection = resources[0]
    for r in collection:
        names.add(r.name)  # 这里可以通过字段名或者偏移来取。
    # py文件资源
    py_file=resources[1]
    # 调用py文件资源中的函数计算?
    # ...
    def h(x):
        if x in names:
            return True
        else:
            return False
    return h

iris = DataFrame(o.get_table('pyodps_iris'))
iris_names_collection = iris.distinct('name')[:2]
df = iris.distinct('name')
# map方法,使用自定义函数,resources参数接收一个collection资源和一个py资源文件
r=df.name.map(myfunc, resources=[iris_names_collection,o.get_resource('py_resource.py')], rtype='boolean').rename('isin')
print(r.execute())

4、报错信息:

Traceback (most recent call last):
  File "<pyodps_user_code>", line 19, in <module>
    print(r.execute())
  File "/home/tops/lib/python3.7/site-packages/odps/df/expr/expressions.py", line 43, in __call__
    return self._func(*args, **kwargs)
  File "/home/tops/lib/python3.7/site-packages/odps/df/expr/expressions.py", line 204, in execute
    return self._handle_delay_call('execute', self, wrapper=wrapper, **kwargs)
  File "/home/tops/lib/python3.7/site-packages/odps/df/expr/expressions.py", line 43, in __call__
    return self._func(*args, **kwargs)
  File "/home/tops/lib/python3.7/site-packages/odps/df/expr/expressions.py", line 158, in _handle_delay_call
    result = getattr(engine, method)(*args, **kwargs)
  File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 734, in execute
    return self._action(*exprs_args_kwargs, **kwargs)
  File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 555, in _action
    timeout=timeout, progress_proportion=progress_proportion)
  File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 799, in _execute_dag
    close_and_notify=close_and_notify, progress_proportion=progress_proportion)
  File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 337, in execute
    results = self._run(ui, progress_proportion)
  File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 212, in _run
    res = call(ui=ui, progress_proportion=progress_proportion / len(calls))
  File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 191, in __call__
    res = self.run(ui=ui, progress_proportion=progress_proportion)
  File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 588, in run
    result = engine._do_execute(expr_dag, expr, **kw)
  File "/home/tops/lib/python3.7/site-packages/odps/df/backends/odpssql/engine.py", line 404, in _do_execute
    libraries=libraries, image=image, schema=self._ctx.default_schema)
  File "/home/tops/lib/python3.7/site-packages/odps/df/backends/odpssql/engine.py", line 206, in _run
    instance.wait_for_success()
  File "/home/tops/lib/python3.7/site-packages/odps/models/instance.py", line 729, in wait_for_success
    raise exc
odps.errors.ScriptError: ODPS-0123055: InstanceId: 20250208064416304gjk0j4c51gk4
ODPS-0123055:User script exception - Traceback (most recent call last):
  File "/home/admin/pyodps_udf_1738997055_de2aafca_6634_4dbe_8a8a_fc6327c6eb51.py", line 3354, in __init__
    resources.append(get_cache_file(str(n)))
  File "/home/admin/lib/python3/odps/distcache.py", line 64, in get_cache_file
    return open(filepath, 'r')
FileNotFoundError: [Errno 2] No such file or directory: './work/py_resource.py'

5、py文件资源
py_resource.py资源已经通过Dataworks界面上传到MaxCompute资源目录下,且已提交。
image.png

展开
收起
游客2ae7cazeorf7o 2025-02-08 15:30:09 288 分享 版权
2 条回答
写回答
取消 提交回答
  • 同步下目前我想到的2个解决方案:
    1、在pyodps3节点中,重写一遍py文件资源中的函数,然后在map的自定义函数中直接引用(已测试可行)。但缺点是py文件资源是公共资源,被多个pyodps3节点调用,该方法会增大代码重复性,且维护性差。
    2、将py文件资源打包成wheel包,上传到资源,然后在df的library中引用。该方法同样维护性差,每次变更py文件资源,都需要重复操作。
    3、问题再思考:通过o.get_resource('py_resource.py')的方法应该是可行,但为啥会报错:FileNotFoundError: [Errno 2] No such file or directory: './work/py_resource.py'。应该是传入方式不对。期待问题得到解决。

    --------20250211 最终解决方案--------

    from odps import DataFrame
    
    def myfunc(resources):  # resources按调用顺序传入。
        # collection资源
        collection = resources[0]
        names = set()
        for r in collection:
            names.add(r.name)  # 这里可以通过字段名或者偏移来取。
        def h(x):
            if x in names:
                return test(1)
            else:
                return test(0)
        return h
    
    iris = DataFrame(o.get_table('pyodps_iris'))
    iris_names_collection = iris.distinct('name')[:2]
    df = iris.distinct('name')
    # 读取py文件资源为字符串
    with o.open_resource('py_resource.py') as fp:
        content=fp.read()
    # 执行py文件资源中的代码(自定义函数中可直接使用py文件资源中的函数)
    exec(content)
    # map自定义函数,资源传入collection对象
    r=df.name.map(myfunc, resources=[iris_names_collection], rtype='int').rename('isin')
    print(r.execute())
    

    输出日志:

       isin
    0     2
    1     2
    2     1
    2025-02-11 19:09:04 INFO =================================================================
    2025-02-11 19:09:04 INFO Exit code of the Shell command 0
    2025-02-11 19:09:04 INFO --- Invocation of Shell command completed ---
    2025-02-11 19:09:04 INFO Shell run successfully!
    2025-02-11 19:09:04 INFO Current task status: FINISH
    2025-02-11 19:09:04 INFO Cost time is: 26.493s
    

    其中,重点是在PyODPS3节点中执行如下代码:

    # 读取py文件资源为字符串
    with o.open_resource('py_resource.py') as fp:
        content=fp.read()
    # 执行py文件资源中的代码(自定义函数中可直接使用py文件资源中的函数)
    exec(content)
    

    则可在自定义函数myfunc中直接使用test函数。

    2025-02-10 17:20:58
    赞同 128 展开评论
  • 你可以通过o.get_resource('py_resource.py')获取你的py文件资源,然后确保在自定义函数内部通过Python的importlibexec动态加载该文件。如果还是不理解的话,可以直接询问阿里云客服,或者提交工单给阿里云的技术顾问,回答不易,麻烦大佬给个采纳,谢谢。

    2025-02-09 20:36:20
    赞同 135 展开评论

MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。

还有其他疑问?
咨询AI助理