用手机写代码:基于Serverless的在线编程能力探索

简介: 本文通过“Serverless角度”,实现简单的Python语言的在线编程能力,并对其进行进一步探索。


1. 前言

随着计算机科学与技术的发展,越来越多的人开始接触编程,也有越来越多的在线编程平台诞生。以Python语言的在线编程平台为例,关于Python语言的在线编程平台大致分为两类,一类是OJ类型的,即在线评测的编程平台,另一类则是学习、工具类的在线编程平台,例如Anycodes在线编程等网站。

但是,无论是那种类型,其背后的核心模块“代码执行器”/“判题机”等,都是值得被关注的。一方面,这类网站通常情况下都需要比要严格的“安全考虑”,例如程序会不会有恶意代码,出现死循环、破坏计算机系统等,程序是否需要隔离运行,运行时是否会获取到其他人提交的代码等;另一方面,这类平台通常情况下都会对资源消耗比较大,尤其是比赛来临时,更是需要突然间对相关机器进行扩容,必要时需要大规模集群来进行应对。同时这类网站通常情况下也都有一个比较大的特点,那就是触发式,即每个代码执行前后实际上并没有非常紧密的前后文关系等。

综合上面的问题和特点,结合Serverless架构,本文将会通过“Serverless角度”,实现简单的Python语言的在线编程能力,并对其进行进一步探索。

2. 简单的在线代码执行器

2.1 基本流程

这一部分将会Python语言实现一个简单的在线代码执行工具,该工具拥有以下特点:

  • 可以执行Python代码;

  • 支持标准输入;

  • 可以返回标准输出和标准错误。

该简单工具的整个过程包括:

image.png

2.2 代码实现

其中执行代码部分,我们可以通过一些常见的Python依赖库,例如subprocess来进行实现:

child = subprocess.Popen("python %s" % (fileName),
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT,
                         shell=True)
output = child.communicate(input=input_data.encode("utf-8"))
print(output)

在函数计算中(无论阿里云函数计算还是腾讯云云函数),通常情况下,再不进行硬盘挂载的前提下,只有/tmp/目录是有可写入权限的,所以函数计算部分,可以将代码写入临时目录/tmp/,并进行代码运行,获得结果:

# -*- coding: utf-8 -*-

import os
import json
import uuid
import random
import subprocess

# 随机字符串
randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))


# Response
class Response:
    def __init__(self, start_response, response, errorCode=None):
        self.start = start_response
        responseBody = {
            'Error': {"Code": errorCode, "Message": response},
        } if errorCode else {
            'Response': response
        }
        # 默认增加uuid,便于后期定位
        responseBody['ResponseId'] = str(uuid.uuid1())
        print("Response: ", json.dumps(responseBody))
        self.response = json.dumps(responseBody)

    def __iter__(self):
        status = '200'
        response_headers = [('Content-type', 'application/json; charset=UTF-8')]
        self.start(status, response_headers)
        yield self.response.encode("utf-8")


def WriteCode(code, fileName):
    try:
        with open(fileName, "w") as f:
            f.write(code)
        return True
    except Exception as e:
        print(e)
        return False


def RunCode(fileName, input_data=""):
    child = subprocess.Popen("python %s" % (fileName),
                             stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT,
                             shell=True)
    output = child.communicate(input=input_data.encode("utf-8"))
    print(output)
    return output[0].decode("utf-8")


def handler(environ, start_response):
    try:
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))
    except (ValueError):
        request_body_size = 0
    requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))

    code = requestBody.get("code", None)
    inputData = requestBody.get("input", "")
    fileName = "/tmp/" + randomStr(5)

    if code and WriteCode(code, fileName):
        output = RunCode(fileName, inputData)
        responseData = output
    else:
        responseData = "Error"

    print(responseData)

    return Response(start_response, {"result": responseData})

2.3 部署上线

当代码编写完成,可以直接通过Serverless-Devs工具进行项目部署,首先编写s.yaml文件:

edition: 1.0.0
name: web-framework-app

services:
  ServerlessBookRunCodeDemo: # 业务名称/模块名称
    component:  fc
    props: # 组件的属性值
      region: cn-beijing
      service:
        name: ServerlessBook
        description: Serverless图书案例
        logConfig:
          project: aliyun-fc-cn-beijing-9bd4a65b-c9d1-529d-9ce0-c91adfb823ab
          logStore: function-log
      function:
        name: serverless_run_code_ordinary
        description: 简单版代码在线执行器
        codeUri: ./
        handler: index.handler
        memorySize: 128
        runtime: python3
        timeout: 5
      triggers:
        - name: httpTrigger
          type: http
          config:
            authType: anonymous
            methods:
              - GET
      customDomains:
        - domainName: auto
          protocol: HTTP
          routeConfigs:
            - path: '/*'

完成编写之后,可以通过:

s deploy

进行项目部署:

image.png

完成项目部署之后,可以通过:

s logs -t

对日志系统进行监听,当函数被触发时,可以看到我们的一些日志内容。

2.4 接口测试

接下来,可以通过PostMan进行接口测试,以Python语言的输出语句为例:

print('HELLO WORLD')

测试结果:

image.png

可以看到,系统是可以正常输出我们的预期结果:HELLO WORLD,当我们将该部分代码进行破坏:

print('HELLO WORLD)

再次执行,可以看到错误结果:

image.png

当我们将带有输入内容的代码传入:

tempInput = input('please input: ')
print('Output: ', tempInput)

并输入“serverless devs”,可以看到:

image.png

至此我们完成了一个非常简单的在线代码执行的接口。针对这个接口,是有蛮大的优化空间的:

  • 超时时间的处理;

  • 代码执行完成,可以进行清理。

当然,通过这个接口也可以看到这样一个问题:那就是代码执行过程中是阻塞的,我们没办法进行持续性的输入,就算是想要输入内容也是需要将代码和输入内容一并发送到服务端。这种模式和目前市面上常见的OJ模式很类似。所以,针对上述的简单的在线代码执行器而言,是可以进行部分完善与OJ系统进行结合使用。

3. 更贴近“本地”的代码执行器

3.1 本地代码执行与OJ模式的区别

众所周知,我们在本地执行以下代码:

import time
print("hello world")
time.sleep(10)
tempInput = input("please: ")
print("Input data: ", tempInput)

实际表现是:

  1. 系统输出hello world;

  2. 系统等待10秒;

  3. 系统提醒我们please,我们此时可以输入一个字符串;

  4. 系统输出Input data以及我们刚刚输入的字符串。

但是,这段代码如果应用于传统OJ或者上部分我们所实现的简单的在线代码执行器中,表现则有一些不同:

  1. 代码与我们要输入内容一同传给系统;

  2. 系统等待10秒;

  3. 输出hello world、please,以及最后输Input data和我们输入的内容;

可以看到,OJ上的体验效果和本地是有非常大的差距的。

3.2 新执行器的流程

为了减少这种体验不统一的问题,可以将代码和结构进行进一步的升级:

image.png

在整个项目中,包括了两个函数,两个存储桶:

  • 业务逻辑函数:该函数的主要操作是业务逻辑,包括创建代码执行的任务(通过对象存储触发器进行异步函数执行),以及获取函数输出结果以及对任务函数的标准输入进行相关操作等;

  • 执行器函数:该函数的主要作用是执行用户的函数代码,这部分是通过对象存储触发,通过下载代码、执行代码、获取输入、输出结果等;代码获取从代码存储桶,输出结果和获取输入从业务存储桶;

  • 代码存储桶:该存储桶的作用是存储代码,当用户发起运行代码的请求, 业务逻辑函数收到用户代码后,会将代码存储到该存储桶,再由该存储桶处罚异步任务;

  • 业务存储桶:该存储桶的作用是中间量的输出,主要包括输出内容的缓存、输入内容的缓存;该部分数据可以通过对象存储的本身特性进行生命周期的制定。

3.3 代码实现

为了让代码在线执行起来,更加贴近本地体验,该方案的代码分为两个函数:

函数:业务逻辑函数

# -*- coding: utf-8 -*-

import os
import oss2
import json
import uuid
import random

# 基本配置信息
AccessKey = {
    "id": os.environ.get('AccessKeyId'),
    "secret": os.environ.get('AccessKeySecret')
}

OSSCodeConf = {
    'endPoint': os.environ.get('OSSConfEndPoint'),
    'bucketName': os.environ.get('OSSConfBucketCodeName'),
    'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}

OSSTargetConf = {
    'endPoint': os.environ.get('OSSConfEndPoint'),
    'bucketName': os.environ.get('OSSConfBucketTargetName'),
    'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}

# 获取获取/上传文件到OSS的临时地址
auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])
codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])
targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])

# 随机字符串
randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))


# Response
class Response:
    def __init__(self, start_response, response, errorCode=None):
        self.start = start_response
        responseBody = {
            'Error': {"Code": errorCode, "Message": response},
        } if errorCode else {
            'Response': response
        }
        # 默认增加uuid,便于后期定位
        responseBody['ResponseId'] = str(uuid.uuid1())
        print("Response: ", json.dumps(responseBody))
        self.response = json.dumps(responseBody)

    def __iter__(self):
        status = '200'
        response_headers = [('Content-type', 'application/json; charset=UTF-8')]
        self.start(status, response_headers)
        yield self.response.encode("utf-8")


def handler(environ, start_response):
    try:
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))
    except (ValueError):
        request_body_size = 0
    requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))

    reqType = requestBody.get("type", None)

    if reqType == "run":
        # 运行代码
        code = requestBody.get("code", None)
        runId = randomStr(10)
        codeBucket.put_object(runId, code.encode("utf-8"))
        responseData = runId
    elif reqType == "input":
        # 输入内容
        inputData = requestBody.get("input", None)
        runId = requestBody.get("id", None)
        targetBucket.put_object(runId + "-input", inputData.encode("utf-8"))
        responseData = 'ok'
    elif reqType == "output":
        # 获取结果
        runId = requestBody.get("id", None)
        targetBucket.get_object_to_file(runId + "-output", '/tmp/' + runId)
        with open('/tmp/' + runId) as f:
            responseData = f.read()
    else:
        responseData = "Error"

    print(responseData)

    return Response(start_response, {"result": responseData})

函数:执行器函数

# -*- coding: utf-8 -*-

import os
import re
import oss2
import json
import time
import pexpect

# 基本配置信息
AccessKey = {
    "id": os.environ.get('AccessKeyId'),
    "secret": os.environ.get('AccessKeySecret')
}

OSSCodeConf = {
    'endPoint': os.environ.get('OSSConfEndPoint'),
    'bucketName': os.environ.get('OSSConfBucketCodeName'),
    'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}

OSSTargetConf = {
    'endPoint': os.environ.get('OSSConfEndPoint'),
    'bucketName': os.environ.get('OSSConfBucketTargetName'),
    'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))
}

# 获取获取/上传文件到OSS的临时地址
auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])
codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])
targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])


def handler(event, context):
    event = json.loads(event.decode("utf-8"))

    for eveEvent in event["events"]:

        # 获取object
        print("获取object")
        code = eveEvent["oss"]["object"]["key"]
        localFileName = "/tmp/" + event["events"][0]["oss"]["object"]["eTag"]

        # 下载图片
        print("下载代码")
        print("code: ", code)
        codeBucket.get_object_to_file(code, localFileName)

        # 执行代码
        foo = pexpect.spawn('python %s' % localFileName)

        outputData = ""

        startTime = time.time()

        # timeout可以通过文件名来进行识别
        try:
            timeout = int(re.findall("timeout(.*?)s", code)[0])
        except:
            timeout = 60

        while (time.time() - startTime) / 1000 <= timeout:
            try:
                tempOutput = foo.read_nonblocking(size=999999, timeout=0.01)
                tempOutput = tempOutput.decode("utf-8", "ignore")

                if len(str(tempOutput)) > 0:
                    outputData = outputData + tempOutput

                # 输出数据存入oss
                targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

            except Exception as e:

                print("Error: ", e)

                # 有输入请求被阻塞
                if str(e) == "Timeout exceeded.":

                    try:
                        # 从oss读取数据
                        targetBucket.get_object_to_file(code + "-input", localFileName + "-input")
                                                targetBucket.delete_object(code + "-input")
                        with open(localFileName + "-input") as f:
                            inputData = f.read()
                        if inputData:
                            foo.sendline(inputData)
                    except:
                        pass

                # 程序执行完成输出
                elif "End Of File (EOF)" in str(e):
                    targetBucket.put_object(code + "-output", outputData.encode("utf-8"))
                    return True

                # 程序抛出异常
                else:

                    outputData = outputData + "\n\nException: %s" % str(e)
                    targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

                    return False

3.4 部署上线

可以通过Serverless Devs将代码部署到线上,设定全局变量:

vars:
  region: cn-beijing
  AccessKeyId: ${Env(AccessKeyId)}
  AccessKeySecret: ${Env(AccessKeySecret)}
  OSSConfBucketCodeName: serverlessbook-runcode-code
  OSSConfBucketTargetName: serverlessbook-runcode-others
  CodeBucketEndpoint: ******
  OSSConfObjectSignUrlTimeOut: 1200
  service:
    name: ServerlessBook
    description: Serverless图书案例
    logConfig:
      project: aliyun-fc-cn-beijing-9bd4a65b-c9d1-529d-9ce0-c91adfb823ab
      logStore: function-log

业务逻辑函数:

ServerlessBookRunCodeMain:
  component: fc
  actions:
    pre-deploy:
      - run: s ServerlessBookRunCodeMain build --use-docker
        path: ./main
  props:
    region: ${vars.region}
    service: ${vars.service}
    function:
      name: serverless_runcode_main
      description: 业务逻辑
      codeUri: ./main
      handler: index.handler
      memorySize: 128
      runtime: python3
      timeout: 5
      environmentVariables:
        AccessKeyId: ${vars.AccessKeyId}
        AccessKeySecret: ${vars.AccessKeySecret}
        OSSConfBucketCodeName: ${vars.OSSConfBucketCodeName}
        OSSConfBucketTargetName: ${vars.OSSConfBucketTargetName}
        OSSConfEndPoint: ${vars.CodeBucketEndpoint}
        OSSConfObjectSignUrlTimeOut: 1200
    triggers:
      - name: httpTrigger
        type: http
        config:
          authType: anonymous
          methods:
            - GET
    customDomains:
      - domainName: auto
        protocol: HTTP
        routeConfigs:
          - path: '/*'

执行器函数:

ServerlessBookRunCodeCompiler:
  component: fc
  actions:
    pre-deploy:
      - run: s ServerlessBookRunCodeCompiler build --use-docker
        path: ./compiler
  Properties:
    region: ${vars.region}
    service:  ${vars.service}
    function:
      name: serverless_runcode_compiler
      description: 代码执行器
      codeUri: ./compiler
      handler: index.handler
      memorySize: 128
      runtime: python3
      timeout: 60
      environmentVariables: ${ServerlessBookRunCodeMain.props.function.environmentVariables}
      Triggers:
        - Name: OSSTrigger
          Type: OSS
          Parameters:
            Bucket: 
            Events:
              - 'oss:ObjectCreated:*'
            Filter:
              Prefix: ''
              Suffix: ''
    triggers:
      - name: ossTrigger
        type: oss
        config:
          bucketName: ${vars.OSSConfBucketCodeName}
          events:
            - 'oss:ObjectCreated:*'
          filter:
            Key:
              Prefix: ''
              Suffix: ''

在函数部分的配置文档中,可以看到有一个叫做Extends的字段,这个字段实际上是Serverless Devs的行为描述字段,通过该字段可以进行行为操作,例如,在上面的例子中,这部分为:

actions:
  pre-deploy:
    - run: s ServerlessBookRunCodeCompiler build --use-docker
      path: ./compiler

实际含义是,在项目进行deploy操作之前,在./compiler目录下,执行指令:

$ s ServerlessBookRunCodeCompiler install --use-docker

当项目描述文档编写完成之后,可以通过:

$ s deploy

进行项目部署。

3.5 接口测试

同“简单的在线代码执行器”部分一样,在这部分也将会通过PostMan对接口进行测试。

测试代码:

import time
print('hello world')
time.sleep(10)
tempInput = input('please: ')
print('Input data: ', tempInput)

当我们通过PostMan发起请求执行这段代码之后:

image.png

我们可以看到系统会返回给我们一个token,该token将会作为我们整个请求任务的Id,此时,我们可以通过获取输出结果的接口,来获取结果:

image.png

由于代码中有:

time.sleep(10)

所以,迅速获得结果的时候是看不到后半部分的输出结果,我们可以设置一个轮训任务,不断通过该token对接口进行刷新:

image.png

我们可以看到,10秒钟后,代码执行到了输入部分:

tempInput = input('please: ')

此时,我们再通过输入接口,进行输入操作:

image.png

完成之后,我们可以看到输入成功(result: ok)的结果,此时,我们继续刷新之前的获取结果部分的请求:

image.png

可以看到,我们已经获得到了所有结果的输出。

相对于前一部分的结构,这种“更贴近本地的代码执行器“变得复杂了很多,但是在实际使用的过程中,却可以更好的模拟出本地执行代码时的一些现象,例如代码的休眠、阻塞、内容的输出等。

4. 总结

无论是简单的在线代码执行器部分,还是更贴近“本地”的代码执行器部分,这篇文章在所应用的内容是相对广泛的。通过这篇文章你可以看到:

  • HTTP触发器的基本使用方法;对象存储触发器的基本使用方;

  • 函数计算组件、对象存储组件的基本使用方法,组件间依赖的实现方法;

同时,通过这篇文章,也可以从一个侧面看到这样一个常见问题的简单解答:我有一个项目,我是每个接口一个函数,还是多个接口复用一个函数?

针对这个问题,其实最主要的是看业务本身的诉求,如果多个接口表达的含义是一致的,或者是同类的,类似的,并且多个接口的资源消耗是类似的,那么放在一个函数中来通过不同的路径进行区分是完全可以的;如果出现资源消耗差距较大,或者函数类型、规模、类别区别过大的时候,将多个接口放在多个函数下也是没有问题的。

本文实际上是抛砖引玉,无论是OJ系统的“判题机”部分,还是在线编程工具的“执行器部分”,都可以很好的和Serverless架构有着比较有趣的结合点。这种结合点不仅仅可以解决传统在线编程所头疼的事情(安全问题,资源消耗问题,并发问题,流量不稳定问题),更可以将Serverless的价值在一个新的领域发挥出来。

作者介绍
目录