用手机写代码:基于Serverless的在线编程能力探索(下)

本文涉及的产品
简介: 但是,无论是那种类型,其背后的核心模块“代码执行器”/“判题机”等,都是值得被关注的。一方面,这类网站通常情况下都需要比要严格的“安全考虑”,例如程序会不会有恶意代码,出现死循环、破坏计算机系统等,程序是否需要隔离运行,运行时是否会获取到其他人提交的代码等;另一方面,这类平台通常情况下都会对资源消耗比较大,尤其是比赛来临时,更是需要突然间对相关机器进行扩容,必要时需要大规模集群来进行应对。同时这类网站通常情况下也都有一个比较大的特点,那就是触发式,即每个代码执行前后实际上并没有非常紧密的前后文关系等。

更贴近“本地”的代码执行器

本地代码执行与OJ模式的区别

众所周知,我们在本地执行以下代码:

import time

print("hello world")

time.sleep(10)

tempInput = input("please: ")

print("Input data: ", tempInput)

实际表现是:

  1. 系统输出hello world
  2. 系统等待10秒
  3. 系统提醒我们please,我们此时可以输入一个字符串
  4. 系统输出Input data以及我们刚刚输入的字符串

但是,这段代码如果应用于传统OJ或者上部分我们所实现的简单的在线代码执行器中,表现则有一些不同:

  1. 代码与我们要输入内容一同传给系统
  2. 系统等待10秒
  3. 输出hello world、please,以及最后输Input data和我们输入的内容

可以看到,OJ上的体验效果和本地是有非常大的差距的。

新执行器的流程

为了减少这种体验不统一的问题,可以将代码和结构进行进一步的升级:

在整个项目中,包括了两个函数,两个存储桶:

  • 业务逻辑函数:该函数的主要操作是业务逻辑,包括创建代码执行的任务(通过对象存储触发器进行异步函数执行),以及获取函数输出结果以及对任务函数的标准输入进行相关操作等;
  • 执行器函数:该函数的主要作用是执行用户的函数代码,这部分是通过对象存储触发,通过下载代码、执行代码、获取输入、输出结果等;代码获取从代码存储桶,输出结果和获取输入从业务存储桶;
  • 代码存储桶:该存储桶的作用是存储代码,当用户发起运行代码的请求, 业务逻辑函数收到用户代码后,会将代码存储到该存储桶,再由该存储桶处罚异步任务;
  • 业务存储桶:该存储桶的作用是中间量的输出,主要包括输出内容的缓存、输入内容的缓存;该部分数据可以通过对象存储的本身特性进行生命周期的制定;

代码实现

为了让代码在线执行起来,更加贴近本地体验,该方案的代码分为两个函数:

函数:业务逻辑函数

# -*- coding: utf-8 -*-


import os

import oss2

import json

import uuid

import random


# 基本配置信息

AccessKey = {

   "id": os.environ.get('AccessKeyId'),

   "secret": os.environ.get('AccessKeySecret')

}


OSSCodeConf = {

   'endPoint': os.environ.get('OSSConfEndPoint'),

   'bucketName': os.environ.get('OSSConfBucketCodeName'),

   'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}


OSSTargetConf = {

   'endPoint': os.environ.get('OSSConfEndPoint'),

   'bucketName': os.environ.get('OSSConfBucketTargetName'),

   'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}


# 获取获取/上传文件到OSS的临时地址

auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])

codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])

targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])


# 随机字符串

randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))



# Response

class Response:

   def __init__(self, start_response, response, errorCode=None):

       self.start = start_response

       responseBody = {

           'Error': {"Code": errorCode, "Message": response},

       } if errorCode else {

           'Response': response

       }

       # 默认增加uuid,便于后期定位

       responseBody['ResponseId'] = str(uuid.uuid1())

       print("Response: ", json.dumps(responseBody))

       self.response = json.dumps(responseBody)


   def __iter__(self):

       status = '200'

       response_headers = [('Content-type', 'application/json; charset=UTF-8')]

       self.start(status, response_headers)

       yield self.response.encode("utf-8")



def handler(environ, start_response):

   try:

       request_body_size = int(environ.get('CONTENT_LENGTH', 0))

   except (ValueError):

       request_body_size = 0

   requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))


   reqType = requestBody.get("type", None)


   if reqType == "run":

       # 运行代码

       code = requestBody.get("code", None)

       runId = randomStr(10)

       codeBucket.put_object(runId, code.encode("utf-8"))

       responseData = runId

   elif reqType == "input":

       # 输入内容

       inputData = requestBody.get("input", None)

       runId = requestBody.get("id", None)

       targetBucket.put_object(runId + "-input", inputData.encode("utf-8"))

       responseData = 'ok'

   elif reqType == "output":

       # 获取结果

       runId = requestBody.get("id", None)

       targetBucket.get_object_to_file(runId + "-output", '/tmp/' + runId)

       with open('/tmp/' + runId) as f:

           responseData = f.read()

   else:

       responseData = "Error"


   print(responseData)


   return Response(start_response, {"result": responseData})

函数:执行器函数

# -*- coding: utf-8 -*-


import os

import re

import oss2

import json

import time

import pexpect


# 基本配置信息

AccessKey = {

   "id": os.environ.get('AccessKeyId'),

   "secret": os.environ.get('AccessKeySecret')

}


OSSCodeConf = {

   'endPoint': os.environ.get('OSSConfEndPoint'),

   'bucketName': os.environ.get('OSSConfBucketCodeName'),

   'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}


OSSTargetConf = {

   'endPoint': os.environ.get('OSSConfEndPoint'),

   'bucketName': os.environ.get('OSSConfBucketTargetName'),

   'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}


# 获取获取/上传文件到OSS的临时地址

auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])

codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])

targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])



def handler(event, context):

   event = json.loads(event.decode("utf-8"))


   for eveEvent in event["events"]:


       # 获取object

       print("获取object")

       code = eveEvent["oss"]["object"]["key"]

       localFileName = "/tmp/" + event["events"][0]["oss"]["object"]["eTag"]


       # 下载图片

       print("下载代码")

       print("code: ", code)

       codeBucket.get_object_to_file(code, localFileName)


       # 执行代码

       foo = pexpect.spawn('python %s' % localFileName)


       outputData = ""


       startTime = time.time()


       # timeout可以通过文件名来进行识别

       try:

           timeout = int(re.findall("timeout(.*?)s", code)[0])

       except:

           timeout = 60


       while (time.time() - startTime) / 1000 <= timeout:

           try:

               tempOutput = foo.read_nonblocking(size=999999, timeout=0.01)

               tempOutput = tempOutput.decode("utf-8", "ignore")


               if len(str(tempOutput)) > 0:

                   outputData = outputData + tempOutput


               # 输出数据存入oss

               targetBucket.put_object(code + "-output", outputData.encode("utf-8"))


           except Exception as e:


               print("Error: ", e)


               # 有输入请求被阻塞

               if str(e) == "Timeout exceeded.":


                   try:

                       # 从oss读取数据

                       targetBucket.get_object_to_file(code + "-input", localFileName + "-input")

    targetBucket.delete_object(code + "-input")

                       with open(localFileName + "-input") as f:

                           inputData = f.read()

                       if inputData:

                           foo.sendline(inputData)

                   except:

                       pass


               # 程序执行完成输出

               elif "End Of File (EOF)" in str(e):

                   targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

                   return True


               # 程序抛出异常

               else:


                   outputData = outputData + "\n\nException: %s" % str(e)

                   targetBucket.put_object(code + "-output", outputData.encode("utf-8"))


                   return False


部署上线

可以通过Serverless Devs将代码部署到线上,设定全局变量:

Global:

 Access: release

 Region: cn-beijing

 AccessKeyId: ${Env(AccessKeyId)}

 AccessKeySecret: ${Env(AccessKeySecret)}

 OSSConfBucketCodeName: serverlessbook-runcode-code

 OSSConfBucketTargetName: serverlessbook-runcode-others

 OSSConfObjectSignUrlTimeOut: 1200

 Service:

   Name: ServerlessBook

   Description: Serverless图书案例

   Log:

     Project: aliyun-fc-cn-beijing-9bd4a65b-c9d1-529d-9ce0-c91adfb823ab

     LogStore: function-log

代码存储桶:

CodeBucket:

 Component: oss

 Provider: alibaba

 Access: ${Global.Access}

 Properties:

   Region: ${Global.Region}

   Bucket: ${Global.OSSConfBucketCodeName}

业务存储桶:

TargetBucket:

 Component: oss

 Provider: alibaba

 Access: ${Global.Access}

 Properties:

   Region: ${Global.Region}

   Bucket: ${Global.OSSConfBucketTargetName}

业务逻辑函数:

ServerlessBookRunCodeMain:

 Component: fc

 Provider: alibaba

 Access: ${Global.Access}

 Extends:

   deploy:

     - Hook: s ServerlessBookRunCodeMain install docker

       Src: ./main

       Pre: true

 Properties:

   Region: ${Global.Region}

   Service: ${Global.Service}

   Function:

     Name: serverless_runcode_main

     Description: 业务逻辑

     CodeUri: ./main

     Handler: index.handler

     MemorySize: 128

     Runtime: python3

     Timeout: 5

     Environment:

       - Key: AccessKeyId

         Value: ${Global.AccessKeyId}

       - Key: AccessKeySecret

         Value: ${Global.AccessKeySecret}

       - Key: OSSConfBucketCodeName

         Value: ${CodeBucket.Output.Bucket}

       - Key: OSSConfBucketTargetName

         Value: ${TargetBucket.Output.Bucket}

       - Key: OSSConfEndPoint

         Value: ${CodeBucket.Output.Endpoint.Publish}

       - Key: OSSConfObjectSignUrlTimeOut

         Value: '1200'

     Triggers:

       - Name: RunCodeMain

         Type: HTTP

         Parameters:

           AuthType: ANONYMOUS

           Methods:

             - GET

             - POST

             - PUT

           Domains:

             - Domain: Auto

执行器函数:

ServerlessBookRunCodeCompiler:

 Component: fc

 Provider: alibaba

 Access: ${Global.Access}

 Extends:

   deploy:

     - Hook: s ServerlessBookRunCodeCompiler install docker

       Src: ./compiler

       Pre: true

 Properties:

   Region: ${Global.Region}

   Service:  ${Global.Service}

   Function:

     Name: serverless_runcode_compiler

     Description: 代码执行器

     CodeUri: ./compiler

     Handler: index.handler

     MemorySize: 128

     Runtime: python3

     Timeout: 60

     Environment: ${ServerlessBookRunCodeMain.Properties.Function.Environment}

     Triggers:

       - Name: OSSTrigger

         Type: OSS

         Parameters:

           Bucket: ${CodeBucket.Output.Bucket}

           Events:

             - 'oss:ObjectCreated:*'

           Filter:

             Prefix: ''

             Suffix: ''

在函数部分的配置文档中,可以看到有一个叫做Extends的字段,这个字段实际上是Serverless Devs的行为描述字段,通过该字段可以进行行为操作,例如,在上面的例子中,这部分为:

Extends:

 deploy:

   - Hook: s ServerlessBookRunCodeCompiler install docker

     Src: ./compiler

     Pre: true

实际含义是,在项目进行deploy操作之前,在./compiler目录下,执行指令:

$ s ServerlessBookRunCodeCompiler install docker

当项目描述文档编写完成之后,可以通过:

$ s deploy

进行项目部署,系统会先对项目进行分析:

检测到您有以下项目 < CodeBucket,TargetBucket,ServerlessBookRunCodeMain,ServerlessBookRunCodeCompiler > 要执行

分析完成,就会自动进入部署阶段,稍等片刻,即可完成部署。

接口测试

同“简单的在线代码执行器”部分一样,在这部分也将会通过PostMan对接口进行测试。

测试代码:

import time

print('hello world')

time.sleep(10)

tempInput = input('please: ')

print('Input data: ', tempInput)

当我们通过PostMan发起请求执行这段代码之后:

我们可以看到系统会返回给我们一个token,该token将会作为我们整个请求任务的Id,此时,我们可以通过获取输出结果的接口,来获取结果:

由于代码中有:

time.sleep(10)

所以,迅速获得结果的时候是看不到后半部分的输出结果,我们可以设置一个轮训任务,不断通过该token对接口进行刷新:

我们可以看到,10秒钟后,代码执行到了输入部分:

tempInput = input('please: ')

此时,我们再通过输入接口,进行输入操作:

完成之后,我们可以看到输入成功(result: ok)的结果,此时,我们继续刷新之前的获取结果部分的请求:

可以看到,我们已经获得到了所有结果的输出。

相对于前一部分的结构,这种“更贴近本地的代码执行器“变得复杂了很多,但是在实际使用的过程中,却可以更好的模拟出本地执行代码时的一些现象,例如代码的休眠、阻塞、内容的输出等。

总结

无论是简单的在线代码执行器部分,还是更贴近“本地”的代码执行器部分,这篇文章在所应用的内容是相对广泛的。通过这篇文章你可以看到:

  • HTTP触发器的基本使用方法;对象存储触发器的基本使用方;
  • 函数计算组件、对象存储组件的基本使用方法,组件间依赖的实现方法;

同时,通过这篇文章,也可以从一个侧面看到这样一个常见问题的简单解答:我有一个项目,我是每个接口一个函数,还是多个接口复用一个函数?

针对这个问题,其实最主要的是看业务本身的诉求,如果多个接口表达的含义是一致的,或者是同类的,类似的,并且多个接口的资源消耗是类似的,那么放在一个函数中来通过不同的路径进行区分是完全可以的;如果出现资源消耗差距较大,或者函数类型、规模、类别区别过大的时候,将多个接口放在多个函数下也是没有问题的。

本文实际上是抛砖引玉,无论是OJ系统的“判题机”部分,还是在线编程工具的“执行器部分”,都可以很好的和Serverless架构有着比较有趣的结合点。这种结合点不仅仅可以解决传统在线编程所头疼的事情(安全问题,资源消耗问题,并发问题,流量不稳定问题),更可以将Serverless的价值在一个新的领域发挥出来

相关实践学习
基于函数计算一键部署掌上游戏机
本场景介绍如何使用阿里云计算服务命令快速搭建一个掌上游戏机。
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
18天前
|
运维 监控 Serverless
Serverless 应用引擎产品使用之在阿里函数计算中,在response.send()之后继续执行其它代码如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
193 0
|
18天前
|
弹性计算 JSON 运维
Serverless 应用引擎产品使用之阿里云serverless的s deploy,本地的代码编译后的镜像无法推送上去如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
18天前
|
运维 监控 Serverless
【专栏】无服务器架构,一种云计算模型,让开发者专注编写代码而不必管理服务器(Serverless)
【4月更文挑战第28天】无服务器架构,一种云计算模型,让开发者专注编写代码而不必管理服务器。它基于事件驱动,自动扩展资源并按需计费。优势包括缩短开发周期、优化资源利用、降低成本、提高可用性及简化维护。然而,冷启动延迟、调试困难、性能监控、安全性和学习曲线等挑战仍需解决。随着技术进步,无服务器架构将在科技发展中发挥更大作用。
|
18天前
|
弹性计算 Serverless 应用服务中间件
Serverless 应用引擎操作报错合集之阿里函数计算中{"ErrorCode":"AccessDenied","ErrorMessage":"Current user is in debt."}出现这个代码如何解决
Serverless 应用引擎(SAE)是阿里云提供的Serverless PaaS平台,支持Spring Cloud、Dubbo、HSF等主流微服务框架,简化应用的部署、运维和弹性伸缩。在使用SAE过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
18天前
|
搜索推荐 前端开发 UED
html页面实现自动适应手机浏览器(一行代码搞定)
html页面实现自动适应手机浏览器(一行代码搞定)
35 0
|
18天前
|
监控 Serverless 对象存储
Serverless 应用引擎常见问题之代码和普通版本不一样如何解决
Serverless 应用引擎(Serverless Application Engine, SAE)是一种完全托管的应用平台,它允许开发者无需管理服务器即可构建和部署应用。以下是Serverless 应用引擎使用过程中的一些常见问题及其答案的汇总:
|
18天前
|
存储 安全 Serverless
在函数计算中,当您通过应用关联到 Gitee 仓库并部署代码时
在函数计算中,当您通过应用关联到 Gitee 仓库并部署代码时
369 6
|
8月前
|
Serverless
函数计算中,测试代码“failed to match interface”
函数计算中,测试代码“failed to match interface”
67 0
|
12天前
|
机器学习/深度学习 人工智能 监控
基于函数计算体验AIGC文生图应用
小陈在学习Serverless和函数计算后,计划通过阿里云函数计算服务实践AIGC应用。他发现阿里云提供了基于Stable Diffusion的文生图模型模板,可以快速创建AIGC应用。部署步骤包括开通函数计算服务,通过模板创建应用并部署,然后通过应用域名进行文字生图体验。用户还能查看和管理函数,进行版本和别名管理。实验完成后,应用可以被安全删除。
188 2
|
5天前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
127 1
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用

热门文章

最新文章

相关产品

  • 函数计算