异常信息
panic: concurrent write to websocket connection
原由
golang 编写 websocket
go版本:1.19
使用了第三方框架: https://github.com/gorilla/websocket/tree/main
代码
server.go
// Copyright 2015 The Gorilla WebSocket Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package main import ( "flag" "fmt" "html/template" "log" "net/http" "github.com/gorilla/websocket" ) var addr = flag.String("addr", "localhost:8080", "http service address") var upgrader = websocket.Upgrader{} // use default options func echo(w http.ResponseWriter, r *http.Request) { c, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Print("upgrade:", err) return } defer c.Close() for { mt, message, err := c.ReadMessage() if err != nil { log.Println("read:", err) break } fmt.Println(string(message)) fmt.Println(mt) //log.Printf("recv: %s, type: %s", message, websocket.FormatMessageType(mt)) err = c.WriteMessage(mt, message) if err != nil { log.Println("write:", err) break } } } func home(w http.ResponseWriter, r *http.Request) { homeTemplate.Execute(w, "ws://"+r.Host+"/echo") } func main() { flag.Parse() log.SetFlags(0) http.HandleFunc("/echo", echo) http.HandleFunc("/", home) log.Fatal(http.ListenAndServe(*addr, nil)) } var homeTemplate = template.Must(template.New("").Parse(` <!DOCTYPE html> <html> <head> <meta charset="utf-8"> <script> window.addEventListener("load", function(evt) { var output = document.getElementById("output"); var input = document.getElementById("input"); var ws; var print = function(message) { var d = document.createElement("div"); d.textContent = message; output.appendChild(d); output.scroll(0, output.scrollHeight); }; document.getElementById("open").onclick = function(evt) { if (ws) { return false; } ws = new WebSocket("{{.}}"); ws.onopen = function(evt) { print("OPEN"); } ws.onclose = function(evt) { print("CLOSE"); ws = null; } ws.onmessage = function(evt) { print("RESPONSE: " + evt.data); } ws.onerror = function(evt) { print("ERROR: " + evt.data); } return false; }; document.getElementById("send").onclick = function(evt) { if (!ws) { return false; } print("SEND: " + input.value); ws.send(input.value); return false; }; document.getElementById("close").onclick = function(evt) { if (!ws) { return false; } ws.close(); return false; }; }); </script> </head> <body> <table> <tr><td valign="top" width="50%"> <p>Click "Open" to create a connection to the server, "Send" to send a message to the server and "Close" to close the connection. You can change the message and send multiple times. <p> <form> <button id="open">Open</button> <button id="close">Close</button> <p><input id="input" type="text" value="Hello world!"> <button id="send">Send</button> </form> </td><td valign="top" width="50%"> <div id="output" style="max-height: 70vh;overflow-y: scroll;"></div> </td></tr></table> </body> </html> `))
client.go
// Copyright 2015 The Gorilla WebSocket Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package main import ( "flag" "fmt" "log" "net/url" "os" "os/signal" "time" "github.com/gorilla/websocket" ) var addr = flag.String("addr", "localhost:8080", "http service address") func main() { flag.Parse() log.SetFlags(0) interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"} log.Printf("connecting to %s", u.String()) c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { log.Fatal("dial:", err) } defer c.Close() done := make(chan struct{}) go func() { // 发送Ping帧,检查连接是否活跃 for { if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil { log.Println("Failed to send Ping: ", err) return } fmt.Println("Ping success") time.Sleep(10 * time.Second) } }() go func() { defer close(done) for { mt, message, err := c.ReadMessage() if err != nil { log.Println("read:", err) return } fmt.Println(mt) fmt.Println(string(message)) // 时间 //log.Printf("recv: %s, type: %s", message, websocket.FormatMessageType(mt)) } }() ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-done: return case t := <-ticker.C: err := c.WriteMessage(websocket.TextMessage, []byte(t.String())) if err != nil { log.Println("write:", err) return } case <-interrupt: log.Println("interrupt") // Cleanly close the connection by sending a close message and then // waiting (with timeout) for the server to close the connection. err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) if err != nil { log.Println("write close:", err) return } select { case <-done: case <-time.After(time.Second): } return } } }
错误点
我希望在连接过程中,通信双方一直检测,也就使用了 PING
,检测活跃。
go func() { // 发送Ping帧,检查连接是否活跃 for { if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil { log.Println("Failed to send Ping: ", err) return } fmt.Println("Ping success") time.Sleep(10 * time.Second) } }()
出现了开始的错误信息:panic: concurrent write to websocket connection,错误信息说:不能并发的给 socket 发消息。
错误 “concurrent write to websocket connection” 指的是有多个goroutine尝试同时向同一个
WebSocket连接写入数据,这是不被允许的。gorilla/websocket 库并不是为并发写操作设计的,因此你需要确保对每个WebSocket连接的写操作在任何时候只由一个goroutine执行。
解决这个问题的方法是使用同步机制,比如互斥锁(sync.Mutex),来同步对WebSocket连接的写操作。下面是一个修改后的示例,展示如何使用互斥锁来避免并发写的问题:
解决办法
在这个示例中,我们定义了一个WebSocketConnection结构体,它包含一个websocket.Conn和一个sync.Mutex。在发送Ping消息的goroutine中,我在写操作之前获取互斥锁,并在写操作完成后释放锁。这样可以确保在任何时候只有一个goroutine能够执行写操作。
请注意,如果还有其他goroutine需要写入WebSocket连接,它们也需要在执行写操作前获取互斥锁,并在完成后释放锁。这样可以避免并发写入的问题,并确保WebSocket连接的正确使用。
示例代码
package main import ( "log" "net/http" "sync" "github.com/gorilla/websocket" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } // 定义一个结构体来包含WebSocket连接和互斥锁 type WebSocketConnection struct { Conn *websocket.Conn Lock sync.Mutex } func handleConnections(ws *websocket.Conn) { defer ws.Close() log.Println("Connection established") // 创建WebSocketConnection实例 conn := &WebSocketConnection{ Conn: ws, Lock: sync.Mutex{}, } // Ping goroutine go func() { for { // 使用互斥锁来同步写操作 conn.Lock() if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil { log.Println("Failed to send Ping: ", err) return } conn.Unlock() time.Sleep(10 * time.Second) } }() // 消息处理goroutine go func() { // 这里可以处理接收到的消息等 // ... }() // 这里可以添加更多的goroutine来处理不同的任务 // ... } func main() { http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println("upgrade:", err) return } go handleConnections(ws) }) log.Fatal(http.ListenAndServe(":8080", nil)) }