背景
在某些业务场景下,生成超大的日志文件或者其他文件, 这些文件需要及时移出并 gz 压缩保存到 OSS,但是压缩文件可能会大于 3G 超出函数计算执行环境的最大内存限制, 本文提供流式解决这个问题的方案
- 函数计算配置VPC, 内网打通ecs
- OSS 和 函数计算在相同 region, 内网传输
示例代码
依赖使用第三方库 paramiko, 但是默认的库在传输大文件上有传输速率限制, 需要做如下改造, 同时构造 paramiko.SFTPClient 的时候需要设置好 window_size 和 max_packet_size 这两个参数
import paramiko
import gzip
import oss2
import logging
import os
import time
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)
# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.22.3"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'
def handler(event, context):
start = time.time()
region = context.region
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)
scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
scp.connect(username=USR_NAME, password=USR_PWD)
window_size = 2 ** 28
max_packet_size = 2 ** 26
sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)
pos = 0
CHUNK_SIZE = 1024*1024
APPEND_SIZE = 128
LOG_FILE = "test-1.log"
DST_OBJ = 'dst/' + LOG_FILE + '.gz'
data_out = []
with sftp.open("/root/" + LOG_FILE, "r") as f:
while 1:
data=f.read(CHUNK_SIZE)
if not data:
if len(data_out) > 0 and len(data_out) < APPEND_SIZE:
result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
break
data_out.append(gzip.compress(data))
if len(data_out) == APPEND_SIZE:
upload_data = b"".join(data_out)
result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
pos += len(upload_data)
data_out = []
print("total time = ", time.time() - start)
return "OK"
这个方案, 虽然解决了函数计算内存限制, 但是对于某些超大文件, 比如15G 以上的文件, 10分钟的时间限制又是一个limit
ImproveMent
OSS 支持分片上传功能,那基于分片上传,配合 FC 的弹性伸缩功能, 可以有如下方案:
示例代码:
master:
# -*- coding: utf-8 -*-
import logging
import fc2
import oss2
import paramiko
import math
import json
import time
from threading import Thread
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)
SRC_LOG_FILE = "/root/huge.log"
DST_FILE = "jiahe/push_log_50000_5G.gz"
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'
SUB_LOG = 5*1024*1024*1024 # 交给每个work函数处理的日志大小
PART_SIZE = 16 * 1024 * 1024 # work 函数上传分片的大小, 和work函数大小必须相同
SERVICE_NMAE = "fc_demo" # work函数所在的service, 最好在同一个service
SUB_FUNCTION_NAME = "worker" # work函数的名字
parts = []
def sub_gz(fcClient, subevent):
content=fcClient.invoke_function(SERVICE_NMAE, SUB_FUNCTION_NAME, json.dumps(subevent)).data
global parts
#print(content)
parts.extend(json.loads(content))
def handler(event, context):
#start = time.time()
global parts
parts = []
region = context.region
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)
scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
scp.connect(username=USR_NAME, password=USR_PWD)
sftp=paramiko.SFTPClient.from_transport(scp)
info = sftp.stat(SRC_LOG_FILE)
total_size = info.st_size
print(total_size)
sftp.close()
endpoint = "http://{0}.{1}-internal.fc.aliyuncs.com".format(context.account_id, context.region)
fcClient = fc2.Client(endpoint=endpoint,accessKeyID=creds.accessKeyId,accessKeySecret=creds.accessKeySecret, securityToken=creds.securityToken,Timeout=900)
threadNum = int(math.ceil(float(total_size)/SUB_LOG))
key = DST_FILE
upload_id = bucket.init_multipart_upload(key).upload_id
part_step = int(math.ceil(float(SUB_LOG)/PART_SIZE))
ts = []
left_size = total_size
for i in range(threadNum):
part_start = part_step * i + 1
size = SUB_LOG if SUB_LOG < left_size else left_size
subEvt = {
"src": SRC_LOG_FILE,
"dst": DST_FILE,
"offset": i * SUB_LOG,
"size": size,
"part_number": part_start,
"upload_id" : upload_id,
}
print(i, subEvt)
t = Thread(target=sub_gz, args=(fcClient, subEvt,))
left_size = left_size - SUB_LOG
t.start()
ts.append(t)
for t in ts:
t.join()
parts.sort(key=lambda k: (k.get('part_number', 0)))
#print(parts)
part_objs = []
for part in parts:
part_objs.append(oss2.models.PartInfo(part["part_number"], part["etag"], size = part["size"], part_crc = part["part_crc"]))
bucket.complete_multipart_upload(key, upload_id, part_objs)
#print(time.time() - start)
return "ok"
worker:
import paramiko
import gzip
import oss2
import logging
import os
import time
import json
logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)
# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'
CHUNK_SIZE = 1024*1024
PART_SIZE = 16 * CHUNK_SIZE
def handler(event, context):
region = context.region
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)
scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
scp.connect(username=USR_NAME, password=USR_PWD)
window_size = 2 ** 30
max_packet_size = 2 ** 28
sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)
evt = json.loads(event)
src_log_file = evt['src']
offset = evt['offset']
size = evt['size']
dst_gz_file = evt['dst']
part_number = int(evt['part_number'])
upload_id = evt['upload_id']
key = dst_gz_file
data_out = []
partsInfo = []
with sftp.open(src_log_file, "r") as f:
f.seek(offset)
while 1:
data=f.read(CHUNK_SIZE)
cur = f.tell()
if not data or (cur > offset + size):
if data_out:
upload_data = b"".join(data_out)
size_to_upload = len(upload_data)
# 这里有可能出现分片不足100K的情况, 比如你的文件是 15G+1k, 这个时候出现1K漏单的情况或者即使大于100K但是压缩之后小于100K的情况
# 对于日志文件, 可以考虑填充点无效的字符在后面
if size_to_upload < 100 * 1024:
# fill_data=b"\n\n\n\n\n\n\n\n\n\nAliyunFCFill" + os.urandom(102400)
fill_data=b"\t\t\t\t\t\t\t\t\t\t\t\n"*1024*1024*5
upload_data += gzip.compress(fill_data) # fill_data 压缩后的结果为>100K
size_to_upload = len(upload_data)
result = bucket.upload_part(key, upload_id, part_number, upload_data)
partsInfo.append({
"part_number":part_number,
"etag":result.etag,
"size":size_to_upload,
"part_crc":result.crc})
break
data_out.append(gzip.compress(data))
# 16M gz压缩的结果生成一个分片, oss要求一个分片最小为100K(102400), 通常16M压缩后的文件应该大于100K
if (cur - offset) % PART_SIZE == 0:
upload_data = b"".join(data_out)
size_to_upload = len(upload_data)
result = bucket.upload_part(key, upload_id, part_number, upload_data)
partsInfo.append({
"part_number":part_number,
"etag":result.etag,
"size":size_to_upload,
"part_crc":result.crc})
part_number += 1
data_out = []
return json.dumps(partsInfo)