如何使用函数计算实现流式处理大文件 — AWS S3 Select

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
函数计算FC,每月15万CU 3个月
简介: 函数计算实现流式处理大文件

目前AWS 推出了S3 Select 预览版,对于带分隔符的文本文件(如csv)和 JSON数据,可使用简单的 SQL 表达式轻松检索对象内容中较小且具有针对性的数据集。

在某些场景中,用户需要对oss上的某个大文件抽取出某些信息。本教程展示如何结合oss和函数计算实现类似AWS S3 Select功能,以oss上的一个大小为400M,内含大约105万条记录的csv文件为例,讲解如果利用函数计算流式处理, 查找出包含'Lilly'的记录。

本demo做了什么?

1, 从一个包含100多万行记录的大csv文件中迅速(1s左右)查找到符合条件的记录,以查找出包含'Lilly'的记录为例,返回结果如下:

[
   "Lilly Patton,School,Weyerhauser Company,26",
   "Lilly Davidson,School,Bristol-Myers Squibb Company,50",
   "Lilly Drake,School,Federal Express Corp.,38",
   "Lilly Carlson,School,Pentair Inc,62",
   "Lilly Fisher,School,Progressive Corporation,42",
   "Lilly Morales,School,Mercury General Corporation,27",
   "Lilly Lewis,School,The Black & Decker Corporation,35",
   "Walter Rivera,School,Eli Lilly and Company,24",
   "Lilly Mills,School,Owens & Minor Inc.,60",
   "Lilly Jimenez,School,AOL Time Warner Inc.,42",
   ... 记录条目比较多,就不都展示了
]

2, 单线程分块读取处理与多线程分治处理方案效率比较,分别调用函数100次,耗时比较如下:

耗时 单线程分块读取处理 多线程分治处理
最小时间 6.69807887077 0.690281152725
最大时间 7.1105670929 0.946150064468
平均时间 6.85662093163 0.748612663746

具体步骤:

注:下面步骤中函数和oss的region是一致的,本例中都是华东2

1, 将例子的csv文件上传到oss中, 在本例子中,该csv文件上传到一个在华东2region中名叫demo-oss的oss bucket中

image

2, 创建好service,并且配置service的角色,该角色具有oss的读写权限和fc invoke权限
  • 2.1 用控制台创建service ApiFc, 控制台操作可以参考函数计算入门示例 - hello world
  • 2.2 用控制台创建一个具有oss读写权限和fc invoke权限的角色 fc-demo,并把角色赋予service ApiFc

2.2.1 用控制台创建角色并把角色赋予service ApiFc 的过程可以参考函数模板使用 中步骤6,7

  • 角色fc-demo,拥有两个policy,一个是对oss的读写,一个是函数调用
    image
  • 角色fc-demo 赋予 service ApiFc
    image
3, 在service ApiFc 下创建函数, 执行
3.1 单线程demo setup

从本文最后下载附件,解压以后可以看到三个zip文件, 用sing.zip创建函数single.

  • 创建函数
    image
  • 函数设置
    image
  • 创建成功后,进入代码编辑页面,配置一下event的参数,保存,直接按执行就可以执行函数了
    image
3.2 多线程demo setup
  • 3.2.1 参考single函数的创建方法,用query.zipquery_part.zipApiFc的service下面创建两个函数query和query_part
  • 3.2.2 主函数query会调用子函数query_part,如果你的service名字和子函数名不是'ApiFc'和'query_part',就需要修改query中相应的代码:
    image
  • 3.2.3 创建完成后,点击执行就行,如下图所示:
    image

具体代码实现:

以下是具体的两种解法:

  • 单线程分块读取处理 -> 速度比较慢, 适合小文件查询处理
  • 多线程分治处理 -> 速度快,适合大文件查询处理
一,单线程分块读取处理

配置event如下:

{
    "file_name": "sampleusers_10m_2.csv",
    "oss_bucket": "demo-oss",
    "mode": ".*Lilly.*"
}

注:目前mode的含义很简单,就是一条正则表达式,该例子就是寻找包含Lilly的记录

代码如下:

# coding=utf-8
import re
import json
import oss2
import logging
import time
import csv

CHUNK_SIZE = 1024*8
  
 # 函数服务主函数
def handler(event, context):
    start = time.time()
    evt = json.loads(event)
    endpoint = "oss-{}-internal.aliyuncs.com".format(context.region)
    oss_bucket, file_name, mode = evt['oss_bucket'], evt['file_name'],evt['mode']
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, endpoint, oss_bucket)
    start = time.time()
    r = bucket.get_object(file_name)
    last_str = ''
    result = []
    pattern = re.compile(mode, re.DOTALL)
    while 1:
    # 函数需要的内存跟CHUNK_SIZE有关,这边8k实验速度最快,当oss默认的chunk速度不合适的时候,可以尝试换下chunk大小
        data = r.read(CHUNK_SIZE) 
        if not data:
            break
        msg = (last_str + data) if last_str else data
        rows = msg.splitlines()
        if data[-1] != '\n':
            # 本数据块剩下的字符串,和下一个数据块首部一部分是完整的一行
            last_str = rows[-1]
            rows = rows[:-1]
        else:
            last_str = ''
            
        for row in rows:
            if pattern.match(row):
                result.append(row)
        # 如果使用csv库中可以转为可读性更好的dict的处理,可以支持更强的查询,但是性能很差,40s+
        # 如果真的需要强查询,这边可以结合'Lilly' in row,再csv.DictReader,能取个中间效果
        # 或者这边可以用正则来处理一些复杂查询
        # 或是基于querycsv进行一些改造,https://pythonhosted.org/querycsv/
        #f_csv = csv.DictReader(rows)
        #for row in f_csv:
        #    if row['name'] == 'Micheal Swanson':
        #        result.append(row)
        
     # 将查询结果写回oss
    csv_str = 'name,school,company,age\n' + '\n'.join(result)
    bucket.put_object('output.csv' , csv_str)
   
    # response这边显示100行
    if len(result) > 100:
        result = result[0:100]
        tips = "to many records, only show 100, you can see in oss output.csv"
        result = (tips, result)
    
    cost_time = time.time() - start
    print cost_time
    return result

上面这个例子调用100次数据,消耗时间如下:

avg: 6.85662093163

min: 6.69807887077

max: 7.1105670929

二,分治法

将大数据分割,使用尽量等分的规则将数据分片分别交给子函数query_part处理,然后将结果汇总

主函数query

配置event如下:

{
    "file_name": "sampleusers_10m_2.csv",
    "oss_bucket": "demo-oss",
    "mode": ".*Lilly.*",
    "part_num":16,
    "account_id":123456
}

注:上面的account_id替换成自己的阿里云account id,目前mode的含义很简单,就是一条正则表达式,该例子就是寻找包含Lilly的记录

代码如下:

# -*- coding: utf-8 -*-
import logging
import fc
import json
import oss2
import math
import threading
import time

result = []

# 文件一行最大为可能512bytes,这是一个尝试值
MAX_PER_ROW_BYTES = 512

class MyThread(threading.Thread):
    def run(self):
        # change your service, sub function
        r = self.client.invoke_function('ApiFc', 'query_part', self.payload) 
        global result
        result.extend(json.loads(r))

    def set_init_data(self, client, payload):
        self.client = client
        self.payload = payload
   
def handler(event, context):
    evt = json.loads(event)
    oss_endpoint = "oss-{}-internal.aliyuncs.com".format(context.region)
    oss_bucket, file_name, mode = evt['oss_bucket'], evt['file_name'],evt['mode']
    part_num = int(evt['part_num'])
    creds = context.credentials
    fc_endpoint = '{0}.{1}-internal.fc.aliyuncs.com'.format(evt['account_id'], context.region)
    client = fc.Client(
            endpoint= fc_endpoint,
            accessKeyID=creds.accessKeyId,
            accessKeySecret=creds.accessKeySecret,
            securityToken=creds.securityToken
        )
    
    # get bucket length
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
    r = bucket.get_object(file_name)
    bytes_length = int(r.stream.fileobj.response.headers['Content-Length'])
    
    # multiThread process
    global result 
    result = []
    threads = []
    PART_SIZE = int( math.ceil( float(bytes_length)/part_num ) )

    begin = 0
    end = PART_SIZE-1
    
    for i in xrange(part_num):
        # 这边分块一定要科学,即分出来的块是full lines
        off_set = -1
        if end < bytes_length - 1:
            for i in range(10000):
                if off_set == -1:
                    # 每次以MAX_PER_ROW_BYTES大小去找到本数据块的最后一个'\n'
                    r_s = bucket.get_object(file_name, byte_range=(end+MAX_PER_ROW_BYTES*i, end+MAX_PER_ROW_BYTES*(i+1)))
                    data = r_s.read()
                    off_set = data.find('\n')
                
                if off_set != -1:
                    break
                
            end = end + off_set
        
        payload={'begin':begin, 'end':end, 'oss_bucket':oss_bucket,'file_name':file_name, 'mode':mode}   
        t = MyThread()
        t.set_init_data(client, json.dumps(payload))
        t.start()
        threads.append(t)
        
        begin = end + 1
        end = end + PART_SIZE - 1
        if end >= bytes_length:
            end = bytes_length-1
        
        if begin >= bytes_length:
            break
        
    for t in threads:
        t.join()
        
    # 将查询结果写回oss
    csv_str = 'name,school,company,age\n' + '\n'.join(result)
    bucket.put_object('output.csv' , csv_str)
   
    # response这边显示100行
    if len(result) > 100:
        result = result[0:100]
        tips = "to many records, only show 100, you can see in oss output.csv"
        result = (tips, result)
     
    return result

具体的查询每个数据块的函数query_part代码如下:

# coding=utf-8
import re
import urllib
import json
import datetime
import oss2
import time
import csv

CHUNK_SIZE = 1024*8
  
 # 函数服务子函数
def handler(event, context):
    start = time.time()
    evt = json.loads(event)
    begin, end = evt['begin'], evt['end']
    file_name, oss_bucket = evt['file_name'], evt['oss_bucket']
    mode = evt['mode']
    oss_endpoint = "oss-{}-internal.aliyuncs.com".format(context.region)
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
    pattern = re.compile(mode, re.DOTALL)
    r = bucket.get_object(file_name, byte_range=(begin, end))
    last_str = ''
    result = []
    while 1:
        data = r.read(CHUNK_SIZE)
        if not data:
            break    
        msg = (last_str + data) if last_str else data
        rows = msg.splitlines()
        if data[-1] != '\n':
            last_str = rows[-1]
            rows = rows[:-1]
        else:
            last_str = ''
            
        for row in rows:
           if pattern.match(row):
                result.append(row)
    print "query_part #### ",  evt, time.time() - start, "last_str = ", last_str 
    return result

上面这个例子调用100次数据,分块的数目设置为16的时候,消耗时间如下:

avg: 0.748612663746

min: 0.690281152725

max: 0.946150064468

本地评估invoke的时间的代码如下:

注意:记得修改query主函数的返回值是函数执行时间

#coding=utf-8
import os

# account_id 改成自己的account_id
command = '''fcli function invoke -f query  -s ApiFc --event-str '{"file_name":  "sampleusers_10m_2.csv", "part_num": 16, "account_id": 12345, "oss_bucket": "demo-oss", "mode": ".*Lilly.*"}' '''

sum = 0
NUM = 100
time_lst = []
min_t = 10000000
max_t = 0

for i in xrange(NUM):
    r = os.popen(command) 
    info = r.readlines()  
    for line in info:  
        line = line.strip('\r\n')
        t = float(line)
        if t > max_t:
            max_t = t
        if t < min_t:
            min_t = t
        sum += t
        time_lst.append(t)

print "avg: ", sum/NUM
print "min: ", min_t
print "max: ", max_t
print "all: ", time_lst

总结:

从上面的方法和测试结果可以看出,利用分治的方法可以做到1秒左右从百万级别记录中查询到满足条件的记录。

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
5月前
|
消息中间件 存储 Serverless
函数计算产品使用问题之怎么访问网络附加存储(NAS)存储模型文件
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
5月前
|
存储 人工智能 运维
函数计算产品使用问题之怎么识别并清理文件中转站中的无用文件
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
5月前
|
JavaScript Serverless 数据安全/隐私保护
函数计算产品使用问题之怎么动态设置.npmrc文件以配置私有仓库访问
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
4月前
|
消息中间件 弹性计算 关系型数据库
体验函数计算:高效处理多媒体文件的真实感受与实战总结
该方案在引导和文档方面做得较为详尽,仅在事件驱动机制部分略显简略。部署和代码示例实用,但需注意内存配置以避免超时。使用体验方面,函数计算表现出色,尤其在高并发场景下,显著提升了应用稳定性和成本效益。云产品如OSS、MNS等与函数计算配合流畅,ECS和RDS表现稳健。总体而言,这套方案弹性好、成本低,特别适合应对高并发或流量不确定的场景,值得推荐。
83 24
|
3月前
|
编解码 弹性计算 运维
AWS无服务器直播解决方案
AWS无服务器直播解决方案
|
5月前
|
消息中间件 关系型数据库 Serverless
【阿里云】一键部署创建函数计算服务以处理多媒体文件
通过阿里云的一键部署功能,轻松创建函数计算服务以处理多媒体文件。首先选择地域并配置资源栈名称及其他必要参数,如登录凭证、实例类型及数据库配置。过程中可能需开通相关服务如消息服务MNS,并确保账户有足够的余额。完成配置后,系统自动创建资源栈。当状态显示“创建成功”即部署完毕。最后,通过提供的URL及凭据访问应用,上传PPTX文件进行处理,并下载处理后的结果。
101 5
|
6月前
|
运维 Kubernetes Serverless
Serverless 应用引擎使用问题之s.yaml文件中如何使用外部环境变量
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
6月前
|
存储 缓存 运维
函数计算产品使用问题之如何将外部环境变量放到s.yaml文件中使用
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
5月前
|
存储 运维 Serverless
函数计算产品使用问题之OSS触发器是否可以只设置文件前缀
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
6月前
|
缓存 Serverless API
函数计算产品使用问题之没有s.yaml文件,修改代码如何重新部署
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 函数计算