Golang 搭建 WebSocket 应用(八) - 完整代码

简介: Golang 搭建 WebSocket 应用(八) - 完整代码

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站

本文应该是本系列文章最后一篇了,前面留下的一些坑可能后面会再补充一下,但不在本系列文章中了。

整体架构

再来回顾一下我们的整体架构:

在我们的 demo 中,包含了以下几种角色:

  1. 客户端:一般是浏览器,用于接收消息;
  2. Hub:消息中心,用于管理所有的客户端连接,以及将消息推送给客户端;
  3. 调用 /send 发送消息的应用:用于将消息发送给 Hub,然后由 Hub 将消息推送给客户端。

然后,每一个 WebSocket 连接都有一个关联的读协程和写协程,

用于读取客户端发送的消息,以及将消息推送给客户端。

目录结构

├── LICENSE  // 协议
├── Makefile // 一些常用的命令
├── README.md
├── authenticator.go      // 认证器
├── authenticator_test.go // 认证器测试
├── bytes.go // 字符串和 []byte 之间转换的辅助方法
├── client.go // WebSocket 客户端
├── go.mod    // 项目依赖
├── go.sum    // 项目依赖
├── hub.go    // 消息中心
├── main.go   // 程序入口
├── message   // 消息记录器
│   ├── db_logger.go
│   ├── db_logger_test.go
│   ├── log.go
│   └── stdout_logger.go
├── server.go // HTTP 服务
└── server_test.go // HTTP 接口的测试

运行

注:需要 Go 1.20 或以上版本

  1. 下载依赖:

可以使用七牛云的代理加速下载。

go mod tidy
  1. 启动 WebSocket 服务端:
go run main.go

Hub 代码

最终,我们的 Hub 代码演进成了下面这样:

// bufferSize 通道缓冲区、map 初始化大小
const bufferSize = 128
// Handler 错误处理函数
type Handler func(log message.Log, err error)
// Hub 维护了所有的客户端连接
type Hub struct {
  // 注册请求
  register chan *Client
  // 取消注册请求
  unregister chan *Client
  // 记录 uid 跟 client 的对应关系
  userClients map[string]*Client
  // 互斥锁,保护 userClients 以及 clients 的读写
  sync.RWMutex
  // 消息记录器
  messageLogger message.Logger
  // 错误处理器
  errorHandler Handler
  // 验证器
  authenticator Authenticator
  // 等待发送的消息数量
  pending atomic.Int64
}
// 默认的错误处理器
func defaultErrorHandler(log message.Log, err error) {
  res, _ := json.Marshal(log)
  fmt.Printf("send message: %s, error: %s\n", string(res), err.Error())
}
func newHub() *Hub {
  return &Hub{
    register:      make(chan *Client),
    unregister:    make(chan *Client),
    userClients:   make(map[string]*Client, bufferSize),
    RWMutex:       sync.RWMutex{},
    messageLogger: &message.StdoutMessageLogger{},
    errorHandler:  defaultErrorHandler,
    authenticator: &JWTAuthenticator{},
  }
}
// 注册、取消注册请求处理
func (h *Hub) run() {
  for {
    select {
    case client := <-h.register:
      h.Lock()
      h.userClients[client.uid] = client
      h.Unlock()
    case client := <-h.unregister:
      h.Lock()
      close(client.send)
      delete(h.userClients, client.uid)
      h.Unlock()
    }
  }
}
// 返回 Hub 的当前的关键指标
func metrics(hub *Hub, w http.ResponseWriter) {
  pending := hub.pending.Load()
  connections := len(hub.userClients)
  _, _ = w.Write([]byte(fmt.Sprintf("# HELP connections 连接数\n# TYPE connections gauge\nconnections %d\n", connections)))
  _, _ = w.Write([]byte(fmt.Sprintf("# HELP pending 等待发送的消息数量\n# TYPE pending gauge\npending %d\n", pending)))
}

其中:

  • Hub 中的 registerunregister 通道用于处理客户端的注册和取消注册请求;
  • Hub 中的 userClients 用于记录 uidClient 的对应关系;
  • Hub 中的 messageLogger 用于记录消息;
  • Hub 中的 errorHandler 用于处理错误;
  • Hub 中的 authenticator 用于验证客户端的身份;
  • Hub 中的 pending 用于记录等待发送的消息数量。

目前实现存在的问题:

  • registerunregister 通道被消费的时候需要加锁,这样会导致 registerunregister 变成串行的,性能不好;
  • userClients 也是需要加锁的,这样会导致 userClients 的读写也是串行的,性能不好;

对于这两个问题,前面我们讨论过,一种可行的办法分段 map,然后对每一个 map 都有一个对应的 sync.Mutex 互斥锁来保证其读写的安全。

Client 代码

Client 比较关键的方法是:

  • writePump:负责将消息推送给客户端。
  • serveWs:处理 WebSocket 连接请求。
  • send:处理消息发送请求。

writePump

这个方法会从 send 通道中获取消息,然后推送给客户端。

推送失败会调用 errorHandler 处理错误。

推送成功会将 pending 减一。

// writePump 负责推送消息给 WebSocket 客户端
//
// 该方法在一个独立的协程中运行,我们保证了每个连接只有一个 writer。
// Client 会从 send 请求中获取消息,然后在这个方法中推送给客户端。
func (c *Client) writePump() {
  defer func() {
    _ = c.conn.Close()
  }()
  // 从 send 通道中获取消息,然后推送给客户端
  for {
    messageLog, ok := <-c.send
    // 设置写超时时间
    _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
    // c.send 这个通道已经被关闭了
    if !ok {
      c.hub.pending.Add(int64(-1 * len(c.send)))
      return
    }
    if err := c.conn.WriteMessage(websocket.TextMessage, StringToBytes(messageLog.Message)); err != nil {
      c.hub.errorHandler(messageLog, err)
      c.hub.pending.Add(int64(-1 * len(c.send)))
      return
    }
    c.hub.pending.Add(int64(-1))
  }
}

serveWs

serveWs 方法会处理 WebSocket 连接请求,然后将其注册到 Hub 中。

在连接的时候会对客户端进行认证,认证失败会断开连接。

最后会启动读写协程。

// serveWs 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
  // 升级为 WebSocket 连接
  conn, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    _, _ = w.Write([]byte(fmt.Sprintf("upgrade error: %s", err.Error())))
    return
  }
  // 认证失败的时候,返回错误信息,并断开连接
  uid, err := hub.authenticator.Authenticate(r)
  if err != nil {
    _ = conn.SetWriteDeadline(time.Now().Add(time.Second))
    _ = conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("authenticate error: %s", err.Error())))
    _ = conn.Close()
    return
  }
  // 注册 Client
  client := &Client{hub: hub, conn: conn, send: make(chan message.Log, bufferSize), uid: uid}
  client.conn.SetCloseHandler(closeHandler)
  // register 无缓冲,下面这一行会阻塞,直到 hub.run 中的 <-h.register 语句执行
  // 这样可以保证 register 成功之后才会启动读写协程
  client.hub.register <- client
  // 启动读写协程
  go client.writePump()
  go client.readPump()
}

send

send 是一个 http 接口,用于处理消息发送请求。

它会从 Hub 中获取 uid 对应的 Client,然后将消息发送给客户端。

// send 处理消息发送请求
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
  uid := r.FormValue("uid")
  if uid == "" {
    w.WriteHeader(http.StatusBadRequest)
    _, _ = w.Write([]byte("uid is required"))
    return
  }
  // 从 hub 中获取 uid 关联的 client
  hub.RLock()
  client, ok := hub.userClients[uid]
  hub.RUnlock()
  if !ok {
    w.WriteHeader(http.StatusBadRequest)
    _, _ = w.Write([]byte(fmt.Sprintf("client not found: %s", uid)))
    return
  }
  // 记录消息
  messageLog := message.Log{Uid: uid, Message: r.FormValue("message")}
  _ = hub.messageLogger.Log(messageLog)
  // 发送消息
  client.send <- messageLog
  // 增加等待发送的消息数量
  hub.pending.Add(int64(1))
}

github

完整代码可以在 github 上进行查看:https://github.com/eleven26/go-pusher


目录
相关文章
|
2月前
|
前端开发 JavaScript UED
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
通过在Django项目中集成Channels和WebSocket,我们能够为前后端分离的应用添加实时通信功能,实现诸如在线聊天、实时数据更新等交互式场景。这不仅增强了应用的功能性,也提升了用户体验。随着实时Web应用的日益普及,掌握Django Channels和WebSocket的集成将为开发者开启新的可能性,推动Web应用的发展迈向更高层次的实时性和交互性。
90 1
|
14天前
|
JavaScript 前端开发 测试技术
在 golang 中执行 javascript 代码的方案详解
本文介绍了在 Golang 中执行 JavaScript 代码的四种方法:使用 `otto` 和 `goja` 嵌入式 JavaScript 引擎、通过 `os/exec` 调用 Node.js 外部进程以及使用 WebView 嵌入浏览器。每种方法都有其适用场景,如嵌入简单脚本、运行复杂 Node.js 脚本或在桌面应用中显示 Web 内容。
50 15
在 golang 中执行 javascript 代码的方案详解
|
3月前
|
存储 JavaScript 前端开发
webSocket+Node+Js实现在线聊天(包含所有代码)
文章介绍了如何使用WebSocket、Node.js和JavaScript实现在线聊天功能,包括完整的前端和后端代码示例。
214 0
|
15天前
|
运维 监控 Cloud Native
一行代码都不改,Golang 应用链路指标日志全知道
本文将通过阿里云开源的 Golang Agent,帮助用户实现“一行代码都不改”就能获取到应用产生的各种观测数据,同时提升运维团队和研发团队的幸福感。
|
25天前
|
缓存 监控 前端开发
在 Go 语言中实现 WebSocket 实时通信的应用,包括 WebSocket 的简介、Go 语言的优势、基本实现步骤、应用案例、注意事项及性能优化策略,旨在帮助开发者构建高效稳定的实时通信系统
本文深入探讨了在 Go 语言中实现 WebSocket 实时通信的应用,包括 WebSocket 的简介、Go 语言的优势、基本实现步骤、应用案例、注意事项及性能优化策略,旨在帮助开发者构建高效稳定的实时通信系统。
74 1
|
2月前
|
JavaScript 前端开发 测试技术
前端全栈之路Deno篇(五):如何快速创建 WebSocket 服务端应用 + 客户端应用 - 可能是2025最佳的Websocket全栈实时应用框架
本文介绍了如何使用Deno 2.0快速构建WebSocket全栈应用,包括服务端和客户端的创建。通过一个简单的代码示例,展示了Deno在WebSocket实现中的便捷与强大,无需额外依赖,即可轻松搭建具备基本功能的WebSocket应用。Deno 2.0被认为是最佳的WebSocket全栈应用JS运行时,适合全栈开发者学习和使用。
130 7
|
1月前
|
Kubernetes Cloud Native JavaScript
为使用WebSocket构建的双向通信应用带来基于服务网格的全链路灰度
介绍如何使用为基于WebSocket的云原生应用构建全链路灰度方案。
|
3月前
|
算法 安全 测试技术
golang 栈数据结构的实现和应用
本文详细介绍了“栈”这一数据结构的特点,并用Golang实现栈。栈是一种FILO(First In Last Out,即先进后出或后进先出)的数据结构。文章展示了如何用slice和链表来实现栈,并通过golang benchmark测试了二者的性能差异。此外,还提供了几个使用栈结构解决的实际算法问题示例,如有效的括号匹配等。
golang 栈数据结构的实现和应用
|
3月前
|
JavaScript 前端开发 UED
WebSocket在Python Web开发中的革新应用:解锁实时通信的新可能
在快速发展的Web应用领域中,实时通信已成为许多现代应用不可或缺的功能。传统的HTTP请求/响应模式在处理实时数据时显得力不从心,而WebSocket技术的出现,为Python Web开发带来了革命性的变化,它允许服务器与客户端之间建立持久的连接,从而实现了数据的即时传输与交换。本文将通过问题解答的形式,深入探讨WebSocket在Python Web开发中的革新应用及其实现方法。
49 3
|
2月前
|
中间件 Go 数据处理
应用golang的管道-过滤器架构风格
【10月更文挑战第1天】本文介绍了一种面向数据流的软件架构设计模式——管道-过滤器(Pipe and Filter),并通过Go语言的Gin框架实现了一个Web应用示例。该模式通过将数据处理流程分解为一系列独立的组件(过滤器),并利用管道连接这些组件,实现了模块化、可扩展性和高效的分布式处理。文中详细讲解了Gin框架的基本使用、中间件的应用以及性能优化方法,展示了如何构建高性能的Web服务。
80 0