1. 前言
WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信,即允许服务器主动发送信息给客户端。WebSocket 在服务端有数据推送需求时,可以主动发送数据至客户端。而原有 HTTP 协议的服务端对于需推送的数据,仅能通过轮询或 long poll 的方式来让客户端获得。
传统业务如果想要实现Websocket是比较容易的一件事情,但是众所周知,Serverles架构中,部署在FaaS平台的函数通常情况下是事件驱动的,且并不支持长链接这样的操作,那么是不是就说明在Serverless架构下,WebScoket是一个很难实现的技术呢?
其实不然,在API网关触发器的加持下,Serverless架构是可以更简单的实现Websocket功能的。本文将会以阿里云函数计算为例,通过API网关触发器实现一个基于Websocket的聊天工具。
2. 原理解析
由于函数计算是无状态且以触发式运行,即在有事件到来时才会被触发,因此,为了实现 WebSocket,函数计算与 API 网关相结合,通过 API 网关承接及保持与客户端的连接,即 API 网关与函数计算一起实现了服务端。当客户端有消息发出时,会先传递给 API 网关,再由 API 网关触发函数执行。当服务端云函数要向客户端发送消息时,会先由云函数将消息 POST 到 API 网关的反向推送链接,再由 API 网关向客户端完成消息的推送。具体的实现架构如下:
在API网关处的业务简图:
整个流程为:
客户端在启动的时候和API网关建立了WebSocket连接,并且将自己的设备ID告知API网关;
客户端在WebSocket通道上发起注册信令;
API网关将注册信令转换成HTTP协议发送给用户后端服务,并且在注册信令上加上设备ID参数(增加在名称为x-ca-deviceid的header中);
用户后端服务验证注册信令,如果验证通过,记住用户设备ID,返回200应答;
用户后端服务通过HTTP/HTTPS/WebSocket三种协议中的任意一种向API网关发送下行通知信令,请求中携带接收请求的设备ID;
API网关解析下行通知信令,找到指定设备ID的连接,将下行通知信令通过WebSocket连接发送给指定客户端;
客户端在不想收到用户后端服务通知的时候,通过WebSocket连接发送注销信令给API网关,请求中不携带设备ID;
API网关将注销信令转换成HTTP协议发送给用户后端服务,并且在注册信令上加上设备ID参数;
用户后端服务删除设备ID,返回200应答。
在阿里云API网关中,如果需要实现Websocket,则需要了解三种管理信令:
注册信令:注册信令是客户端发送给用户后端服务的信令,起到两个作用:
将客户端的设备ID发送给用户后端服务,用户后端服务需要记住这个设备ID。用户不需要定义设备ID字段,设备ID字段由API网关的SDK自动生成。
用户可以将此信令定义为携带用户名和密码的API,用户后端服务在收到注册信令的验证客户端的合法性。用户后端服务在返回注册信令应答的时候,返回非200时,API网关会视此情况为注册失败。
客户端要想收到用户后端服务发送过来的通知,需要先发送注册信令给API网关,收到用户后端服务的200应答后正式注册成功。
下行通知信令:用户后端服务,在收到客户端发送的注册信令后,记住注册信令中的设备ID字段,然后就可以向API网关发送接收方为这个设备的下行通知信令了。只要这个设备在线,API网关就可以将此下行通知发送到端。
注销信令:客户端在不想收到用户后端服务的通知时发送注销信令发送给API网关,收到用户后端服务的200应答后注销成功,不再接受用户后端服务推送的下行消息。
整个流程:
开通分组绑定的域名的WebSocket通道;
创建注册、下行通知、注销三个API,给这三个API授权、并上线;
用户后端服务实现注册,注销信令逻辑,通过SDK发送下行通知;
下载SDK,嵌入到客户端,建立WebSocket连接,发送注册请求,监听下行通知。
3. 匿名聊天室
3.1 API网关配置
首先,我们需要在函数计算处新建三个事件函数分别对应三种信令:
然后我们需要在API网关处配置相关接口,创建一个API分组:
绑定域名,并且开通WebSocket通道:
四个API,分别用来实现三种信令,以及一个上行数据的接口:
其中Register是实现注册信令,对应后端的函数为register函数:
Notify为下行通知请求,协议为HTTP以及Websocket,无需配置后端函数:
Clean为注销请求,对应的后端函数计算中的clean函数:
最后一个为接收上行数据的普通请求,对应后端函数计算中的send函数:
创建完成之后,我们将这些API发布到线上,并且创建应用:
然后授权notify接口:
此时还需要创建AppKey:
完成之后,我们需要根据业务逻辑分别实现注册函数、传输函数以及清理函数。
3.2 函数计算配置
函数计算这里,我们需要实现三个函数:
register函数:注册函数,当函数注册时,将用户的ID/设备ID存储到对象存储中;
send函数:传输函数,当一个客户端发送消息后,通过send函数接收,并将消息通过API网关的下行通知请求发送给在线的其他客户端。判断在线的其他客户端的方法是通过对象存储中的object来进行判断;
clean函数:清理函数,用来断开连接,并清理链接对象存储在对象存储中的object信息;
这其中register函数为:
import oss2
import json
ossClient = oss2.Bucket(oss2.Auth('', ''),
'http://oss-cn-hongkong.aliyuncs.com',
'')
def register(event, context):
userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid']
# 注册的时候,将链接写入到对象存储
ossClient.put_object(userId, 'user-id')
# 返回客户端注册结果
return {
'isBase64Encoded': 'false',
'statusCode': '200',
'body': {
'userId': userId
},
}
send函数为:
import oss2
import json
import base64
from apigateway import client
from apigateway.http import request
from apigateway.common import constant
ossClient = oss2.Bucket(oss2.Auth('', ''),
'http://oss-cn-hongkong.aliyuncs.com',
'')
apigatewayClient = client.DefaultClient(app_key="",
app_secret="")
def send(event, context):
host = "http://websocket.serverless.fun"
url = "/notify"
userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid']
# 获取链接对象
for obj in oss2.ObjectIterator(ossClient):
if obj.key != userId:
req_post = request.Request(host=host,
protocol=constant.HTTP,
url=url,
method="POST",
time_out=30000,
headers={'x-ca-deviceid': obj.key})
req_post.set_body(json.dumps({
"from": userId,
"message": base64.b64decode(json.loads(event.decode("utf-8"))['body']).decode("utf-8")
}))
req_post.set_content_type(constant.CONTENT_TYPE_STREAM)
result = apigatewayClient.execute(req_post)
print(result)
if result[0] != 200:
# 删除链接记录
ossClient.delete_object(obj.key)
return {
'isBase64Encoded': 'false',
'statusCode': '200',
'body': {
'status': "ok"
},
}
在send函数中,需要引入对应的SDK,来向下行通知请求接口发起请求,可以参考控制台提供的依赖包:
clean函数为:
import oss2
import json
ossClient = oss2.Bucket(oss2.Auth('', ''),
'http://oss-cn-hongkong.aliyuncs.com',
'')
def clean(event, context):
userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid']
# 删除链接记录
ossClient.delete_object(userId)
至此,我们完成了匿名聊天似的服务端建设。
3 体验与测试
通过React开发前端代码,其中HTML代码为:
Serverless Devs ChatApp
body {
margin: 100px 0;
font-family: 'Inconsolata', monospace;
font-size: 14px;
color: #dfe1e8;
background: #343d46;
}
#app {
height: 400px;
width: 100%;
}
div#main {
max-width: 560px;
background: #2b303b;
border-radius: 0px 0px 5px 5px;
height: 400px;
-webkit-box-shadow: 0px 0px 13px 0px rgba(50, 50, 50, 0.59);
-moz-box-shadow: 0px 0px 13px 0px rgba(50, 50, 50, 0.59);
box-shadow: 0px 0px 13px 0px rgba(50, 50, 50, 0.59);
overflow: hidden;
}
.holder {
overflow: auto;
}
input[type=text], input[type=text]:focus {
border: none;
padding: 0;
margin: 0;
height: 22px;
background: #2b303b;
color: #ebcb8b;
width: 80%;
}
p {
margin-bottom: 0;
line-height: 21px;
}
#bar {
height: 30px;
max-width: 560px;
background: black;
border-radius: 5px 5px 0px 0px;
}
#content {
padding: 0px 0px 0px 4px;
height: 100%;
}
处理WebScoket的相关代码:
'use strict';
const uuid = require('uuid');
const util = require('util');
const register = function (editor, deviceId) {
const ws = new WebSocket('ws://websocket.serverless.fun:8080');
const now = new Date();
const reg = {
method: 'GET',
host: 'websocket.serverless.fun:8080',
querys: {},
headers: {
'x-ca-websocket_api_type': ['REGISTER'],
'x-ca-seq': ['0'],
'x-ca-nonce': [uuid.v4().toString()],
'date': [now.toUTCString()],
'x-ca-timestamp': [now.getTime().toString()],
'CA_VERSION': ['1'],
},
path: '/register',
body: '',
};
ws.onopen = function open() {
ws.send('RG#' + deviceId);
};
var registered = false;
var registerResp = false;
var hbStarted = false;
ws.onmessage = function incoming(event) {
if (event.data.startsWith('NF#')) {
const msg = JSON.parse(event.data.substr(3));
editor.addHistory(util.format('%s > %s', msg.from, msg.message));
editor.setState({'prompt': deviceId + " > "});
return;
}
if (!hbStarted && event.data.startsWith('RO#')) {
console.log('Login successfully');
if (!registered) {
registered = true;
ws.send(JSON.stringify(reg));
}
hbStarted = true;
setInterval(function () {
ws.send('H1');
}, 15 * 1000);
return;
}
};
ws.onclose = function (event) {
console.log('ws closed:', event);
};
};
module.exports = register;
上行数据以及初始化项目、状态等代码:
const React = require('react');
const axios = require('axios');
const uuid = require('uuid');
const register = require('./ws');
var deviceId = uuid.v4().replace(/-/g, '').substr(0, 8);
var Prompt = deviceId + ' > ';
var ShellApi = 'http://websocket.serverless.fun/send';
const App = React.createClass({
getInitialState: function () {
register(this, deviceId);
this.offset = 0
this.cmds = []
return {
history: [],
prompt: Prompt,
}
},
clearHistory: function () {
this.setState({history: []});
},
execShellCommand: function (cmd) {
const that = this;
that.setState({'prompt': ''})
that.offset = 0
that.cmds.push(cmd)
axios.post(ShellApi, cmd, {
headers: {
'Content-Type': 'application/octet-stream',
"x-ca-deviceid": deviceId
}
}).then(function (res) {
that.setState({'prompt': Prompt});
}).catch(function (err) {
const errText = err.response ? err.response.status + ' ' + err.response.statusText : err.toString();
that.addHistory(errText);
that.setState({'prompt': Prompt})
});
},
showWelcomeMsg: function () {
this.addHistory(deviceId + ', Welcome to Serverless Devs ChatApp! Have fun!');
},
openLink: function (link) {
return function () {
window.open(link, '_blank');
}
},
componentDidMount: function () {
const term = this.refs.term.getDOMNode();
this.showWelcomeMsg();
term.focus();
},
componentDidUpdate: function () {
var container = document.getElementById('holder')
container.scrollTop = container.scrollHeight
},
handleInput: function (e) {
switch (e.key) {
case "Enter":
var input_text = this.refs.term.getDOMNode().value;
if ((input_text.replace(/\s/g, '')).length < 1) {
return
}
if (input_text === 'clear') {
this.state.history = []
this.showWelcomeMsg()
this.clearInput()
this.offset = 0
this.cmds.length = 0
return
}
this.addHistory(this.state.prompt + " " + input_text);
this.execShellCommand(input_text);
this.clearInput();
break
case 'ArrowUp':
if (this.offset === 0) {
this.lastCmd = this.refs.term.getDOMNode().value
}
this.refs.term.getDOMNode().value = this.cmds[this.cmds.length - ++this.offset] || this.cmds[(this.offset = this.cmds.length, 0)] || this.lastCmd
return false
case 'ArrowDown':
this.refs.term.getDOMNode().value = this.cmds[this.cmds.length - --this.offset] || (this.offset = 0, this.lastCmd)
return false
}
},
clearInput: function () {
this.refs.term.getDOMNode().value = "";
},
addHistory: function (output) {
const history = this.state.history.slice(0)
if (output instanceof Array) {
history.push.apply(history, output)
} else {
history.push(output)
}
this.setState({
'history': history
});
},
handleClick: function () {
const term = this.refs.term.getDOMNode();
term.focus();
},
render: function () {
const output = this.state.history.map(function (op, i) {
return {op}
});
return (
{output}
{this.state.prompt}
)
}
});
const AppComponent = React.createFactory(App);
React.render(AppComponent(), document.getElementById('app'));
完成之后,我们可以查看并测试项目,此时我们可以打开两个前端窗口:
我们在左侧输入一个字符串,并测试:
可以看到在右侧,出现了左侧窗口的ID,并且出现了对应的信息,此时我们在右侧同样输出字符串,并发送:
可以看到左侧,也同样出现了相关的效果。至此,我们完成了基于函数计算与API网关的WebSocket的匿名聊天室应用。
4 总结
通过函数计算和API网关进行Websocket的实践,绝对不仅仅是一个聊天工具这么简单,他可以用在很多方面,例如通过Websocket进行实时日志系统的制作等。单独的函数计算,仅仅是一个计算平台,只有和周边的BaaS结合,才能展示出Serverless架构的价值和真正的能力,意义。这也是为什么很多人Serverless=FaaS+BaaS的一个原因。
期待更多人,可以通过Serverless架构,创造出更多有趣的应用。
说明