上一篇介绍了Golang中封装WebSocket功能,让WebSocket更简单好用和稳定。
这里借助Redis自身的订阅和发布机制和WebSocket结合,实现轻量级的订阅发布和消息推送。本来消息订阅和推送打算用mqtt实现,但是这样还得有一个MqttBroker代理服务器,或采用网上开源的实现,或使用go语言自己实现个mqtt的broker。这都不够轻量级,这里介绍下借助redis的轻量级的实现。
大致框图如下:
涉及实时性和性能相关的服务可以直接在OnMessage里让客户端同后台业务直接交互实现。
关于提高性能的思考,首先看如果是高并发的情况下,瓶颈可能会在哪。
内部的两个redis客户端,一个负责发布,订阅,一个负责接收。当消息量大的情况下未必受用。那么首先负责发布的客户端,可考虑用redis的连接池实现。
消息的发布和订阅,固定为两个事件,一个是OnPublish,一个是OnSubcribe。并定义相关的报文结构如下:
收到的Publish事件,发布消息到Redis:
// 接收到发布消息事件 c.On("Publish", func(msg string) { // 将消息打印到控制台 fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg) pubMsg := websocket.PushMsg{ID: c.ID()} err := json.Unmarshal([]byte(msg), &pubMsg) if err != nil { log.Printf("解析json串错误,err=", err) return } if pubMsg.Type != "pub" { log.Println("pub msg type error") return } //发布消息到Redis websocket.Publish(pubMsg.Topic, pubMsg.Payload) })
收到的订阅事件,发布消息到Redis:
// 接收到订阅的事件 c.On("Subscribe", func(msg string) { // 将消息打印到控制台,c .Context()是iris的http上下文。 fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg) subMsg := websocket.SubMsg{ID: c.ID()} err := json.Unmarshal([]byte(msg), &subMsg) if err != nil { log.Printf("解析json串错误,err=", err) return } if pubMsg.Type != "pub" { log.Println("pub msg type error") return } //订阅到Redis sub.Subscribe(subMsg.Topic, subMsg.ID) })
开启一个Redis客户端,负责收到的消息:
func (c *Subscriber) Init(ws *Server) { conn := RedisClient.Get() c.client = redis.PubSubConn{conn} c.Ws = ws go func() { for { log.Println("redis wait...") switch res := c.client.Receive().(type) { case redis.Message: fmt.Printf("receive:%#v\n", res) topic := res.Channel message := string(res.Data) fnSubReceived(c.cbMap, topic, message) case redis.Subscription: fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count) case error: log.Println("error handle", res) if IsConnError(res) { conn, err := RedisClient.Dial() if err != nil { log.Printf("err=%s\n", err) } c.client = redis.PubSubConn{conn} } continue } } }() }
附完整实现:
package websocket import ( "log" ) //订阅的消息格式定义 type SubMsg struct { ID string `json:"id"` //请求ID Type string `json:"type"` //订阅时固定为sub,取消订阅时固定为unsub Topic string `json:"topic"` Param string `json:"param"` } //平台推送消息定义 type PushMsg struct { ID string `json:"id"` Type string `json:"type"` //发布,类型为pub Topic string `json:"topic"` Payload string `json:"payload"` Result string `json:"result"` } func Publish(topic string, msg string) (interface{}, error) { resp, err := Redo("Publish", topic, msg) if err != nil { log.Println(err) } return resp, err }
package websocket import ( "errors" "fmt" "io" "log" "strings" "sync" "time" //"unsafe" "github.com/gomodule/redigo/redis" ) var ( // RD redis全局client RedisClient *redis.Pool ) // InitRedis 初始设置 func InitRedis(host string, auth string, db int) error { // 连接Redis RedisClient = &redis.Pool{ MaxIdle: 3, MaxActive: 4000, IdleTimeout: 180 * time.Second, Dial: func() (redis.Conn, error) { c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db)) if nil != err { return nil, err } return c, nil }, TestOnBorrow: func(c redis.Conn, t time.Time) error { if time.Since(t) < time.Minute { return nil } _, err := c.Do("PING") return err }, } rd := RedisClient.Get() defer rd.Close() c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db)) defer c.Close() if err != nil { fmt.Println("Connect to redis error", err) return err } fmt.Println("Connect to redis ok") return nil } func IsConnError(err error) bool { var needNewConn bool if err == nil { return false } if err == io.EOF { needNewConn = true } if strings.Contains(err.Error(), "use of closed network connection") { needNewConn = true } if strings.Contains(err.Error(), "connect: connection refused") { needNewConn = true } if strings.Contains(err.Error(), "connection closed") { needNewConn = true } return needNewConn } // 在pool加入TestOnBorrow方法来去除扫描坏连接 func Redo(command string, opt ...interface{}) (interface{}, error) { if RedisClient == nil { return "", errors.New("error,redis client is null") } rd := RedisClient.Get() defer rd.Close() var conn redis.Conn var err error var maxretry = 3 var needNewConn bool resp, err := rd.Do(command, opt...) needNewConn = IsConnError(err) if needNewConn == false { return resp, err } else { conn, err = RedisClient.Dial() } for index := 0; index < maxretry; index++ { if conn == nil && index+1 > maxretry { return resp, err } if conn == nil { conn, err = RedisClient.Dial() } if err != nil { continue } resp, err := conn.Do(command, opt...) needNewConn = IsConnError(err) if needNewConn == false { return resp, err } else { conn, err = RedisClient.Dial() } } conn.Close() return "", errors.New("redis error") } type SubscribeCallback func(topicMap sync.Map, topic, msg string) type Subscriber struct { client redis.PubSubConn Ws *Server //websocket cbMap sync.Map CallBack interface { OnReceive(SubscribeCallback) } } var fnSubReceived SubscribeCallback func (c *Subscriber) OnReceive(cb SubscribeCallback) { fnSubReceived = cb } func (c *Subscriber) Init(ws *Server) { conn := RedisClient.Get() c.client = redis.PubSubConn{conn} c.Ws = ws go func() { for { log.Println("redis wait...") switch res := c.client.Receive().(type) { case redis.Message: fmt.Printf("receive:%#v\n", res) topic := res.Channel message := string(res.Data) fnSubReceived(c.cbMap, topic, message) case redis.Subscription: fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count) case error: log.Println("error handle", res) if IsConnError(res) { conn, err := RedisClient.Dial() if err != nil { log.Printf("err=%s\n", err) } c.client = redis.PubSubConn{conn} } continue } } }() } func (c *Subscriber) Close() { err := c.client.Close() if err != nil { log.Println("redis close error.") } } func (c *Subscriber) Subscribe(channel interface{}, clientid string) { err := c.client.Subscribe(channel) if err != nil { log.Println("redis Subscribe error.", err) } c.cbMap.Store(clientid, channel.(string)) } func (c *Subscriber) PSubscribe(channel interface{}, clientid string) { err := c.client.PSubscribe(channel) if err != nil { log.Println("redis PSubscribe error.", err) } c.cbMap.Store(clientid, channel.(string)) }
package main import ( "fmt" "log" "sync" //"github.com/gin-contrib/cors" "encoding/json" "github.com/gin-gonic/gin" "net/http" "strings" "websockTest/websocket" ) func main() { ws := websocket.New(websocket.Config{ ReadBufferSize: 1024, WriteBufferSize: 1024, }) ws.OnConnection(handleConnection) // 初始化连接redis var sub websocket.Subscriber err := websocket.InitRedis("127.0.0.1:6379", "", 0) if err != nil { fmt.Printf("InitRedis error: %s\n", err) } else { sub.Init(ws) //redis client收到的消息分发到websocket sub.OnReceive(func(topicMap sync.Map, topic, msg string) { fmt.Printf("sub msg received,topic=%s,msg=%s\n", topic, msg) topicMap.Range(func(k, v interface{}) bool { fmt.Println("topicMap:", k, v) if v.(string) == topic { conn := sub.Ws.GetConnection(k.(string)) if conn != nil { conn.Write(1, []byte(msg)) } } return true }) }) } r := gin.Default() //允许跨域 //config := cors.DefaultConfig() //config.AllowOrigins = []string{"http://127.0.0.1:9090"} //r.Use(Cors()) //静态资源 r.Static("/static", "./static") r.LoadHTMLGlob("views/*") r.GET("/ws", ws.Handler()) r.GET("/api/v3/device", ws.Handler()) r.GET("/test", func(c *gin.Context) { c.HTML(http.StatusOK, "test.html", gin.H{ "title": "this is a test", }) }) r.Run(":9090") } func handleConnection(c websocket.Connection) { fmt.Println("client connected,id=", c.ID()) c.Write(1, []byte("welcome client")) // 从浏览器中读取事件 c.On("chat", func(msg string) { // 将消息打印到控制台,c .Context()是iris的http上下文。 fmt.Printf("%s chat sent: %s\n", c.Context().ClientIP(), msg) // 将消息写回客户端消息所有者: // c.Emit("chat", msg) c.To(websocket.All).Emit("chat", msg) }) c.OnMessage(func(msg []byte) { fmt.Println("received msg:", string(msg)) c.Write(1, []byte("hello aa")) c.To(websocket.All).Emit("chat", msg) }) c.OnDisconnect(func() { fmt.Println("client Disconnect,id=", c.ID()) }) // 接收到发布消息事件 c.On("Publish", func(msg string) { // 将消息打印到控制台,c .Context()是iris的http上下文。 fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg) pubMsg := websocket.PushMsg{ID: c.ID()} err := json.Unmarshal([]byte(msg), &pubMsg) if err != nil { log.Printf("解析json串错误,err=", err) return } if pubMsg.Type != "pub" { log.Println("pub msg type error") return } //发布消息到Redis websocket.Publish(pubMsg.Topic, pubMsg.Payload) }) // 接收到订阅的事件 c.On("Subscribe", func(msg string) { // 将消息打印到控制台,c .Context()是iris的http上下文。 fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg) subMsg := websocket.SubMsg{ID: c.ID()} err := json.Unmarshal([]byte(msg), &subMsg) if err != nil { log.Printf("解析json串错误,err=", err) return } if pubMsg.Type != "pub" { log.Println("pub msg type error") return } //订阅到Redis sub.Subscribe(subMsg.Topic, subMsg.ID) }) }