函数计算实现 oss 上传超大 zip 压缩文件的自动解压处理

本文涉及的产品
函数计算FC,每月15万CU 3个月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: 在本文中,以用户上传超大压缩文件( zip 类型)到 oss, oss 系统捕获 PutObjec/PostObject 的事件, 自动触发函数执行, 函数将压缩文件解压,并将对应的解压文件放在oss 指定的 bucket 的某个目录下为例,展示 FC 的灵丹妙手

前言

阿里云函数计算(简称 FC )提供了一种事件驱动的计算模型。函数的执行是由事件驱动的,函数计算触发器描述了一组规则,当某个事件满足这些规则,事件源就会触发相应的函数。函数计算已经将 对象存储 作为一个事件源用于触发函数, 用于对存入oss的文件进行自动化处理:

image

如上图所示,阿里云对象存储和函数计算无缝集成。您可以为各种类型的事件设置处理函数,当OSS系统捕获到指定类型的事件后,会自动调用函数处理。例如,您可以设置函数来处理PutObject事件,当您调用OSS PutObject API上传图片到OSS后,相关联的函数会自动触发来处理该图片。

在本文中,以用户上传超大压缩文件( zip 类型)到 oss, oss 系统捕获 PutObjec/PostObject 的事件, 自动触发函数执行, 函数将压缩文件解压,并将对应的解压文件放在oss 指定的 bucket 的某个目录下。 比如在bucket myzipfilebucket source 目录是上传压缩文件的目录, processed 目录是解压后压缩文件存放的目录

方法

在本文中,以python3 runtime 为例,一步一步慢慢展示fc的能力

简单法

因为 FC 的临时目录的空间大小为512M,如果使用落盘,明显是下下策, 不仅增加 io 时间, 而且 512M 的限制基本让人敬而远之了。所以我们把一切在内存中完成, 于是有下面的解决方案, FC 从内网中拉取 OSS 中的压缩文件,然后一切在内存中完成,将解压后的文件的 bytes 上传到指定 bucket 中目录, 于是有了下面的代码:

# -*- coding: utf-8 -*-
import oss2, json
import zipfile
import os, io
import logging
import chardet

LOGGER = logging.getLogger()

def handler(event, context):
  """
  The object from OSS will be decompressed automatically .
  param: event:   The OSS event json string. Including oss object uri and other information.
      For detail info, please refer https://help.aliyun.com/document_detail/70140.html?spm=a2c4g.11186623.6.578.5eb8cc74AJCA9p#OSS

  param: context: The function context, including credential and runtime info.

      For detail info, please refer to https://help.aliyun.com/document_detail/56316.html#using-context
  """
  evt_lst = json.loads(event)
  creds = context.credentials
  auth=oss2.StsAuth(
     creds.access_key_id,
     creds.access_key_secret,
     creds.security_token)

  evt = evt_lst['events'][0]
  bucket_name = evt['oss']['bucket']['name']
  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
  bucket = oss2.Bucket(auth, endpoint, bucket_name)
  object_name = evt['oss']['object']['key']

  """
  When a source/ prefix object is placed in an OSS, it is hoped that the object will be decompressed and then stored in the OSS as processed/ prefixed.
  For example, source/a.zip will be processed as processed/a/... 
  "source /", "processed/" can be changed according to the user's requirements."""

  file_type = os.path.splitext(object_name)[1]

  if file_type != ".zip":
    raise RuntimeError('{} filetype is not zip'.format(object_name))

  newKey = object_name.replace("source/", "processed/")
  remote_stream = bucket.get_object(object_name)
  if not remote_stream:
    raise RuntimeError('failed to get oss object. bucket: %s. object: %s' % (bucket_name, object_name))
  zip_buffer = io.BytesIO(remote_stream.read())

  LOGGER.info('download object from oss success: {}'.format(object_name))

  newKey = newKey.replace(".zip", "/")
  with zipfile.ZipFile(zip_buffer) as zip_file:
    for name in zip_file.namelist():
      with zip_file.open(name) as file_obj:
        # fix chinese directory name garbled problem
        try:
          name = name.encode(encoding='cp437')
        except:
          name = name.encode(encoding='utf-8')
        detect = chardet.detect( (name*100)[0:100] )
        confidence = detect["confidence"]
        if confidence >= 0.8:
          try:
            name = name.decode(encoding=detect["encoding"])
          except: 
            name = name.decode(encoding="gb2312")
        else:
           name = name.decode(encoding="gb2312")

        bucket.put_object(newKey +  name, file_obj.read())
  • 优点

    • 方法简单,一切在内存中完成,适合较小的压缩文件解压
  • 不足

    • 有内存的限制,假设压缩文件稍微大点,就很容易超出函数计算执行环境最大内存 3G 的限制

    • 假设压缩文件大小差异很大,以最大压缩文件消耗的内存设置函数内存规格, 增加函数执行费用

流式法

完整的代码示例请在附件下载

注:附件的流式法的代码中的入口函数 py 文件可能没有及时更新, 下载代码下来后, 直接copy 文章中的代码更新入口函数即可

很快,我们自然想到不能完全把压缩文件的内容全部通过 FC 作为 中转站来处理,如果我们先获取压缩文件的 meta 信息,比如我们先拉取压缩文件中的 meta 信息的字节流(很小), 分析出这个大的压缩文件里面有哪些文件,这些文件对应到压缩文件字节流中的起止位置信息;通过这些信息, 压缩文件里面的每个文件都能构造出一个 file-like object, 那么在 FC 这边, 只需要将 get 的 file-like object 进行解压,同时将解压后的内容 as a file-like object put 到指定的 bucket 目录。 完全没必要把所有内容一下子拖到这里统一加工。

  • 改造 zipfile
    在 python 中,我们继续使用 zipfile 这个lib,看起来是这个库支持参数是 file-like object, 但是这个库要求 file-like object 具有 seek 和 tell 接口, oss get_object 获得GetObjectResult 类型的对象虽然是一个 file-like object, 但是没有seektell 接口, 因此我们 zipfile 进行了一些改造:
  • 同时,构造出能被 zipfile 支持的file-like object

    # -*- coding: utf-8 -*-
    import oss2
    from oss2 import utils, models
    import ossZipfile as zipfile
    
    zipfile_support_oss = zipfile
    
    # support upload to oss as a file-like object
    def make_crc_adapter(data, init_crc=0):
      data = utils.to_bytes(data)
      # file-like object
      if hasattr(data,'read'): 
        return utils._FileLikeAdapter(data, crc_callback=utils.Crc64(init_crc))
    
    utils.make_crc_adapter = make_crc_adapter
    
    class OssStreamFileLikeObject(object):
      def __init__(self, bucket, key):
        super(OssStreamFileLikeObject, self).__init__()
        self._bucket = bucket
        self._key = key
        self._meta_data = self._bucket.get_object_meta(self._key)
    
      @property
      def bucket(self):
        return self._bucket
    
      @property
      def key(self):
        return self._key
    
      @property
      def filesize(self):
        return self._meta_data.content_length
    
      def get_reader(self, begin, end):
        begin = begin if begin >= 0 else 0
        end = end if end > 0 else self.filesize - 1
        end = end if end < self.filesize else self.filesize - 1 
        begin = begin if begin < end else end
        return self._bucket.get_object(self._key, byte_range=(begin, end))
    
      def get_content_bytes(self, begin, end):
        reader = self.get_reader(begin, end)
        return reader.read()
    
      def get_last_content_bytes(self, offset):
        return self.get_content_bytes(self.filesize-offset, self.filesize-1)
    
  • 入口函数

# -*- coding: utf-8 -*-
'''
声明:
这个函数针对文件和文件夹命名编码是如下格式:
1. mac/linux 系统, 默认是utf-8
2. windows 系统, 默认是gb2312, 也可以是utf-8

对于其他编码,我们这里尝试使用chardet这个库进行编码判断, 但是这个并不能保证100% 正确,
建议用户先调试函数,如果有必要改写这个函数,并保证调试通过

函数最新进展可以关注该blog: https://yq.aliyun.com/articles/680958

Statement:
This function names and encodes files and folders as follows:
1. MAC/Linux system, default is utf-8
2. For Windows, the default is gb2312 or utf-8

For other encodings, we try to use the chardet library for coding judgment here, 
but this is not guaranteed to be 100% correct. 
If necessary to rewrite this function, and ensure that the debugging pass
'''

import helper
import oss2, json
import os
import logging
import chardet

"""
When a source/ prefix object is placed in an OSS, it is hoped that the object will be decompressed and then stored in the OSS as processed/ prefixed.
For example, source/a.zip will be processed as processed/a/... 
"Source /", "processed/" can be changed according to the user's requirements.

detail: https://yq.aliyun.com/articles/680958
"""
# Close the info log printed by the oss SDK
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

LOGGER = logging.getLogger()

def handler(event, context):
  """
  The object from OSS will be decompressed automatically .
  param: event:   The OSS event json string. Including oss object uri and other information.
      For detail info, please refer https://help.aliyun.com/document_detail/70140.html?spm=a2c4g.11186623.6.578.5eb8cc74AJCA9p#OSS

  param: context: The function context, including credential and runtime info.

      For detail info, please refer to https://help.aliyun.com/document_detail/56316.html#using-context
  """
  evt_lst = json.loads(event)
  creds = context.credentials
  auth=oss2.StsAuth(
    creds.access_key_id,
    creds.access_key_secret,
    creds.security_token)

  evt = evt_lst['events'][0]
  bucket_name = evt['oss']['bucket']['name']
  endpoint = 'oss-' +  evt['region'] + '-internal.aliyuncs.com'
  bucket = oss2.Bucket(auth, endpoint, bucket_name)
  object_name = evt['oss']['object']['key']

  if "ObjectCreated:PutSymlink" == evt['eventName']:
    object_name = bucket.get_symlink(object_name).target_key
    if object_name == "":
      raise RuntimeError('{} is invalid symlink file'.format(evt['oss']['object']['key']))

  file_type = os.path.splitext(object_name)[1]

  if file_type != ".zip":
    raise RuntimeError('{} filetype is not zip'.format(object_name))

  LOGGER.info("start to decompress zip file = {}".format(object_name))

  lst = object_name.split("/")
  zip_name = lst[-1]
  PROCESSED_DIR = os.environ.get("PROCESSED_DIR", "")
  if PROCESSED_DIR and PROCESSED_DIR[-1] != "/":
    PROCESSED_DIR += "/"
  newKey = PROCESSED_DIR + zip_name
  zip_fp = helper.OssStreamFileLikeObject(bucket, object_name)

  newKey = newKey.replace(".zip", "/")

  with helper.zipfile_support_oss.ZipFile(zip_fp) as zip_file:
    for name in zip_file.namelist():
      with zip_file.open(name) as file_obj:
        try:
          name = name.encode(encoding='cp437')
        except:
          name = name.encode(encoding='utf-8')

        # the string to be detect is long enough, the detection result accuracy is higher 
        detect = chardet.detect( (name*100)[0:100] )
        confidence = detect["confidence"]
        if confidence > 0.8:
          try:
            name = name.decode(encoding=detect["encoding"])
          except:
            name = name.decode(encoding='gb2312')
        else:
          name = name.decode(encoding="gb2312")

        bucket.put_object(newKey +  name, file_obj)
  • 优点
    • 可以突破内存限制,小内存的函数也可以干大压缩文件文件的自动解压存放工作
      image
  • 不足
    • 对于较小的压缩文件,不如简单的方法来的简洁和直接

总结

本文针对 oss 上传 zip 压缩文件进行自动解压进行了一些方案的探讨与尝试, 分析了两种方案各自的优点和不足。 但是这两种方法都有一个共同的限制,就是函数执行时间最大为15分钟,如果压缩文件足够大和足够复杂(里面有很多的小文件), 需要合理评估时间, 执行时间 = 解压时间(纯cpu计算,这个跟设置函数内存规格大小成线性关系,需要合理设置) + 网络io 时间(走内网); 一般来说本地(两核3G的配置)解压时间在10分钟以内的, FC 应该都有能力处理。

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
5月前
|
监控 Java Serverless
函数计算产品使用问题之对于OSS打包的zip的保存目录,该如何操作
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
4月前
|
存储 运维 Serverless
函数计算产品使用问题之OSS触发器是否可以只设置文件前缀
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
5月前
|
Java Serverless 数据库连接
函数计算操作报错合集之调用打包的OSS函数时发生报错,该怎么办
Serverless 应用引擎(SAE)是阿里云提供的Serverless PaaS平台,支持Spring Cloud、Dubbo、HSF等主流微服务框架,简化应用的部署、运维和弹性伸缩。在使用SAE过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
域名解析 Serverless API
函数计算产品使用问题之如何配置自定义域名访问OSS中的内容
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
运维 Serverless 数据处理
函数计算产品使用问题之在对象存储服务(OSS)上创建ZIP包解压触发器后,触发器未按预期执行,一般是什么导致的
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
3月前
|
人工智能 自然语言处理 Serverless
阿里云函数计算 x NVIDIA 加速企业 AI 应用落地
阿里云函数计算与 NVIDIA TensorRT/TensorRT-LLM 展开合作,通过结合阿里云的无缝计算体验和 NVIDIA 的高性能推理库,开发者能够以更低的成本、更高的效率完成复杂的 AI 任务,加速技术落地和应用创新。
178 13
|
5天前
|
人工智能 Serverless API
尽享红利,Serverless构建企业AI应用方案与实践
本次课程由阿里云云原生架构师计缘分享,主题为“尽享红利,Serverless构建企业AI应用方案与实践”。课程分为四个部分:1) Serverless技术价值,介绍其发展趋势及优势;2) Serverless函数计算与AI的结合,探讨两者融合的应用场景;3) Serverless函数计算AIGC应用方案,展示具体的技术实现和客户案例;4) 业务初期如何降低使用门槛,提供新用户权益和免费资源。通过这些内容,帮助企业和开发者快速构建高效、低成本的AI应用。
42 12
|
4月前
|
Serverless API 异构计算
函数计算产品使用问题之修改SD模版应用的运行环境
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
4月前
|
运维 Serverless 网络安全
函数计算产品使用问题之通过仓库导入应用时无法配置域名外网访问,该如何排查
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
2月前
|
存储 消息中间件 人工智能
ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用
本文整理自2024年云栖大会阿里云智能集团高级技术专家金吉祥的演讲《ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用》。

热门文章

最新文章

相关产品

  • 函数计算