我们在使用WebSocket
的时候,最头疼的一件事莫过于在处理onmessage
事件,通常我们会遇到这样的情况,向后端发送一个请求,后端返回一个结果;
我们如何将这个指令返回的数据,对应到这个指令上?这个时候大家都想了很多办法,比如在发送的时候,将指令和数据一起发送,后端返回的时候,将指令和数据一起返回,这样就可以通过指令来对应数据了;
可能有的小伙伴就说了这种情况为啥不直接用http
请求呢?当然是因为需求呀,不讲故事也不说废话,开始干货。
先看效果,不要太在意样式,懒得写,随便弄了点;
问题
标题是Websocket
,但是其实不只是Websocket
有这个问题,还有其他的可能都有这个问题,先看代码:
const ws = new WebSocket('ws://localhost:3000'); ws.onopen = function () { // 发送消息 ws.send('hello'); }; // 接收消息 ws.onmessage = function (e) { console.log(e.data); };
这个代码是一个最简单的Websocket
的使用,不难发现我们发送消息和接收消息并不在同一个作用域里面,这样就会导致我们无法将发送的消息和接收的消息对应起来;
更绝的是,我们这个没有任何标识,例如我这个例子里面,我发送了hello
,后端会返回world
;
模拟环境
上面是一个Websocket
的示例,其实我们还会有很多领域可能会出现这个问题,例如我写这个解决方案的时候,并不是为了解决Websocket
的问题,而是为了解决串口通信的问题;
我这里就模拟一个环境,因为我没办法提供Websocket
的服务,也不能提供串口的服务,所以我就模拟一个环境,也让大家更好的理解这个问题。
class Mock { constructor() { let count = 0; setInterval(() => { this.onmessage('hello world' + count); count++; }, 10000); } send(data) { // 随机延迟 const delay = Math.random() * 1000; setTimeout(() => { switch (data) { case 'hello': this.onmessage('world'); break; case 'world': this.onmessage('hello'); break; case '你好': // 随机是否返回 if (Math.random() > 0.5) { this.onmessage('好个der'); } break; default: this.onmessage(data); break; } }, delay); } onmessage = (data) => { } }
这个Mock
类,就是一个模拟的Websocket
,我们可以通过send
方法来发送数据,通过onmessage
方法来接收数据;
上面的mock
环境非常简单,就是你在发送消息的时候,立即会返回一个消息,但是会有延时,上面的模拟环境有下面几种情况:
- 我们发送
hello
,后端返回world
; - 我们发送
world
,后端返回hello
; - 我们发送
你好
,后端返回好个der
,但是这个返回是随机的,有可能不返回,模拟丢包的情况; - 每过10秒,后端返回
hello world${count}
;
解决方案
先上代码,后面讲解:
class MockHelper { constructor() { // 用于存储消息处理器 this.messageHandlers = []; // 模拟消息发送(websocket) this.mock = new Mock(); // 模拟消息接收 this.mock.onmessage = (data) => { // 熔断器 let fusing = false; const fused = () => { fusing = true; } // 执行消息处理 for (let i = 0; i < this.messageHandlers.length; i++) { const handler = this.messageHandlers[i]; try { // 执行消息处理器 handler.handle(data, fused); } catch (e) { // 消息处理器执行失败 handler.reject(e); this.messageHandlers.splice(i, 1); i--; } // 熔断 if (fusing) { this.messageHandlers.splice(i, 1); break; } } } } send(data, handler) { // 返回一个 Promise return new Promise((resolve, reject) => { // 创建消息处理器 const messageHandler = new MessageHandler(); messageHandler.callback = handler; messageHandler.resolve = resolve; messageHandler.reject = reject; messageHandler.data = data; // 设置超时,3s messageHandler.timer = setTimeout(() => { messageHandler.reject(new Error('timeout')); this.messageHandlers.splice(this.messageHandlers.indexOf(messageHandler), 1); }, 3000); // 添加到消息处理器列表 this.messageHandlers.push(messageHandler); // 发送消息 this.mock.send(data); }) } } // 消息处理器 class MessageHandler { constructor() { this.callback = null; this.timer = null; this.resolve = null; this.reject = null; this.data = null; } // 处理消息 handle(data, fused) { // 消息处理结果 let result = { sendData: this.data, receiveData: data, handleData: null }; // 熔断器 const _fused = () => { // 收到熔断信号,清除定时器 if (this.timer) { clearTimeout(this.timer); this.timer = null; } // 执行上层的熔断函数 fused(); // 执行 resolve if (this.resolve) { // 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData Promise.resolve().then(() => { this.resolve(result); this.resolve = null; }); } } // 执行回调函数 if (typeof this.callback === 'function') { try { result.handleData = this.callback(data, _fused); } catch (e) { this.reject(e); throw e; } } } }
MockHelper
上面我们定义了一个MockHelper
类,这个类可以理解为对Websocket
的封装,其他类似的处理机制都可以用这种方式来封装;
在实例化的过程中,会对Websocket
进行创建,然后监听对应的消息响应事件,当收到消息的时候,会执行对应的消息处理器:
// 模拟消息接收 this.mock.onmessage = (data) => { // 熔断器 let fusing = false; const fused = () => { fusing = true; } // 执行消息处理 for (let i = 0; i < this.messageHandlers.length; i++) { const handler = this.messageHandlers[i]; try { // 执行消息处理器 handler.handle(data, fused); } catch (e) { // 消息处理器执行失败 handler.reject(e); this.messageHandlers.splice(i, 1); i--; } // 熔断 if (fusing) { this.messageHandlers.splice(i, 1); break; } } }
方法很简单,每行代码都有注释,主要是思想,我们通过一个fusing
变量来标记是否熔断;
熔断的意思就是结束后续的处理流程,执行熔断器之后,就代表这个消息已经被处理了,后续的消息处理器就不需要再处理了;
消息执行出现异常会被捕获,并且执行reject
,然后从消息处理器列表中移除,这里不会被熔断,因为这个消息处理器已经执行失败了,可能是因为消息处理器的逻辑有问题,并不代表这个消息已经被处理了;
send
send
方法是用来发送消息的,这个方法返回一个Promise
,这个Promise
的resolve
和reject
是在消息处理器中执行的;
/** * 发送消息 * @param data 发送的消息内容 * @param handler 对消息的处理函数 * @return {Promise<unknown>} 返回一个 Promise */ function send(data, handler) { // 返回一个 Promise return new Promise((resolve, reject) => { // 创建消息处理器 const messageHandler = new MessageHandler(); messageHandler.callback = handler; messageHandler.resolve = resolve; messageHandler.reject = reject; messageHandler.data = data; // 设置超时,3s messageHandler.timer = setTimeout(() => { messageHandler.reject(new Error('timeout')); this.messageHandlers.splice(this.messageHandlers.indexOf(messageHandler), 1); }, 3000); // 添加到消息处理器列表 this.messageHandlers.push(messageHandler); // 发送消息 this.mock.send(data); }) }
send
方法接收两个参数,第一个参数是发送的消息内容,第二个参数是对消息的处理函数;
发送的消息内容就不多说了,对消息的处理函数就是我们自定义对返回的消息进行处理的逻辑;
send
方法的内容还是很简单的,最开始初始化一个MessageHandler
对象,并且设置一个超时定时器,如果出现超时就将这个处理器从this.messageHandlers
中删除;
然后,将这个处理器添加到this.messageHandlers
中,this.messageHandlers
最后会在onmessage
中使用,这样就就形成了一闭环;
MessageHandler
在send
中传入的处理函数,最后会被封装成一个MessageHandler
对象,这个对象就是消息处理器;
// 消息处理器 class MessageHandler { constructor() { this.callback = null; this.timer = null; this.resolve = null; this.reject = null; this.data = null; } // 处理消息 handle(data, fused) { // 消息处理结果 let result = { sendData: this.data, receiveData: data, handleData: null }; // 熔断器 const _fused = () => { // 收到熔断信号,清除定时器 if (this.timer) { clearTimeout(this.timer); this.timer = null; } // 执行上层的熔断函数 fused(); // 执行 resolve if (this.resolve) { // 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData Promise.resolve().then(() => { this.resolve(result); this.resolve = null; }); } } // 执行回调函数 if (typeof this.callback === 'function') { try { result.handleData = this.callback(data, _fused); } catch (e) { this.reject(e); throw e; } } } }
MessageHandler
对象中有五个属性,callback
是对消息的处理函数,timer
是用来设置超时的定时器,resolve
和reject
是用来执行Promise
的,data
是发送的消息内容;
handle
方法是用来处理消息的,这个方法接收两个参数,第一个参数是接收到的消息内容,第二个参数是熔断器;
这里的核心还是熔断器,上面的onmessage
的熔断器很简单,就是设置一个标记,然后在onmessage
中判断这个标记,如果是熔断状态,就不再执行后面的逻辑;
这里面的熔断器会执行一系列的逻辑:
// 熔断器 const _fused = () => { // 收到熔断信号,清除定时器 if (this.timer) { clearTimeout(this.timer); this.timer = null; } // 执行上层的熔断函数 fused(); // 执行 resolve if (this.resolve) { // 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData Promise.resolve().then(() => { this.resolve(result); this.resolve = null; }); } }
首先,收到熔断信号,清除定时器,这样就不会再执行超时的逻辑;
然后执行上层的熔断器,这样在onmessage
中就不会再执行后面的逻辑;
最后,执行resolve
,这里将resolve
放到了Promise.resolve().then()
中,这样就可以等待回调函数执行完毕,再执行resolve
,这样才能拿到handleData
;
使用
使用很简单,直接看代码:
// 实例化 const mockHelper = new MockHelper(); // 直接发送消息,并传入消息处理函数 mockHelper.send('hello', (data, fused) => { if (data === 'world') { fused(); return 'hello world'; } }).then((data) => { // 这里可以拿到 发送的消息内容,接收的消息内容,处理后的消息内容 console.log('then', data); }).catch((e) => { console.log('catch', e); });
看到这里不知道你会不会有一堆的疑问,比如:
- 为什么要在
send
中传入消息处理函数?我直接在这个消息处理函数中处理消息不就行了吗? - 为什么要用
Promise
来封装呢?你这里都有回调了,感觉没必要啊? - 你这个最后还不是会有一堆的
if
判断吗?而且我如果要发两次相同的消息,这个代码我还要写两遍吗? - 其他的问题...
这当然是为了方便大家使用,按照现在的封装,我们只需要在send
中传入消息处理函数,然后在这个函数中处理消息,就可以了;
但是实际情况下我们还是要尽可以的对代码进行复用,光说不练假把式,我们来看看怎么复用这个代码:
// 实例化 const mockHelper = new MockHelper(); // 发送 hello 消息 const sendHello = () => { return mockHelper.send('hello', (data, fused) => { if (data === 'world') { fused(); return 'hello world'; } }); }; // 发送 world 消息 const sendWorld = () => { return mockHelper.send('world', (data, fused) => { if (data === 'hello') { fused(); return 'hello world'; } }); }; // 使用 sendHello().then((data) => { console.log(data); }) sendHello().then((data) => { console.log(data); }) sendHello().then((data) => { console.log(data); }); sendWorld().then((data) => { console.log(data); }) sendWorld().then((data) => { console.log(data); }) sendWorld().then((data) => { console.log(data); }) // ...其他 // 最后才是直接使用 send mockHelper.send('xxx', (data, fused) => { if (data === 'xxx') { fused(); return '...'; } }).then((data) => { console.log(data); })
对比
在我最开始没有想到这种封装的时候,我就是用下面这种方式来封装的,屎山!!!
按照最开始没有封装的时候的逻辑,我们需要在onmessage
事件中,可能会写一堆的if
判断,然后再执行对应的回调函数,比如:
class WebsocketWrap { constructor(options) { this.options = options; this.ws = null; this.connect(); } connect() { this.ws = new WebSocket(this.options.url); this.ws.onmessage = (event) => { this.onmessage(event.data); }; } onmessage = (data) => { // 逻辑处理 a if (data === 'a') { this.options['onA'](data); return; } // 逻辑处理 b if (data.xxx === 'xxx') { this.options['onXxx'](data); return; } // 逻辑处理 c if (data.yyy === 'yyy') { this.options['onYyy'](data); return; } // 各种其他自定义处理... } send(data) { this.ws.send(data); } } // 使用 const ws = new WebsocketWrap({ url: 'ws://localhost:8080', onA: (data) => { console.log('onA', data); }, onXxx: (data) => { console.log('onXxx', data); }, onYyy: (data) => { console.log('onYyy', data); } });
这样的代码,如果逻辑处理的函数比较多,那么onmessage
函数就会很长,而且不好维护,也不好扩展;
如果现在来了一个新的需求,我们就需要在onmessage
函数中,再添加一个if
判断,然后再添加一个对应的处理函数,这样的代码就会变得越来越难维护;
而我上面的封装就是解决了这个问题,我将onmessage
事件中的if
进行了封装,在send
方法中传入的处理函数,就是对消息的处理函数,也就是这个if
判断的逻辑;
最后消息处理完毕,执行熔断器,然后将处理后的消息通过resolve
返回出去,这样就可以在then
中拿到处理后的消息了;
总结
上面只是一个很简单的封装,主要介绍的还是思想,其实里面还有很多可以优化的地方,比如:
- 如果发送的消息并没有回应应该怎么修改?
- 这里服务端主动推送的消息全都被丢弃了,应该怎么修改?
- 是否可以增加一些配置,比如超时时间,重试次数等等?
- 其他...
我能想到的问题就直接在码上掘金中解决了,如果还有什么其他的想法,可以自己尝试添加,感谢阅读,欢迎评论,如果觉得不错,可以点个赞,谢谢!