利用函数计算多实例并发实现秒级转存超大文件

本文涉及的产品
简介: # 场景 从指定 URL 下载文件,并将其转存到指定的 [OSS](https://www.aliyun.com/product/oss) Bucket 中。 # 问题难点 * 文件非常大,可能有几个 G 甚至十几 G。

场景

从指定 URL 下载文件,并将其转存到指定的 OSS Bucket 中。

问题难点

  • 文件非常大,可能有几个 G 甚至十几 G。
  • 转存速度要快,转存时间要控制在秒级。
  • 需要走公网。

以往的实现方法

1. 利用服务器转存

ECS 实例上多进程流式下载、流式上传。

缺点:

  • 需要人工操作、人工监控,消耗人力。
  • 无意义地重复工作,无法处理同时的大量请求,也无法集成事件源。
  • 价格昂贵。
  • 带宽有限制,扩大带宽需要支付更多的钱。
  • 难以多实例并发。

2. 利用 OSS API 转存

利用 OSS 提供 的 CopyObject API 进行转存。

缺点:

  • 只能处理源文件和目标文件都在同一个 region 的情况。
  • 不能处理源文件为 URL 的情况。

现在的实现方法

函数计算(FC)上编写函数。

优点:

  • 可以集成事件源,比如通过 OSS 在文件上传后自动触发函数进行转存。
  • 无需运维、无需搭建环境,只需提供函数代码。
  • 价格低廉,几乎免费。

限制与挑战:

  • 跨区域转存需要走公网,带宽有限。
  • 函数计算单次执行时间最多为 $600$ 秒。如何把转存任务拆分为细粒度的子任务,使得它既能在 FC 限定的时间内完成,又能大规模并行,显著加速转存任务。

1. 流式处理

要转存的文件太大,无法全部存储在内存里,因此需要采用流式读取/上传的方式。

2. 文件分片

将超大文件平均分成若干片,交由多个函数实例并行执行,从而显著加快传输速度。此外,出现网络问题时,分片的方式可以让函数只对失败的分片进行重试,减少冗余的工作。

3. 多实例并发

这是最为关键的一点。函数在接到转存任务之后,将超大文件进行分片,对于每个分片都启动一个子进程。该子进程通过函数计算的 SDK 同步调用该函数本身,将上传完成后用于最后合并分片的信息通过返回值传递给主函数。主函数在所有子进程运行完毕之后,将这些分片合并,完成整个转存任务。

large-file-m.png

4. 优化单实例效率

通过参数调优,选择最佳参数等手段,使得单实例转存效率最高。

效果

文件大小 $200$ MB $2$ GB $10.774$ GB $14.68$ GB
单实例 $15$ 秒 $155$ 秒 $>600$ 秒 $>600$ 秒
$100$ 实例并发 $1$ 秒 $4$ 秒 $11$ 秒 $14$ 秒

示例

1. 创建服务

函数计算控制台创建服务,并将权限配置为 AliyunOSSFullAccessAliyunFCInvocationAccess

1.png

2. 创建函数

在第一步创建的服务中创建函数,运行环境设置为 python3,函数执行内存设置为 1536MB,超时时间设置为 600 秒,代码如下:

# coding=utf-8

import time, json, requests, traceback, oss2, fc2
from requests.exceptions import *
from fc2.fc_exceptions import *
from oss2.models import PartInfo
from oss2.exceptions import *
from multiprocessing import Pool
from contextlib import closing

MAX_SUBTASKS = 99 # The number of worker processes to do subtasks
BLOCK_SIZE = 6 * 1024 * 1024 # The size of each part
CHUNK_SIZE = 8 * 1024 # The size of each chunk
SLEEP_TIME = 0.1 # The initial seconds to wait for retrying
MAX_RETRY_TIME = 10 # The maximum retry times

def retry(func):
    """
    Return the result of the lambda function func with retry.
    :param func: (required, lambda) the function.
    :return: The result of func.
    """
    wait_time = SLEEP_TIME
    retry_cnt = 1
    while True:
        if retry_cnt > MAX_RETRY_TIME:
            return func()
        try:
            return func()
        except (ConnectionError, SSLError, ConnectTimeout, Timeout) as e:
            print(traceback.format_exc())
        except (OssError) as e:
            if 500 <= e.status < 600:
                print(traceback.format_exc())
            else:
                raise Exception(e)
        except (FcError) as e:
            if (500 <= e.status_code < 600) or (e.status_code == 429):
                print(traceback.format_exc())
            else:
                raise Exception(e)
        print('Retry %d times...' % retry_cnt)
        time.sleep(wait_time)
        wait_time *= 2
        retry_cnt += 1

def get_info(url):
    """
    Get the CRC64 and total length of the file.
    :param url: (required, string) the url address of the file.
    :return: CRC64, length
    """
    with retry(lambda : requests.get(url, {}, stream = True)) as r:
        return r.headers['x-oss-hash-crc64ecma'], int(r.headers['content-length'])

class Response(object):
    """
    The response class to support reading by chunks.
    """
    def __init__(self, response):
        self.response = response
        self.status = response.status_code
        self.headers = response.headers

    def read(self, amt = None):
        if amt is None:
            content = b''
            for chunk in self.response.iter_content(CHUNK_SIZE):
                content += chunk
            return content
        else:
            try:
                return next(self.response.iter_content(amt))
            except StopIteration:
                return b''

    def __iter__(self):
        return self.response.iter_content(CHUNK_SIZE)

def do_subtask(event, context):
    """
    Download a range of the file from url and then upload it to OSS.
    :param event: (required, json) the json format of event.
    :param context: (required, FCContext) the context of handler.
    :return: ([integer, string]) the part_number and etag of this part.
    """
    oss_endpoint = event['target_endpoint']
    oss_bucket_name = event['target_bucket_name']
    access_key_id = context.credentials.access_key_id
    access_key_secret = context.credentials.access_key_secret
    security_token = context.credentials.security_token
    auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
    bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name)
    object_name = event['target_object_name']
    upload_id = event['upload_id']
    part_number = event['part_number']
    url = event['source_url']
    st = event['st']
    en = event['en']
    try:
        headers = {'Range' : 'bytes=%d-%d' % (st, en)}
        resp = Response(retry(lambda : requests.get(url, headers = headers, stream = True)))
        result = retry(lambda : bucket.upload_part(object_name, upload_id, part_number, resp))
        return [part_number, result.etag]
    except Exception as e:
        print(traceback.format_exc())
        raise Exception(e)

def invoke_subtask(args):
    """
    Invoke the same function synchronously to start a subtask.
    :param args: (object_name, upload_id, part_number, url, st, en, context)
    :object_name: (required, string) the goal object_name.
    :oss_endpoint: (required, string) the goal oss endpoint.
    :oss_bucket_name: (required, string) the goal bucket_name.
    :upload_id: (required, integer) the upload_id of this upload task.
    :part_number: (integer) the part_number of the first part in this subtask.
    :url: (required, string) the url address of the file.
    :st, en: (required, integer) the byte range of this subtask, denoting [st, en].
    :context: (required, FCContext) the context of handler.
    :return: the return of the invoked function.
    """
    object_name = args[0]
    oss_endpoint = args[1]
    oss_bucket_name = args[2]
    upload_id = args[3]
    part_number = args[4]
    url = args[5]
    st = args[6]
    en = args[7]
    context = args[8]
    account_id = context.account_id
    access_key_id = context.credentials.access_key_id
    access_key_secret = context.credentials.access_key_secret
    security_token = context.credentials.security_token
    region = context.region
    service_name = context.service.name
    function_name = context.function.name
    endpoint = 'http://%s.%s-internal.fc.aliyuncs.com' % (account_id, region)
    client = fc2.Client(
        endpoint = endpoint,
        accessKeyID = access_key_id,
        accessKeySecret = access_key_secret,
        securityToken = security_token
    )
    payload = {
        'target_object_name' : object_name,
        'target_endpoint' : oss_endpoint,
        'target_bucket_name' : oss_bucket_name,
        'upload_id' : upload_id,
        'part_number' : part_number,
        'source_url' : url,
        'st' : st,
        'en' : en,
        'is_children' : True
    }
    ret = retry(lambda : client.invoke_function(service_name, function_name, payload = json.dumps(payload)))
    return ret.data

def migrate_file(url, oss_object_name, oss_endpoint, oss_bucket_name, context):
    """
    Download the file from url and then upload it to OSS.
    :param url: (required, string) the url address of the file.
    :param oss_object_name: (required, string) the goal object_name.
    :param oss_endpoint: (required, string) the goal oss endpoint.
    :param oss_bucket_name: (required, string) the goal bucket_name.
    :param context: (required, FCContext) the context of handler.
    :return: actual_crc64, expect_crc64
    :actual_crc64: (string) the CRC64 of upload.
    :expect_crc64: (string) the CRC64 of source file.
    """
    crc64, total_size = get_info(url)
    access_key_id = context.credentials.access_key_id
    access_key_secret = context.credentials.access_key_secret
    security_token = context.credentials.security_token
    auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
    bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name)
    upload_id = retry(lambda : bucket.init_multipart_upload(oss_object_name)).upload_id
    pool = Pool(MAX_SUBTASKS)
    st = 0
    part_number = 1
    tasks = []
    while st < total_size:
        en = min(total_size - 1, st + BLOCK_SIZE - 1)
        tasks.append((oss_object_name, oss_endpoint, oss_bucket_name, upload_id, part_number, url, st, en, context))
        part_number += 1
        st = en + 1
    subtasks = pool.map(invoke_subtask, tasks)
    pool.close()
    pool.join()
    parts = []
    for it in subtasks:
        tmp = json.loads(it)
        parts.append(PartInfo(tmp[0], tmp[1]))
    res = retry(lambda : bucket.complete_multipart_upload(oss_object_name, upload_id, parts))
    return str(res.crc), str(crc64)

def get_oss_endpoint(oss_region, fc_region):
    """
    Get the oss endpoint.
    :param oss_region: (required, string) the region of the target oss.
    :param fc_region: (required, string) the region of the fc function.
    :return: (string) the best oss endpoint.
    """
    endpoint = 'http://oss-' + oss_region
    if oss_region == fc_region:
        endpoint += '-internal'
    return endpoint + '.aliyuncs.com'

def handler(event, context):
    evt = json.loads(event)
    if list(evt.keys()).count('is_children'):
        return json.dumps(do_subtask(evt, context))
    url = evt['source_url']
    oss_object_name = evt['target_object_name']
    oss_endpoint = get_oss_endpoint(evt['target_region'], context.region)
    oss_bucket_name = evt['target_bucket_name']
    st_time = int(time.time())
    wait_time = SLEEP_TIME
    retry_cnt = 1
    while True:
        actual_crc64, expect_crc64 = migrate_file(url, oss_object_name, oss_endpoint, oss_bucket_name, context)
        if actual_crc64 == expect_crc64:
            break
        print('Migration object CRC64 not matched, expected: %s, actual: %s' % (expect_crc64, actual_crc64))
        if retry_cnt > MAX_RETRY_TIME:
            raise Exception('Maximum retry time exceeded.')
        print('Retry %d times...' % retry_cnt)
        time.sleep(wait_time)
        wait_time *= 2
        retry_cnt += 1
    print('Success! Total time: %d s.' % (int(time.time()) - st_time))

3. 编写 event

newnew.png

点击 触发事件,填入以下格式的 json:

{
    "source_url" : "<source_url>",
    "target_object_name" : "<target_object_name>",
    "target_region" : "<target_region>",
    "target_bucket_name" : "<target_bucket_name>"
}

4. 执行函数

点击 执行,等待函数执行完毕,转存成功。

new.png

5. 查看转存文件

前往 OSS 控制台,在对应 bucket 下确认转存成功。

4.png

实际应用

相关实践学习
基于函数计算一键部署掌上游戏机
本场景介绍如何使用阿里云计算服务命令快速搭建一个掌上游戏机。
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
2月前
|
弹性计算 监控 Serverless
Serverless 应用引擎常见问题之相同的配置(4U8G)需要多个实例来扛如何解决
Serverless 应用引擎(Serverless Application Engine, SAE)是一种完全托管的应用平台,它允许开发者无需管理服务器即可构建和部署应用。以下是Serverless 应用引擎使用过程中的一些常见问题及其答案的汇总:
134 5
|
2月前
|
Java Serverless 对象存储
Serverless 应用引擎常见问题之实例文件下载到本地如何解决
Serverless 应用引擎(Serverless Application Engine, SAE)是一种完全托管的应用平台,它允许开发者无需管理服务器即可构建和部署应用。以下是Serverless 应用引擎使用过程中的一些常见问题及其答案的汇总:
173 5
|
2天前
|
存储 Serverless 对象存储
通过FC运行脚本下载文件到OSS
本文介绍了在阿里云中使用函数计算服务(Function Compute)从URL下载文件并存储到OSS(Object Storage Service)的步骤。首先,需开通函数计算服务并创建RAM角色,授权函数计算访问OSS权限。费用详情参考官方计费概述。操作步骤包括:登录OSS控制台,使用公共模板创建执行,配置参数并运行Python脚本,脚本负责从URL下载文件并上传至指定OSS Bucket。执行成功后,文件将出现在目标OSS Bucket中。
19 0
|
4天前
|
消息中间件 Kafka Serverless
小红书黄章衡:AutoMQ Serverless 基石-秒级分区迁移
Apache Kafka的分区迁移通常需要大量数据同步,耗时较长,但在AutoMQ中,由于存算分离架构,迁移时间缩短至秒级。本文深入解析了AutoMQ秒级迁移的原理和源码,包括构建迁移命令、Broker同步变更、元数据持久化、数据上传、选主以及数据恢复等六个步骤。这种高效迁移能力适用于高峰期快速扩容和Serverless按需扩容场景,提升了系统的弹性和运维效率。AutoMQ由Apache RocketMQ和Linux LVS团队创建,旨在提供成本优化和高弹性消息队列服务。
30 3
小红书黄章衡:AutoMQ Serverless 基石-秒级分区迁移
|
12天前
|
运维 Serverless API
Serverless 应用引擎产品使用之在阿里函数计算中,设置单实例并发1如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
140 6
Serverless 应用引擎产品使用之在阿里函数计算中,设置单实例并发1如何解决
|
12天前
|
运维 文字识别 Serverless
Serverless 应用引擎产品使用之在阿里云函数计算中,需要处理的文件大于100MB如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
129 5
|
12天前
|
运维 Dubbo Java
Serverless 应用引擎产品使用之在 Serverless 应用引擎中,查看镜像文件中的 JAR 文件如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
143 2
|
12天前
|
弹性计算 运维 Serverless
Serverless 应用引擎产品使用之在阿里函数计算中,使用阿里云API或SDK从函数计算调用ECS实例的服务如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
40 4
|
12天前
|
缓存 运维 Serverless
Serverless 应用引擎产品使用之阿里函数计算中。将本地电脑上的项目文件部署到阿里云函数计算(FC)上并实现对外提供API和WebUI如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
32 1
|
12天前
|
消息中间件 运维 Serverless
Serverless 应用引擎产品使用之在阿里云函数计算FC中,函数的执行时间是根据实例的存活时间进行计算如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
30 0

热门文章

最新文章

相关产品

  • 函数计算