目前AWS 推出了S3 Select 预览版,对于带分隔符的文本文件(如csv)和 JSON数据,可使用简单的 SQL 表达式轻松检索对象内容中较小且具有针对性的数据集。
在某些场景中,用户需要对oss上的某个大文件抽取出某些信息。本教程展示如何结合oss和函数计算实现类似AWS S3 Select功能,以oss上的一个大小为400M,内含大约105万条记录的csv文件为例,讲解如果利用函数计算流式处理, 查找出包含'Lilly'的记录。
本demo做了什么?
1, 从一个包含100多万行记录的大csv文件中迅速(1s左右)查找到符合条件的记录,以查找出包含'Lilly'的记录为例,返回结果如下:
[
"Lilly Patton,School,Weyerhauser Company,26",
"Lilly Davidson,School,Bristol-Myers Squibb Company,50",
"Lilly Drake,School,Federal Express Corp.,38",
"Lilly Carlson,School,Pentair Inc,62",
"Lilly Fisher,School,Progressive Corporation,42",
"Lilly Morales,School,Mercury General Corporation,27",
"Lilly Lewis,School,The Black & Decker Corporation,35",
"Walter Rivera,School,Eli Lilly and Company,24",
"Lilly Mills,School,Owens & Minor Inc.,60",
"Lilly Jimenez,School,AOL Time Warner Inc.,42",
... 记录条目比较多,就不都展示了
]
2, 单线程分块读取处理与多线程分治处理方案效率比较,分别调用函数100次,耗时比较如下:
耗时 | 单线程分块读取处理 | 多线程分治处理 |
---|---|---|
最小时间 | 6.69807887077 | 0.690281152725 |
最大时间 | 7.1105670929 | 0.946150064468 |
平均时间 | 6.85662093163 | 0.748612663746 |
具体步骤:
注:下面步骤中函数和oss的region是一致的,本例中都是华东2
1, 将例子的csv文件上传到oss中, 在本例子中,该csv文件上传到一个在华东2region中名叫demo-oss
的oss bucket中
2, 创建好service,并且配置service的角色,该角色具有oss的读写权限和fc invoke权限
- 2.1 用控制台创建service ApiFc, 控制台操作可以参考函数计算入门示例 - hello world
- 2.2 用控制台创建一个具有oss读写权限和fc invoke权限的角色 fc-demo,并把角色赋予service
ApiFc
2.2.1 用控制台创建角色并把角色赋予service
ApiFc
的过程可以参考函数模板使用 中步骤6,7
- 角色
fc-demo
,拥有两个policy,一个是对oss的读写,一个是函数调用
- 角色
fc-demo
赋予 serviceApiFc
3, 在service ApiFc
下创建函数, 执行
3.1 单线程demo setup
从本文最后下载附件,解压以后可以看到三个zip文件, 用sing.zip创建函数single.
- 创建函数
- 函数设置
- 创建成功后,进入代码编辑页面,配置一下event的参数,保存,直接按执行就可以执行函数了
3.2 多线程demo setup
- 3.2.1 参考single函数的创建方法,用
query.zip
和query_part.zip
在ApiFc
的service下面创建两个函数query和query_part - 3.2.2 主函数query会调用子函数query_part,如果你的service名字和子函数名不是'ApiFc'和'query_part',就需要修改query中相应的代码:
- 3.2.3 创建完成后,点击执行就行,如下图所示:
具体代码实现:
以下是具体的两种解法:
- 单线程分块读取处理 -> 速度比较慢, 适合小文件查询处理
- 多线程分治处理 -> 速度快,适合大文件查询处理
一,单线程分块读取处理
配置event如下:
{
"file_name": "sampleusers_10m_2.csv",
"oss_bucket": "demo-oss",
"mode": ".*Lilly.*"
}
注:目前mode的含义很简单,就是一条正则表达式,该例子就是寻找包含Lilly的记录
代码如下:
# coding=utf-8
import re
import json
import oss2
import logging
import time
import csv
CHUNK_SIZE = 1024*8
# 函数服务主函数
def handler(event, context):
start = time.time()
evt = json.loads(event)
endpoint = "oss-{}-internal.aliyuncs.com".format(context.region)
oss_bucket, file_name, mode = evt['oss_bucket'], evt['file_name'],evt['mode']
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, endpoint, oss_bucket)
start = time.time()
r = bucket.get_object(file_name)
last_str = ''
result = []
pattern = re.compile(mode, re.DOTALL)
while 1:
# 函数需要的内存跟CHUNK_SIZE有关,这边8k实验速度最快,当oss默认的chunk速度不合适的时候,可以尝试换下chunk大小
data = r.read(CHUNK_SIZE)
if not data:
break
msg = (last_str + data) if last_str else data
rows = msg.splitlines()
if data[-1] != '\n':
# 本数据块剩下的字符串,和下一个数据块首部一部分是完整的一行
last_str = rows[-1]
rows = rows[:-1]
else:
last_str = ''
for row in rows:
if pattern.match(row):
result.append(row)
# 如果使用csv库中可以转为可读性更好的dict的处理,可以支持更强的查询,但是性能很差,40s+
# 如果真的需要强查询,这边可以结合'Lilly' in row,再csv.DictReader,能取个中间效果
# 或者这边可以用正则来处理一些复杂查询
# 或是基于querycsv进行一些改造,https://pythonhosted.org/querycsv/
#f_csv = csv.DictReader(rows)
#for row in f_csv:
# if row['name'] == 'Micheal Swanson':
# result.append(row)
# 将查询结果写回oss
csv_str = 'name,school,company,age\n' + '\n'.join(result)
bucket.put_object('output.csv' , csv_str)
# response这边显示100行
if len(result) > 100:
result = result[0:100]
tips = "to many records, only show 100, you can see in oss output.csv"
result = (tips, result)
cost_time = time.time() - start
print cost_time
return result
上面这个例子调用100次数据,消耗时间如下:
avg: 6.85662093163
min: 6.69807887077
max: 7.1105670929
二,分治法
将大数据分割,使用尽量等分的规则将数据分片分别交给子函数query_part处理,然后将结果汇总
主函数query
配置event如下:
{
"file_name": "sampleusers_10m_2.csv",
"oss_bucket": "demo-oss",
"mode": ".*Lilly.*",
"part_num":16,
"account_id":123456
}
注:上面的account_id替换成自己的阿里云account id,目前mode的含义很简单,就是一条正则表达式,该例子就是寻找包含Lilly的记录
代码如下:
# -*- coding: utf-8 -*-
import logging
import fc
import json
import oss2
import math
import threading
import time
result = []
# 文件一行最大为可能512bytes,这是一个尝试值
MAX_PER_ROW_BYTES = 512
class MyThread(threading.Thread):
def run(self):
# change your service, sub function
r = self.client.invoke_function('ApiFc', 'query_part', self.payload)
global result
result.extend(json.loads(r))
def set_init_data(self, client, payload):
self.client = client
self.payload = payload
def handler(event, context):
evt = json.loads(event)
oss_endpoint = "oss-{}-internal.aliyuncs.com".format(context.region)
oss_bucket, file_name, mode = evt['oss_bucket'], evt['file_name'],evt['mode']
part_num = int(evt['part_num'])
creds = context.credentials
fc_endpoint = '{0}.{1}-internal.fc.aliyuncs.com'.format(evt['account_id'], context.region)
client = fc.Client(
endpoint= fc_endpoint,
accessKeyID=creds.accessKeyId,
accessKeySecret=creds.accessKeySecret,
securityToken=creds.securityToken
)
# get bucket length
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
r = bucket.get_object(file_name)
bytes_length = int(r.stream.fileobj.response.headers['Content-Length'])
# multiThread process
global result
result = []
threads = []
PART_SIZE = int( math.ceil( float(bytes_length)/part_num ) )
begin = 0
end = PART_SIZE-1
for i in xrange(part_num):
# 这边分块一定要科学,即分出来的块是full lines
off_set = -1
if end < bytes_length - 1:
for i in range(10000):
if off_set == -1:
# 每次以MAX_PER_ROW_BYTES大小去找到本数据块的最后一个'\n'
r_s = bucket.get_object(file_name, byte_range=(end+MAX_PER_ROW_BYTES*i, end+MAX_PER_ROW_BYTES*(i+1)))
data = r_s.read()
off_set = data.find('\n')
if off_set != -1:
break
end = end + off_set
payload={'begin':begin, 'end':end, 'oss_bucket':oss_bucket,'file_name':file_name, 'mode':mode}
t = MyThread()
t.set_init_data(client, json.dumps(payload))
t.start()
threads.append(t)
begin = end + 1
end = end + PART_SIZE - 1
if end >= bytes_length:
end = bytes_length-1
if begin >= bytes_length:
break
for t in threads:
t.join()
# 将查询结果写回oss
csv_str = 'name,school,company,age\n' + '\n'.join(result)
bucket.put_object('output.csv' , csv_str)
# response这边显示100行
if len(result) > 100:
result = result[0:100]
tips = "to many records, only show 100, you can see in oss output.csv"
result = (tips, result)
return result
具体的查询每个数据块的函数query_part代码如下:
# coding=utf-8
import re
import urllib
import json
import datetime
import oss2
import time
import csv
CHUNK_SIZE = 1024*8
# 函数服务子函数
def handler(event, context):
start = time.time()
evt = json.loads(event)
begin, end = evt['begin'], evt['end']
file_name, oss_bucket = evt['file_name'], evt['oss_bucket']
mode = evt['mode']
oss_endpoint = "oss-{}-internal.aliyuncs.com".format(context.region)
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
pattern = re.compile(mode, re.DOTALL)
r = bucket.get_object(file_name, byte_range=(begin, end))
last_str = ''
result = []
while 1:
data = r.read(CHUNK_SIZE)
if not data:
break
msg = (last_str + data) if last_str else data
rows = msg.splitlines()
if data[-1] != '\n':
last_str = rows[-1]
rows = rows[:-1]
else:
last_str = ''
for row in rows:
if pattern.match(row):
result.append(row)
print "query_part #### ", evt, time.time() - start, "last_str = ", last_str
return result
上面这个例子调用100次数据,分块的数目设置为16的时候,消耗时间如下:
avg: 0.748612663746
min: 0.690281152725
max: 0.946150064468
本地评估invoke的时间的代码如下:
注意:记得修改query主函数的返回值是函数执行时间
#coding=utf-8
import os
# account_id 改成自己的account_id
command = '''fcli function invoke -f query -s ApiFc --event-str '{"file_name": "sampleusers_10m_2.csv", "part_num": 16, "account_id": 12345, "oss_bucket": "demo-oss", "mode": ".*Lilly.*"}' '''
sum = 0
NUM = 100
time_lst = []
min_t = 10000000
max_t = 0
for i in xrange(NUM):
r = os.popen(command)
info = r.readlines()
for line in info:
line = line.strip('\r\n')
t = float(line)
if t > max_t:
max_t = t
if t < min_t:
min_t = t
sum += t
time_lst.append(t)
print "avg: ", sum/NUM
print "min: ", min_t
print "max: ", max_t
print "all: ", time_lst
总结:
从上面的方法和测试结果可以看出,利用分治的方法可以做到1秒左右从百万级别记录中查询到满足条件的记录。