告别屎山!!!WebSocket 的极致封装, 写好代码竟如此简单

简介: 告别屎山!!!WebSocket 的极致封装, 写好代码竟如此简单

我们在使用WebSocket的时候,最头疼的一件事莫过于在处理onmessage事件,通常我们会遇到这样的情况,向后端发送一个请求,后端返回一个结果;


我们如何将这个指令返回的数据,对应到这个指令上?这个时候大家都想了很多办法,比如在发送的时候,将指令和数据一起发送,后端返回的时候,将指令和数据一起返回,这样就可以通过指令来对应数据了;


可能有的小伙伴就说了这种情况为啥不直接用http请求呢?当然是因为需求呀,不讲故事也不说废话,开始干货。


先看效果,不要太在意样式,懒得写,随便弄了点;

image.png

image.png

问题


标题是Websocket,但是其实不只是Websocket有这个问题,还有其他的可能都有这个问题,先看代码:

const ws = new WebSocket('ws://localhost:3000');
ws.onopen = function () {
  // 发送消息
  ws.send('hello');
};
// 接收消息
ws.onmessage = function (e) {
  console.log(e.data);
};

这个代码是一个最简单的Websocket的使用,不难发现我们发送消息和接收消息并不在同一个作用域里面,这样就会导致我们无法将发送的消息和接收的消息对应起来;


更绝的是,我们这个没有任何标识,例如我这个例子里面,我发送了hello,后端会返回world


模拟环境


上面是一个Websocket的示例,其实我们还会有很多领域可能会出现这个问题,例如我写这个解决方案的时候,并不是为了解决Websocket的问题,而是为了解决串口通信的问题;


我这里就模拟一个环境,因为我没办法提供Websocket的服务,也不能提供串口的服务,所以我就模拟一个环境,也让大家更好的理解这个问题。

   class Mock {
    constructor() {
        let count = 0;
        setInterval(() => {
            this.onmessage('hello world' + count);
            count++;
        }, 10000);
    }
    send(data) {
        // 随机延迟
        const delay = Math.random() * 1000;
        setTimeout(() => {
            switch (data) {
                case 'hello':
                    this.onmessage('world');
                    break;
                case 'world':
                    this.onmessage('hello');
                    break;
                case '你好':
                    // 随机是否返回
                    if (Math.random() > 0.5) {
                        this.onmessage('好个der');
                    }
                    break;
                default:
                    this.onmessage(data);
                    break;
            }
        }, delay);
    }
    onmessage = (data) => {
    }
}

这个Mock类,就是一个模拟的Websocket,我们可以通过send方法来发送数据,通过onmessage方法来接收数据;


上面的mock环境非常简单,就是你在发送消息的时候,立即会返回一个消息,但是会有延时,上面的模拟环境有下面几种情况:


  1. 我们发送hello,后端返回world
  2. 我们发送world,后端返回hello
  3. 我们发送你好,后端返回好个der,但是这个返回是随机的,有可能不返回,模拟丢包的情况;
  4. 每过10秒,后端返回hello world${count}


解决方案


先上代码,后面讲解:

class MockHelper {
    constructor() {
        // 用于存储消息处理器
        this.messageHandlers = [];
        // 模拟消息发送(websocket)
        this.mock = new Mock();
        // 模拟消息接收
        this.mock.onmessage = (data) => {
            // 熔断器
            let fusing = false;
            const fused = () => {
                fusing = true;
            }
            // 执行消息处理
            for (let i = 0; i < this.messageHandlers.length; i++) {
                const handler = this.messageHandlers[i];
                try {
                    // 执行消息处理器
                    handler.handle(data, fused);
                } catch (e) {
                    // 消息处理器执行失败
                    handler.reject(e);
                    this.messageHandlers.splice(i, 1);
                    i--;
                }
                // 熔断
                if (fusing) {
                    this.messageHandlers.splice(i, 1);
                    break;
                }
            }
        }
    }
    send(data, handler) {
        // 返回一个 Promise
        return new Promise((resolve, reject) => {
            // 创建消息处理器
            const messageHandler = new MessageHandler();
            messageHandler.callback = handler;
            messageHandler.resolve = resolve;
            messageHandler.reject = reject;
            messageHandler.data = data;
            // 设置超时,3s
            messageHandler.timer = setTimeout(() => {
                messageHandler.reject(new Error('timeout'));
                this.messageHandlers.splice(this.messageHandlers.indexOf(messageHandler), 1);
            }, 3000);
            // 添加到消息处理器列表
            this.messageHandlers.push(messageHandler);
            // 发送消息
            this.mock.send(data);
        })
    }
}
// 消息处理器
class MessageHandler {
    constructor() {
        this.callback = null;
        this.timer = null;
        this.resolve = null;
        this.reject = null;
        this.data = null;
    }
    // 处理消息
    handle(data, fused) {
        // 消息处理结果
        let result = {
            sendData: this.data,
            receiveData: data,
            handleData: null
        };
        // 熔断器
        const _fused = () => {
            // 收到熔断信号,清除定时器
            if (this.timer) {
                clearTimeout(this.timer);
                this.timer = null;
            }
            // 执行上层的熔断函数
            fused();
            // 执行 resolve
            if (this.resolve) {
                // 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData
                Promise.resolve().then(() => {
                    this.resolve(result);
                    this.resolve = null;
                });
            }
        }
        // 执行回调函数
        if (typeof this.callback === 'function') {
            try {
                result.handleData = this.callback(data, _fused);
            } catch (e) {
                this.reject(e);
                throw e;
            }
        }
    }
}

MockHelper


上面我们定义了一个MockHelper类,这个类可以理解为对Websocket的封装,其他类似的处理机制都可以用这种方式来封装;


在实例化的过程中,会对Websocket进行创建,然后监听对应的消息响应事件,当收到消息的时候,会执行对应的消息处理器:

// 模拟消息接收
this.mock.onmessage = (data) => {
    // 熔断器
    let fusing = false;
    const fused = () => {
        fusing = true;
    }
    // 执行消息处理
    for (let i = 0; i < this.messageHandlers.length; i++) {
        const handler = this.messageHandlers[i];
        try {
            // 执行消息处理器
            handler.handle(data, fused);
        } catch (e) {
            // 消息处理器执行失败
            handler.reject(e);
            this.messageHandlers.splice(i, 1);
            i--;
        }
        // 熔断
        if (fusing) {
            this.messageHandlers.splice(i, 1);
            break;
        }
    }
}

方法很简单,每行代码都有注释,主要是思想,我们通过一个fusing变量来标记是否熔断;


熔断的意思就是结束后续的处理流程,执行熔断器之后,就代表这个消息已经被处理了,后续的消息处理器就不需要再处理了;


消息执行出现异常会被捕获,并且执行reject,然后从消息处理器列表中移除,这里不会被熔断,因为这个消息处理器已经执行失败了,可能是因为消息处理器的逻辑有问题,并不代表这个消息已经被处理了;


send


send方法是用来发送消息的,这个方法返回一个Promise,这个Promiseresolvereject是在消息处理器中执行的;

/**
 * 发送消息
 * @param data                  发送的消息内容
 * @param handler               对消息的处理函数
 * @return {Promise<unknown>}   返回一个 Promise
 */
function send(data, handler) {
    // 返回一个 Promise
    return new Promise((resolve, reject) => {
        // 创建消息处理器
        const messageHandler = new MessageHandler();
        messageHandler.callback = handler;
        messageHandler.resolve = resolve;
        messageHandler.reject = reject;
        messageHandler.data = data;
        // 设置超时,3s
        messageHandler.timer = setTimeout(() => {
            messageHandler.reject(new Error('timeout'));
            this.messageHandlers.splice(this.messageHandlers.indexOf(messageHandler), 1);
        }, 3000);
        // 添加到消息处理器列表
        this.messageHandlers.push(messageHandler);
        // 发送消息
        this.mock.send(data);
    })
}

send方法接收两个参数,第一个参数是发送的消息内容,第二个参数是对消息的处理函数;


发送的消息内容就不多说了,对消息的处理函数就是我们自定义对返回的消息进行处理的逻辑;


send方法的内容还是很简单的,最开始初始化一个MessageHandler对象,并且设置一个超时定时器,如果出现超时就将这个处理器从this.messageHandlers中删除;


然后,将这个处理器添加到this.messageHandlers中,this.messageHandlers最后会在onmessage中使用,这样就就形成了一闭环;


MessageHandler


send中传入的处理函数,最后会被封装成一个MessageHandler对象,这个对象就是消息处理器;

// 消息处理器
class MessageHandler {
    constructor() {
        this.callback = null;
        this.timer = null;
        this.resolve = null;
        this.reject = null;
        this.data = null;
    }
    // 处理消息
    handle(data, fused) {
        // 消息处理结果
        let result = {
            sendData: this.data,
            receiveData: data,
            handleData: null
        };
        // 熔断器
        const _fused = () => {
            // 收到熔断信号,清除定时器
            if (this.timer) {
                clearTimeout(this.timer);
                this.timer = null;
            }
            // 执行上层的熔断函数
            fused();
            // 执行 resolve
            if (this.resolve) {
                // 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData
                Promise.resolve().then(() => {
                    this.resolve(result);
                    this.resolve = null;
                });
            }
        }
        // 执行回调函数
        if (typeof this.callback === 'function') {
            try {
                result.handleData = this.callback(data, _fused);
            } catch (e) {
                this.reject(e);
                throw e;
            }
        }
    }
}

MessageHandler对象中有五个属性,callback是对消息的处理函数,timer是用来设置超时的定时器,resolvereject是用来执行Promise的,data是发送的消息内容;


handle方法是用来处理消息的,这个方法接收两个参数,第一个参数是接收到的消息内容,第二个参数是熔断器;


这里的核心还是熔断器,上面的onmessage的熔断器很简单,就是设置一个标记,然后在onmessage中判断这个标记,如果是熔断状态,就不再执行后面的逻辑;


这里面的熔断器会执行一系列的逻辑:

  // 熔断器
const _fused = () => {
    // 收到熔断信号,清除定时器
    if (this.timer) {
        clearTimeout(this.timer);
        this.timer = null;
    }
    // 执行上层的熔断函数
    fused();
    // 执行 resolve
    if (this.resolve) {
        // 这里需要等待回调函数执行完毕,才能执行 resolve,这样才能拿到 handleData
        Promise.resolve().then(() => {
            this.resolve(result);
            this.resolve = null;
        });
    }
}

首先,收到熔断信号,清除定时器,这样就不会再执行超时的逻辑;


然后执行上层的熔断器,这样在onmessage中就不会再执行后面的逻辑;


最后,执行resolve,这里将resolve放到了Promise.resolve().then()中,这样就可以等待回调函数执行完毕,再执行resolve,这样才能拿到handleData


使用


使用很简单,直接看代码:

// 实例化
const mockHelper = new MockHelper();
// 直接发送消息,并传入消息处理函数
mockHelper.send('hello', (data, fused) => {
    if (data === 'world') {
        fused();
        return 'hello world';
    }
}).then((data) => {
    // 这里可以拿到 发送的消息内容,接收的消息内容,处理后的消息内容
    console.log('then', data);
}).catch((e) => {
    console.log('catch', e);
});

看到这里不知道你会不会有一堆的疑问,比如:


  • 为什么要在send中传入消息处理函数?我直接在这个消息处理函数中处理消息不就行了吗?
  • 为什么要用Promise来封装呢?你这里都有回调了,感觉没必要啊?
  • 你这个最后还不是会有一堆的if判断吗?而且我如果要发两次相同的消息,这个代码我还要写两遍吗?
  • 其他的问题...


这当然是为了方便大家使用,按照现在的封装,我们只需要在send中传入消息处理函数,然后在这个函数中处理消息,就可以了;


但是实际情况下我们还是要尽可以的对代码进行复用,光说不练假把式,我们来看看怎么复用这个代码:

// 实例化
const mockHelper = new MockHelper();
// 发送 hello 消息
const sendHello = () => {
    return mockHelper.send('hello', (data, fused) => {
        if (data === 'world') {
            fused();
            return 'hello world';
        }
    });
};
// 发送 world 消息
const sendWorld = () => {
    return mockHelper.send('world', (data, fused) => {
        if (data === 'hello') {
            fused();
            return 'hello world';
        }
    });
};
// 使用
sendHello().then((data) => {
    console.log(data);
})
sendHello().then((data) => {
    console.log(data);
})
sendHello().then((data) => {
    console.log(data);
});
sendWorld().then((data) => {
    console.log(data);
})
sendWorld().then((data) => {
    console.log(data);
})
sendWorld().then((data) => {
    console.log(data);
})
// ...其他
// 最后才是直接使用 send
mockHelper.send('xxx', (data, fused) => {
    if (data === 'xxx') {
        fused();
        return '...';
    }
}).then((data) => {
    console.log(data);
})

对比


在我最开始没有想到这种封装的时候,我就是用下面这种方式来封装的,屎山!!!


按照最开始没有封装的时候的逻辑,我们需要在onmessage事件中,可能会写一堆的if判断,然后再执行对应的回调函数,比如:

class WebsocketWrap {
    constructor(options) {
        this.options = options;
        this.ws = null;
        this.connect();
    }
    connect() {
        this.ws = new WebSocket(this.options.url);
        this.ws.onmessage = (event) => {
            this.onmessage(event.data);
        };
    }
    onmessage = (data) => {
        // 逻辑处理 a
        if (data === 'a') {
            this.options['onA'](data);
            return;
        }
        // 逻辑处理 b
        if (data.xxx === 'xxx') {
            this.options['onXxx'](data);
            return;
        }
        // 逻辑处理 c
        if (data.yyy === 'yyy') {
            this.options['onYyy'](data);
            return;
        }
        // 各种其他自定义处理...
    }
    send(data) {
        this.ws.send(data);
    }
}
// 使用
const ws = new WebsocketWrap({
    url: 'ws://localhost:8080',
    onA: (data) => {
        console.log('onA', data);
    },
    onXxx: (data) => {
        console.log('onXxx', data);
    },
    onYyy: (data) => {
        console.log('onYyy', data);
    }
});

这样的代码,如果逻辑处理的函数比较多,那么onmessage函数就会很长,而且不好维护,也不好扩展;


如果现在来了一个新的需求,我们就需要在onmessage函数中,再添加一个if判断,然后再添加一个对应的处理函数,这样的代码就会变得越来越难维护;


而我上面的封装就是解决了这个问题,我将onmessage事件中的if进行了封装,在send方法中传入的处理函数,就是对消息的处理函数,也就是这个if判断的逻辑;


最后消息处理完毕,执行熔断器,然后将处理后的消息通过resolve返回出去,这样就可以在then中拿到处理后的消息了;


总结


上面只是一个很简单的封装,主要介绍的还是思想,其实里面还有很多可以优化的地方,比如:


  1. 如果发送的消息并没有回应应该怎么修改?
  2. 这里服务端主动推送的消息全都被丢弃了,应该怎么修改?
  3. 是否可以增加一些配置,比如超时时间,重试次数等等?
  4. 其他...


我能想到的问题就直接在码上掘金中解决了,如果还有什么其他的想法,可以自己尝试添加,感谢阅读,欢迎评论,如果觉得不错,可以点个赞,谢谢!



目录
相关文章
|
3月前
|
移动开发 缓存 网络协议
Websocket协议原理及Ws服务器代码实现
Websocket协议原理及Ws服务器代码实现
websocket封装带心跳和重连机制(vue3+ts+vite)
websocket封装带心跳和重连机制(vue3+ts+vite)
1524 0
|
6月前
|
JavaScript
websocket | 浅浅的封装一下
在写websocket的时候,很多页面都需要使用,每个页面重复写,太麻烦,于是先浅浅的封装一下。
|
7月前
|
前端开发 API
websocket使用实践代码指南
websocket使用实践代码指南
237 0
|
9月前
|
JavaScript
vue websocket组件封装
vue websocket组件封装
193 0
|
9月前
|
JSON 关系型数据库 MySQL
php使用webSocket实现Echarts长连接自动刷新的解决方案(2):后端服务端代码返回json数据
php使用webSocket实现Echarts长连接自动刷新的解决方案(2):后端服务端代码返回json数据
113 0
|
开发框架 前端开发 网络协议
服务器开发- Asp.Net Core中的websocket,并封装一个简单的中间件
服务器开发- Asp.Net Core中的websocket,并封装一个简单的中间件
421 1
|
前端开发 测试技术 应用服务中间件
接口并发测试之:WebSocket从原理到代码实战,我没草率~
接口并发测试之:WebSocket从原理到代码实战,我没草率~
520 0
|
Java Maven
java WebSocket客户端断线重连 | 实用代码框架
java WebSocket客户端断线重连 | 实用代码框架
|
存储 缓存 移动开发
H5 新增内容大全(包括Web Workers、SSE、WebSocket的详细使用代码)
H5 新增内容大全(包括Web Workers、SSE、WebSocket的详细使用代码)
H5 新增内容大全(包括Web Workers、SSE、WebSocket的详细使用代码)