Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心

简介: Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心

本文介绍了如何在Golang中使用WebSocket实现一个消息推送系统,通过建立与用户ID关联的WebSocket连接,并提供HTTP接口向特定用户推送消息。作者详细讲解了Client和Hub结构体的调整,以及如何处理连接、消息发送和接收等关键步骤。

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站

有了前两篇的铺垫,相信大家已经对 Golang 中 WebSocket 的使用有一定的了解了,

今天我们以一个更加真实的例子来学习如何在 Golang 中使用 WebSocket

需求背景

在实际的项目中,往往有一些任务耗时比较长,然后我们会把这些任务做异步的处理,但是又要及时给客户端反馈任务的处理进度。

对于这种场景,我们可以使用 WebSocket 来实现。其他可以使用 WebSocket 进行通知的场景还有像管理后台一些通知(比如新订单通知)等。

在本篇文章中,就是要实现一个这样的消息推送系统,具体来说,它会有以下功能:

  1. 可以给特定的用户推送:建立连接的时候,就建立起 WebSocket 连接与用户 ID 之间的关联
  2. 断开连接的时候,移除 WebSocket 连接与用户的关联,并且关闭这个 WebSocket 连接
  3. 业务系统可以通过 HTTP 接口来给特定的用户推送 WebSocket 消息:只要传递用户 ID 以及需要推送的消息即可

基础框架

下面是一个最简单版本的框架图:

它包含如下几个角色:

  1. Client 客户端,也就是实际中接收消息通知的浏览器
  2. Server 服务端,在我们的例子中,服务端实际不处理业务逻辑,只处理跟客户端的消息交互:维持 WebSocket 连接,推送消息到特定的 WebSocket 连接
  3. 业务逻辑:这个实际上不属于 demo 的一部分,但是 Server 推送的数据是来自业务逻辑处理的结果

设计成这样的目的是为了将技术跟业务进行分离,业务逻辑上的变化不影响到底层技术,同样的,WebSocket 推送中心的技术上的变动也不会影响到实际的业务。

开始开发

一些结构体变动

  1. Client 结构体的变化
type Client struct {
  hub *Hub
  conn *websocket.Conn
  send chan []byte
    // 新增字段
    uid int
}

因为我们需要建立起 WebSocket 连接与用户之间的关联,因此我们需要一个额外的字段来记录用户 ID,也就是上面的 uid 字段。

这个字段会在客户端建立连接后写入。

  1. Hub 结构体的变化
type Hub struct {
  clients map[*Client]bool
  register chan *Client
  unregister chan *Client

  // 记录 uid 跟 client 的对应关系
  userClients map[int]*Client

    // 读写锁,保护 userClients 以及 clients 的读写
  sync.RWMutex
}
  1. 因为我们不再需要做广播,所以会移除 Hub 中的 broadcast 字段。

取而代之的是,我们会直接在消息推送接口中写入到 uid 对应的 Clientsend 通道。

当然我们也可以在 Hub 中另外加一个字段来记录要推送给不同 uid 的消息,但是我们的 Hubrun 方法是一个协程处理的,当需要推送的数据较多或者其中有

网络延迟的时候,会直接影响到推送给其他用户的消息。当然我们也可以改造一下 run 方法,启动多个协程来处理,不过这样比较复杂,本文会在 writePump 中处理。

(也就是建立 WebSocket 连接时的那个写操作协程)

  1. 同时为了更加快速地通过 uid 来获取对应的 WebSocket 连接,新增了一个 userClients 字段。

这是一个 map 类型的字段,keyuid,值是对应的 Client 指针。

  1. 最后新增了一个 Mutex 互斥锁

因为,在用户实际进行登录的时候需要写入 userClients 字段,而这是一个 map 类型字段,并不支持并发读写。

如果我们在接受并发连接的时候同时修改 userClients 的时候会导致 panic,因此我们使用了一个互斥锁来保证 userClients 的读写安全。

同时,clients 也是一个 map,但上一篇文章中没有使用 sync.Mutex 来保护它的读写,在并发操作的时候也是会有问题的,

所以 Mutex 同时也需要保护 clients 的读写。

func (h *Hub) run() {
  for {
    select {
    case client := <-h.register:
      h.Lock()
      h.clients[client] = true
      h.Unlock()
    case client := <-h.unregister:
      if _, ok := h.clients[client]; ok {
        h.Lock()
        delete(h.userClients, client.uid)
        delete(h.clients, client)
        h.Unlock()
        close(client.send)
      }
    }
  }
}

最后,我们会在 Hubrun 方法中写 userClients 或者 clients 字段的时候,先获取锁,写成功的时候释放锁。

建立连接

在本篇中,将会继续沿用上一篇的代码,只是其中一些细节会有所改动。建立连接这步操作,跟上一篇的一样:

// 将 HTTP 转换为 WebSocket 连接的 Upgrader
var upgrader = websocket.Upgrader{
  ReadBufferSize:  1024,
  WriteBufferSize: 1024,
}

// 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
    // 升级为 WebSocket 连接
  conn, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
    log.Println(err)
    return
  }
    // 新建一个 Client
  client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
    // 注册到 Hub
  client.hub.register <- client

    // 推送消息的协程
  go client.writePump()
    // 结束消息的协程
  go client.readPump()
}

接收消息

由于我们要做的只是一个推送消息的系统,所以我们只处理用户发来的登录请求,其他的消息会全部丢弃:

func (c *Client) readPump() {
  defer func() {
    c.hub.unregister <- c
    _ = c.conn.Close()
  }()
  c.conn.SetReadLimit(maxMessageSize)
  c.conn.SetReadDeadline(time.Time{}) // 永不超时
  for {
    // 从客户端接收消息
    _, message, err := c.conn.ReadMessage()
    if err != nil {
      log.Println("readPump error: ", err)
      break
    }

    // 只处理登录消息
    var data = make(map[string]string)
    err = json.Unmarshal(message, &data)
    if err != nil {
      break
    }

    // 写入 uid 以及 Hub 的 userClients
    if uid, ok := data["uid"]; ok {
      c.uid = uid
      c.hub.Lock()
      c.hub.userClients[uid] = c
      c.hub.Unlock()
    }
  }
}

在本文中,假设客户端的登录消息格式为 {"uid": "123456"} 这种 json 格式。

在这里也操作了 userClients 字段,同样需要使用互斥锁来保证操作的安全性。

发送消息

  1. 在我们的系统中,可以提供一个 HTTP 接口来跟业务系统进行交互:
// 发送消息的接口
// 参数:
// 1. uid:接收消息的用户 ID
// 2. message:需要发送给这个用户的消息
http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
    send(hub, w, r)
})

// 发送消息的方法
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
  uid := r.FormValue("uid")
  // 参数错误
  if uid == "" {
    w.WriteHeader(http.StatusBadRequest)
    return
  }

  // 从 hub 中获取 client
  hub.Lock()
  client, ok := hub.userClients[uid]
  hub.Unlock()
  // 尚未建立连接
  if !ok {
    w.WriteHeader(http.StatusBadRequest)
    return
  }

  // 发送消息
  message := r.FormValue("message")
  client.send <- []byte(message)
}
  1. 实际发送消息的操作

writePump 方法中,我们会将从 /send 接收到的数据发送给对应的用户:

// 发送消息的协程
func (c *Client) writePump() {
  defer func() {
    _ = c.conn.Close()
  }()
  for {
    select {
    case message, ok := <-c.send:
      // 设置写超时时间
      c.conn.SetWriteDeadline(time.Now().Add(writeWait))
      // 连接已经被关闭了
      if !ok {
        c.conn.WriteMessage(websocket.CloseMessage, []byte{})
        return
      }

      // 获取一个发送消息的 Writer
      w, err := c.conn.NextWriter(websocket.TextMessage)
      if err != nil {
        return
      }
      // 写入消息到 Writer
      w.Write(message)

            // 关闭 Writer
      if err := w.Close(); err != nil {
        return
      }
    }
  }
}

在这个方法中,我们会从 c.send 这个 chan 中获取需要发送给客户端的消息,然后进行发送操作。

测试

  1. 启动 main 程序
go run main.go
  1. 打开一个浏览器的控制台,执行以下代码
ws = new WebSocket('ws://127.0.0.1:8181/ws')
ws.send('{"uid": "123"}')

这两行代码的作用是与 WebSocket 服务器建立连接,然后发送一个登录信息。

然后我们打开控制台的 Network -> WS -> Message 就可以看到浏览器发给服务端的消息:

  1. 使用 HTTP 客户端发送消息给 uid 为 123 的用户

假设我们的 WebSocket 服务器绑定的端口为 8181

打开终端,执行以下命令:

curl "http://localhost:8181/send?uid=123&message=Hello%20World"

然后我们可以在 Network -> WS -> Message 看到接收到了消息 Hello World

结束了

到此为止,我们已经实现了一个初步可工作的 WebSocket 应用,当然还有很多可以优化的地方,

比如:

  1. 错误处理
  2. Hub 状态目前对外部来说是一个黑盒子,我们可以加个接口返回一下 Hub 的当前状态,比如当前连接数
  3. 日志:出错的时候,日志可以帮助我们快速定位问题

这些功能会在后续继续完善,今天就到此为止了。


目录
相关文章
|
8天前
|
人工智能 开发框架 数据可视化
Eino:字节跳动开源基于Golang的AI应用开发框架,组件化设计助力构建AI应用
Eino 是字节跳动开源的大模型应用开发框架,帮助开发者高效构建基于大模型的 AI 应用。支持组件化设计、流式处理和可视化开发工具。
138 27
|
1月前
|
数据挖掘 UED
WebSocket在实时体育比分网站中的应用
WebSocket 在实时体育比分网站中用于实时比分更新、动态赛事信息推送、交互式功能(如即时聊天和投票)、赛程提醒与推送通知、比分预测与数据分析,以及多平台支持。通过持久连接,服务器可即时推送比分变化、球员动态、比赛状态等信息,减少延迟并提升用户体验。同时,WebSocket 支持双向通信,使用户能实时互动,确保跨平台的实时数据同步。
|
2月前
|
运维 监控 Cloud Native
一行代码都不改,Golang 应用链路指标日志全知道
本文将通过阿里云开源的 Golang Agent,帮助用户实现“一行代码都不改”就能获取到应用产生的各种观测数据,同时提升运维团队和研发团队的幸福感。
208 12
|
3月前
|
缓存 监控 前端开发
在 Go 语言中实现 WebSocket 实时通信的应用,包括 WebSocket 的简介、Go 语言的优势、基本实现步骤、应用案例、注意事项及性能优化策略,旨在帮助开发者构建高效稳定的实时通信系统
本文深入探讨了在 Go 语言中实现 WebSocket 实时通信的应用,包括 WebSocket 的简介、Go 语言的优势、基本实现步骤、应用案例、注意事项及性能优化策略,旨在帮助开发者构建高效稳定的实时通信系统。
200 1
|
3月前
|
Kubernetes Cloud Native JavaScript
为使用WebSocket构建的双向通信应用带来基于服务网格的全链路灰度
介绍如何使用为基于WebSocket的云原生应用构建全链路灰度方案。
|
4月前
|
JavaScript 前端开发 测试技术
前端全栈之路Deno篇(五):如何快速创建 WebSocket 服务端应用 + 客户端应用 - 可能是2025最佳的Websocket全栈实时应用框架
本文介绍了如何使用Deno 2.0快速构建WebSocket全栈应用,包括服务端和客户端的创建。通过一个简单的代码示例,展示了Deno在WebSocket实现中的便捷与强大,无需额外依赖,即可轻松搭建具备基本功能的WebSocket应用。Deno 2.0被认为是最佳的WebSocket全栈应用JS运行时,适合全栈开发者学习和使用。
213 7
|
4月前
|
中间件 Go 数据处理
应用golang的管道-过滤器架构风格
【10月更文挑战第1天】本文介绍了一种面向数据流的软件架构设计模式——管道-过滤器(Pipe and Filter),并通过Go语言的Gin框架实现了一个Web应用示例。该模式通过将数据处理流程分解为一系列独立的组件(过滤器),并利用管道连接这些组件,实现了模块化、可扩展性和高效的分布式处理。文中详细讲解了Gin框架的基本使用、中间件的应用以及性能优化方法,展示了如何构建高性能的Web服务。
113 0
|
4月前
|
消息中间件 网络协议 安全
C# 一分钟浅谈:WebSocket 协议应用
【10月更文挑战第6天】在过去的一年中,我参与了一个基于 WebSocket 的实时通信系统项目,该项目不仅提升了工作效率,还改善了用户体验。本文将分享在 C# 中应用 WebSocket 协议的经验和心得,包括基础概念、C# 实现示例、常见问题及解决方案等内容,希望能为广大开发者提供参考。
268 0
|
4月前
|
负载均衡 网络协议 C#
C#实现WebSocket实时消息推送技术详解
C#实现WebSocket实时消息推送技术详解
242 1
|
4月前
|
前端开发 JavaScript UED
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
通过在Django项目中集成Channels和WebSocket,我们能够为前后端分离的应用添加实时通信功能,实现诸如在线聊天、实时数据更新等交互式场景。这不仅增强了应用的功能性,也提升了用户体验。随着实时Web应用的日益普及,掌握Django Channels和WebSocket的集成将为开发者开启新的可能性,推动Web应用的发展迈向更高层次的实时性和交互性。
128 1