一. 在udf中引用torch
1. 参考文档
https://help.aliyun.com/document_detail/154431.html
https://help.aliyun.com/document_detail/189752.html
2. 踩坑
a. 资源archive上传大小受限
神经网络框架诸如pytorch,TensorFlow的压缩包均大于500M,odpscmd上传受限报错。
- gpu版 - 880MB
内部含有不少通过cuda编译代码到gpu执行的动态链接库so文件,不建议手动删除以减小压缩包大小方便上传,会报不知名的各种错
- cpu-only版 - 200MB
一般这种需求都会有一个gpu环境下训练好的神经网络模型,包含不同layer的weight,在做推理的时候仅需要导入参数进行类似加权求和即可得到结果,odps也仅提供cu用作计算,所以使用cpu版本即可。参考如下链接:
https://download.pytorch.org/whl/torch/
b. 多依赖
在执行的时候会报ModuleNotFoundError,这部分
- pkg_resources
在导依赖的时候这个包比较坑
这个包属于setuptools,可以解压看一下,拿出来单独zip一下,里面有Init.py是可以直接import的
或者在上传torch包里torch_version的源码处改一下
from setuptools.pkg_resources import packaging # type: ignore[attr-defined]
这块不知道在环境存在setuptools的情况下为什么python不能自行导入,不同环境下torch_version.py的代码是不同的,mac环境下这个脚本是不需要引用这个资源的。希望懂的大佬可以给讲一下[鞠躬]
- 利用脚本一键打包
这个问题比较常见
参考官方文档:https://help.aliyun.com/document_detail/118328.html
3. 引用代码
a. python脚本
fromodps.udfimportannotate"->string") (classTryImport(object): # 类名为TryImport。def__init__(self): importsyssys.path.insert(0, 'work/torch-1.10.2-cp37-cp37m-manylinux1_x86_64.zip') # Numpy包,您只需要替换work/后边的包名即可。sys.path.insert(0, 'work/setuptools-45.2.0.zip') # Numpy包,您只需要替换work/后边的包名即可。sys.path.insert(0, 'work/pkg_resources.zip') # Numpy包,您只需要替换work/后边的包名即可。sys.path.insert(0, 'work/typing_extensions-3.10.0.2-py3-none-any.zip') # Numpy包,您只需要替换work/后边的包名即可。defevaluate(self): importtorchreturn"import succeed"
b. 资源上传及函数注册
/
-- 添加torch包 add archive /Users/adamsun/Downloads/torch/torch.zip; -- 添加udf.py add py /Users/adamsun/PycharmProjects/pythonProject/torch_iris_predict1.py -f; -- 创建udf CREATE function torch_predict1 as 'torch_iris_predict1.TorchPredict' USING 'torch_iris_predict1.py,torch.zip,setuptools-45.2.0.zip,pkg_resources.zip,typing_extensions-3.10.0.2-py3-none-any.zip'; -- 执行测试 set odps.sql.python.version=cp37; select torch_predict1();
二、离线模型批量推理
1. 测试思路
这边简单的用torch定义了一个简单三层的神经网络,以iris数据集为基础,训练了几个epoch到收敛,并得到网络参数。将网络参数持久化到本地电脑,并上传到服务器端,在每次调用udf的时候加载训练好的参数用于推理。
2. udf-python脚本代码
fromodps.udfimportannotatefromodps.distcacheimportget_cache_fileimportsysimportiosys.path.insert(0, 'work/torch.zip') sys.path.insert(0, 'work/setuptools-45.2.0.zip') sys.path.insert(0, 'work/pkg_resources.zip') sys.path.insert(0, 'work/typing_extensions-3.10.0.2-py3-none-any.zip') importtorchclassmynet(torch.nn.Module): def__init__(self): super(mynet, self).__init__() self.fc=torch.nn.Sequential( torch.nn.Linear(4, 20), torch.nn.ReLU(), torch.nn.Linear(20, 30), torch.nn.ReLU(), torch.nn.Linear(30, 3) ) self.mse=torch.nn.CrossEntropyLoss() self.optim=torch.optim.Adam(params=self.parameters(), lr=0.1) defforward(self, inputs): outputs=self.fc(inputs) returnoutputsdeftrain(self, x, label): out=self.forward(x) loss=self.mse(out, label) self.optim.zero_grad() loss.backward() self.optim.step() deftest(self, test_): returnself.fc(test_) "DOUBLE,DOUBLE,DOUBLE,DOUBLE->BIGINT") (classTorchPredict(object): def__init__(self): # import sys# sys.path.insert(0, 'work/torch.zip')# sys.path.insert(0, 'work/setuptools-45.2.0.zip')# sys.path.insert(0, 'work/pkg_resources.zip')# sys.path.insert(0, 'work/typing_extensions-3.10.0.2-py3-none-any.zip')self.model=mynet() withget_cache_file('model_param.pkl', 'b') asf: buffer=io.BytesIO(f.read()) self.model.load_state_dict(torch.load(buffer)) defevaluate(self, col1, col2, col3, col4): _mid=self.model.forward(torch.tensor([col1, col2, col3, col4])) # print(_mid, flush=True) # 不debug先不打日志了returntorch.argmax(_mid, 0).item()
3. 踩坑
a. 模型序列化
模型保存的时候保存整个实例化后的object class在模型使用的时候torch.load会有几率报错:
AttributeError: Can't get attribute 'Generator' on )>
- 尽量仅保存训练好网络所有layer的参数 iris_net.state_dict() 而不是iris_net
- 然后在udf中定义网络的类,这个可以直接复制训练的时候的class - mynet
- 使用load_state_dict导入模型参数的资源文件
# 本地保存模型torch.save(iris_net.state_dict(),"/Users/adamsun/PycharmProjects/pythonProject/model_param.pkl") # 重新实例化一个新的iris_net=mynet() # load进来iris_net.load_state_dict(torch.load("/Users/adamsun/PycharmProjects/pythonProject/model_param.pkl"))
b. torch.load不能直接加资源path
- torch.load('资源名')会导致报错找不到该资源
- 而在odps侧需要使用get_cache_file('资源名')对文件进行读取的操作
- 通过看torch.load的文档可以看到可以上传 字符串类型的文件路径或者是file-like object但需要实现
read
readline
tell
seek
的方法
结合get_cache_file脚本和实现样例即可得到该问题的解决,在mac环境下get_cache_file本质上就是open,而open已经实现了上述四个接口,linux环境的包也同样,只不过在odps侧限定了file文件夹下读取。
如下代码为torch脚本中的example,参考即可导入离线模型
withopen('tensor.pt', 'rb') asf: buffer=io.BytesIO(f.read()) torch.load(buffer)
4. 性能优化
odps与开源大数据平台的思想相同,计算向数据靠近,在sql任务中,每个mapper都会实例化python ufd脚本中的类,并反复调用evaluate
方法,在变成语言性能无法优化的情况下,尽可能让该方法简要,仅做必要的计算。
所以将与硬盘交互的加载网络模型参数的部分放到了init中,前后性能差距37倍,优化前 264 records/s -> 优化后 9765 records/s,1400万记录数的推理仅需12分钟,平均每秒近2W的推理
优化前 264 records/s
优化后 9765 records/s
1400万记录数的推理仅需12分钟,平均每秒近2W的推理