函数计算与对象存储实现WordCount

本文涉及的产品
对象存储 OSS,20GB 3个月
函数计算FC,每月15万CU 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Serverless架构可以在很多领域发挥极具价值的作用。包括监控告警、人工智能、图像处理、音视频处理等,同样,在大数据领域,Serverless架构仍然可以具有良好的表现,以大数据常见的入门案例:WordCount为例,可以依靠Serverless架构实现一个“Serverless版本的MapReduce”。

前言

Serverless架构可以在很多领域发挥极具价值的作用。包括监控告警、人工智能、图像处理、音视频处理等,同样,在大数据领域,Serverless架构仍然可以具有良好的表现,以大数据常见的入门案例:WordCount为例,可以依靠Serverless架构实现一个“Serverless版本的MapReduce”。

MapReduce在百度百科中的解释如下:

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

通过这段描述,可以明确MapReduce是面向大数据并行处理的计算模型、框架和平台,在传统学习中,通常会在Hadoop等分布式框架下进行MapReduce相关工作,随着云计算的逐渐发展,各个云厂商也都先后推出了在线的MapReduce业务。

本节,通过MapReduce模型实现一个简单的WordCount算法,区别于传统使用Hadoop等大数据框架,我们本节内容使用的是对象存储与函数计算的结合。

理论基础

在开始之前,我们根据MapReduce的要求,我们绘制一个简单的流程图:

在这个结构中,我们可以看到,我们需要2个函数分别作Mapper和Reducer,以及3个对象存储的存储桶,分别作为输入的存储桶、中间临时缓存的存储桶以及结果存储桶。以阿里云为例,在项目开始之前,我们现在杭州区准备3个对象存储:


对象存储1 serverless-book-mr-origin

对象存储2 serverless-book-mr-middle

对象存储3 serverless-book-mr-target


为了让整个Mapper和Reducer逻辑更加清晰,在开始之前先对传统的WordCount结构进行改造,使其更加适合云函数,同时合理分配

Mapper和Reducer的工作:

功能实现


编写Mapper相关逻辑,Mapper相关逻辑代码如下:


# -*- coding: utf8 -*-

import datetime

import oss2

import re

import os

import sys

import json

import logging


logging.basicConfig(level=logging.INFO, stream=sys.stdout)

logger = logging.getLogger()

logger.setLevel(level=logging.INFO)

auth = oss2.Auth('<AccessKeyID>', '<AccessKeySecret>')

source_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-origin')

middle_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-middle')



def delete_file_folder(src):

   if os.path.isfile(src):

       try:

           os.remove(src)

       except:

           pass

   elif os.path.isdir(src):

       for item in os.listdir(src):

           itemsrc = os.path.join(src, item)

           delete_file_folder(itemsrc)

       try:

           os.rmdir(src)

       except:

           pass



def download_file(key, download_path):

   logger.info("Download file [%s]" % (key))

   try:

       source_bucket.get_object_to_file(key, download_path)

   except Exception as e:

       print(e)

       return -1

   return 0



def upload_file(key, local_file_path):

   logger.info("Start to upload file to oss")

   try:

       middle_bucket.put_object_from_file(key, local_file_path)

   except Exception as e:

       print(e)

       return -1

   logger.info("Upload data map file [%s] Success" % key)

   return 0



def do_mapping(key, middle_file_key):

   src_file_path = u'/tmp/' + key.split('/')[-1]

   middle_file_path = u'/tmp/' + u'mapped_' + key.split('/')[-1]

   download_ret = download_file(key, src_file_path)  # download src file

   if download_ret == 0:

       inputfile = open(src_file_path, 'r')  # open local /tmp file

       mapfile = open(middle_file_path, 'w')  # open a new file write stream

       for line in inputfile:

           line = re.sub('[^a-zA-Z0-9]', ' ', line)  # replace non-alphabetic/number characters

           words = line.split()

           for word in words:

               mapfile.write('%s\t%s' % (word, 1))  # count for 1

               mapfile.write('\n')

       inputfile.close()

       mapfile.close()

       upload_ret = upload_file(middle_file_key, middle_file_path)  # upload the file's each word

       delete_file_folder(src_file_path)

       delete_file_folder(middle_file_path)

       return upload_ret

   else:

       return -1



def map_caller(event):

   key = event["events"][0]["oss"]["object"]["key"]

   logger.info("Key is " + key)

   middle_file_key = 'middle_' + key.split('/')[-1]

   return do_mapping(key, middle_file_key)



def handler(event, context):

   logger.info("start main handler")

   start_time = datetime.datetime.now()

   res = map_caller(json.loads(event.decode("utf-8")))

   end_time = datetime.datetime.now()

   print("data mapping duration: " + str((end_time - start_time).microseconds / 1000) + "ms")

   if res == 0:

       return "Data mapping SUCCESS"

   else:

       return "Data mapping FAILED"


同样的方法,建立reducer.py文件,编写Reducer逻辑,代码如下:


# -*- coding: utf8 -*-

import oss2

from operator import itemgetter

import os

import sys

import json

import datetime

import logging


logging.basicConfig(level=logging.INFO, stream=sys.stdout)

logger = logging.getLogger()

logger.setLevel(level=logging.INFO)

auth = oss2.Auth('<AccessKeyID>', '<AccessKeySecret>')

middle_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-middle')

target_bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'serverless-book-mr-target')



def delete_file_folder(src):

   if os.path.isfile(src):

       try:

           os.remove(src)

       except:

           pass

   elif os.path.isdir(src):

       for item in os.listdir(src):

           itemsrc = os.path.join(src, item)

           delete_file_folder(itemsrc)

       try:

           os.rmdir(src)

       except:

           pass



def download_file(key, download_path):

   logger.info("Download file [%s]" % (key))

   try:

       middle_bucket.get_object_to_file(key, download_path)

   except Exception as e:

       print(e)

       return -1

   return 0



def upload_file(key, local_file_path):

   logger.info("Start to upload file to oss")

   try:

       target_bucket.put_object_from_file(key, local_file_path)

   except Exception as e:

       print(e)

       return -1

   logger.info("Upload data map file [%s] Success" % key)

   return 0



def alifc_reducer(key, result_key):

   word2count = {}

   src_file_path = u'/tmp/' + key.split('/')[-1]

   result_file_path = u'/tmp/' + u'result_' + key.split('/')[-1]

   download_ret = download_file(key, src_file_path)

   if download_ret == 0:

       map_file = open(src_file_path, 'r')

       result_file = open(result_file_path, 'w')

       for line in map_file:

           line = line.strip()

           word, count = line.split('\t', 1)

           try:

               count = int(count)

               word2count[word] = word2count.get(word, 0) + count

           except ValueError:

               logger.error("error value: %s, current line: %s" % (ValueError, line))

               continue

       map_file.close()

       delete_file_folder(src_file_path)

       sorted_word2count = sorted(word2count.items(), key=itemgetter(1))[::-1]

       for wordcount in sorted_word2count:

           res = '%s\t%s' % (wordcount[0], wordcount[1])

           result_file.write(res)

           result_file.write('\n')

       result_file.close()

       upload_ret = upload_file(result_key, result_file_path)

       delete_file_folder(result_file_path)

       return upload_ret

   else:

       return -1



def reduce_caller(event):

   key = event["events"][0]["oss"]["object"]["key"]

   logger.info("Key is " + key)

   result_key = 'result_' + key.split('/')[-1]

   return alifc_reducer(key, result_key)



def handler(event, context):

   logger.info("start main handler")

   start_time = datetime.datetime.now()

   res = reduce_caller(json.loads(event.decode("utf-8")))

   end_time = datetime.datetime.now()

   print("data reducing duration: " + str((end_time - start_time).microseconds / 1000) + "ms")

   if res == 0:

       return "Data reducing SUCCESS"

   else:

       return "Data reducing FAILED"


创建完成之后,可以在控制台看到两个函数:

以及三个存储桶:

测试体验

此时,我们准备一个英文短文:

我们将短文上传到存储桶serverless-book-mr-origin:

稍等片刻,我们可以在mapper函数执行之后,在存储桶serverless-book-mr-middle中看到文档:

在reducer函数执行完成之后,在存储桶serverless-book-mr-target中看到文档:

至此,可以看到,我们完成了简单的词频统计功能。

总结

其实Serverless架构相对来说比较容易做大数据处理的,通过本实例,希望读者可以对Serverless架构的应用场景有更多的启发,不仅仅在监控告警方面有着很好的表现,在大数据领域,也是不甘落后的。本实例,将多个函数部署在一个服务下,通过三个存储桶和两个函数联动,完成一个mapreduce的功能,我们在实际生产中,每个项目都不会是单个函数单打独斗的,而是多个函数组合应用,形成一个Service体系。希望通过该应用场景,读者们可以对Serverless架构有更深入的了解,并且可以有所启发,将云函数和不同触发器进行组合,应用在更多的领域以及业务中

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
5月前
|
监控 Java Serverless
函数计算产品使用问题之对于OSS打包的zip的保存目录,该如何操作
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
4月前
|
存储 运维 Serverless
函数计算产品使用问题之OSS触发器是否可以只设置文件前缀
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
5月前
|
Java Serverless 数据库连接
函数计算操作报错合集之调用打包的OSS函数时发生报错,该怎么办
Serverless 应用引擎(SAE)是阿里云提供的Serverless PaaS平台,支持Spring Cloud、Dubbo、HSF等主流微服务框架,简化应用的部署、运维和弹性伸缩。在使用SAE过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
域名解析 Serverless API
函数计算产品使用问题之如何配置自定义域名访问OSS中的内容
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
运维 Serverless 数据处理
函数计算产品使用问题之在对象存储服务(OSS)上创建ZIP包解压触发器后,触发器未按预期执行,一般是什么导致的
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
5月前
|
运维 Serverless 对象存储
函数计算产品使用问题之如何配合OSS实现接口收到的图片或文件直接存入OSS
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
6月前
|
JSON Serverless 对象存储
函数计算产品使用问题之如何创建一个同时具有HTTP触发器和OSS触发器的函数
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
7月前
|
弹性计算 监控 Serverless
Serverless 应用引擎操作报错合集之阿里函数计算中调用zip-oss-fc函数返回时候出现错误代码如何解决
Serverless 应用引擎(SAE)是阿里云提供的Serverless PaaS平台,支持Spring Cloud、Dubbo、HSF等主流微服务框架,简化应用的部署、运维和弹性伸缩。在使用SAE过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
Serverless 应用引擎操作报错合集之阿里函数计算中调用zip-oss-fc函数返回时候出现错误代码如何解决
|
6月前
|
JSON 运维 Serverless
函数计算产品使用问题之如何实现数据的读取和修改,而不需要每次都从OSS下载完整的数据
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
6月前
|
运维 网络协议 Serverless
函数计算产品使用问题之怎么将生成的图片保存到oss上
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
下一篇
DataWorks