用手机写代码:基于Serverless的在线编程能力探索

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
函数计算FC,每月15万CU 3个月
简介: 随着计算机科学与技术的发展,越来越多的人开始接触编程,也有越来越多的在线编程平台诞生。以Python语言的在线编程平台为例,大致可以分为两类,一类是OJ类型的,即在线评测的编程平台,这一类的平台特点是阻塞类型的执行,即用户需要一次性将代码和标准输入内容提交,当程序执行完成会一次性将结果返回;另一类则是学习、工具类的在线编程平台,例如Anycodes在线编程等网站,这一类平台的特点是非阻塞类型的执行,即用户可以实时看到代码执行的结果,以及可以实时内容进行内容的输入。

前言

随着计算机科学与技术的发展,越来越多的人开始接触编程,也有越来越多的在线编程平台诞生。以Python语言的在线编程平台为例,大致可以分为两类,一类是OJ类型的,即在线评测的编程平台,这一类的平台特点是阻塞类型的执行,即用户需要一次性将代码和标准输入内容提交,当程序执行完成会一次性将结果返回;另一类则是学习、工具类的在线编程平台,例如Anycodes在线编程等网站,这一类平台的特点是非阻塞类型的执行,即用户可以实时看到代码执行的结果,以及可以实时内容进行内容的输入。

但是,无论是那种类型的在线编程平台,其背后的核心模块( “代码执行器”或“判题机”)都是极具有研究价值,一方面,这类网站通常情况下都需要比要严格的“安全机制”,例如程序会不会有恶意代码,出现死循环、破坏计算机系统等,程序是否需要隔离运行,运行时是否会获取到其他人提交的代码等;另一方面,这类平台通常情况下都会对资源消耗比较大,尤其是比赛来临时,更是需要突然间对相关机器进行扩容,必要时需要大规模集群来进行应对。同时这类网站通常情况下也都有一个比较大的特点,那就是触发式,即每个代码执行前后实际上并没有非常紧密的前后文关系等。

随着Serverless架构的不断发展,很多人发现Serverless架构的请求级隔离和极致弹性等特性可以解决传统在线编程平台所遇到的安全问题和资源消耗问题,Serverless架构的按量付费模式,可以在保证在线编程功能性能的前提下,进一步降低成本。所以,通过Serverless架构实现在线编程功能的开发就逐渐的被更多人所关注和研究。本文将会以阿里云函数计算为例,通过Serverless架构实现一个Python语言的在线编程功能,并对该功能进一步的优化,使其更加贴近本地本地代码执行体验。

在线编程功能开发

一个比较简单的、典型的在线编程功能,在线执行模块通常情况下是需要以下几个能力:

  • 在线执行代码
  • 用户可以输入内容
  • 可以返回结果(标准输出、标准错误等)

除了在线编程所需要实现的功能之外,在线编程在Serverless架构下,所需要实现的业务逻辑,也仅仅被收敛到关注代码执行模块即可:获取客户端发送的程序信息(包括代码、标准输入等),将代码缓存到本地,执行代码,获取结果,但会给客户端,整个架构的流程简图为:

关于执行代码部分,可以通过Python语言的subprocess依赖中的Popen()方法实现,在使用Popen()方法时,有几个比较重要的概念,需要明确:

subprocess.PIPE: 一个可以被用于Popen的stdin 、stdout 和stderr 3个参数的特殊值,表示需要创建一个新的管道;

subprocess.STDOUT:一个可以被用于Popen的stderr参数的输出值,表示子程序的标准错误汇合到标准输出;

所以,当我们想要实现可以进行标准输入(stdin),获取标准输出(stdout)以及标准错误(stderr)的功能,可以简化代码实现为:

# -*- coding: utf-8 -*-

import subprocess

child = subprocess.Popen("python %s" % (fileName),
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT,
                         shell=True)
output = child.communicate(input=input_data.encode("utf-8"))
print(output)
AI 代码解读

除代码执行部分之外,在Serverless架构下,获取到用户代码并将其存储过程中,需要额外注意函数实例中目录的读写权限。通常情况下,在函数计算中,如果不进行硬盘挂载,只有/tmp/目录是有可写入权限的。所以在该项目中,我们将用户传递到服务端的代码进行临时存储时,需要将其写入临时目录/tmp/,在临时存储代码的时候,还需要额外考虑实例复用的情况,所以此时,可以为临时代码提供临时的文件名,例如:

# -*- coding: utf-8 -*-

import random

randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))

path = "/tmp/%s"% randomStr(5)
AI 代码解读

完整的代码实现为:

# -*- coding: utf-8 -*-

import json
import uuid
import random
import subprocess

# 随机字符串
randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))


# Response
class Response:
    def __init__(self, start_response, response, errorCode=None):
        self.start = start_response
        responseBody = {
            'Error': {"Code": errorCode, "Message": response},
        } if errorCode else {
            'Response': response
        }
        # 默认增加uuid,便于后期定位
        responseBody['ResponseId'] = str(uuid.uuid1())
        self.response = json.dumps(responseBody)

    def __iter__(self):
        status = '200'
        response_headers = [('Content-type', 'application/json; charset=UTF-8')]
        self.start(status, response_headers)
        yield self.response.encode("utf-8")


def WriteCode(code, fileName):
    try:
        with open(fileName, "w") as f:
            f.write(code)
        return True
    except Exception as e:
        print(e)
        return False


def RunCode(fileName, input_data=""):
    child = subprocess.Popen("python %s" % (fileName),
                             stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT,
                             shell=True)
    output = child.communicate(input=input_data.encode("utf-8"))
    return output[0].decode("utf-8")


def handler(environ, start_response):
    try:
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))
    except (ValueError):
        request_body_size = 0
    requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))

    code = requestBody.get("code", None)
    inputData = requestBody.get("input", "")
    fileName = "/tmp/" + randomStr(5)
    responseData = RunCode(fileName, inputData) if code and WriteCode(code, fileName) else "Error"

    return Response(start_response, {"result": responseData})
AI 代码解读

完成核心的业务逻辑编写之后,我们可以将代码部署到阿里云函数计算中。部署完成之后,我们可以获得到接口的临时测试地址。通过PostMan对该接口进行测试,以Python语言的输出语句为例:

print('HELLO WORLD')
AI 代码解读

可以看到,当我们通过POST方法,携带代码等作为参数,发起请求后,获得到的响应为:

我们通过响应结果,可以看到,系统是可以正常输出我们的预期结果:HELLO WORLD。至此我们完成了标准输出功能的测试,接下来我们对标准错误等功能进行测试,此时我们将刚刚的输出代码进行破坏:

print('HELLO WORLD)
AI 代码解读

使用同样的方法,再次进行代码执行,可以看到结果:

结果中,我们可以看到Python的报错信息,是符合我们的预期的,至此完成了在线编程功能的标准错误功能的测试,接下来,我们进行标准输入功能的测试,由于我们使用的subprocess.Popen()方法,是一种阻塞方法,所以此时我们需要将代码和标准输入内容一同放到服务端。测试的代码为:

tempInput = input('please input: ')
print('Output: ', tempInput)
AI 代码解读

测试的标准输入内容为:“serverless devs”。
当我们使用同样的方法,发起请求之后,我们可以看到:

系统是正常输出预期的结果。至此我们完成了一个非常简单的在线编程服务的接口。该接口目前只是初级版本,仅用于学习使用,其具有极大的优化空间:

  • 超时时间的处理
  • 代码执行完成,可以进行清理

当然,通过这个接口也可以看到这样一个问题:那就是代码执行过程中是阻塞的,我们没办法进行持续性的输入,也没有办法实时输出,即使需要输入内容也是需要将代码和输入内容一并发送到服务端。这种模式和目前市面上常见的OJ模式很类似,但是就单纯的在线编程而言,还需要进一步对项目优化,使其可以通过非阻塞方法,实现代码的执行,并且可以持续性的进行输入操作,持续性的进行内容输出。

更贴近“本地”的代码执行器

我们以一段代码为例:

import time
print("hello world")
time.sleep(10)
tempInput = input("please: ")
print("Input data: ", tempInput)
AI 代码解读

当我们在本地的执行这段Python代码时,整体的用户侧的实际表现是:

  • 系统输出hello world
  • 系统等待10秒
  • 系统提醒我们please,我们此时可以输入一个字符串
  • 系统输出Input data以及我们刚刚输入的字符串

但是,这段代码如果应用于传统OJ或者刚刚我们所实现的在线编程系统中,表现则大不相同:

  • 代码与我们要输入内容一同传给系统
  • 系统等待10秒
  • 输出hello world、please,以及最后输Input data和我们输入的内容

可以看到,OJ模式上的在线编程功能和本地是有非常大的差距的,至少在体验层面,这个差距是比较大的。为了减少这种体验不统一的问题,我们可以将上上述的架构进一步升级,通过函数的异步触发,以及Python语言的pexpect.spawn()方法实现一款更贴近本地体验的在线编程功能:

在整个项目中,包括了两个函数,两个存储桶:

  • 业务逻辑函数:该函数的主要操作是业务逻辑,包括创建代码执行的任务(通过对象存储触发器进行异步函数执行),以及获取函数输出结果以及对任务函数的标准输入进行相关操作等;
  • 执行器函数:该函数的主要作用是执行用户的函数代码,这部分是通过对象存储触发,通过下载代码、执行代码、获取输入、输出结果等;代码获取从代码存储桶,输出结果和获取输入从业务存储桶;
  • 代码存储桶:该存储桶的作用是存储代码,当用户发起运行代码的请求, 业务逻辑函数收到用户代码后,会将代码存储到该存储桶,再由该存储桶处罚异步任务;
  • 业务存储桶:该存储桶的作用是中间量的输出,主要包括输出内容的缓存、输入内容的缓存;该部分数据可以通过对象存储的本身特性进行生命周期的制定;

为了让代码在线执行起来,更加贴近本地体验,该方案的代码分为两个函数,分别进行业务逻辑处理和在线编程核心功能。

其中业务逻辑处理函数,主要是:

  • 获取用户的代码信息,生成代码执行ID,并将代码存到对象存储,异步触发在线编程函数的执行,返回生成代码执行ID;
  • 获取用户的输入信息和代码执行ID,并将内容存储到对应的对象存储中;
  • 获取代码的输出结果,根据用户指定的代码执行ID,将执行结果从对象存储中读取出来,并返回给用户;

整体的业务逻辑为:

实现的代码为:

# -*- coding: utf-8 -*-

import os
import oss2
import json
import uuid
import random

# 基本配置信息
AccessKey = {
    "id": os.environ.get('AccessKeyId'),
    "secret": os.environ.get('AccessKeySecret')
}

OSSCodeConf = {
    'endPoint': os.environ.get('OSSConfEndPoint'),
    'bucketName': os.environ.get('OSSConfBucketCodeName'),
    'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}

OSSTargetConf = {
    'endPoint': os.environ.get('OSSConfEndPoint'),
    'bucketName': os.environ.get('OSSConfBucketTargetName'),
    'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}

# 获取获取/上传文件到OSS的临时地址
auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])
codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])
targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])

# 随机字符串
randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))


# Response
class Response:
    def __init__(self, start_response, response, errorCode=None):
        self.start = start_response
        responseBody = {
            'Error': {"Code": errorCode, "Message": response},
        } if errorCode else {
            'Response': response
        }
        # 默认增加uuid,便于后期定位
        responseBody['ResponseId'] = str(uuid.uuid1())
        self.response = json.dumps(responseBody)

    def __iter__(self):
        status = '200'
        response_headers = [('Content-type', 'application/json; charset=UTF-8')]
        self.start(status, response_headers)
        yield self.response.encode("utf-8")


def handler(environ, start_response):
    try:
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))
    except (ValueError):
        request_body_size = 0
    requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))

    reqType = requestBody.get("type", None)

    if reqType == "run":
        # 运行代码
        code = requestBody.get("code", None)
        runId = randomStr(10)
        codeBucket.put_object(runId, code.encode("utf-8"))
        responseData = runId
    elif reqType == "input":
        # 输入内容
        inputData = requestBody.get("input", None)
        runId = requestBody.get("id", None)
        targetBucket.put_object(runId + "-input", inputData.encode("utf-8"))
        responseData = 'ok'
    elif reqType == "output":
        # 获取结果
        runId = requestBody.get("id", None)
        targetBucket.get_object_to_file(runId + "-output", '/tmp/' + runId)
        with open('/tmp/' + runId) as f:
            responseData = f.read()
    else:
        responseData = "Error"

    return Response(start_response, {"result": responseData})
AI 代码解读

执行器函数,主要是通过代码存储桶触发,从而进行代码执行的模块,这一部分主要包括:

  • 从存储桶获取代码,并通过pexpect.spawn()进行代码执行;
  • 通过pexpect.spawn().read_nonblocking()非阻塞的获取间断性的执行结果,并写入到对象存储;
  • 通过pexpect.spawn().sendline()进行内容输入;

整体流程为:

代码实现为:

# -*- coding: utf-8 -*-

import os
import re
import oss2
import json
import time
import pexpect

# 基本配置信息
AccessKey = {
    "id": os.environ.get('AccessKeyId'),
    "secret": os.environ.get('AccessKeySecret')
}

OSSCodeConf = {
    'endPoint': os.environ.get('OSSConfEndPoint'),
    'bucketName': os.environ.get('OSSConfBucketCodeName'),
    'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}

OSSTargetConf = {
    'endPoint': os.environ.get('OSSConfEndPoint'),
    'bucketName': os.environ.get('OSSConfBucketTargetName'),
    'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}

# 获取获取/上传文件到OSS的临时地址
auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])
codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])
targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])


def handler(event, context):
    event = json.loads(event.decode("utf-8"))

    for eveEvent in event["events"]:

        # 获取object
        code = eveEvent["oss"]["object"]["key"]
        localFileName = "/tmp/" + event["events"][0]["oss"]["object"]["eTag"]

        # 下载代码
        codeBucket.get_object_to_file(code, localFileName)

        # 执行代码
        foo = pexpect.spawn('python %s' % localFileName)

        outputData = ""

        startTime = time.time()

        # timeout可以通过文件名来进行识别
        try:
            timeout = int(re.findall("timeout(.*?)s", code)[0])
        except:
            timeout = 60

        while (time.time() - startTime) / 1000 <= timeout:
            try:
                tempOutput = foo.read_nonblocking(size=999999, timeout=0.01)
                tempOutput = tempOutput.decode("utf-8", "ignore")

                if len(str(tempOutput)) > 0:
                    outputData = outputData + tempOutput

                # 输出数据存入oss
                targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

            except Exception as e:

                print("Error: ", e)

                # 有输入请求被阻塞
                if str(e) == "Timeout exceeded.":

                    try:
                        # 从oss读取数据
                        targetBucket.get_object_to_file(code + "-input", localFileName + "-input")
                        targetBucket.delete_object(code + "-input")
                        with open(localFileName + "-input") as f:
                            inputData = f.read()
                        if inputData:
                            foo.sendline(inputData)
                    except:
                        pass

                # 程序执行完成输出
                elif "End Of File (EOF)" in str(e):
                    targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
                    return True

                # 程序抛出异常
                else:

                    outputData = outputData + "\n\nException: %s" % str(e)
                    targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

                    return False
AI 代码解读

当我们完成核心的业务逻辑编写之后,我们可以将项目部署到线上。

项目部署完成之后,和上文的测试方法一样,在这里也通过PostMan对接口进行测试。此时,我们需要设定一个覆盖能较全的测试代码,包括输出打印、输入、一些sleep()等方法:

import time
print('hello world')
time.sleep(10)
tempInput = input('please: ')
print('Input data: ', tempInput)
AI 代码解读

当我们通过PostMan发起请求执行这段代码之后,我们可以看到系统为我们返回了预期的代码执行ID:

我们可以看到系统会返回给我们一个代码执行ID,该执行ID将会作为我们整个请求任务的Id,此时,我们可以通过获取输出结果的接口,来获取结果:

由于代码中有:

time.sleep(10)
AI 代码解读

所以,迅速获得结果的时候是看不到后半部分的输出结果,我们可以设置一个轮训任务,不断通过该Id对接口进行刷新:

我们可以看到,10秒钟后,代码执行到了输入部分:

tempInput = input('please: ')
AI 代码解读

此时,我们再通过输入接口,进行输入操作:

完成之后,我们可以看到输入成功(result: ok)的结果,此时,我们继续刷新之前的获取结果部分的请求:

可以看到,我们已经获得到了所有结果的输出。

相对于上文的在线编程功能,这种“更贴近本地的代码执行器“变得复杂了很多,但是在实际使用的过程中,却可以更好的模拟出本地执行代码时的一些现象,例如代码的休眠、阻塞、内容的输出等。

总结

无论是简单的在线代码执行器部分,还是更贴近“本地”的代码执行器部分,这篇文章在所应用的内容是相对广泛的。通过这篇文章你可以看到:

  • HTTP触发器的基本使用方法;对象存储触发器的基本使用方;
  • 函数计算组件、对象存储组件的基本使用方法,组件间依赖的实现方法;

同时,通过这篇文章,也可以从一个侧面看到这样一个常见问题的简单解答:我有一个项目,我是每个接口一个函数,还是多个接口复用一个函数?

针对这个问题,其实最主要的是看业务本身的诉求,如果多个接口表达的含义是一致的,或者是同类的,类似的,并且多个接口的资源消耗是类似的,那么放在一个函数中来通过不同的路径进行区分是完全可以的;如果出现资源消耗差距较大,或者函数类型、规模、类别区别过大的时候,将多个接口放在多个函数下也是没有问题的。

本文实际上是抛砖引玉,无论是OJ系统的“判题机”部分,还是在线编程工具的“执行器部分”,都可以很好的和Serverless架构有着比较有趣的结合点。这种结合点不仅仅可以解决传统在线编程所头疼的事情(安全问题,资源消耗问题,并发问题,流量不稳定问题),更可以将Serverless的价值在一个新的领域发挥出来。

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
打赏
0
0
0
0
1882
分享
相关文章
函数计算产品使用问题之WebIDE编写的Node.js代码是否会自动进行打包部署
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
函数计算产品使用问题之如何解决代码需要多个gpu的问题
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
大模型代码能力体验报告之贪吃蛇小游戏《一》:Claude.ai篇 - 生成、预览和快速部署的serverless一条龙
本文介绍了通过Claude.ai生成并优化Web版贪吃蛇游戏的过程,展示了其强大的代码生成功能及用户友好的界面设计。从初始版本的快速生成到根据用户反馈调整游戏速度,再到提供多种实用工具如文件管理、版本控制和一键部署,Claude.ai不仅是一个代码助手,更像是一个全面的serverless开发平台。文中还呼吁国内厂商关注此类技术的发展。
200 1
电话号码正则表达式 代码 javascript+html,JS正则表达式判断11位手机号码
电话号码正则表达式 代码 javascript+html,JS正则表达式判断11位手机号码
171 2
函数计算产品使用问题之代码上传记录如何查看
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
函数计算产品使用问题之如何在函数代码的根目录中执行命令
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
114 7
云大使 X 函数计算 FC 专属活动上线!享返佣,一键打造 AI 应用
如今,AI 技术已经成为推动业务创新和增长的重要力量。但对于许多企业和开发者来说,如何高效、便捷地部署和管理 AI 应用仍然是一个挑战。阿里云函数计算 FC 以其免运维的特点,大大降低了 AI 应用部署的复杂性。用户无需担心底层资源的管理和运维问题,可以专注于应用的创新和开发,并且用户可以通过一键部署功能,迅速将 AI 大模型部署到云端,实现快速上线和迭代。函数计算目前推出了多种规格的云资源优惠套餐,用户可以根据实际需求灵活选择。
Serverless + AI 让应用开发更简单,加速应用智能化
Serverless + AI 让应用开发更简单,加速应用智能化
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
178 15
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用

热门文章

最新文章

相关产品

  • 函数计算
  • AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等