本文介绍了如何在Golang中使用WebSocket实现一个消息推送系统,通过建立与用户ID关联的WebSocket连接,并提供HTTP接口向特定用户推送消息。作者详细讲解了Client和Hub结构体的调整,以及如何处理连接、消息发送和接收等关键步骤。
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。
有了前两篇的铺垫,相信大家已经对 Golang 中 WebSocket
的使用有一定的了解了,
今天我们以一个更加真实的例子来学习如何在 Golang 中使用 WebSocket
。
需求背景
在实际的项目中,往往有一些任务耗时比较长,然后我们会把这些任务做异步的处理,但是又要及时给客户端反馈任务的处理进度。
对于这种场景,我们可以使用 WebSocket
来实现。其他可以使用 WebSocket
进行通知的场景还有像管理后台一些通知(比如新订单通知)等。
在本篇文章中,就是要实现一个这样的消息推送系统,具体来说,它会有以下功能:
- 可以给特定的用户推送:建立连接的时候,就建立起
WebSocket
连接与用户 ID 之间的关联 - 断开连接的时候,移除
WebSocket
连接与用户的关联,并且关闭这个WebSocket
连接 - 业务系统可以通过 HTTP 接口来给特定的用户推送
WebSocket
消息:只要传递用户 ID 以及需要推送的消息即可
基础框架
下面是一个最简单版本的框架图:
它包含如下几个角色:
Client
客户端,也就是实际中接收消息通知的浏览器Server
服务端,在我们的例子中,服务端实际不处理业务逻辑,只处理跟客户端的消息交互:维持WebSocket
连接,推送消息到特定的WebSocket
连接- 业务逻辑:这个实际上不属于 demo 的一部分,但是
Server
推送的数据是来自业务逻辑处理的结果
设计成这样的目的是为了将技术跟业务进行分离,业务逻辑上的变化不影响到底层技术,同样的,WebSocket
推送中心的技术上的变动也不会影响到实际的业务。
开始开发
一些结构体变动
Client
结构体的变化
type Client struct { hub *Hub conn *websocket.Conn send chan []byte // 新增字段 uid int }
因为我们需要建立起 WebSocket
连接与用户之间的关联,因此我们需要一个额外的字段来记录用户 ID,也就是上面的 uid
字段。
这个字段会在客户端建立连接后写入。
Hub
结构体的变化
type Hub struct { clients map[*Client]bool register chan *Client unregister chan *Client // 记录 uid 跟 client 的对应关系 userClients map[int]*Client // 读写锁,保护 userClients 以及 clients 的读写 sync.RWMutex }
- 因为我们不再需要做广播,所以会移除
Hub
中的broadcast
字段。
取而代之的是,我们会直接在消息推送接口中写入到 uid
对应的 Client
的 send
通道。
当然我们也可以在 Hub
中另外加一个字段来记录要推送给不同 uid
的消息,但是我们的 Hub
的 run
方法是一个协程处理的,当需要推送的数据较多或者其中有
网络延迟的时候,会直接影响到推送给其他用户的消息。当然我们也可以改造一下 run
方法,启动多个协程来处理,不过这样比较复杂,本文会在 writePump
中处理。
(也就是建立 WebSocket
连接时的那个写操作协程)
- 同时为了更加快速地通过
uid
来获取对应的WebSocket
连接,新增了一个userClients
字段。
这是一个 map
类型的字段,key
是 uid
,值是对应的 Client
指针。
- 最后新增了一个
Mutex
互斥锁
因为,在用户实际进行登录的时候需要写入 userClients
字段,而这是一个 map
类型字段,并不支持并发读写。
如果我们在接受并发连接的时候同时修改 userClients
的时候会导致 panic
,因此我们使用了一个互斥锁来保证 userClients
的读写安全。
同时,clients
也是一个 map
,但上一篇文章中没有使用 sync.Mutex
来保护它的读写,在并发操作的时候也是会有问题的,
所以 Mutex
同时也需要保护 clients
的读写。
func (h *Hub) run() { for { select { case client := <-h.register: h.Lock() h.clients[client] = true h.Unlock() case client := <-h.unregister: if _, ok := h.clients[client]; ok { h.Lock() delete(h.userClients, client.uid) delete(h.clients, client) h.Unlock() close(client.send) } } } }
最后,我们会在 Hub
的 run
方法中写 userClients
或者 clients
字段的时候,先获取锁,写成功的时候释放锁。
建立连接
在本篇中,将会继续沿用上一篇的代码,只是其中一些细节会有所改动。建立连接这步操作,跟上一篇的一样:
// 将 HTTP 转换为 WebSocket 连接的 Upgrader var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } // 处理 WebSocket 连接请求 func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { // 升级为 WebSocket 连接 conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) return } // 新建一个 Client client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} // 注册到 Hub client.hub.register <- client // 推送消息的协程 go client.writePump() // 结束消息的协程 go client.readPump() }
接收消息
由于我们要做的只是一个推送消息的系统,所以我们只处理用户发来的登录请求,其他的消息会全部丢弃:
func (c *Client) readPump() { defer func() { c.hub.unregister <- c _ = c.conn.Close() }() c.conn.SetReadLimit(maxMessageSize) c.conn.SetReadDeadline(time.Time{}) // 永不超时 for { // 从客户端接收消息 _, message, err := c.conn.ReadMessage() if err != nil { log.Println("readPump error: ", err) break } // 只处理登录消息 var data = make(map[string]string) err = json.Unmarshal(message, &data) if err != nil { break } // 写入 uid 以及 Hub 的 userClients if uid, ok := data["uid"]; ok { c.uid = uid c.hub.Lock() c.hub.userClients[uid] = c c.hub.Unlock() } } }
在本文中,假设客户端的登录消息格式为 {"uid": "123456"}
这种 json
格式。
在这里也操作了
userClients
字段,同样需要使用互斥锁来保证操作的安全性。
发送消息
- 在我们的系统中,可以提供一个 HTTP 接口来跟业务系统进行交互:
// 发送消息的接口 // 参数: // 1. uid:接收消息的用户 ID // 2. message:需要发送给这个用户的消息 http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) { send(hub, w, r) }) // 发送消息的方法 func send(hub *Hub, w http.ResponseWriter, r *http.Request) { uid := r.FormValue("uid") // 参数错误 if uid == "" { w.WriteHeader(http.StatusBadRequest) return } // 从 hub 中获取 client hub.Lock() client, ok := hub.userClients[uid] hub.Unlock() // 尚未建立连接 if !ok { w.WriteHeader(http.StatusBadRequest) return } // 发送消息 message := r.FormValue("message") client.send <- []byte(message) }
- 实际发送消息的操作
在 writePump
方法中,我们会将从 /send
接收到的数据发送给对应的用户:
// 发送消息的协程 func (c *Client) writePump() { defer func() { _ = c.conn.Close() }() for { select { case message, ok := <-c.send: // 设置写超时时间 c.conn.SetWriteDeadline(time.Now().Add(writeWait)) // 连接已经被关闭了 if !ok { c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } // 获取一个发送消息的 Writer w, err := c.conn.NextWriter(websocket.TextMessage) if err != nil { return } // 写入消息到 Writer w.Write(message) // 关闭 Writer if err := w.Close(); err != nil { return } } } }
在这个方法中,我们会从 c.send
这个 chan
中获取需要发送给客户端的消息,然后进行发送操作。
测试
- 启动
main
程序
go run main.go
- 打开一个浏览器的控制台,执行以下代码
ws = new WebSocket('ws://127.0.0.1:8181/ws') ws.send('{"uid": "123"}')
这两行代码的作用是与 WebSocket
服务器建立连接,然后发送一个登录信息。
然后我们打开控制台的 Network -> WS -> Message
就可以看到浏览器发给服务端的消息:
- 使用 HTTP 客户端发送消息给 uid 为 123 的用户
假设我们的
WebSocket
服务器绑定的端口为8181
打开终端,执行以下命令:
curl "http://localhost:8181/send?uid=123&message=Hello%20World"
然后我们可以在 Network -> WS -> Message
看到接收到了消息 Hello World
。
结束了
到此为止,我们已经实现了一个初步可工作的 WebSocket
应用,当然还有很多可以优化的地方,
比如:
- 错误处理
Hub
状态目前对外部来说是一个黑盒子,我们可以加个接口返回一下Hub
的当前状态,比如当前连接数- 日志:出错的时候,日志可以帮助我们快速定位问题
这些功能会在后续继续完善,今天就到此为止了。