Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心

简介: Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心

本文介绍了如何在Golang中使用WebSocket实现一个消息推送系统,通过建立与用户ID关联的WebSocket连接,并提供HTTP接口向特定用户推送消息。作者详细讲解了Client和Hub结构体的调整,以及如何处理连接、消息发送和接收等关键步骤。

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站

有了前两篇的铺垫,相信大家已经对 Golang 中 WebSocket 的使用有一定的了解了,

今天我们以一个更加真实的例子来学习如何在 Golang 中使用 WebSocket

需求背景

在实际的项目中,往往有一些任务耗时比较长,然后我们会把这些任务做异步的处理,但是又要及时给客户端反馈任务的处理进度。

对于这种场景,我们可以使用 WebSocket 来实现。其他可以使用 WebSocket 进行通知的场景还有像管理后台一些通知(比如新订单通知)等。

在本篇文章中,就是要实现一个这样的消息推送系统,具体来说,它会有以下功能:

  1. 可以给特定的用户推送:建立连接的时候,就建立起 WebSocket 连接与用户 ID 之间的关联
  2. 断开连接的时候,移除 WebSocket 连接与用户的关联,并且关闭这个 WebSocket 连接
  3. 业务系统可以通过 HTTP 接口来给特定的用户推送 WebSocket 消息:只要传递用户 ID 以及需要推送的消息即可

基础框架

下面是一个最简单版本的框架图:

它包含如下几个角色:

  1. Client 客户端,也就是实际中接收消息通知的浏览器
  2. Server 服务端,在我们的例子中,服务端实际不处理业务逻辑,只处理跟客户端的消息交互:维持 WebSocket 连接,推送消息到特定的 WebSocket 连接
  3. 业务逻辑:这个实际上不属于 demo 的一部分,但是 Server 推送的数据是来自业务逻辑处理的结果

设计成这样的目的是为了将技术跟业务进行分离,业务逻辑上的变化不影响到底层技术,同样的,WebSocket 推送中心的技术上的变动也不会影响到实际的业务。

开始开发

一些结构体变动

  1. Client 结构体的变化
type Client struct {
  hub *Hub
  conn *websocket.Conn
  send chan []byte
    // 新增字段
    uid int
}

因为我们需要建立起 WebSocket 连接与用户之间的关联,因此我们需要一个额外的字段来记录用户 ID,也就是上面的 uid 字段。

这个字段会在客户端建立连接后写入。

  1. Hub 结构体的变化
type Hub struct {
  clients map[*Client]bool
  register chan *Client
  unregister chan *Client

  // 记录 uid 跟 client 的对应关系
  userClients map[int]*Client

    // 读写锁,保护 userClients 以及 clients 的读写
  sync.RWMutex
}
  1. 因为我们不再需要做广播,所以会移除 Hub 中的 broadcast 字段。

取而代之的是,我们会直接在消息推送接口中写入到 uid 对应的 Clientsend 通道。

当然我们也可以在 Hub 中另外加一个字段来记录要推送给不同 uid 的消息,但是我们的 Hubrun 方法是一个协程处理的,当需要推送的数据较多或者其中有

网络延迟的时候,会直接影响到推送给其他用户的消息。当然我们也可以改造一下 run 方法,启动多个协程来处理,不过这样比较复杂,本文会在 writePump 中处理。

(也就是建立 WebSocket 连接时的那个写操作协程)

  1. 同时为了更加快速地通过 uid 来获取对应的 WebSocket 连接,新增了一个 userClients 字段。

这是一个 map 类型的字段,keyuid,值是对应的 Client 指针。

  1. 最后新增了一个 Mutex 互斥锁

因为,在用户实际进行登录的时候需要写入 userClients 字段,而这是一个 map 类型字段,并不支持并发读写。

如果我们在接受并发连接的时候同时修改 userClients 的时候会导致 panic,因此我们使用了一个互斥锁来保证 userClients 的读写安全。

同时,clients 也是一个 map,但上一篇文章中没有使用 sync.Mutex 来保护它的读写,在并发操作的时候也是会有问题的,

所以 Mutex 同时也需要保护 clients 的读写。

func (h *Hub) run() {
  for {
    select {
    case client := <-h.register:
      h.Lock()
      h.clients[client] = true
      h.Unlock()
    case client := <-h.unregister:
      if _, ok := h.clients[client]; ok {
        h.Lock()
        delete(h.userClients, client.uid)
        delete(h.clients, client)
        h.Unlock()
        close(client.send)
      }
    }
  }
}

最后,我们会在 Hubrun 方法中写 userClients 或者 clients 字段的时候,先获取锁,写成功的时候释放锁。

建立连接

在本篇中,将会继续沿用上一篇的代码,只是其中一些细节会有所改动。建立连接这步操作,跟上一篇的一样:

// 将 HTTP 转换为 WebSocket 连接的 Upgrader
var upgrader = websocket.Upgrader{
  ReadBufferSize:  1024,
  WriteBufferSize: 1024,
}

// 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
    // 升级为 WebSocket 连接
  conn, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
    log.Println(err)
    return
  }
    // 新建一个 Client
  client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
    // 注册到 Hub
  client.hub.register <- client

    // 推送消息的协程
  go client.writePump()
    // 结束消息的协程
  go client.readPump()
}

接收消息

由于我们要做的只是一个推送消息的系统,所以我们只处理用户发来的登录请求,其他的消息会全部丢弃:

func (c *Client) readPump() {
  defer func() {
    c.hub.unregister <- c
    _ = c.conn.Close()
  }()
  c.conn.SetReadLimit(maxMessageSize)
  c.conn.SetReadDeadline(time.Time{}) // 永不超时
  for {
    // 从客户端接收消息
    _, message, err := c.conn.ReadMessage()
    if err != nil {
      log.Println("readPump error: ", err)
      break
    }

    // 只处理登录消息
    var data = make(map[string]string)
    err = json.Unmarshal(message, &data)
    if err != nil {
      break
    }

    // 写入 uid 以及 Hub 的 userClients
    if uid, ok := data["uid"]; ok {
      c.uid = uid
      c.hub.Lock()
      c.hub.userClients[uid] = c
      c.hub.Unlock()
    }
  }
}

在本文中,假设客户端的登录消息格式为 {"uid": "123456"} 这种 json 格式。

在这里也操作了 userClients 字段,同样需要使用互斥锁来保证操作的安全性。

发送消息

  1. 在我们的系统中,可以提供一个 HTTP 接口来跟业务系统进行交互:
// 发送消息的接口
// 参数:
// 1. uid:接收消息的用户 ID
// 2. message:需要发送给这个用户的消息
http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
    send(hub, w, r)
})

// 发送消息的方法
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
  uid := r.FormValue("uid")
  // 参数错误
  if uid == "" {
    w.WriteHeader(http.StatusBadRequest)
    return
  }

  // 从 hub 中获取 client
  hub.Lock()
  client, ok := hub.userClients[uid]
  hub.Unlock()
  // 尚未建立连接
  if !ok {
    w.WriteHeader(http.StatusBadRequest)
    return
  }

  // 发送消息
  message := r.FormValue("message")
  client.send <- []byte(message)
}
  1. 实际发送消息的操作

writePump 方法中,我们会将从 /send 接收到的数据发送给对应的用户:

// 发送消息的协程
func (c *Client) writePump() {
  defer func() {
    _ = c.conn.Close()
  }()
  for {
    select {
    case message, ok := <-c.send:
      // 设置写超时时间
      c.conn.SetWriteDeadline(time.Now().Add(writeWait))
      // 连接已经被关闭了
      if !ok {
        c.conn.WriteMessage(websocket.CloseMessage, []byte{})
        return
      }

      // 获取一个发送消息的 Writer
      w, err := c.conn.NextWriter(websocket.TextMessage)
      if err != nil {
        return
      }
      // 写入消息到 Writer
      w.Write(message)

            // 关闭 Writer
      if err := w.Close(); err != nil {
        return
      }
    }
  }
}

在这个方法中,我们会从 c.send 这个 chan 中获取需要发送给客户端的消息,然后进行发送操作。

测试

  1. 启动 main 程序
go run main.go
  1. 打开一个浏览器的控制台,执行以下代码
ws = new WebSocket('ws://127.0.0.1:8181/ws')
ws.send('{"uid": "123"}')

这两行代码的作用是与 WebSocket 服务器建立连接,然后发送一个登录信息。

然后我们打开控制台的 Network -> WS -> Message 就可以看到浏览器发给服务端的消息:

  1. 使用 HTTP 客户端发送消息给 uid 为 123 的用户

假设我们的 WebSocket 服务器绑定的端口为 8181

打开终端,执行以下命令:

curl "http://localhost:8181/send?uid=123&message=Hello%20World"

然后我们可以在 Network -> WS -> Message 看到接收到了消息 Hello World

结束了

到此为止,我们已经实现了一个初步可工作的 WebSocket 应用,当然还有很多可以优化的地方,

比如:

  1. 错误处理
  2. Hub 状态目前对外部来说是一个黑盒子,我们可以加个接口返回一下 Hub 的当前状态,比如当前连接数
  3. 日志:出错的时候,日志可以帮助我们快速定位问题

这些功能会在后续继续完善,今天就到此为止了。


目录
相关文章
|
19天前
|
前端开发 JavaScript UED
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
通过在Django项目中集成Channels和WebSocket,我们能够为前后端分离的应用添加实时通信功能,实现诸如在线聊天、实时数据更新等交互式场景。这不仅增强了应用的功能性,也提升了用户体验。随着实时Web应用的日益普及,掌握Django Channels和WebSocket的集成将为开发者开启新的可能性,推动Web应用的发展迈向更高层次的实时性和交互性。
43 1
|
27天前
|
算法 安全 测试技术
golang 栈数据结构的实现和应用
本文详细介绍了“栈”这一数据结构的特点,并用Golang实现栈。栈是一种FILO(First In Last Out,即先进后出或后进先出)的数据结构。文章展示了如何用slice和链表来实现栈,并通过golang benchmark测试了二者的性能差异。此外,还提供了几个使用栈结构解决的实际算法问题示例,如有效的括号匹配等。
golang 栈数据结构的实现和应用
|
16天前
|
负载均衡 网络协议 C#
C#实现WebSocket实时消息推送技术详解
C#实现WebSocket实时消息推送技术详解
23 1
|
1月前
|
JavaScript 前端开发 UED
WebSocket在Python Web开发中的革新应用:解锁实时通信的新可能
在快速发展的Web应用领域中,实时通信已成为许多现代应用不可或缺的功能。传统的HTTP请求/响应模式在处理实时数据时显得力不从心,而WebSocket技术的出现,为Python Web开发带来了革命性的变化,它允许服务器与客户端之间建立持久的连接,从而实现了数据的即时传输与交换。本文将通过问题解答的形式,深入探讨WebSocket在Python Web开发中的革新应用及其实现方法。
36 3
|
8天前
|
中间件 Go 数据处理
应用golang的管道-过滤器架构风格
【10月更文挑战第1天】本文介绍了一种面向数据流的软件架构设计模式——管道-过滤器(Pipe and Filter),并通过Go语言的Gin框架实现了一个Web应用示例。该模式通过将数据处理流程分解为一系列独立的组件(过滤器),并利用管道连接这些组件,实现了模块化、可扩展性和高效的分布式处理。文中详细讲解了Gin框架的基本使用、中间件的应用以及性能优化方法,展示了如何构建高性能的Web服务。
24 0
|
10天前
|
消息中间件 网络协议 安全
C# 一分钟浅谈:WebSocket 协议应用
【10月更文挑战第6天】在过去的一年中,我参与了一个基于 WebSocket 的实时通信系统项目,该项目不仅提升了工作效率,还改善了用户体验。本文将分享在 C# 中应用 WebSocket 协议的经验和心得,包括基础概念、C# 实现示例、常见问题及解决方案等内容,希望能为广大开发者提供参考。
48 0
|
1月前
|
存储 监控 Go
面向OpenTelemetry的Golang应用无侵入插桩技术
文章主要讲述了阿里云 ARMS 团队与程序语言与编译器团队合作研发的面向OpenTelemetry的Golang应用无侵入插桩技术解决方案,旨在解决Golang应用监控的挑战。
|
2月前
|
存储 Prometheus 监控
Golang 搭建 WebSocket 应用(六) - 监控
Golang 搭建 WebSocket 应用(六) - 监控
33 3
|
2月前
|
Go 开发者
|
2月前
|
人工智能 安全 Go
Golang 搭建 WebSocket 应用(八) - 完整代码
Golang 搭建 WebSocket 应用(八) - 完整代码
33 0