1、使用场景:
在PyODPS3节点中使用了DataFrame的map和apply方法,编写自定义函数,传入一个collection资源和一个py文件资源,并在自定义函数中使用collection和py文件资源中的函数进行数据处理。
2、问题:
通过map/apply方法的resources参数,可以传入collection资源和py文件资源,目前传入collection资源没有问题,但是如何正确传入py文件资源?并在自定义函数中使用py文件资源中的函数?
已经查看过官网文档中的相关内容介绍(链接:https://help.aliyun.com/zh/maxcompute/user-guide/use-udfs-and-the-third-party-python-libraries?spm=a2c4g.11186623.help-menu-27797.d_2_4_0_4_8.6d0e4426edEzPR&scm=20140722.H_90716._.OR_help-T_cn~zh-V_1):
官网介绍的方法是通过o.create_resource方法创建一个文件资源,再传入resources参数。
我的场景区别是:需要传入py文件资源(py文件资源已提前上传至MaxCompute资源目录下,且已提交),且需要调用py文件资源中的函数。
3、问题复现代码(PyODPS3节点):
from odps import DataFrame
def myfunc(resources): # resources按调用顺序传入。
# collection资源
collection = resources[0]
for r in collection:
names.add(r.name) # 这里可以通过字段名或者偏移来取。
# py文件资源
py_file=resources[1]
# 调用py文件资源中的函数计算?
# ...
def h(x):
if x in names:
return True
else:
return False
return h
iris = DataFrame(o.get_table('pyodps_iris'))
iris_names_collection = iris.distinct('name')[:2]
df = iris.distinct('name')
# map方法,使用自定义函数,resources参数接收一个collection资源和一个py资源文件
r=df.name.map(myfunc, resources=[iris_names_collection,o.get_resource('py_resource.py')], rtype='boolean').rename('isin')
print(r.execute())
4、报错信息:
Traceback (most recent call last):
File "<pyodps_user_code>", line 19, in <module>
print(r.execute())
File "/home/tops/lib/python3.7/site-packages/odps/df/expr/expressions.py", line 43, in __call__
return self._func(*args, **kwargs)
File "/home/tops/lib/python3.7/site-packages/odps/df/expr/expressions.py", line 204, in execute
return self._handle_delay_call('execute', self, wrapper=wrapper, **kwargs)
File "/home/tops/lib/python3.7/site-packages/odps/df/expr/expressions.py", line 43, in __call__
return self._func(*args, **kwargs)
File "/home/tops/lib/python3.7/site-packages/odps/df/expr/expressions.py", line 158, in _handle_delay_call
result = getattr(engine, method)(*args, **kwargs)
File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 734, in execute
return self._action(*exprs_args_kwargs, **kwargs)
File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 555, in _action
timeout=timeout, progress_proportion=progress_proportion)
File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 799, in _execute_dag
close_and_notify=close_and_notify, progress_proportion=progress_proportion)
File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 337, in execute
results = self._run(ui, progress_proportion)
File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 212, in _run
res = call(ui=ui, progress_proportion=progress_proportion / len(calls))
File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 191, in __call__
res = self.run(ui=ui, progress_proportion=progress_proportion)
File "/home/tops/lib/python3.7/site-packages/odps/df/backends/core.py", line 588, in run
result = engine._do_execute(expr_dag, expr, **kw)
File "/home/tops/lib/python3.7/site-packages/odps/df/backends/odpssql/engine.py", line 404, in _do_execute
libraries=libraries, image=image, schema=self._ctx.default_schema)
File "/home/tops/lib/python3.7/site-packages/odps/df/backends/odpssql/engine.py", line 206, in _run
instance.wait_for_success()
File "/home/tops/lib/python3.7/site-packages/odps/models/instance.py", line 729, in wait_for_success
raise exc
odps.errors.ScriptError: ODPS-0123055: InstanceId: 20250208064416304gjk0j4c51gk4
ODPS-0123055:User script exception - Traceback (most recent call last):
File "/home/admin/pyodps_udf_1738997055_de2aafca_6634_4dbe_8a8a_fc6327c6eb51.py", line 3354, in __init__
resources.append(get_cache_file(str(n)))
File "/home/admin/lib/python3/odps/distcache.py", line 64, in get_cache_file
return open(filepath, 'r')
FileNotFoundError: [Errno 2] No such file or directory: './work/py_resource.py'
5、py文件资源
py_resource.py资源已经通过Dataworks界面上传到MaxCompute资源目录下,且已提交。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
同步下目前我想到的2个解决方案:
1、在pyodps3节点中,重写一遍py文件资源中的函数,然后在map的自定义函数中直接引用(已测试可行)。但缺点是py文件资源是公共资源,被多个pyodps3节点调用,该方法会增大代码重复性,且维护性差。
2、将py文件资源打包成wheel包,上传到资源,然后在df的library中引用。该方法同样维护性差,每次变更py文件资源,都需要重复操作。
3、问题再思考:通过o.get_resource('py_resource.py')的方法应该是可行,但为啥会报错:FileNotFoundError: [Errno 2] No such file or directory: './work/py_resource.py'。应该是传入方式不对。期待问题得到解决。
--------20250211 最终解决方案--------
from odps import DataFrame
def myfunc(resources): # resources按调用顺序传入。
# collection资源
collection = resources[0]
names = set()
for r in collection:
names.add(r.name) # 这里可以通过字段名或者偏移来取。
def h(x):
if x in names:
return test(1)
else:
return test(0)
return h
iris = DataFrame(o.get_table('pyodps_iris'))
iris_names_collection = iris.distinct('name')[:2]
df = iris.distinct('name')
# 读取py文件资源为字符串
with o.open_resource('py_resource.py') as fp:
content=fp.read()
# 执行py文件资源中的代码(自定义函数中可直接使用py文件资源中的函数)
exec(content)
# map自定义函数,资源传入collection对象
r=df.name.map(myfunc, resources=[iris_names_collection], rtype='int').rename('isin')
print(r.execute())
输出日志:
isin
0 2
1 2
2 1
2025-02-11 19:09:04 INFO =================================================================
2025-02-11 19:09:04 INFO Exit code of the Shell command 0
2025-02-11 19:09:04 INFO --- Invocation of Shell command completed ---
2025-02-11 19:09:04 INFO Shell run successfully!
2025-02-11 19:09:04 INFO Current task status: FINISH
2025-02-11 19:09:04 INFO Cost time is: 26.493s
其中,重点是在PyODPS3节点中执行如下代码:
# 读取py文件资源为字符串
with o.open_resource('py_resource.py') as fp:
content=fp.read()
# 执行py文件资源中的代码(自定义函数中可直接使用py文件资源中的函数)
exec(content)
则可在自定义函数myfunc中直接使用test函数。
你可以通过o.get_resource('py_resource.py')
获取你的py文件资源,然后确保在自定义函数内部通过Python的importlib
或exec
动态加载该文件。如果还是不理解的话,可以直接询问阿里云客服,或者提交工单给阿里云的技术顾问,回答不易,麻烦大佬给个采纳,谢谢。
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。