函数计算实现 oss 上传超大 zip 压缩文件的自动解压处理

本文涉及的产品
函数计算FC,每月15万CU 3个月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: 在本文中,以用户上传超大压缩文件( zip 类型)到 oss, oss 系统捕获 PutObjec/PostObject 的事件, 自动触发函数执行, 函数将压缩文件解压,并将对应的解压文件放在oss 指定的 bucket 的某个目录下为例,展示 FC 的灵丹妙手

前言

阿里云函数计算(简称 FC )提供了一种事件驱动的计算模型。函数的执行是由事件驱动的,函数计算触发器描述了一组规则,当某个事件满足这些规则,事件源就会触发相应的函数。函数计算已经将 对象存储 作为一个事件源用于触发函数, 用于对存入oss的文件进行自动化处理:

image

如上图所示,阿里云对象存储和函数计算无缝集成。您可以为各种类型的事件设置处理函数,当OSS系统捕获到指定类型的事件后,会自动调用函数处理。例如,您可以设置函数来处理PutObject事件,当您调用OSS PutObject API上传图片到OSS后,相关联的函数会自动触发来处理该图片。

在本文中,以用户上传超大压缩文件( zip 类型)到 oss, oss 系统捕获 PutObjec/PostObject 的事件, 自动触发函数执行, 函数将压缩文件解压,并将对应的解压文件放在oss 指定的 bucket 的某个目录下。 比如在bucket myzipfilebucket source 目录是上传压缩文件的目录, processed 目录是解压后压缩文件存放的目录

方法

在本文中,以python3 runtime 为例,一步一步慢慢展示fc的能力

简单法

因为 FC 的临时目录的空间大小为512M,如果使用落盘,明显是下下策, 不仅增加 io 时间, 而且 512M 的限制基本让人敬而远之了。所以我们把一切在内存中完成, 于是有下面的解决方案, FC 从内网中拉取 OSS 中的压缩文件,然后一切在内存中完成,将解压后的文件的 bytes 上传到指定 bucket 中目录, 于是有了下面的代码:

# -*- coding: utf-8 -*-
import oss2, json
import zipfile
import os, io
import logging
import chardet

LOGGER = logging.getLogger()

def handler(event, context):
  """
  The object from OSS will be decompressed automatically .
  param: event:   The OSS event json string. Including oss object uri and other information.
      For detail info, please refer https://help.aliyun.com/document_detail/70140.html?spm=a2c4g.11186623.6.578.5eb8cc74AJCA9p#OSS
  
  param: context: The function context, including credential and runtime info.

      For detail info, please refer to https://help.aliyun.com/document_detail/56316.html#using-context
  """
  evt_lst = json.loads(event)
  creds = context.credentials
  auth=oss2.StsAuth(
     creds.access_key_id,
     creds.access_key_secret,
     creds.security_token)

  evt = evt_lst['events'][0]
  bucket_name = evt['oss']['bucket']['name']
  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
  bucket = oss2.Bucket(auth, endpoint, bucket_name)
  object_name = evt['oss']['object']['key']

  """
  When a source/ prefix object is placed in an OSS, it is hoped that the object will be decompressed and then stored in the OSS as processed/ prefixed.
  For example, source/a.zip will be processed as processed/a/... 
  "source /", "processed/" can be changed according to the user's requirements."""

  file_type = os.path.splitext(object_name)[1]

  if file_type != ".zip":
    raise RuntimeError('{} filetype is not zip'.format(object_name))

  newKey = object_name.replace("source/", "processed/")
  remote_stream = bucket.get_object(object_name)
  if not remote_stream:
    raise RuntimeError('failed to get oss object. bucket: %s. object: %s' % (bucket_name, object_name))
  zip_buffer = io.BytesIO(remote_stream.read())

  LOGGER.info('download object from oss success: {}'.format(object_name))

  newKey = newKey.replace(".zip", "/")
  with zipfile.ZipFile(zip_buffer) as zip_file:
    for name in zip_file.namelist():
      with zip_file.open(name) as file_obj:
        # fix chinese directory name garbled problem
        try:
          name = name.encode(encoding='cp437')
        except:
          name = name.encode(encoding='utf-8')
        detect = chardet.detect( (name*100)[0:100] )
        confidence = detect["confidence"]
        if confidence >= 0.8:
          try:
            name = name.decode(encoding=detect["encoding"])
          except: 
            name = name.decode(encoding="gb2312")
        else:
           name = name.decode(encoding="gb2312")
          
        bucket.put_object(newKey +  name, file_obj.read())
  • 优点

    • 方法简单,一切在内存中完成,适合较小的压缩文件解压
  • 不足

    • 有内存的限制,假设压缩文件稍微大点,就很容易超出函数计算执行环境最大内存 3G 的限制
    • 假设压缩文件大小差异很大,以最大压缩文件消耗的内存设置函数内存规格, 增加函数执行费用

流式法

完整的代码示例请在附件下载

注:附件的流式法的代码中的入口函数 py 文件可能没有及时更新, 下载代码下来后, 直接copy 文章中的代码更新入口函数即可

很快,我们自然想到不能完全把压缩文件的内容全部通过 FC 作为 中转站来处理,如果我们先获取压缩文件的 meta 信息,比如我们先拉取压缩文件中的 meta 信息的字节流(很小), 分析出这个大的压缩文件里面有哪些文件,这些文件对应到压缩文件字节流中的起止位置信息;通过这些信息, 压缩文件里面的每个文件都能构造出一个 file-like object, 那么在 FC 这边, 只需要将 get 的 file-like object 进行解压,同时将解压后的内容 as a file-like object put 到指定的 bucket 目录。 完全没必要把所有内容一下子拖到这里统一加工。

  • 改造 zipfile
    在 python 中,我们继续使用 zipfile 这个lib,看起来是这个库支持参数是 file-like object, 但是这个库要求 file-like object 具有 seek 和 tell 接口, oss get_object 获得GetObjectResult 类型的对象虽然是一个 file-like object, 但是没有seektell 接口, 因此我们 zipfile 进行了一些改造:

  • 同时,构造出能被 zipfile 支持的file-like object

    # -*- coding: utf-8 -*-
    import oss2
    from oss2 import utils, models
    import ossZipfile as zipfile
    
    zipfile_support_oss = zipfile
    
    # support upload to oss as a file-like object
    def make_crc_adapter(data, init_crc=0):
      data = utils.to_bytes(data)
      # file-like object
      if hasattr(data,'read'): 
        return utils._FileLikeAdapter(data, crc_callback=utils.Crc64(init_crc))
    
    utils.make_crc_adapter = make_crc_adapter
    
    class OssStreamFileLikeObject(object):
      def __init__(self, bucket, key):
        super(OssStreamFileLikeObject, self).__init__()
        self._bucket = bucket
        self._key = key
        self._meta_data = self._bucket.get_object_meta(self._key)
    
      @property
      def bucket(self):
        return self._bucket
    
      @property
      def key(self):
        return self._key
    
      @property
      def filesize(self):
        return self._meta_data.content_length
    
      def get_reader(self, begin, end):
        begin = begin if begin >= 0 else 0
        end = end if end > 0 else self.filesize - 1
        end = end if end < self.filesize else self.filesize - 1 
        begin = begin if begin < end else end
        return self._bucket.get_object(self._key, byte_range=(begin, end))
    
      def get_content_bytes(self, begin, end):
        reader = self.get_reader(begin, end)
        return reader.read()
    
      def get_last_content_bytes(self, offset):
        return self.get_content_bytes(self.filesize-offset, self.filesize-1)
  • 入口函数
# -*- coding: utf-8 -*-
'''
声明:
这个函数针对文件和文件夹命名编码是如下格式:
1. mac/linux 系统, 默认是utf-8
2. windows 系统, 默认是gb2312, 也可以是utf-8

对于其他编码,我们这里尝试使用chardet这个库进行编码判断, 但是这个并不能保证100% 正确,
建议用户先调试函数,如果有必要改写这个函数,并保证调试通过

函数最新进展可以关注该blog: https://yq.aliyun.com/articles/680958

Statement:
This function names and encodes files and folders as follows:
1. MAC/Linux system, default is utf-8
2. For Windows, the default is gb2312 or utf-8

For other encodings, we try to use the chardet library for coding judgment here, 
but this is not guaranteed to be 100% correct. 
If necessary to rewrite this function, and ensure that the debugging pass
'''

import helper
import oss2, json
import os
import logging
import chardet

"""
When a source/ prefix object is placed in an OSS, it is hoped that the object will be decompressed and then stored in the OSS as processed/ prefixed.
For example, source/a.zip will be processed as processed/a/... 
"Source /", "processed/" can be changed according to the user's requirements.

detail: https://yq.aliyun.com/articles/680958
"""
# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

LOGGER = logging.getLogger()

def handler(event, context):
  """
  The object from OSS will be decompressed automatically .
  param: event:   The OSS event json string. Including oss object uri and other information.
      For detail info, please refer https://help.aliyun.com/document_detail/70140.html?spm=a2c4g.11186623.6.578.5eb8cc74AJCA9p#OSS
  
  param: context: The function context, including credential and runtime info.

      For detail info, please refer to https://help.aliyun.com/document_detail/56316.html#using-context
  """
  evt_lst = json.loads(event)
  creds = context.credentials
  auth=oss2.StsAuth(
    creds.access_key_id,
    creds.access_key_secret,
    creds.security_token)

  evt = evt_lst['events'][0]
  bucket_name = evt['oss']['bucket']['name']
  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
  bucket = oss2.Bucket(auth, endpoint, bucket_name)
  object_name = evt['oss']['object']['key']

  if "ObjectCreated:PutSymlink" == evt['eventName']:
    object_name = bucket.get_symlink(object_name).target_key
    if object_name == "":
      raise RuntimeError('{} is invalid symlink file'.format(evt['oss']['object']['key']))

  file_type = os.path.splitext(object_name)[1]

  if file_type != ".zip":
    raise RuntimeError('{} filetype is not zip'.format(object_name))

  LOGGER.info("start to decompress zip file = {}".format(object_name))
  
  lst = object_name.split("/")
  zip_name = lst[-1]
  PROCESSED_DIR = os.environ.get("PROCESSED_DIR", "")
  if PROCESSED_DIR and PROCESSED_DIR[-1] != "/":
    PROCESSED_DIR += "/"
  newKey = PROCESSED_DIR + zip_name
  zip_fp = helper.OssStreamFileLikeObject(bucket, object_name)
  
  newKey = newKey.replace(".zip", "/")
  
  with helper.zipfile_support_oss.ZipFile(zip_fp) as zip_file:
    for name in zip_file.namelist():
      with zip_file.open(name) as file_obj:
        try:
          name = name.encode(encoding='cp437')
        except:
          name = name.encode(encoding='utf-8')
        
        # the string to be detect is long enough, the detection result accuracy is higher 
        detect = chardet.detect( (name*100)[0:100] )
        confidence = detect["confidence"]
        if confidence > 0.8:
          try:
            name = name.decode(encoding=detect["encoding"])
          except:
            name = name.decode(encoding='gb2312')
        else:
          name = name.decode(encoding="gb2312")
          
        bucket.put_object(newKey +  name, file_obj)
  • 优点

    • 可以突破内存限制,小内存的函数也可以干大压缩文件文件的自动解压存放工作
      image
  • 不足

    • 对于较小的压缩文件,不如简单的方法来的简洁和直接

总结

本文针对 oss 上传 zip 压缩文件进行自动解压进行了一些方案的探讨与尝试, 分析了两种方案各自的优点和不足。 但是这两种方法都有一个共同的限制,就是函数执行时间最大为15分钟,如果压缩文件足够大和足够复杂(里面有很多的小文件), 需要合理评估时间, 执行时间 = 解压时间(纯cpu计算,这个跟设置函数内存规格大小成线性关系,需要合理设置) + 网络io 时间(走内网); 一般来说本地(两核3G的配置)解压时间在10分钟以内的, FC 应该都有能力处理。

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
4月前
|
监控 Java Serverless
函数计算产品使用问题之对于OSS打包的zip的保存目录,该如何操作
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
4月前
|
运维 Serverless 数据处理
函数计算产品使用问题之在对象存储服务(OSS)上创建ZIP包解压触发器后,触发器未按预期执行,一般是什么导致的
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
5月前
|
弹性计算 运维 网络协议
Serverless 应用引擎产品使用合集之部署WAR包后,将被解压到什么目录下
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
6月前
|
存储 弹性计算 安全
对象存储OSS产品常见问题之ZIP包解压缩失败异常如何解决
对象存储OSS是基于互联网的数据存储服务模式,让用户可以安全、可靠地存储大量非结构化数据,如图片、音频、视频、文档等任意类型文件,并通过简单的基于HTTP/HTTPS协议的RESTful API接口进行访问和管理。本帖梳理了用户在实际使用中可能遇到的各种常见问题,涵盖了基础操作、性能优化、安全设置、费用管理、数据备份与恢复、跨区域同步、API接口调用等多个方面。
258 0
|
6月前
|
JavaScript Java Serverless
函数计算中,这里是用的curl的方式,如何改用http的post方式请求?还有如何设置oss打包的zip的保存目录?
函数计算中,这里是用的curl的方式,如何改用http的post方式请求?还有如何设置oss打包的zip的保存目录?
198 0
|
4月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
18天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
3月前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
4月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如何使用Python和阿里云SDK读取OSS中的文件
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
存储 运维 安全
阿里云OSS的优势
【7月更文挑战第19天】阿里云OSS的优势
192 2

热门文章

最新文章

相关产品

  • 函数计算