利用函数计算流式 gz 打包 ECS 上的单个 超大文件

本文涉及的产品
函数计算FC,每月15万CU 3个月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: 在某些业务场景下,生成超大的日志文件或者其他文件, 这些文件需要及时移出并 gz 压缩保存到 OSS,但是压缩文件可能会大于 3G 超出函数计算执行环境的最大内存限制, 本文提供流式解决这个问题的方案

背景

在某些业务场景下,生成超大的日志文件或者其他文件, 这些文件需要及时移出并 gz 压缩保存到 OSS,但是压缩文件可能会大于 3G 超出函数计算执行环境的最大内存限制, 本文提供流式解决这个问题的方案

  • 函数计算配置VPC, 内网打通ecs
  • OSS 和 函数计算在相同 region, 内网传输

示例代码

依赖使用第三方库 paramiko, 但是默认的库在传输大文件上有传输速率限制, 需要做如下改造, 同时构造 paramiko.SFTPClient 的时候需要设置好 window_size 和 max_packet_size 这两个参数

image

import paramiko
import gzip
import oss2
import logging
import os
import time

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.22.3"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

def handler(event, context):
    start = time.time()
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    window_size = 2 ** 28
    max_packet_size = 2 ** 26
    sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)
    
    pos = 0
    CHUNK_SIZE = 1024*1024
    APPEND_SIZE = 128
     
    LOG_FILE = "test-1.log"
    DST_OBJ = 'dst/' + LOG_FILE + '.gz'
    
    data_out = []
    with sftp.open("/root/" + LOG_FILE, "r") as f:
        while 1:
            data=f.read(CHUNK_SIZE)      
            if not data:
                if len(data_out) > 0 and len(data_out) < APPEND_SIZE:
                  result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
                break    
            data_out.append(gzip.compress(data))
            if len(data_out) == APPEND_SIZE:
                upload_data = b"".join(data_out)
                result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
                pos += len(upload_data)
                data_out = []
    
    print("total time = ", time.time() - start)
    return "OK"

这个方案, 虽然解决了函数计算内存限制, 但是对于某些超大文件, 比如15G 以上的文件, 10分钟的时间限制又是一个limit

ImproveMent

OSS 支持分片上传功能,那基于分片上传,配合 FC 的弹性伸缩功能, 可以有如下方案:

image

示例代码:

master:

# -*- coding: utf-8 -*-
import logging
import fc2
import oss2
import paramiko
import math
import json
import time
from threading import Thread

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

SRC_LOG_FILE =  "/root/huge.log"
DST_FILE = "jiahe/push_log_50000_5G.gz"
BUCKET_NAME = "oss-demo"

ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

SUB_LOG = 5*1024*1024*1024  # 交给每个work函数处理的日志大小
PART_SIZE = 16 * 1024 * 1024  # work 函数上传分片的大小, 和work函数大小必须相同

SERVICE_NMAE = "fc_demo"    # work函数所在的service, 最好在同一个service
SUB_FUNCTION_NAME = "worker"  # work函数的名字

parts = []
def sub_gz(fcClient, subevent):
    content=fcClient.invoke_function(SERVICE_NMAE, SUB_FUNCTION_NAME, json.dumps(subevent)).data
    global parts
    #print(content)
    parts.extend(json.loads(content))

def handler(event, context):
    #start = time.time()
    global parts
    parts = []
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    sftp=paramiko.SFTPClient.from_transport(scp)
    info = sftp.stat(SRC_LOG_FILE)
    total_size = info.st_size
    print(total_size)
    sftp.close()

    endpoint = "http://{0}.{1}-internal.fc.aliyuncs.com".format(context.account_id, context.region)
    fcClient = fc2.Client(endpoint=endpoint,accessKeyID=creds.accessKeyId,accessKeySecret=creds.accessKeySecret, securityToken=creds.securityToken,Timeout=900)

    threadNum = int(math.ceil(float(total_size)/SUB_LOG))
    key = DST_FILE
    upload_id = bucket.init_multipart_upload(key).upload_id

    part_step = int(math.ceil(float(SUB_LOG)/PART_SIZE))
    ts = []
    left_size = total_size
    for i in range(threadNum):
        part_start = part_step * i + 1
        size = SUB_LOG if SUB_LOG < left_size else left_size
        subEvt = {
          "src": SRC_LOG_FILE,
          "dst": DST_FILE,
          "offset": i * SUB_LOG,
          "size": size,
          "part_number":  part_start, 
          "upload_id" : upload_id,
        }
        print(i, subEvt)
        t = Thread(target=sub_gz, args=(fcClient, subEvt,))
        left_size = left_size - SUB_LOG
        t.start()
        ts.append(t)

    for t in ts:
        t.join()

    parts.sort(key=lambda k: (k.get('part_number', 0)))
    #print(parts)
    part_objs = []
    for part in parts:
        part_objs.append(oss2.models.PartInfo(part["part_number"], part["etag"], size = part["size"], part_crc = part["part_crc"]))
    bucket.complete_multipart_upload(key, upload_id, part_objs)
    #print(time.time() - start)
    return "ok"

worker:

import paramiko
import gzip
import oss2
import logging
import os
import time
import json

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

CHUNK_SIZE = 1024*1024
PART_SIZE = 16 * CHUNK_SIZE

def handler(event, context):
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    window_size = 2 ** 30
    max_packet_size = 2 ** 28
    sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)

    evt = json.loads(event)
    src_log_file = evt['src']
    offset = evt['offset']
    size = evt['size']
    dst_gz_file = evt['dst']
    part_number = int(evt['part_number'])
    upload_id = evt['upload_id']

    key =  dst_gz_file
    data_out = []
    partsInfo = []

    with sftp.open(src_log_file, "r") as f:
        f.seek(offset)
        while 1:
            data=f.read(CHUNK_SIZE)
            cur = f.tell()
            if not data or (cur > offset + size):
                if data_out:
                    upload_data = b"".join(data_out)
                    size_to_upload = len(upload_data)
                    # 这里有可能出现分片不足100K的情况, 比如你的文件是 15G+1k, 这个时候出现1K漏单的情况或者即使大于100K但是压缩之后小于100K的情况
                    # 对于日志文件, 可以考虑填充点无效的字符在后面
                    if size_to_upload < 100 * 1024:
                        # fill_data=b"\n\n\n\n\n\n\n\n\n\nAliyunFCFill" + os.urandom(102400)
                        fill_data=b"\t\t\t\t\t\t\t\t\t\t\t\n"*1024*1024*5
                        upload_data += gzip.compress(fill_data) # fill_data 压缩后的结果为>100K
                        size_to_upload = len(upload_data)
                    result = bucket.upload_part(key, upload_id, part_number, upload_data)
                    partsInfo.append({
                        "part_number":part_number,
                        "etag":result.etag,
                        "size":size_to_upload,
                        "part_crc":result.crc})
                break
            data_out.append(gzip.compress(data))
            # 16M gz压缩的结果生成一个分片, oss要求一个分片最小为100K(102400), 通常16M压缩后的文件应该大于100K
            if (cur - offset) % PART_SIZE == 0: 
                upload_data = b"".join(data_out)
                size_to_upload = len(upload_data)
                result = bucket.upload_part(key, upload_id, part_number, upload_data)
                partsInfo.append({
                      "part_number":part_number,
                      "etag":result.etag,
                      "size":size_to_upload,
                      "part_crc":result.crc})
                part_number += 1
                data_out = []
    
    return json.dumps(partsInfo)
相关实践学习
2分钟自动化部署人生模拟器
本场景将带你借助云效流水线Flow实现人生模拟器小游戏的自动化部署
7天玩转云服务器
云服务器ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,可降低 IT 成本,提升运维效率。本课程手把手带你了解ECS、掌握基本操作、动手实操快照管理、镜像管理等。了解产品详情:&nbsp;https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
Java
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
84 9
|
4月前
|
存储 UED Windows
Windows服务器上大量文件迁移方案
Windows服务器上大量文件迁移方案
241 1
|
5月前
|
JavaScript Serverless 数据安全/隐私保护
函数计算产品使用问题之怎么动态设置.npmrc文件以配置私有仓库访问
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
3月前
|
Python
Flask学习笔记(三):基于Flask框架上传特征值(相关数据)到服务器端并保存为txt文件
这篇博客文章是关于如何使用Flask框架上传特征值数据到服务器端,并将其保存为txt文件的教程。
39 0
Flask学习笔记(三):基于Flask框架上传特征值(相关数据)到服务器端并保存为txt文件
|
3月前
|
Java Shell Maven
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
162 4
|
4月前
|
Java
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
java小工具util系列5:java文件相关操作工具,包括读取服务器路径下文件,删除文件及子文件,删除文件夹等方法
61 4
|
3月前
|
前端开发 Docker 容器
主机host服务器和Docker容器之间的文件互传方法汇总
Docker 成为前端工具,可实现跨设备兼容。本文介绍主机与 Docker 容器/镜像间文件传输的三种方法:1. 构建镜像时使用 `COPY` 或 `ADD` 指令;2. 启动容器时使用 `-v` 挂载卷;3. 运行时使用 `docker cp` 命令。每种方法适用于不同场景,如静态文件打包、开发时文件同步及临时文件传输。注意权限问题、容器停止后的文件传输及性能影响。
705 0
|
4月前
|
消息中间件 弹性计算 关系型数据库
体验函数计算:高效处理多媒体文件的真实感受与实战总结
该方案在引导和文档方面做得较为详尽,仅在事件驱动机制部分略显简略。部署和代码示例实用,但需注意内存配置以避免超时。使用体验方面,函数计算表现出色,尤其在高并发场景下,显著提升了应用稳定性和成本效益。云产品如OSS、MNS等与函数计算配合流畅,ECS和RDS表现稳健。总体而言,这套方案弹性好、成本低,特别适合应对高并发或流量不确定的场景,值得推荐。
81 24
|
3月前
|
前端开发 Java Shell
后端项目打包上传服务器部署运行记录
后端项目打包上传服务器部署运行记录
63 0
|
3月前
|
Java Linux Maven
服务器部署之项目打包及命令行输出
服务器部署之项目打包及命令行输出
55 0

相关产品

  • 函数计算