前言
Serverless架构可以在很多领域发挥极具价值的作用。包括监控告警、人工智能、图像处理、音视频处理等,同样,在大数据领域,Serverless架构仍然可以具有良好的表现,以大数据常见的入门案例:WordCount为例,可以依靠Serverless架构实现一个“Serverless版本的MapReduce”。
MapReduce在百度百科中的解释如下:
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
通过这段描述,可以明确MapReduce是面向大数据并行处理的计算模型、框架和平台,在传统学习中,通常会在Hadoop等分布式框架下进行MapReduce相关工作,随着云计算的逐渐发展,各个云厂商也都先后推出了在线的MapReduce业务。
本节,通过MapReduce模型实现一个简单的WordCount算法,区别于传统使用Hadoop等大数据框架,我们本节内容使用的是对象存储与函数计算的结合。
理论基础
在开始之前,我们根据MapReduce的要求,我们绘制一个简单的流程图:
在这个结构中,我们可以看到,我们需要2个函数分别作Mapper和Reducer,以及3个对象存储的存储桶,分别作为输入的存储桶、中间临时缓存的存储桶以及结果存储桶。以阿里云为例,在项目开始之前,我们现在杭州区准备3个对象存储:
对象存储1 serverless-book-mr-origin
对象存储2 serverless-book-mr-middle
对象存储3 serverless-book-mr-target
为了让整个Mapper和Reducer逻辑更加清晰,在开始之前先对传统的WordCount结构进行改造,使其更加适合云函数,同时合理分配
Mapper和Reducer的工作:
功能实现
编写Mapper相关逻辑,Mapper相关逻辑代码如下:
# -*- coding: utf8 -*-
import datetime
import oss2
import re
import os
import sys
import json
import logging
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger()
logger.setLevel(level=logging.INFO)
auth = oss2.Auth('<AccessKeyID>', '<AccessKeySecret>')
source_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-origin')
middle_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-middle')
def delete_file_folder(src):
if os.path.isfile(src):
try:
os.remove(src)
except:
pass
elif os.path.isdir(src):
for item in os.listdir(src):
itemsrc = os.path.join(src, item)
delete_file_folder(itemsrc)
try:
os.rmdir(src)
except:
pass
def download_file(key, download_path):
logger.info("Download file [%s]" % (key))
try:
source_bucket.get_object_to_file(key, download_path)
except Exception as e:
print(e)
return -1
return 0
def upload_file(key, local_file_path):
logger.info("Start to upload file to oss")
try:
middle_bucket.put_object_from_file(key, local_file_path)
except Exception as e:
print(e)
return -1
logger.info("Upload data map file [%s] Success" % key)
return 0
def do_mapping(key, middle_file_key):
src_file_path = u'/tmp/' + key.split('/')[-1]
middle_file_path = u'/tmp/' + u'mapped_' + key.split('/')[-1]
download_ret = download_file(key, src_file_path) # download src file
if download_ret == 0:
inputfile = open(src_file_path, 'r') # open local /tmp file
mapfile = open(middle_file_path, 'w') # open a new file write stream
for line in inputfile:
line = re.sub('[^a-zA-Z0-9]', ' ', line) # replace non-alphabetic/number characters
words = line.split()
for word in words:
mapfile.write('%s\t%s' % (word, 1)) # count for 1
mapfile.write('\n')
inputfile.close()
mapfile.close()
upload_ret = upload_file(middle_file_key, middle_file_path) # upload the file's each word
delete_file_folder(src_file_path)
delete_file_folder(middle_file_path)
return upload_ret
else:
return -1
def map_caller(event):
key = event["events"][0]["oss"]["object"]["key"]
logger.info("Key is " + key)
middle_file_key = 'middle_' + key.split('/')[-1]
return do_mapping(key, middle_file_key)
def handler(event, context):
logger.info("start main handler")
start_time = datetime.datetime.now()
res = map_caller(json.loads(event.decode("utf-8")))
end_time = datetime.datetime.now()
print("data mapping duration: " + str((end_time - start_time).microseconds / 1000) + "ms")
if res == 0:
return "Data mapping SUCCESS"
else:
return "Data mapping FAILED"
同样的方法,建立reducer.py文件,编写Reducer逻辑,代码如下:
# -*- coding: utf8 -*-
import oss2
from operator import itemgetter
import os
import sys
import json
import datetime
import logging
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger()
logger.setLevel(level=logging.INFO)
auth = oss2.Auth('<AccessKeyID>', '<AccessKeySecret>')
middle_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-middle')
target_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-target')
def delete_file_folder(src):
if os.path.isfile(src):
try:
os.remove(src)
except:
pass
elif os.path.isdir(src):
for item in os.listdir(src):
itemsrc = os.path.join(src, item)
delete_file_folder(itemsrc)
try:
os.rmdir(src)
except:
pass
def download_file(key, download_path):
logger.info("Download file [%s]" % (key))
try:
middle_bucket.get_object_to_file(key, download_path)
except Exception as e:
print(e)
return -1
return 0
def upload_file(key, local_file_path):
logger.info("Start to upload file to oss")
try:
target_bucket.put_object_from_file(key, local_file_path)
except Exception as e:
print(e)
return -1
logger.info("Upload data map file [%s] Success" % key)
return 0
def alifc_reducer(key, result_key):
word2count = {}
src_file_path = u'/tmp/' + key.split('/')[-1]
result_file_path = u'/tmp/' + u'result_' + key.split('/')[-1]
download_ret = download_file(key, src_file_path)
if download_ret == 0:
map_file = open(src_file_path, 'r')
result_file = open(result_file_path, 'w')
for line in map_file:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
logger.error("error value: %s, current line: %s" % (ValueError, line))
continue
map_file.close()
delete_file_folder(src_file_path)
sorted_word2count = sorted(word2count.items(), key=itemgetter(1))[::-1]
for wordcount in sorted_word2count:
res = '%s\t%s' % (wordcount[0], wordcount[1])
result_file.write(res)
result_file.write('\n')
result_file.close()
upload_ret = upload_file(result_key, result_file_path)
delete_file_folder(result_file_path)
return upload_ret
else:
return -1
def reduce_caller(event):
key = event["events"][0]["oss"]["object"]["key"]
logger.info("Key is " + key)
result_key = 'result_' + key.split('/')[-1]
return alifc_reducer(key, result_key)
def handler(event, context):
logger.info("start main handler")
start_time = datetime.datetime.now()
res = reduce_caller(json.loads(event.decode("utf-8")))
end_time = datetime.datetime.now()
print("data reducing duration: " + str((end_time - start_time).microseconds / 1000) + "ms")
if res == 0:
return "Data reducing SUCCESS"
else:
return "Data reducing FAILED"
创建完成之后,可以在控制台看到两个函数:
以及三个存储桶:
测试体验
此时,我们准备一个英文短文:
我们将短文上传到存储桶serverless-book-mr-origin:
稍等片刻,我们可以在mapper函数执行之后,在存储桶serverless-book-mr-middle中看到文档:
在reducer函数执行完成之后,在存储桶serverless-book-mr-target中看到文档:
至此,可以看到,我们完成了简单的词频统计功能。
总结
其实Serverless架构相对来说比较容易做大数据处理的,通过本实例,希望读者可以对Serverless架构的应用场景有更多的启发,不仅仅在监控告警方面有着很好的表现,在大数据领域,也是不甘落后的。本实例,将多个函数部署在一个服务下,通过三个存储桶和两个函数联动,完成一个mapreduce的功能,我们在实际生产中,每个项目都不会是单个函数单打独斗的,而是多个函数组合应用,形成一个Service体系。希望通过该应用场景,读者们可以对Serverless架构有更深入的了解,并且可以有所启发,将云函数和不同触发器进行组合,应用在更多的领域以及业务中