在这篇博客里,我们最终希望构建一个Websocket集群来实现与客户端的实时通信,比如聊天室。我们当然可以通过简单的demo构建一个Websocket服务器并让所有客户端连接这台机器,但当这个聊天室的交互量非常庞大呢?比如斗鱼的直播弹幕,我去斗鱼看了下请求,从命名也可以看到其建立了一个ws连接,叫做danmuproxy.douyu.com,如下图。
那么问题来了,如果我只使用一台服务器,如何去支持可能有10万人同时加入的这个聊天室呢?显然我们需要一个解决方案,比如将流量负载均衡到不同的服务器上并提供一种通信机制让各个服务器能进行消息同步(不然用户A连上服务器A,用户B脸上服务器B,它们发消息的时候对方都没法收到)。
其实从上图的名字来看就知道斗鱼连接的这个danmuproxy.douyu.com中的proxy就大致能推断出他们也是把流量做了一个分发。
Websocket集群
由于和普通的HTTP服务器的负载均衡不同,上一节也说到了这些Websocket服务器需要共享信息(当然,需要做Session共享的服务器也一样)。这意味着客户端与Websocket服务器的交互是有状态(stateful)的,我们需要把每个客户端的连接数据保存在内存中。而当我们要实现分布式的时候,我们则需要在各个机器上共享这些信息,所以我们需要一个Publish/Subscribe broker(其实broker以前上学讲软件设计体系结构的时候学过,但当时太萌新了没理解)。接下来举个例子。
假设我们现在使用Redis作为我们的解决方案,然后我们现在有三台Websocket服务器WS1,WS2和WS3。然后每台服务器上连了三个用户。WS1机器上的其中一个用户发送了某个消息到聊天室,在你的Websocket服务器的逻辑中,你首先会把这个消息存入数据库做一个持久化(比如做历史消息),然后将这个消息根据channelId之类的东西推送至这个聊天室的channel(Websocket的channel的实现会在下一篇中详细讲),我们假设这个channelId叫“The☆World”。
现在你把数据安全的存入了DB里,并且你发布了一个事件给你的Pub/Sub broker(Redis channel)来通知其他对此感兴趣的部分(其他Websocket或者API服务器等)。所以之前的另外两个服务器WS2和WS3因为对这部分感兴趣所以他们也通过脚本监听了这一个Redis channel,它们就会得到通知,然后每个服务器就会对DB请求query获取更新然后emit消息给Websocket上对应channel。
这就是你们可以看到的,使用Pub/Sub brooker来实现了一个横向扩展的Websocket集群。
从这里也可以看到集群具有的有点,高扩展性以及高可用性。
实现
这次实现使用了我的一台高配阿里云国内服务器和一台比较low的阿里云9元学生服务器以及高配服务器上的redis。
Nginx负载均衡
首先配置Nginx做负载均衡,下图是我的配置,只是个Demo没做wss相关的。
服务器端实现
代码都在github上。
Demo的代码也很短
const WebSocket = require('ws');
const publicIp = require('public-ip');
const uuidv1 = require('uuid/v1');
const redis = require("redis");
const config = require('./config');
const sub = redis.createClient(config.DB.REDIS_PORT, config.DB.REDIS_HOST);
const pub = redis.createClient(config.DB.REDIS_PORT, config.DB.REDIS_HOST);
if (config.DB.REDIS_PASSWORD) {
sub.auth(config.DB.REDIS_PASSWORD);
pub.auth(config.DB.REDIS_PASSWORD);
}
const wss = new WebSocket.Server({ port: 2333 });
const ip2name = {
'47.94.233.234': '梁王的高配据点',
'115.28.68.89': '梁王的9块服务器',
}
let sockets = {};
wss.on('connection', function connection(ws) {
const uuid = uuidv1();
ws.uuid = uuid;
sockets[uuid] = ws;
ws.on('message', function incoming(message) {
// publish消息给其他服务器
pub.publish('channel', `${ws.uuid}>${message}`);
console.log(`publish to channel: ${ws.uuid}>${message}`)
// 向本服务器的socket广播
wss.clients.forEach(function each(client) {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(`来自${ws.from || '???'}的用户${ws.uuid}发送了: ${message}`);
}
});
});
publicIp.v4().then(ip => {
console.log(ip);
ws.from = ip2name[ip] ? ip2name[ip] : '未知';
ws.send(`你连接的服务器为${ws.from}`);
});
});
// 监听其他服务器发送的消息
sub.on('message', function(channel, message) {
console.log(channel ${channel}, ${message}
)
if (channel == 'channel')
{
var messageArr = message.split('>');
var uuid = messageArr[0]
var wsFrom = sockets[uuid];
var content = messageArr[1];
// 如果socket是非本服务器的
if(!wsFrom) {
wss.clients.forEach(function each(client) {
client.send(`来自其他服务器的用户${uuid}发送了: ${content}`);
});
}
}
});
sub.subscribe('channel');
复制代码效果
可以用以下代码在控制台中尝试,服务器后期可能会关。
var socket = new WebSocket('ws://websocket-demo.lwio.me');
// Listen for messages
socket.addEventListener('message', function (event) {
console.log('收到了', event.data);
});
// socket.send('keke')
复制代码
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。