玩转 Go 生态|Hertz WebSocket 扩展简析

本文涉及的产品
文档翻译,文档翻译 1千页
文本翻译,文本翻译 100万字符
语种识别,语种识别 100万字符
简介: WebSocket 是一种可以在单个 TCP 连接上进行全双工通信,位于 OSI 模型的应用层。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

WebSocket 是一种可以在单个 TCP 连接上进行全双工通信,位于 OSI 模型的应用层。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。

Hertz 提供了 WebSocket 的支持,参考 gorilla/websocket 库使用 hijack 的方式在 Hertz 进行了适配,用法和参数基本保持一致。

安装

go get github.com/hertz-contrib/websocket

示例代码

package main
​
import (
    "context"
    "flag"
    "html/template"
    "log"
​
    "github.com/cloudwego/hertz/pkg/app"
    "github.com/cloudwego/hertz/pkg/app/server"
    "github.com/hertz-contrib/websocket"
)
​
var addr = flag.String("addr", "localhost:8080", "http service address")
​
var upgrader = websocket.HertzUpgrader{} // use default options
​
func echo(_ context.Context, c *app.RequestContext) {
    err := upgrader.Upgrade(c, func(conn *websocket.Conn) {
        for {
            mt, message, err := conn.ReadMessage()
            if err != nil {
                log.Println("read:", err)
                break
            }
            log.Printf("recv: %s", message)
            err = conn.WriteMessage(mt, message)
            if err != nil {
                log.Println("write:", err)
                break
            }
        }
    })
    if err != nil {
        log.Print("upgrade:", err)
        return
    }
}
​
func home(_ context.Context, c *app.RequestContext) {
    c.SetContentType("text/html; charset=utf-8")
    homeTemplate.Execute(c, "ws://"+string(c.Host())+"/echo")
}
​
func main() {
    flag.Parse()
    h := server.Default(server.WithHostPorts(*addr))
    // https://github.com/cloudwego/hertz/issues/121
    h.NoHijackConnPool = true
    h.GET("/", home)
    h.GET("/echo", echo)
    h.Spin()
}
​
var homeTemplate = template.Must(template.New("").Parse(`
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<script>  
window.addEventListener("load", function(evt) {
​
    var output = document.getElementById("output");
    var input = document.getElementById("input");
    var ws;
​
    var print = function(message) {
        var d = document.createElement("div");
        d.textContent = message;
        output.appendChild(d);
        output.scroll(0, output.scrollHeight);
    };
​
    document.getElementById("open").onclick = function(evt) {
        if (ws) {
            return false;
        }
        ws = new WebSocket("{{.}}");
        ws.onopen = function(evt) {
            print("OPEN");
        }
        ws.onclose = function(evt) {
            print("CLOSE");
            ws = null;
        }
        ws.onmessage = function(evt) {
            print("RESPONSE: " + evt.data);
        }
        ws.onerror = function(evt) {
            print("ERROR: " + evt.data);
        }
        return false;
    };
​
    document.getElementById("send").onclick = function(evt) {
        if (!ws) {
            return false;
        }
        print("SEND: " + input.value);
        ws.send(input.value);
        return false;
    };
​
    document.getElementById("close").onclick = function(evt) {
        if (!ws) {
            return false;
        }
        ws.close();
        return false;
    };
​
});
</script>
</head>
<body>
<table>
<tr><td valign="top" width="50%">
<p>Click "Open" to create a connection to the server, 
"Send" to send a message to the server and "Close" to close the connection. 
You can change the message and send multiple times.
<p>
<form>
<button id="open">Open</button>
<button id="close">Close</button>
<p><input id="input" type="text" value="Hello world!">
<button id="send">Send</button>
</form>
</td><td valign="top" width="50%">
<div id="output" style="max-height: 70vh;overflow-y: scroll;"></div>
</td></tr></table>
</body>
</html>
`))

运行 server:

go run server.go

上述示例代码中,服务器包括一个简单的网络客户端。要使用该客户端,在浏览器中打开 http://127.0.0.1:8080,并按照页面上的指示操作。

Upgrade

websocket.Conn 类型代表一个 WebSocket 连接。服务器应用程序从 HTTP 请求处理程序中调用 HertzUpgrader.Upgrade 方法,将 HTTP 协议的连接请求升级为 WebSocket 协议的连接请求。

这部分逻辑对应着示例代码的 echo() 函数,此处着重介绍 HertzUpgrader.Upgrade

函数签名:

func (u *HertzUpgrader) Upgrade(ctx *app.RequestContext, handler HertzHandler) error

内部处理逻辑:

func (u *HertzUpgrader) Upgrade(ctx *app.RequestContext, handler HertzHandler) error {
    if !ctx.IsGet() {
        return u.returnError(ctx, consts.StatusMethodNotAllowed, fmt.Sprintf("%s request method is not GET", badHandshake))
    }
    // 校验 requsetHeader 中与 websocket 相关的字段(此处省略部分逻辑代码)
​
    subprotocol := u.selectSubprotocol(ctx)
    compress := u.isCompressionEnable(ctx)
​
    ctx.SetStatusCode(consts.StatusSwitchingProtocols)
    // 构造协议升级后的响应头部信息
    ctx.Response.Header.Set("Upgrade", "websocket")
    ctx.Response.Header.Set("Connection", "Upgrade")
    ctx.Response.Header.Set("Sec-WebSocket-Accept", computeAcceptKeyBytes(challengeKey))
    // “无上下文接管”模式
    if compress {
        ctx.Response.Header.Set("Sec-WebSocket-Extensions", "permessage-deflate; server_no_context_takeover; client_no_context_takeover")
    }
    if subprotocol != nil {
        ctx.Response.Header.SetBytesV("Sec-WebSocket-Protocol", subprotocol)
    }
​
    // 通过 Hijack 的方式,实现 websocket 全双工的通信
    ctx.Hijack(func(netConn network.Conn) {
        writeBuf := poolWriteBuffer.Get().([]byte)
        c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize, u.WriteBufferPool, nil, writeBuf)
        if subprotocol != nil {
            c.subprotocol = b2s(subprotocol)
        }
​
        if compress {
            c.newCompressionWriter = compressNoContextTakeover
            c.newDecompressionReader = decompressNoContextTakeover
        }
​
        netConn.SetDeadline(time.Time{})
​
        handler(c)
​
        writeBuf = writeBuf[0:0]
        poolWriteBuffer.Put(writeBuf)
    })
​
    return nil
}

HertzHandler

HertzHandler 是上述 HertzUpgrader.Upgrade 函数的第二个参数。HertzHandler 在握手完成后接收一个 websocket 连接,通过劫持这个连接,完成全双工的通信。

HertzHandler 必须由用户提供,内部定义了 WebSocket 请求和响应的具体流程。

函数签名:

type HertzHandler func(*Conn)

上述 echo 服务器的 websocket 处理流程:

err := upgrader.Upgrade(c, func(conn *websocket.Conn) {
    for {
        // 读取客户端发送的信息
        mt, message, err := conn.ReadMessage()
        if err != nil {
            log.Println("read:", err)
            break
        }
        log.Printf("recv: %s", message)
        // 向客户端发送信息
        err = conn.WriteMessage(mt, message)
        if err != nil {
            log.Println("write:", err)
            break
        }
    }
})

配置

上述文档已经讲述了Hertz WebSocket 最核心的协议升级连接劫持的逻辑,下面将罗列 Hertz WebSocket 使用过程中可选的配置参数。

这部分将围绕 websocket.HertzUpgrader 结构展开说明。

参数 介绍
ReadBufferSize 用于设置输入缓冲区的大小,单位为字节。如果缓冲区大小为零,那么就使用 HTTP 服务器分配的大小。输入缓冲区大小并不限制可以接收的信息的大小。
WriteBufferSize 用于设置输出缓冲区的大小,单位为字节。如果缓冲区大小为零,那么就使用 HTTP 服务器分配的大小。输出缓冲区大小并不限制可以发送的信息的大小。
WriteBufferPool 用于设置写操作的缓冲池。
Subprotocols 用于按优先顺序设置服务器支持的协议。如果这个字段不是 nil,那么 Upgrade 方法通过选择这个列表中与客户端请求的协议的第一个匹配来协商一个子协议。如果没有匹配,那么就不协商协议(Sec-Websocket-Protocol 头不包括在握手响应中)。
Error 用于设置生成 HTTP 错误响应的函数。
CheckOrigin 用于设置针对请求的 Origin 头的校验函数, 如果请求的 Origin 头是可接受的,CheckOrigin 返回 true。
EnableCompression 用于设置服务器是否应该尝试协商每个消息的压缩(RFC 7692)。将此值设置为 true 并不能保证压缩会被支持。

WriteBufferPool

如果该值没有被设置,则额外初始化写缓冲区,并在当前生命周期内分配给该连接。当应用程序在大量的连接上有适度的写入量时,缓冲池是最有用的。

应用程序应该使用一个单一的缓冲池来为不同的连接分配缓冲区。

接口签名:

// BufferPool represents a pool of buffers. The *sync.Pool type satisfies this
// interface.  The type of the value stored in a pool is not specified.
type BufferPool interface {
    // Get gets a value from the pool or returns nil if the pool is empty.
    Get() interface{}
    // Put adds a value to the pool.
    Put(interface{})
}

示例代码:

type simpleBufferPool struct {
    v interface{}
}
​
func (p *simpleBufferPool) Get() interface{} {
    v := p.v
    p.v = nil
    return v
}
​
func (p *simpleBufferPool) Put(v interface{}) {
    p.v = v
}
​
var upgrader = websocket.HertzUpgrader{
    WriteBufferPool: &simpleBufferPool{},
}

Subprotocols

WebSocket 只是定义了一种交换任意消息的机制。这些消息是什么意思,客户端在任何特定的时间点可以期待什么样的消息,或者他们被允许发送什么样的消息,完全取决于实现应用程序。

所以你需要在服务器和客户端之间就这些事情达成协议。子协议参数只是让客户端和服务端正式地交换这些信息。你可以为你想要的任何协议编造任何名字。服务器可以简单地检查客户在握手过程中是否遵守了该协议。

Error

如果 Error 为 nil,则使用 Hertz 提供的 API 来生成 HTTP 错误响应。

函数签名:

func(ctx *app.RequestContext, status int, reason error)

示例代码:

var upgrader = websocket.HertzUpgrader{
    Error: func(ctx *app.RequestContext, status int, reason error) {
        ctx.Response.Header.Set("Sec-Websocket-Version", "13")
        ctx.AbortWithMsg(reason.Error(), status)
    },
}

CheckOrigin

如果 CheckOrigin 为nil,则使用一个安全的默认值:如果Origin请求头存在,并且源主机不等于请求主机头,则返回false。CheckOrigin 函数应该仔细验证请求的来源,以防止跨站请求伪造。

函数签名:

func(ctx *app.RequestContext) bool

默认实现:

func fastHTTPCheckSameOrigin(ctx *app.RequestContext) bool {
    origin := ctx.Request.Header.Peek("Origin")
    if len(origin) == 0 {
        return true
    }
    u, err := url.Parse(b2s(origin))
    if err != nil {
        return false
    }
    return equalASCIIFold(u.Host, b2s(ctx.Host()))
}

EnableCompression

服务端接受一个或者多个扩展字段,这些扩展字段是包含客户端请求的 Sec-WebSocket-Extensions 头字段扩展中的。当 EnableCompression 为 true 时,服务端根据当前自身支持的扩展与其进行匹配,如果匹配成功则支持压缩。

校验逻辑:

var strPermessageDeflate = []byte("permessage-deflate")
​
func (u *HertzUpgrader) isCompressionEnable(ctx *app.RequestContext) bool {
    extensions := parseDataHeader(ctx.Request.Header.Peek("Sec-WebSocket-Extensions"))
​
    // Negotiate PMCE
    if u.EnableCompression {
        for _, ext := range extensions {
            if bytes.HasPrefix(ext, strPermessageDeflate) {
                return true
            }
        }
    }
​
    return false
}

目前仅支持“无上下文接管”模式,详见上述 HertzUpgrader.Upgrade 代码部分。

Set Deadline

当使用 websocket 进行读写的时候,可以通过类似如下方式设置超时时间(在每次读写过程中都会生效)。

示例代码:

func echo(_ context.Context, c *app.RequestContext) {
    err := upgrader.Upgrade(c, func(conn *websocket.Conn) {
        defer conn.Close()
        // "github.com/cloudwego/hertz/pkg/network"
        conn.NetConn().(network.Conn).SetReadTimeout(1 * time.Second)
        ...
    })
    if err != nil {
        log.Print("upgrade:", err)
        return
    }
}

更多用法示例详见 examples

相关文章
|
6月前
|
搜索推荐 Go 开发者
Go模块与依赖管理:构建稳定、可维护的项目生态
【2月更文挑战第9天】Go模块是Go语言从1.11版本开始引入的一个新的依赖管理工具,它改变了以往通过GOPATH管理项目依赖的方式,为Go开发者带来了更加灵活、高效的依赖管理方式。本文将深入探讨Go模块与依赖管理的概念、使用方法和最佳实践,帮助读者更好地理解和应用Go模块,构建稳定、可维护的项目生态。
|
6月前
|
JSON Go 数据格式
从1开始,扩展Go语言后端业务系统的RPC功能
从1开始,扩展Go语言后端业务系统的RPC功能
90 0
|
6月前
|
存储 缓存 安全
Go 简单设计和实现可扩展、高性能的泛型本地缓存
本文将会探讨如何极简设计并实现一个可扩展、高性能的本地缓存。支持多样化的缓存策略,例如 最近最少使用(LRU)等。
104 0
Go 简单设计和实现可扩展、高性能的泛型本地缓存
|
6月前
|
网络协议 Java Go
【Go语言专栏】Go语言中的WebSocket实时通信应用
【4月更文挑战第30天】Go语言(Golang)是Google开发的编程语言,适用于云计算、微服务等领域。本文介绍了WebSocket,一种实现浏览器与服务器全双工通信的协议,其特点是实时性、全双工和轻量级。在Go中实现WebSocket,可以使用gorilla/websocket库。示例展示了如何创建服务器端和客户端,实现消息的收发。WebSocket广泛应用于聊天、游戏、通知推送和实时数据同步等场景。学习Go语言中的WebSocket对于开发实时通信应用至关重要。
235 0
|
3月前
|
缓存 JavaScript 前端开发
为开源项目 go-gin-api 增加 WebSocket 模块
为开源项目 go-gin-api 增加 WebSocket 模块
46 2
|
4月前
|
XML JSON Go
Swoole与Go系列教程之WebSocket服务的应用
在 WebSocket 协议出现之前,Web 应用为了能过获取到实时的数据都是通过不断轮询服务端的接口。轮询的效率、延时很低,并且很耗费资源。
1057 2
Swoole与Go系列教程之WebSocket服务的应用
|
3月前
|
前端开发 Go 开发者
用 Go + WebSocket 快速实现一个 chat 服务
用 Go + WebSocket 快速实现一个 chat 服务
|
3月前
|
网络协议 前端开发 Go
[go笔记]websocket入门
[go笔记]websocket入门
|
5月前
|
NoSQL 安全 Go
Go 语言 mongox 库:简化操作、安全、高效、可扩展、BSON 构建
go mongox 是一个基于泛型的库,扩展了 MongoDB 的官方库。通过泛型技术,它实现了结构体与 MongoDB 集合的绑定,旨在提供类型安全和简化的数据操作。 go mongox 还引入链式调用,让文档操作更流畅,并且提供了丰富的 BSON 构建器和内置函数,简化了 BSON 数据的构建。 此外,它还支持插件化编程和内置多种钩子函数,为数据库操作前后的自定义逻辑提供灵活性,增强了应用的可扩展性和可维护性。
98 6
|
Go API 数据库
Go接口 - 构建可扩展Go应用1
Go接口 - 构建可扩展Go应用
67 0
下一篇
无影云桌面