Serverless与Websocket的聊天工具

简介: 本文以阿里云函数计算为例,通过API网关触发器实现一个基于Websocket的聊天工具。


1. 前言

WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信,即允许服务器主动发送信息给客户端。WebSocket 在服务端有数据推送需求时,可以主动发送数据至客户端。而原有 HTTP 协议的服务端对于需推送的数据,仅能通过轮询或 long poll 的方式来让客户端获得。

传统业务如果想要实现Websocket是比较容易的一件事情,但是众所周知,Serverles架构中,部署在FaaS平台的函数通常情况下是事件驱动的,且并不支持长链接这样的操作,那么是不是就说明在Serverless架构下,WebScoket是一个很难实现的技术呢?

其实不然,在API网关触发器的加持下,Serverless架构是可以更简单的实现Websocket功能的。本文将会以阿里云函数计算为例,通过API网关触发器实现一个基于Websocket的聊天工具。

2. 原理解析

由于函数计算是无状态且以触发式运行,即在有事件到来时才会被触发,因此,为了实现 WebSocket,函数计算与 API 网关相结合,通过 API 网关承接及保持与客户端的连接,即 API 网关与函数计算一起实现了服务端。当客户端有消息发出时,会先传递给 API 网关,再由 API 网关触发函数执行。当服务端云函数要向客户端发送消息时,会先由云函数将消息 POST 到 API 网关的反向推送链接,再由 API 网关向客户端完成消息的推送。具体的实现架构如下:

原理图

在API网关处的业务简图:

流程图

整个流程为:

  1. 客户端在启动的时候和API网关建立了WebSocket连接,并且将自己的设备ID告知API网关;

  2. 客户端在WebSocket通道上发起注册信令;

  3. API网关将注册信令转换成HTTP协议发送给用户后端服务,并且在注册信令上加上设备ID参数(增加在名称为x-ca-deviceid的header中);

  4. 用户后端服务验证注册信令,如果验证通过,记住用户设备ID,返回200应答;

  5. 用户后端服务通过HTTP/HTTPS/WebSocket三种协议中的任意一种向API网关发送下行通知信令,请求中携带接收请求的设备ID;

  6. API网关解析下行通知信令,找到指定设备ID的连接,将下行通知信令通过WebSocket连接发送给指定客户端;

  7. 客户端在不想收到用户后端服务通知的时候,通过WebSocket连接发送注销信令给API网关,请求中不携带设备ID;

  8. API网关将注销信令转换成HTTP协议发送给用户后端服务,并且在注册信令上加上设备ID参数;

  9. 用户后端服务删除设备ID,返回200应答。

在阿里云API网关中,如果需要实现Websocket,则需要了解三种管理信令:

  1. 注册信令:注册信令是客户端发送给用户后端服务的信令,起到两个作用:

  • 将客户端的设备ID发送给用户后端服务,用户后端服务需要记住这个设备ID。用户不需要定义设备ID字段,设备ID字段由API网关的SDK自动生成。

  • 用户可以将此信令定义为携带用户名和密码的API,用户后端服务在收到注册信令的验证客户端的合法性。用户后端服务在返回注册信令应答的时候,返回非200时,API网关会视此情况为注册失败。

客户端要想收到用户后端服务发送过来的通知,需要先发送注册信令给API网关,收到用户后端服务的200应答后正式注册成功。

  1. 下行通知信令:用户后端服务,在收到客户端发送的注册信令后,记住注册信令中的设备ID字段,然后就可以向API网关发送接收方为这个设备的下行通知信令了。只要这个设备在线,API网关就可以将此下行通知发送到端。

  2. 注销信令:客户端在不想收到用户后端服务的通知时发送注销信令发送给API网关,收到用户后端服务的200应答后注销成功,不再接受用户后端服务推送的下行消息。

整个流程:

image.png

  1. 开通分组绑定的域名的WebSocket通道;

  2. 创建注册、下行通知、注销三个API,给这三个API授权、并上线;

  3. 用户后端服务实现注册,注销信令逻辑,通过SDK发送下行通知;

  4. 下载SDK,嵌入到客户端,建立WebSocket连接,发送注册请求,监听下行通知。

3. 匿名聊天室

3.1 API网关配置

首先,我们需要在函数计算处新建三个事件函数分别对应三种信令:

image.png

然后我们需要在API网关处配置相关接口,创建一个API分组:

image.png

绑定域名,并且开通WebSocket通道:

image.png

四个API,分别用来实现三种信令,以及一个上行数据的接口:

image.png

其中Register是实现注册信令,对应后端的函数为register函数:

image.png

Notify为下行通知请求,协议为HTTP以及Websocket,无需配置后端函数:

image.png

Clean为注销请求,对应的后端函数计算中的clean函数:

image.png

最后一个为接收上行数据的普通请求,对应后端函数计算中的send函数:

image.png

创建完成之后,我们将这些API发布到线上,并且创建应用:

image.png

然后授权notify接口:

image.png

此时还需要创建AppKey:

image.png

完成之后,我们需要根据业务逻辑分别实现注册函数、传输函数以及清理函数。

3.2 函数计算配置

函数计算这里,我们需要实现三个函数:

register函数:注册函数,当函数注册时,将用户的ID/设备ID存储到对象存储中;

send函数:传输函数,当一个客户端发送消息后,通过send函数接收,并将消息通过API网关的下行通知请求发送给在线的其他客户端。判断在线的其他客户端的方法是通过对象存储中的object来进行判断;

clean函数:清理函数,用来断开连接,并清理链接对象存储在对象存储中的object信息;

这其中register函数为:

import oss2
import json
ossClient = oss2.Bucket(oss2.Auth('', ''),
                        'http://oss-cn-hongkong.aliyuncs.com',
                        '')

def register(event, context):
    userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid']
    # 注册的时候,将链接写入到对象存储
    ossClient.put_object(userId, 'user-id')
    # 返回客户端注册结果
    return {
        'isBase64Encoded': 'false',
        'statusCode': '200',
        'body': {
            'userId': userId
        },
    }

send函数为:

import oss2
import json
import base64
from apigateway import client
from apigateway.http import request
from apigateway.common import constant

ossClient = oss2.Bucket(oss2.Auth('', ''),
                        'http://oss-cn-hongkong.aliyuncs.com',
                        '')
apigatewayClient = client.DefaultClient(app_key="",
                                        app_secret="")

def send(event, context):

    host = "http://websocket.serverless.fun"
    url = "/notify"
    userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid']

    # 获取链接对象
    for obj in oss2.ObjectIterator(ossClient):
        if obj.key != userId:
            req_post = request.Request(host=host,
                                       protocol=constant.HTTP,
                                       url=url,
                                       method="POST",
                                       time_out=30000,
                                       headers={'x-ca-deviceid': obj.key})
            req_post.set_body(json.dumps({
                "from": userId,
                "message": base64.b64decode(json.loads(event.decode("utf-8"))['body']).decode("utf-8")
            }))
            req_post.set_content_type(constant.CONTENT_TYPE_STREAM)
            result = apigatewayClient.execute(req_post)
            print(result)
            if result[0] != 200:
                # 删除链接记录
                ossClient.delete_object(obj.key)
    return {
        'isBase64Encoded': 'false',
        'statusCode': '200',
        'body': {
            'status': "ok"
        },
    }

在send函数中,需要引入对应的SDK,来向下行通知请求接口发起请求,可以参考控制台提供的依赖包:

image.png

clean函数为:

import oss2
import json
ossClient = oss2.Bucket(oss2.Auth('', ''),
                        'http://oss-cn-hongkong.aliyuncs.com',
                        '')

def clean(event, context):
    userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid']
    # 删除链接记录
    ossClient.delete_object(userId)

至此,我们完成了匿名聊天似的服务端建设。

3 体验与测试

通过React开发前端代码,其中HTML代码为:




    Serverless Devs ChatApp
    
    
    
    
        body {
            margin: 100px 0;
            font-family: 'Inconsolata', monospace;
            font-size: 14px;
            color: #dfe1e8;
            background: #343d46;
        }

        #app {
            height: 400px;
            width: 100%;
        }

        div#main {
            max-width: 560px;
            background: #2b303b;
            border-radius: 0px 0px 5px 5px;
            height: 400px;
            -webkit-box-shadow: 0px 0px 13px 0px rgba(50, 50, 50, 0.59);
            -moz-box-shadow: 0px 0px 13px 0px rgba(50, 50, 50, 0.59);
            box-shadow: 0px 0px 13px 0px rgba(50, 50, 50, 0.59);
            overflow: hidden;
        }

        .holder {
            overflow: auto;
        }

        input[type=text], input[type=text]:focus {
            border: none;
            padding: 0;
            margin: 0;
            height: 22px;
            background: #2b303b;
            color: #ebcb8b;
            width: 80%;
        }

        p {
            margin-bottom: 0;
            line-height: 21px;
        }

        #bar {
            height: 30px;
            max-width: 560px;
            background: black;
            border-radius: 5px 5px 0px 0px;
        }

        #content {
            padding: 0px 0px 0px 4px;
            height: 100%;
        }
    


处理WebScoket的相关代码:

'use strict';

const uuid = require('uuid');
const util = require('util');

const register = function (editor, deviceId) {
    const ws = new WebSocket('ws://websocket.serverless.fun:8080');
    const now = new Date();

    const reg = {
        method: 'GET',
        host: 'websocket.serverless.fun:8080',
        querys: {},
        headers: {
            'x-ca-websocket_api_type': ['REGISTER'],
            'x-ca-seq': ['0'],
            'x-ca-nonce': [uuid.v4().toString()],
            'date': [now.toUTCString()],
            'x-ca-timestamp': [now.getTime().toString()],
            'CA_VERSION': ['1'],
        },
        path: '/register',
        body: '',
    };

    ws.onopen = function open() {
        ws.send('RG#' + deviceId);
    };

    var registered = false;
    var registerResp = false;
    var hbStarted = false;

    ws.onmessage = function incoming(event) {
        if (event.data.startsWith('NF#')) {
            const msg = JSON.parse(event.data.substr(3));
            editor.addHistory(util.format('%s > %s', msg.from, msg.message));
            editor.setState({'prompt': deviceId + " > "});
            return;
        }

        if (!hbStarted && event.data.startsWith('RO#')) {
            console.log('Login successfully');

            if (!registered) {
                registered = true;
                ws.send(JSON.stringify(reg));
            }

            hbStarted = true;
            setInterval(function () {
                ws.send('H1');
            }, 15 * 1000);
            return;
        }

    };

    ws.onclose = function (event) {
        console.log('ws closed:', event);
    };
};

module.exports = register;

上行数据以及初始化项目、状态等代码:

const React = require('react');
const axios = require('axios');
const uuid = require('uuid');
const register = require('./ws');

var deviceId = uuid.v4().replace(/-/g, '').substr(0, 8);
var Prompt = deviceId + ' > ';
var ShellApi = 'http://websocket.serverless.fun/send';

const App = React.createClass({
    getInitialState: function () {
        register(this, deviceId);
        this.offset = 0
        this.cmds = []
        return {
            history: [],
            prompt: Prompt,
        }
    },
    clearHistory: function () {
        this.setState({history: []});
    },
    execShellCommand: function (cmd) {
        const that = this;
        that.setState({'prompt': ''})
        that.offset = 0
        that.cmds.push(cmd)
        axios.post(ShellApi, cmd, {
            headers: {
                'Content-Type': 'application/octet-stream',
                "x-ca-deviceid": deviceId
            }
        }).then(function (res) {
            that.setState({'prompt': Prompt});
        }).catch(function (err) {
            const errText = err.response ? err.response.status + ' ' + err.response.statusText : err.toString();
            that.addHistory(errText);
            that.setState({'prompt': Prompt})
        });
    },
    showWelcomeMsg: function () {
        this.addHistory(deviceId + ', Welcome to Serverless Devs ChatApp! Have fun!');
    },
    openLink: function (link) {
        return function () {
            window.open(link, '_blank');
        }
    },
    componentDidMount: function () {
        const term = this.refs.term.getDOMNode();

        this.showWelcomeMsg();
        term.focus();
    },
    componentDidUpdate: function () {
        var container = document.getElementById('holder')
        container.scrollTop = container.scrollHeight
    },
    handleInput: function (e) {
        switch (e.key) {
            case "Enter":
                var input_text = this.refs.term.getDOMNode().value;

                if ((input_text.replace(/\s/g, '')).length < 1) {
                    return
                }

                if (input_text === 'clear') {
                    this.state.history = []
                    this.showWelcomeMsg()
                    this.clearInput()
                    this.offset = 0
                    this.cmds.length = 0
                    return
                }

                this.addHistory(this.state.prompt + " " + input_text);
                this.execShellCommand(input_text);
                this.clearInput();
                break
            case 'ArrowUp':
                if (this.offset === 0) {
                    this.lastCmd = this.refs.term.getDOMNode().value
                }

                this.refs.term.getDOMNode().value = this.cmds[this.cmds.length - ++this.offset] || this.cmds[(this.offset = this.cmds.length, 0)] || this.lastCmd
                return false
            case 'ArrowDown':
                this.refs.term.getDOMNode().value = this.cmds[this.cmds.length - --this.offset] || (this.offset = 0, this.lastCmd)
                return false
        }
    },
    clearInput: function () {
        this.refs.term.getDOMNode().value = "";
    },
    addHistory: function (output) {
        const history = this.state.history.slice(0)

        if (output instanceof Array) {
            history.push.apply(history, output)
        } else {
            history.push(output)
        }

        this.setState({
            'history': history
        });
    },
    handleClick: function () {
        const term = this.refs.term.getDOMNode();
        term.focus();
    },
    render: function () {
        const output = this.state.history.map(function (op, i) {
            return 

{op}

       });        return (            

               {output}                

{this.state.prompt}

       )    } }); const AppComponent = React.createFactory(App); React.render(AppComponent(), document.getElementById('app'));

完成之后,我们可以查看并测试项目,此时我们可以打开两个前端窗口:

image.png

我们在左侧输入一个字符串,并测试:

image.png

可以看到在右侧,出现了左侧窗口的ID,并且出现了对应的信息,此时我们在右侧同样输出字符串,并发送:

image.png

可以看到左侧,也同样出现了相关的效果。至此,我们完成了基于函数计算与API网关的WebSocket的匿名聊天室应用。

4 总结

通过函数计算和API网关进行Websocket的实践,绝对不仅仅是一个聊天工具这么简单,他可以用在很多方面,例如通过Websocket进行实时日志系统的制作等。单独的函数计算,仅仅是一个计算平台,只有和周边的BaaS结合,才能展示出Serverless架构的价值和真正的能力,意义。这也是为什么很多人Serverless=FaaS+BaaS的一个原因。

期待更多人,可以通过Serverless架构,创造出更多有趣的应用。

说明

配套代码可以参考Github相关仓库

作者介绍
目录