MaxCompute-udf用于torch离线模型批量推理

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: odps-udf用于torch离线模型的批量推理实现以及踩坑

一. 在udf中引用torch

1. 参考文档

https://help.aliyun.com/document_detail/154431.html

https://help.aliyun.com/document_detail/189752.html

2. 踩坑

a. 资源archive上传大小受限

神经网络框架诸如pytorch,TensorFlow的压缩包均大于500M,odpscmd上传受限报错。

  1. gpu版 - 880MB

内部含有不少通过cuda编译代码到gpu执行的动态链接库so文件,不建议手动删除以减小压缩包大小方便上传,会报不知名的各种错

  1. cpu-only版 - 200MB

一般这种需求都会有一个gpu环境下训练好的神经网络模型,包含不同layer的weight,在做推理的时候仅需要导入参数进行类似加权求和即可得到结果,odps也仅提供cu用作计算,所以使用cpu版本即可。参考如下链接:

https://download.pytorch.org/whl/torch/

b. 多依赖

在执行的时候会报ModuleNotFoundError,这部分

  1. pkg_resources

在导依赖的时候这个包比较坑

这个包属于setuptools,可以解压看一下,拿出来单独zip一下,里面有Init.py是可以直接import的

或者在上传torch包里torch_version的源码处改一下

from setuptools.pkg_resources import packaging # type: ignore[attr-defined]

这块不知道在环境存在setuptools的情况下为什么python不能自行导入,不同环境下torch_version.py的代码是不同的,mac环境下这个脚本是不需要引用这个资源的。希望懂的大佬可以给讲一下[鞠躬]

  1. 利用脚本一键打包

这个问题比较常见

参考官方文档:https://help.aliyun.com/document_detail/118328.html

3. 引用代码

a. python脚本

fromodps.udfimportannotate@annotate("->string")
classTryImport(object):  # 类名为TryImport。def__init__(self):
importsyssys.path.insert(0, 'work/torch-1.10.2-cp37-cp37m-manylinux1_x86_64.zip')  # Numpy包,您只需要替换work/后边的包名即可。sys.path.insert(0, 'work/setuptools-45.2.0.zip')  # Numpy包,您只需要替换work/后边的包名即可。sys.path.insert(0, 'work/pkg_resources.zip')  # Numpy包,您只需要替换work/后边的包名即可。sys.path.insert(0, 'work/typing_extensions-3.10.0.2-py3-none-any.zip')  # Numpy包,您只需要替换work/后边的包名即可。defevaluate(self):
importtorchreturn"import succeed"



b. 资源上传及函数注册


/

-- 添加torch包
add archive /Users/adamsun/Downloads/torch/torch.zip;
-- 添加udf.py
add py /Users/adamsun/PycharmProjects/pythonProject/torch_iris_predict1.py -f;
-- 创建udf
CREATE function torch_predict1 as 'torch_iris_predict1.TorchPredict' USING 'torch_iris_predict1.py,torch.zip,setuptools-45.2.0.zip,pkg_resources.zip,typing_extensions-3.10.0.2-py3-none-any.zip';
-- 执行测试
set odps.sql.python.version=cp37;
select torch_predict1();

二、离线模型批量推理

1. 测试思路

这边简单的用torch定义了一个简单三层的神经网络,以iris数据集为基础,训练了几个epoch到收敛,并得到网络参数。将网络参数持久化到本地电脑,并上传到服务器端,在每次调用udf的时候加载训练好的参数用于推理。

2. udf-python脚本代码

fromodps.udfimportannotatefromodps.distcacheimportget_cache_fileimportsysimportiosys.path.insert(0, 'work/torch.zip')
sys.path.insert(0, 'work/setuptools-45.2.0.zip')
sys.path.insert(0, 'work/pkg_resources.zip')
sys.path.insert(0, 'work/typing_extensions-3.10.0.2-py3-none-any.zip')
importtorchclassmynet(torch.nn.Module):
def__init__(self):
super(mynet, self).__init__()
self.fc=torch.nn.Sequential(
torch.nn.Linear(4, 20),
torch.nn.ReLU(),
torch.nn.Linear(20, 30),
torch.nn.ReLU(),
torch.nn.Linear(30, 3)
        )
self.mse=torch.nn.CrossEntropyLoss()
self.optim=torch.optim.Adam(params=self.parameters(), lr=0.1)
defforward(self, inputs):
outputs=self.fc(inputs)
returnoutputsdeftrain(self, x, label):
out=self.forward(x)
loss=self.mse(out, label)
self.optim.zero_grad()
loss.backward()
self.optim.step()
deftest(self, test_):
returnself.fc(test_)
@annotate("DOUBLE,DOUBLE,DOUBLE,DOUBLE->BIGINT")
classTorchPredict(object):
def__init__(self):
# import sys# sys.path.insert(0, 'work/torch.zip')# sys.path.insert(0, 'work/setuptools-45.2.0.zip')# sys.path.insert(0, 'work/pkg_resources.zip')# sys.path.insert(0, 'work/typing_extensions-3.10.0.2-py3-none-any.zip')self.model=mynet()
withget_cache_file('model_param.pkl', 'b') asf:
buffer=io.BytesIO(f.read())
self.model.load_state_dict(torch.load(buffer))
defevaluate(self, col1, col2, col3, col4):
_mid=self.model.forward(torch.tensor([col1, col2, col3, col4]))
# print(_mid, flush=True) # 不debug先不打日志了returntorch.argmax(_mid, 0).item()

3. 踩坑

a. 模型序列化

模型保存的时候保存整个实例化后的object class在模型使用的时候torch.load会有几率报错:

AttributeError: Can't get attribute 'Generator' on )>

  • 尽量仅保存训练好网络所有layer的参数 iris_net.state_dict() 而不是iris_net
  • 然后在udf中定义网络的类,这个可以直接复制训练的时候的class - mynet
  • 使用load_state_dict导入模型参数的资源文件
# 本地保存模型torch.save(iris_net.state_dict(),"/Users/adamsun/PycharmProjects/pythonProject/model_param.pkl")
# 重新实例化一个新的iris_net=mynet()
# load进来iris_net.load_state_dict(torch.load("/Users/adamsun/PycharmProjects/pythonProject/model_param.pkl"))

b. torch.load不能直接加资源path

  • torch.load('资源名')会导致报错找不到该资源
  • 而在odps侧需要使用get_cache_file('资源名')对文件进行读取的操作
  • 通过看torch.load的文档可以看到可以上传 字符串类型的文件路径或者是file-like object但需要实现 readreadlinetellseek的方法

结合get_cache_file脚本和实现样例即可得到该问题的解决,在mac环境下get_cache_file本质上就是open,而open已经实现了上述四个接口,linux环境的包也同样,只不过在odps侧限定了file文件夹下读取。

如下代码为torch脚本中的example,参考即可导入离线模型

withopen('tensor.pt', 'rb') asf:
buffer=io.BytesIO(f.read())
torch.load(buffer)

4. 性能优化

odps与开源大数据平台的思想相同,计算向数据靠近,在sql任务中,每个mapper都会实例化python ufd脚本中的类,并反复调用evaluate方法,在变成语言性能无法优化的情况下,尽可能让该方法简要,仅做必要的计算。

所以将与硬盘交互的加载网络模型参数的部分放到了init中,前后性能差距37倍,优化前 264 records/s -> 优化后 9765 records/s,1400万记录数的推理仅需12分钟,平均每秒近2W的推理

优化前 264 records/s

优化后 9765 records/s

1400万记录数的推理仅需12分钟,平均每秒近2W的推理

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
2月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
44 4
|
2月前
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
98 0
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
ly~
|
2月前
|
供应链 搜索推荐 安全
大数据模型的应用
大数据模型在多个领域均有广泛应用。在金融领域,它可用于风险评估与预测、智能营销及反欺诈检测,助力金融机构做出更加精准的决策;在医疗领域,大数据模型能够协助疾病诊断与预测、优化医疗资源管理和加速药物研发;在交通领域,该技术有助于交通流量预测、智能交通管理和物流管理,从而提升整体交通效率;电商领域则借助大数据模型实现商品推荐、库存管理和价格优化,增强用户体验与企业效益;此外,在能源和制造业中,大数据模型的应用范围涵盖从需求预测到设备故障预测等多个方面,全面推动了行业的智能化转型与升级。
ly~
185 2
|
2月前
|
SQL 分布式计算 大数据
大数据平台的毕业设计01:Hadoop与离线分析
大数据平台的毕业设计01:Hadoop与离线分析
159 0
|
5月前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之整库离线同步至MC的配置中,是否可以清除原表所有分区数据的功能
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
数据采集 自然语言处理 大数据
​「Python大数据」LDA主题分析模型
使用Python进行文本聚类,流程包括读取VOC数据、jieba分词、去除停用词,应用LDA模型(n_components=5)进行主题分析,并通过pyLDAvis生成可视化HTML。关键代码涉及数据预处理、CountVectorizer、LatentDirichletAllocation以及HTML文件的本地化处理。停用词和业务术语列表用于优化分词效果。
296 0
​「Python大数据」LDA主题分析模型
|
6月前
|
机器学习/深度学习 自然语言处理 大数据
社交媒体的情感分析大数据模型
构建基于大数据的情感分析模型,利用Python和机器学习处理社交媒体数据。情感分析识别文本情感倾向,助力市场洞察和舆情监控。技术栈包括Python、NLP库(nltk, spaCy, TextBlob, VADER)、Scikit-learn、TensorFlow/PyTorch及大数据工具。数据收集(如Twitter API)、预处理(去除噪声、分词)、特征提取(TF-IDF、词嵌入)、模型训练(逻辑回归、BERT)是关键步骤。模型能捕捉文本情感,支持决策,随着技术进步,应用前景广阔。
551 10
|
6月前
|
数据采集 搜索推荐 安全
智慧城市的交通管理大数据模型
智慧城市交通管理系统借助大数据模型,通过全面收集交通数据(如监控、GPS、公共交通信息等),进行数据清洗和预处理,利用Python的Pandas进行数据管理。通过ARIMA等模型分析,预测交通流量、识别交通模式,支持智能信号控制、预测性维护和事件响应。这种集成分析与决策支持系统提升城市交通效率,确保出行安全,预示着未来交通管理的智能化和个性化趋势。【6月更文挑战第23天】
616 10
|
6月前
|
机器学习/深度学习 自然语言处理 监控
金融行业的大数据风控模型:构建安全高效的信用评估体系
金融机构借助大数据风控提升信贷效率,通过数据收集、清洗、特征工程、模型构建与评估来识别风险。关键技术涉及机器学习、深度学习、NLP和实时处理。以下是一个Python风控模型构建的简例,展示了从数据预处理到模型训练、评估的过程,并提及实时监控预警的重要性。该文旨在阐述大数据风控的核心要素和关键技术,并提供基础的代码实现概念。【6月更文挑战第23天】
1050 8
下一篇
DataWorks