Go WebSocket + Redis 实现轻量级的订阅和实时消息推送

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: Go WebSocket + Redis 实现轻量级的订阅和实时消息推送

上一篇介绍了Golang中封装WebSocket功能,让WebSocket更简单好用和稳定。


这里借助Redis自身的订阅和发布机制和WebSocket结合,实现轻量级的订阅发布和消息推送。本来消息订阅和推送打算用mqtt实现,但是这样还得有一个MqttBroker代理服务器,或采用网上开源的实现,或使用go语言自己实现个mqtt的broker。这都不够轻量级,这里介绍下借助redis的轻量级的实现。


大致框图如下:



涉及实时性和性能相关的服务可以直接在OnMessage里让客户端同后台业务直接交互实现。


关于提高性能的思考,首先看如果是高并发的情况下,瓶颈可能会在哪。


内部的两个redis客户端,一个负责发布,订阅,一个负责接收。当消息量大的情况下未必受用。那么首先负责发布的客户端,可考虑用redis的连接池实现。


消息的发布和订阅,固定为两个事件,一个是OnPublish,一个是OnSubcribe。并定义相关的报文结构如下:





收到的Publish事件,发布消息到Redis:


// 接收到发布消息事件
  c.On("Publish", func(msg string) {
    // 将消息打印到控制台
    fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg)
    pubMsg := websocket.PushMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &pubMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //发布消息到Redis
    websocket.Publish(pubMsg.Topic, pubMsg.Payload)
  })


收到的订阅事件,发布消息到Redis:


// 接收到订阅的事件
  c.On("Subscribe", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg)
    subMsg := websocket.SubMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &subMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //订阅到Redis
    sub.Subscribe(subMsg.Topic, subMsg.ID)
  })


开启一个Redis客户端,负责收到的消息:


func (c *Subscriber) Init(ws *Server) {
  conn := RedisClient.Get()
  c.client = redis.PubSubConn{conn}
  c.Ws = ws
  go func() {
    for {
      log.Println("redis wait...")
      switch res := c.client.Receive().(type) {
      case redis.Message:
        fmt.Printf("receive:%#v\n", res)
        topic := res.Channel
        message := string(res.Data)
        fnSubReceived(c.cbMap, topic, message)
      case redis.Subscription:
        fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
      case error:
        log.Println("error handle", res)
        if IsConnError(res) {
          conn, err := RedisClient.Dial()
          if err != nil {
            log.Printf("err=%s\n", err)
          }
          c.client = redis.PubSubConn{conn}
        }
        continue
      }
    }
  }()
}


附完整实现:


package websocket
import (
  "log"
)
//订阅的消息格式定义
type SubMsg struct {
  ID    string `json:"id"`   //请求ID
  Type  string `json:"type"` //订阅时固定为sub,取消订阅时固定为unsub
  Topic string `json:"topic"`
  Param string `json:"param"`
}
//平台推送消息定义
type PushMsg struct {
  ID      string `json:"id"`
  Type    string `json:"type"` //发布,类型为pub
  Topic   string `json:"topic"`
  Payload string `json:"payload"`
  Result  string `json:"result"`
}
func Publish(topic string, msg string) (interface{}, error) {
  resp, err := Redo("Publish", topic, msg)
  if err != nil {
    log.Println(err)
  }
  return resp, err
}


package websocket
import (
  "errors"
  "fmt"
  "io"
  "log"
  "strings"
  "sync"
  "time"
  //"unsafe"
  "github.com/gomodule/redigo/redis"
)
var (
  // RD redis全局client
  RedisClient *redis.Pool
)
// InitRedis 初始设置
func InitRedis(host string, auth string, db int) error {
  // 连接Redis
  RedisClient = &redis.Pool{
    MaxIdle:     3,
    MaxActive:   4000,
    IdleTimeout: 180 * time.Second,
    Dial: func() (redis.Conn, error) {
      c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
      if nil != err {
        return nil, err
      }
      return c, nil
    },
    TestOnBorrow: func(c redis.Conn, t time.Time) error {
      if time.Since(t) < time.Minute {
        return nil
      }
      _, err := c.Do("PING")
      return err
    },
  }
  rd := RedisClient.Get()
  defer rd.Close()
  c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
  defer c.Close()
  if err != nil {
    fmt.Println("Connect to redis error", err)
    return err
  }
  fmt.Println("Connect to redis ok")
  return nil
}
func IsConnError(err error) bool {
  var needNewConn bool
  if err == nil {
    return false
  }
  if err == io.EOF {
    needNewConn = true
  }
  if strings.Contains(err.Error(), "use of closed network connection") {
    needNewConn = true
  }
  if strings.Contains(err.Error(), "connect: connection refused") {
    needNewConn = true
  }
  if strings.Contains(err.Error(), "connection closed") {
    needNewConn = true
  }
  return needNewConn
}
// 在pool加入TestOnBorrow方法来去除扫描坏连接
func Redo(command string, opt ...interface{}) (interface{}, error) {
  if RedisClient == nil {
    return "", errors.New("error,redis client is null")
  }
  rd := RedisClient.Get()
  defer rd.Close()
  var conn redis.Conn
  var err error
  var maxretry = 3
  var needNewConn bool
  resp, err := rd.Do(command, opt...)
  needNewConn = IsConnError(err)
  if needNewConn == false {
    return resp, err
  } else {
    conn, err = RedisClient.Dial()
  }
  for index := 0; index < maxretry; index++ {
    if conn == nil && index+1 > maxretry {
      return resp, err
    }
    if conn == nil {
      conn, err = RedisClient.Dial()
    }
    if err != nil {
      continue
    }
    resp, err := conn.Do(command, opt...)
    needNewConn = IsConnError(err)
    if needNewConn == false {
      return resp, err
    } else {
      conn, err = RedisClient.Dial()
    }
  }
  conn.Close()
  return "", errors.New("redis error")
}
type SubscribeCallback func(topicMap sync.Map, topic, msg string)
type Subscriber struct {
  client   redis.PubSubConn
  Ws       *Server //websocket
  cbMap    sync.Map
  CallBack interface {
    OnReceive(SubscribeCallback)
  }
}
var fnSubReceived SubscribeCallback
func (c *Subscriber) OnReceive(cb SubscribeCallback) {
  fnSubReceived = cb
}
func (c *Subscriber) Init(ws *Server) {
  conn := RedisClient.Get()
  c.client = redis.PubSubConn{conn}
  c.Ws = ws
  go func() {
    for {
      log.Println("redis wait...")
      switch res := c.client.Receive().(type) {
      case redis.Message:
        fmt.Printf("receive:%#v\n", res)
        topic := res.Channel
        message := string(res.Data)
        fnSubReceived(c.cbMap, topic, message)
      case redis.Subscription:
        fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
      case error:
        log.Println("error handle", res)
        if IsConnError(res) {
          conn, err := RedisClient.Dial()
          if err != nil {
            log.Printf("err=%s\n", err)
          }
          c.client = redis.PubSubConn{conn}
        }
        continue
      }
    }
  }()
}
func (c *Subscriber) Close() {
  err := c.client.Close()
  if err != nil {
    log.Println("redis close error.")
  }
}
func (c *Subscriber) Subscribe(channel interface{}, clientid string) {
  err := c.client.Subscribe(channel)
  if err != nil {
    log.Println("redis Subscribe error.", err)
  }
  c.cbMap.Store(clientid, channel.(string))
}
func (c *Subscriber) PSubscribe(channel interface{}, clientid string) {
  err := c.client.PSubscribe(channel)
  if err != nil {
    log.Println("redis PSubscribe error.", err)
  }
  c.cbMap.Store(clientid, channel.(string))
}


package main
import (
  "fmt"
  "log"
  "sync"
  //"github.com/gin-contrib/cors"
  "encoding/json"
  "github.com/gin-gonic/gin"
  "net/http"
  "strings"
  "websockTest/websocket"
)
func main() {
  ws := websocket.New(websocket.Config{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
  })
  ws.OnConnection(handleConnection)
  // 初始化连接redis
  var sub websocket.Subscriber
  err := websocket.InitRedis("127.0.0.1:6379", "", 0)
  if err != nil {
    fmt.Printf("InitRedis error: %s\n", err)
  } else {
    sub.Init(ws)
    //redis client收到的消息分发到websocket
    sub.OnReceive(func(topicMap sync.Map, topic, msg string) {
      fmt.Printf("sub msg received,topic=%s,msg=%s\n", topic, msg)
      topicMap.Range(func(k, v interface{}) bool {
        fmt.Println("topicMap:", k, v)
        if v.(string) == topic {
          conn := sub.Ws.GetConnection(k.(string))
          if conn != nil {
            conn.Write(1, []byte(msg))
          }
        }
        return true
      })
    })
  }
  r := gin.Default()
  //允许跨域
  //config := cors.DefaultConfig()
  //config.AllowOrigins = []string{"http://127.0.0.1:9090"}
  //r.Use(Cors())
  //静态资源
  r.Static("/static", "./static")
  r.LoadHTMLGlob("views/*")
  r.GET("/ws", ws.Handler())
  r.GET("/api/v3/device", ws.Handler())
  r.GET("/test", func(c *gin.Context) {
    c.HTML(http.StatusOK, "test.html", gin.H{
      "title": "this is a test",
    })
  })
  r.Run(":9090")
}
func handleConnection(c websocket.Connection) {
  fmt.Println("client connected,id=", c.ID())
  c.Write(1, []byte("welcome client"))
  // 从浏览器中读取事件
  c.On("chat", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s chat sent: %s\n", c.Context().ClientIP(), msg)
    // 将消息写回客户端消息所有者:
    // c.Emit("chat", msg)
    c.To(websocket.All).Emit("chat", msg)
  })
  c.OnMessage(func(msg []byte) {
    fmt.Println("received msg:", string(msg))
    c.Write(1, []byte("hello aa"))
    c.To(websocket.All).Emit("chat", msg)
  })
  c.OnDisconnect(func() {
    fmt.Println("client Disconnect,id=", c.ID())
  })
  // 接收到发布消息事件
  c.On("Publish", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg)
    pubMsg := websocket.PushMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &pubMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //发布消息到Redis
    websocket.Publish(pubMsg.Topic, pubMsg.Payload)
  })
  // 接收到订阅的事件
  c.On("Subscribe", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg)
    subMsg := websocket.SubMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &subMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //订阅到Redis
    sub.Subscribe(subMsg.Topic, subMsg.ID)
  })
}


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
8天前
|
NoSQL 安全 Unix
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(中)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
18 0
|
8天前
|
存储 缓存 NoSQL
【Go语言专栏】Go语言中的Redis操作与缓存应用
【4月更文挑战第30天】本文探讨了在Go语言中使用Redis进行操作和缓存应用的方法。文章介绍了Redis作为高性能键值存储系统,用于提升应用性能。推荐使用`go-redis/redis`库,示例代码展示了连接、设置、获取和删除键值对的基本操作。文章还详细阐述了缓存应用的步骤及常见缓存策略,包括缓存穿透、缓存击穿和缓存雪崩的解决方案。利用Redis和合适策略可有效优化应用性能。
|
8天前
|
网络协议 Java Go
【Go语言专栏】Go语言中的WebSocket实时通信应用
【4月更文挑战第30天】Go语言(Golang)是Google开发的编程语言,适用于云计算、微服务等领域。本文介绍了WebSocket,一种实现浏览器与服务器全双工通信的协议,其特点是实时性、全双工和轻量级。在Go中实现WebSocket,可以使用gorilla/websocket库。示例展示了如何创建服务器端和客户端,实现消息的收发。WebSocket广泛应用于聊天、游戏、通知推送和实时数据同步等场景。学习Go语言中的WebSocket对于开发实时通信应用至关重要。
|
8天前
|
存储 NoSQL 调度
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(下)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
13 0
|
8天前
|
存储 NoSQL API
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(上)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
20 1
|
8天前
|
NoSQL Shell Go
在go中简单使用go-redis库
在go中简单使用go-redis库
|
8天前
|
缓存 监控 前端开发
【Go 语言专栏】Go 语言中的 WebSocket 与 Socket.IO 集成
【4月更文挑战第30天】本文介绍了在 Go 语言中集成 WebSocket 与 Socket.IO 的相关技术,WebSocket 是一种高效的双向通信协议,Socket.IO 是一个实时通信库,提供丰富的事件处理。集成两者能实现更强大的实时通信功能。文章讨论了 Go 中 WebSocket 的实现,Socket.IO 与 WebSocket 的关系,集成的意义及步骤,并提醒注意协议兼容性、消息格式等问题。此外,还提到了性能优化策略和应用案例,如实时聊天、数据监控和在线协作工具。通过集成,开发者可以构建出满足多样化需求的实时通信应用。
|
8天前
|
缓存 监控 前端开发
【Go 语言专栏】Go 语言中的 WebSocket 实时通信应用
【4月更文挑战第30天】本文探讨了Go语言在WebSocket实时通信中的应用。WebSocket作为全双工通信协议,允许持续的双向通信。Go语言凭借其高效和并发特性,适合构建实时应用。文中概述了在Go中实现WebSocket的基本步骤,包括服务器和客户端的建立与通信,并列举了实时聊天、数据监控和在线协作等应用案例。同时,强调了消息格式、并发处理、错误处理和安全性的注意事项。通过数据压缩、缓存管理和连接管理等策略可优化性能。Go语言还能与数据库和前端框架结合,提升用户体验。总之,Go语言为WebSocket实时通信提供了强大支持,有望在更多领域发挥作用。
|
8天前
|
网络协议 安全 Go
【Go语言专栏】Go语言中的WebSocket编程
【4月更文挑战第30天】本文介绍了在Go语言中使用WebSocket进行实时Web应用开发的方法。通过第三方包`gorilla/websocket`,开发者可建立WebSocket服务器和客户端。文中展示了如何创建服务器,升级HTTP连接,以及处理读写消息的示例代码。同时,客户端的创建和通信过程也得以阐述。文章还提及WebSocket的生命周期管理、性能与安全性考虑,以及实践中的最佳做法。通过学习,读者将能运用Go语言构建高效、实时的Web应用。
|
8天前
|
NoSQL Redis
redis消息订阅
redis消息订阅