Go WebSocket + Redis 实现轻量级的订阅和实时消息推送

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
全局流量管理 GTM,标准版 1个月
简介: Go WebSocket + Redis 实现轻量级的订阅和实时消息推送

上一篇介绍了Golang中封装WebSocket功能,让WebSocket更简单好用和稳定。


这里借助Redis自身的订阅和发布机制和WebSocket结合,实现轻量级的订阅发布和消息推送。本来消息订阅和推送打算用mqtt实现,但是这样还得有一个MqttBroker代理服务器,或采用网上开源的实现,或使用go语言自己实现个mqtt的broker。这都不够轻量级,这里介绍下借助redis的轻量级的实现。


大致框图如下:



涉及实时性和性能相关的服务可以直接在OnMessage里让客户端同后台业务直接交互实现。


关于提高性能的思考,首先看如果是高并发的情况下,瓶颈可能会在哪。


内部的两个redis客户端,一个负责发布,订阅,一个负责接收。当消息量大的情况下未必受用。那么首先负责发布的客户端,可考虑用redis的连接池实现。


消息的发布和订阅,固定为两个事件,一个是OnPublish,一个是OnSubcribe。并定义相关的报文结构如下:





收到的Publish事件,发布消息到Redis:


// 接收到发布消息事件
  c.On("Publish", func(msg string) {
    // 将消息打印到控制台
    fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg)
    pubMsg := websocket.PushMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &pubMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //发布消息到Redis
    websocket.Publish(pubMsg.Topic, pubMsg.Payload)
  })


收到的订阅事件,发布消息到Redis:


// 接收到订阅的事件
  c.On("Subscribe", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg)
    subMsg := websocket.SubMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &subMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //订阅到Redis
    sub.Subscribe(subMsg.Topic, subMsg.ID)
  })


开启一个Redis客户端,负责收到的消息:


func (c *Subscriber) Init(ws *Server) {
  conn := RedisClient.Get()
  c.client = redis.PubSubConn{conn}
  c.Ws = ws
  go func() {
    for {
      log.Println("redis wait...")
      switch res := c.client.Receive().(type) {
      case redis.Message:
        fmt.Printf("receive:%#v\n", res)
        topic := res.Channel
        message := string(res.Data)
        fnSubReceived(c.cbMap, topic, message)
      case redis.Subscription:
        fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
      case error:
        log.Println("error handle", res)
        if IsConnError(res) {
          conn, err := RedisClient.Dial()
          if err != nil {
            log.Printf("err=%s\n", err)
          }
          c.client = redis.PubSubConn{conn}
        }
        continue
      }
    }
  }()
}


附完整实现:


package websocket
import (
  "log"
)
//订阅的消息格式定义
type SubMsg struct {
  ID    string `json:"id"`   //请求ID
  Type  string `json:"type"` //订阅时固定为sub,取消订阅时固定为unsub
  Topic string `json:"topic"`
  Param string `json:"param"`
}
//平台推送消息定义
type PushMsg struct {
  ID      string `json:"id"`
  Type    string `json:"type"` //发布,类型为pub
  Topic   string `json:"topic"`
  Payload string `json:"payload"`
  Result  string `json:"result"`
}
func Publish(topic string, msg string) (interface{}, error) {
  resp, err := Redo("Publish", topic, msg)
  if err != nil {
    log.Println(err)
  }
  return resp, err
}


package websocket
import (
  "errors"
  "fmt"
  "io"
  "log"
  "strings"
  "sync"
  "time"
  //"unsafe"
  "github.com/gomodule/redigo/redis"
)
var (
  // RD redis全局client
  RedisClient *redis.Pool
)
// InitRedis 初始设置
func InitRedis(host string, auth string, db int) error {
  // 连接Redis
  RedisClient = &redis.Pool{
    MaxIdle:     3,
    MaxActive:   4000,
    IdleTimeout: 180 * time.Second,
    Dial: func() (redis.Conn, error) {
      c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
      if nil != err {
        return nil, err
      }
      return c, nil
    },
    TestOnBorrow: func(c redis.Conn, t time.Time) error {
      if time.Since(t) < time.Minute {
        return nil
      }
      _, err := c.Do("PING")
      return err
    },
  }
  rd := RedisClient.Get()
  defer rd.Close()
  c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
  defer c.Close()
  if err != nil {
    fmt.Println("Connect to redis error", err)
    return err
  }
  fmt.Println("Connect to redis ok")
  return nil
}
func IsConnError(err error) bool {
  var needNewConn bool
  if err == nil {
    return false
  }
  if err == io.EOF {
    needNewConn = true
  }
  if strings.Contains(err.Error(), "use of closed network connection") {
    needNewConn = true
  }
  if strings.Contains(err.Error(), "connect: connection refused") {
    needNewConn = true
  }
  if strings.Contains(err.Error(), "connection closed") {
    needNewConn = true
  }
  return needNewConn
}
// 在pool加入TestOnBorrow方法来去除扫描坏连接
func Redo(command string, opt ...interface{}) (interface{}, error) {
  if RedisClient == nil {
    return "", errors.New("error,redis client is null")
  }
  rd := RedisClient.Get()
  defer rd.Close()
  var conn redis.Conn
  var err error
  var maxretry = 3
  var needNewConn bool
  resp, err := rd.Do(command, opt...)
  needNewConn = IsConnError(err)
  if needNewConn == false {
    return resp, err
  } else {
    conn, err = RedisClient.Dial()
  }
  for index := 0; index < maxretry; index++ {
    if conn == nil && index+1 > maxretry {
      return resp, err
    }
    if conn == nil {
      conn, err = RedisClient.Dial()
    }
    if err != nil {
      continue
    }
    resp, err := conn.Do(command, opt...)
    needNewConn = IsConnError(err)
    if needNewConn == false {
      return resp, err
    } else {
      conn, err = RedisClient.Dial()
    }
  }
  conn.Close()
  return "", errors.New("redis error")
}
type SubscribeCallback func(topicMap sync.Map, topic, msg string)
type Subscriber struct {
  client   redis.PubSubConn
  Ws       *Server //websocket
  cbMap    sync.Map
  CallBack interface {
    OnReceive(SubscribeCallback)
  }
}
var fnSubReceived SubscribeCallback
func (c *Subscriber) OnReceive(cb SubscribeCallback) {
  fnSubReceived = cb
}
func (c *Subscriber) Init(ws *Server) {
  conn := RedisClient.Get()
  c.client = redis.PubSubConn{conn}
  c.Ws = ws
  go func() {
    for {
      log.Println("redis wait...")
      switch res := c.client.Receive().(type) {
      case redis.Message:
        fmt.Printf("receive:%#v\n", res)
        topic := res.Channel
        message := string(res.Data)
        fnSubReceived(c.cbMap, topic, message)
      case redis.Subscription:
        fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
      case error:
        log.Println("error handle", res)
        if IsConnError(res) {
          conn, err := RedisClient.Dial()
          if err != nil {
            log.Printf("err=%s\n", err)
          }
          c.client = redis.PubSubConn{conn}
        }
        continue
      }
    }
  }()
}
func (c *Subscriber) Close() {
  err := c.client.Close()
  if err != nil {
    log.Println("redis close error.")
  }
}
func (c *Subscriber) Subscribe(channel interface{}, clientid string) {
  err := c.client.Subscribe(channel)
  if err != nil {
    log.Println("redis Subscribe error.", err)
  }
  c.cbMap.Store(clientid, channel.(string))
}
func (c *Subscriber) PSubscribe(channel interface{}, clientid string) {
  err := c.client.PSubscribe(channel)
  if err != nil {
    log.Println("redis PSubscribe error.", err)
  }
  c.cbMap.Store(clientid, channel.(string))
}


package main
import (
  "fmt"
  "log"
  "sync"
  //"github.com/gin-contrib/cors"
  "encoding/json"
  "github.com/gin-gonic/gin"
  "net/http"
  "strings"
  "websockTest/websocket"
)
func main() {
  ws := websocket.New(websocket.Config{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
  })
  ws.OnConnection(handleConnection)
  // 初始化连接redis
  var sub websocket.Subscriber
  err := websocket.InitRedis("127.0.0.1:6379", "", 0)
  if err != nil {
    fmt.Printf("InitRedis error: %s\n", err)
  } else {
    sub.Init(ws)
    //redis client收到的消息分发到websocket
    sub.OnReceive(func(topicMap sync.Map, topic, msg string) {
      fmt.Printf("sub msg received,topic=%s,msg=%s\n", topic, msg)
      topicMap.Range(func(k, v interface{}) bool {
        fmt.Println("topicMap:", k, v)
        if v.(string) == topic {
          conn := sub.Ws.GetConnection(k.(string))
          if conn != nil {
            conn.Write(1, []byte(msg))
          }
        }
        return true
      })
    })
  }
  r := gin.Default()
  //允许跨域
  //config := cors.DefaultConfig()
  //config.AllowOrigins = []string{"http://127.0.0.1:9090"}
  //r.Use(Cors())
  //静态资源
  r.Static("/static", "./static")
  r.LoadHTMLGlob("views/*")
  r.GET("/ws", ws.Handler())
  r.GET("/api/v3/device", ws.Handler())
  r.GET("/test", func(c *gin.Context) {
    c.HTML(http.StatusOK, "test.html", gin.H{
      "title": "this is a test",
    })
  })
  r.Run(":9090")
}
func handleConnection(c websocket.Connection) {
  fmt.Println("client connected,id=", c.ID())
  c.Write(1, []byte("welcome client"))
  // 从浏览器中读取事件
  c.On("chat", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s chat sent: %s\n", c.Context().ClientIP(), msg)
    // 将消息写回客户端消息所有者:
    // c.Emit("chat", msg)
    c.To(websocket.All).Emit("chat", msg)
  })
  c.OnMessage(func(msg []byte) {
    fmt.Println("received msg:", string(msg))
    c.Write(1, []byte("hello aa"))
    c.To(websocket.All).Emit("chat", msg)
  })
  c.OnDisconnect(func() {
    fmt.Println("client Disconnect,id=", c.ID())
  })
  // 接收到发布消息事件
  c.On("Publish", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg)
    pubMsg := websocket.PushMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &pubMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //发布消息到Redis
    websocket.Publish(pubMsg.Topic, pubMsg.Payload)
  })
  // 接收到订阅的事件
  c.On("Subscribe", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg)
    subMsg := websocket.SubMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &subMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //订阅到Redis
    sub.Subscribe(subMsg.Topic, subMsg.ID)
  })
}


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
缓存 监控 前端开发
Go 语言中如何集成 WebSocket 与 Socket.IO,实现高效、灵活的实时通信
本文探讨了在 Go 语言中如何集成 WebSocket 与 Socket.IO,实现高效、灵活的实时通信。首先介绍了 WebSocket 和 Socket.IO 的基本概念及其优势,接着详细讲解了 Go 语言中 WebSocket 的实现方法,以及二者集成的重要意义和具体步骤。文章还讨论了集成过程中需要注意的问题,如协议兼容性、消息格式、并发处理等,并提供了实时聊天、数据监控和在线协作工具等应用案例,最后提出了性能优化策略,包括数据压缩、缓存策略和连接管理优化。旨在帮助开发者更好地理解并应用这些技术。
104 3
|
2月前
|
缓存 监控 前端开发
在 Go 语言中实现 WebSocket 实时通信的应用,包括 WebSocket 的简介、Go 语言的优势、基本实现步骤、应用案例、注意事项及性能优化策略,旨在帮助开发者构建高效稳定的实时通信系统
本文深入探讨了在 Go 语言中实现 WebSocket 实时通信的应用,包括 WebSocket 的简介、Go 语言的优势、基本实现步骤、应用案例、注意事项及性能优化策略,旨在帮助开发者构建高效稳定的实时通信系统。
131 1
|
2月前
|
消息中间件 NoSQL Redis
【赵渝强老师】Redis的消息发布与订阅
本文介绍了Redis实现消息队列的两种场景:发布者订阅者模式和生产者消费者模式。其中,发布者订阅者模式通过channel频道进行解耦,订阅者监听特定channel的消息,当发布者向该channel发送消息时,所有订阅者都能接收到消息。文章还提供了相关操作命令及示例代码,展示了如何使用Redis实现消息的发布与订阅。
|
3月前
|
SQL 分布式计算 NoSQL
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
35 2
|
3月前
|
负载均衡 网络协议 C#
C#实现WebSocket实时消息推送技术详解
C#实现WebSocket实时消息推送技术详解
175 1
|
4月前
|
NoSQL Go API
go语言操作Redis
go语言操作Redis
|
5月前
|
消息中间件 存储 NoSQL
redis实战——go-redis的使用与redis基础数据类型的使用场景(一)
本文档介绍了如何使用 Go 语言中的 `go-redis` 库操作 Redis 数据库
262 0
redis实战——go-redis的使用与redis基础数据类型的使用场景(一)
|
4月前
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。
|
5月前
|
NoSQL Go Redis
用 Go + Redis 实现分布式锁
用 Go + Redis 实现分布式锁
|
5月前
|
前端开发 Go 开发者
用 Go + WebSocket 快速实现一个 chat 服务
用 Go + WebSocket 快速实现一个 chat 服务