如何将py文件资源传入ODPS DataFrame的map/apply方法的resources参数?
同步下目前我想到的2个解决方案:1、在pyodps3节点中,重写一遍py文件资源中的函数,然后在map的自定义函数中直接引用(已测试可行)。但缺点是py文件资源是公共资源,被多个pyodps3节点调用,该方法会增大代码重复性,且维护性差。2、将py文件资源打包成wheel包,上传到资源,然后在df的library中引用。该方法同样维护性差,每次变更py文件资源,都需要重复操作。3、问题再思考:通过o.get_resource('py_resource.py')的方法应该是可行,但为啥会报错:FileNotFoundError: [Errno 2] No such file or directory: './work/py_resource.py'。应该是传入方式不对。期待问题得到解决。
--------20250211 最终解决方案--------
from odps import DataFrame
def myfunc(resources): # resources按调用顺序传入。
# collection资源
collection = resources[0]
names = set()
for r in collection:
names.add(r.name) # 这里可以通过字段名或者偏移来取。
def h(x):
if x in names:
return test(1)
else:
return test(0)
return h
iris = DataFrame(o.get_table('pyodps_iris'))
iris_names_collection = iris.distinct('name')[:2]
df = iris.distinct('name')
# 读取py文件资源为字符串
with o.open_resource('py_resource.py') as fp:
content=fp.read()
# 执行py文件资源中的代码(自定义函数中可直接使用py文件资源中的函数)
exec(content)
# map自定义函数,资源传入collection对象
r=df.name.map(myfunc, resources=[iris_names_collection], rtype='int').rename('isin')
print(r.execute())
输出日志:
isin
0 2
1 2
2 1
2025-02-11 19:09:04 INFO =================================================================
2025-02-11 19:09:04 INFO Exit code of the Shell command 0
2025-02-11 19:09:04 INFO --- Invocation of Shell command completed ---
2025-02-11 19:09:04 INFO Shell run successfully!
2025-02-11 19:09:04 INFO Current task status: FINISH
2025-02-11 19:09:04 INFO Cost time is: 26.493s
其中,重点是在PyODPS3节点中执行如下代码:
# 读取py文件资源为字符串
with o.open_resource('py_resource.py') as fp:
content=fp.read()
# 执行py文件资源中的代码(自定义函数中可直接使用py文件资源中的函数)
exec(content)
则可在自定义函数myfunc中直接使用test函数。
赞139
踩0