Go WebSocket + Redis 实现轻量级的订阅和实时消息推送

简介: Go WebSocket + Redis 实现轻量级的订阅和实时消息推送

上一篇介绍了Golang中封装WebSocket功能,让WebSocket更简单好用和稳定。


这里借助Redis自身的订阅和发布机制和WebSocket结合,实现轻量级的订阅发布和消息推送。本来消息订阅和推送打算用mqtt实现,但是这样还得有一个MqttBroker代理服务器,或采用网上开源的实现,或使用go语言自己实现个mqtt的broker。这都不够轻量级,这里介绍下借助redis的轻量级的实现。


大致框图如下:



涉及实时性和性能相关的服务可以直接在OnMessage里让客户端同后台业务直接交互实现。


关于提高性能的思考,首先看如果是高并发的情况下,瓶颈可能会在哪。


内部的两个redis客户端,一个负责发布,订阅,一个负责接收。当消息量大的情况下未必受用。那么首先负责发布的客户端,可考虑用redis的连接池实现。


消息的发布和订阅,固定为两个事件,一个是OnPublish,一个是OnSubcribe。并定义相关的报文结构如下:





收到的Publish事件,发布消息到Redis:


// 接收到发布消息事件
  c.On("Publish", func(msg string) {
    // 将消息打印到控制台
    fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg)
    pubMsg := websocket.PushMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &pubMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //发布消息到Redis
    websocket.Publish(pubMsg.Topic, pubMsg.Payload)
  })


收到的订阅事件,发布消息到Redis:


// 接收到订阅的事件
  c.On("Subscribe", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg)
    subMsg := websocket.SubMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &subMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //订阅到Redis
    sub.Subscribe(subMsg.Topic, subMsg.ID)
  })


开启一个Redis客户端,负责收到的消息:


func (c *Subscriber) Init(ws *Server) {
  conn := RedisClient.Get()
  c.client = redis.PubSubConn{conn}
  c.Ws = ws
  go func() {
    for {
      log.Println("redis wait...")
      switch res := c.client.Receive().(type) {
      case redis.Message:
        fmt.Printf("receive:%#v\n", res)
        topic := res.Channel
        message := string(res.Data)
        fnSubReceived(c.cbMap, topic, message)
      case redis.Subscription:
        fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
      case error:
        log.Println("error handle", res)
        if IsConnError(res) {
          conn, err := RedisClient.Dial()
          if err != nil {
            log.Printf("err=%s\n", err)
          }
          c.client = redis.PubSubConn{conn}
        }
        continue
      }
    }
  }()
}


附完整实现:


package websocket
import (
  "log"
)
//订阅的消息格式定义
type SubMsg struct {
  ID    string `json:"id"`   //请求ID
  Type  string `json:"type"` //订阅时固定为sub,取消订阅时固定为unsub
  Topic string `json:"topic"`
  Param string `json:"param"`
}
//平台推送消息定义
type PushMsg struct {
  ID      string `json:"id"`
  Type    string `json:"type"` //发布,类型为pub
  Topic   string `json:"topic"`
  Payload string `json:"payload"`
  Result  string `json:"result"`
}
func Publish(topic string, msg string) (interface{}, error) {
  resp, err := Redo("Publish", topic, msg)
  if err != nil {
    log.Println(err)
  }
  return resp, err
}


package websocket
import (
  "errors"
  "fmt"
  "io"
  "log"
  "strings"
  "sync"
  "time"
  //"unsafe"
  "github.com/gomodule/redigo/redis"
)
var (
  // RD redis全局client
  RedisClient *redis.Pool
)
// InitRedis 初始设置
func InitRedis(host string, auth string, db int) error {
  // 连接Redis
  RedisClient = &redis.Pool{
    MaxIdle:     3,
    MaxActive:   4000,
    IdleTimeout: 180 * time.Second,
    Dial: func() (redis.Conn, error) {
      c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
      if nil != err {
        return nil, err
      }
      return c, nil
    },
    TestOnBorrow: func(c redis.Conn, t time.Time) error {
      if time.Since(t) < time.Minute {
        return nil
      }
      _, err := c.Do("PING")
      return err
    },
  }
  rd := RedisClient.Get()
  defer rd.Close()
  c, err := redis.Dial("tcp", host, redis.DialPassword(auth), redis.DialDatabase(db))
  defer c.Close()
  if err != nil {
    fmt.Println("Connect to redis error", err)
    return err
  }
  fmt.Println("Connect to redis ok")
  return nil
}
func IsConnError(err error) bool {
  var needNewConn bool
  if err == nil {
    return false
  }
  if err == io.EOF {
    needNewConn = true
  }
  if strings.Contains(err.Error(), "use of closed network connection") {
    needNewConn = true
  }
  if strings.Contains(err.Error(), "connect: connection refused") {
    needNewConn = true
  }
  if strings.Contains(err.Error(), "connection closed") {
    needNewConn = true
  }
  return needNewConn
}
// 在pool加入TestOnBorrow方法来去除扫描坏连接
func Redo(command string, opt ...interface{}) (interface{}, error) {
  if RedisClient == nil {
    return "", errors.New("error,redis client is null")
  }
  rd := RedisClient.Get()
  defer rd.Close()
  var conn redis.Conn
  var err error
  var maxretry = 3
  var needNewConn bool
  resp, err := rd.Do(command, opt...)
  needNewConn = IsConnError(err)
  if needNewConn == false {
    return resp, err
  } else {
    conn, err = RedisClient.Dial()
  }
  for index := 0; index < maxretry; index++ {
    if conn == nil && index+1 > maxretry {
      return resp, err
    }
    if conn == nil {
      conn, err = RedisClient.Dial()
    }
    if err != nil {
      continue
    }
    resp, err := conn.Do(command, opt...)
    needNewConn = IsConnError(err)
    if needNewConn == false {
      return resp, err
    } else {
      conn, err = RedisClient.Dial()
    }
  }
  conn.Close()
  return "", errors.New("redis error")
}
type SubscribeCallback func(topicMap sync.Map, topic, msg string)
type Subscriber struct {
  client   redis.PubSubConn
  Ws       *Server //websocket
  cbMap    sync.Map
  CallBack interface {
    OnReceive(SubscribeCallback)
  }
}
var fnSubReceived SubscribeCallback
func (c *Subscriber) OnReceive(cb SubscribeCallback) {
  fnSubReceived = cb
}
func (c *Subscriber) Init(ws *Server) {
  conn := RedisClient.Get()
  c.client = redis.PubSubConn{conn}
  c.Ws = ws
  go func() {
    for {
      log.Println("redis wait...")
      switch res := c.client.Receive().(type) {
      case redis.Message:
        fmt.Printf("receive:%#v\n", res)
        topic := res.Channel
        message := string(res.Data)
        fnSubReceived(c.cbMap, topic, message)
      case redis.Subscription:
        fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
      case error:
        log.Println("error handle", res)
        if IsConnError(res) {
          conn, err := RedisClient.Dial()
          if err != nil {
            log.Printf("err=%s\n", err)
          }
          c.client = redis.PubSubConn{conn}
        }
        continue
      }
    }
  }()
}
func (c *Subscriber) Close() {
  err := c.client.Close()
  if err != nil {
    log.Println("redis close error.")
  }
}
func (c *Subscriber) Subscribe(channel interface{}, clientid string) {
  err := c.client.Subscribe(channel)
  if err != nil {
    log.Println("redis Subscribe error.", err)
  }
  c.cbMap.Store(clientid, channel.(string))
}
func (c *Subscriber) PSubscribe(channel interface{}, clientid string) {
  err := c.client.PSubscribe(channel)
  if err != nil {
    log.Println("redis PSubscribe error.", err)
  }
  c.cbMap.Store(clientid, channel.(string))
}


package main
import (
  "fmt"
  "log"
  "sync"
  //"github.com/gin-contrib/cors"
  "encoding/json"
  "github.com/gin-gonic/gin"
  "net/http"
  "strings"
  "websockTest/websocket"
)
func main() {
  ws := websocket.New(websocket.Config{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
  })
  ws.OnConnection(handleConnection)
  // 初始化连接redis
  var sub websocket.Subscriber
  err := websocket.InitRedis("127.0.0.1:6379", "", 0)
  if err != nil {
    fmt.Printf("InitRedis error: %s\n", err)
  } else {
    sub.Init(ws)
    //redis client收到的消息分发到websocket
    sub.OnReceive(func(topicMap sync.Map, topic, msg string) {
      fmt.Printf("sub msg received,topic=%s,msg=%s\n", topic, msg)
      topicMap.Range(func(k, v interface{}) bool {
        fmt.Println("topicMap:", k, v)
        if v.(string) == topic {
          conn := sub.Ws.GetConnection(k.(string))
          if conn != nil {
            conn.Write(1, []byte(msg))
          }
        }
        return true
      })
    })
  }
  r := gin.Default()
  //允许跨域
  //config := cors.DefaultConfig()
  //config.AllowOrigins = []string{"http://127.0.0.1:9090"}
  //r.Use(Cors())
  //静态资源
  r.Static("/static", "./static")
  r.LoadHTMLGlob("views/*")
  r.GET("/ws", ws.Handler())
  r.GET("/api/v3/device", ws.Handler())
  r.GET("/test", func(c *gin.Context) {
    c.HTML(http.StatusOK, "test.html", gin.H{
      "title": "this is a test",
    })
  })
  r.Run(":9090")
}
func handleConnection(c websocket.Connection) {
  fmt.Println("client connected,id=", c.ID())
  c.Write(1, []byte("welcome client"))
  // 从浏览器中读取事件
  c.On("chat", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s chat sent: %s\n", c.Context().ClientIP(), msg)
    // 将消息写回客户端消息所有者:
    // c.Emit("chat", msg)
    c.To(websocket.All).Emit("chat", msg)
  })
  c.OnMessage(func(msg []byte) {
    fmt.Println("received msg:", string(msg))
    c.Write(1, []byte("hello aa"))
    c.To(websocket.All).Emit("chat", msg)
  })
  c.OnDisconnect(func() {
    fmt.Println("client Disconnect,id=", c.ID())
  })
  // 接收到发布消息事件
  c.On("Publish", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s received publish: %s\n", c.Context().ClientIP(), msg)
    pubMsg := websocket.PushMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &pubMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //发布消息到Redis
    websocket.Publish(pubMsg.Topic, pubMsg.Payload)
  })
  // 接收到订阅的事件
  c.On("Subscribe", func(msg string) {
    // 将消息打印到控制台,c .Context()是iris的http上下文。
    fmt.Printf("%s received subscribe: %s\n", c.Context().ClientIP(), msg)
    subMsg := websocket.SubMsg{ID: c.ID()}
    err := json.Unmarshal([]byte(msg), &subMsg)
    if err != nil {
      log.Printf("解析json串错误,err=", err)
      return
    }
    if pubMsg.Type != "pub" {
      log.Println("pub msg type error")
      return
    }
    //订阅到Redis
    sub.Subscribe(subMsg.Topic, subMsg.ID)
  })
}


相关文章
|
5月前
|
消息中间件 缓存 NoSQL
Redis各类数据结构详细介绍及其在Go语言Gin框架下实践应用
这只是利用Go语言和Gin框架与Redis交互最基础部分展示;根据具体业务需求可能需要更复杂查询、事务处理或订阅发布功能实现更多高级特性应用场景。
347 86
|
5月前
|
存储 前端开发 JavaScript
Go语言实战案例-项目实战篇:编写一个轻量级在线聊天室
本文介绍如何用Go语言从零实现一个轻量级在线聊天室,基于WebSocket实现实时通信,支持多人消息广播。涵盖前后端开发、技术选型与功能扩展,助你掌握Go高并发与实时通信核心技术。
|
7月前
|
测试技术 Go 开发者
go-carbon v2.6.10发布,轻量级、语义化、对开发者友好的 golang 时间处理库
Carbon 是一个轻量级、语义化的 Golang 时间处理库,支持时间穿越、差值计算、极值判断、星座、农历、儒略日、波斯历等特性。现由开源组织 dromara 维护,获 Gitee 2024 GVP 与 Gitcode G-Star 认可。优化了时间冻结方法、位运算替代条件判断,并新增 Sleep 方法、韩语文档及多项常量,提升性能与易用性。官网与源码详见:carbon.go-pkg.com,GitHub、Gitee、Gitcode 均可访问。
97 2
|
10月前
|
数据采集 监控 Go
用 Go 实现一个轻量级并发任务调度器(支持限速)
本文介绍了如何用 Go 实现一个轻量级的并发任务调度器,解决日常开发中批量任务处理的需求。调度器支持最大并发数控制、速率限制、失败重试及结果收集等功能。通过示例代码展示了其使用方法,并分析了核心组件设计,包括任务(Task)和调度器(Scheduler)。该工具适用于网络爬虫、批量请求等场景。文章最后总结了 Go 并发模型的优势,并提出了扩展功能的方向,如失败回调、超时控制等,欢迎读者交流改进。
419 25
|
11月前
|
Go 开发者
go-carbon v2.6.0 重大版本更新,轻量级、语义化、对开发者友好的 golang 时间处理库
carbon 是一个轻量级、语义化、对开发者友好的 Golang 时间处理库,提供了对时间穿越、时间差值、时间极值、时间判断、星座、星座、农历、儒略日 / 简化儒略日、波斯历 / 伊朗历的支持
234 3
|
12月前
|
消息中间件 XML 前端开发
springBoot集成websocket实时消息推送
本文介绍了如何在Spring Boot项目中集成WebSocket实现实时消息推送。首先,通过引入`spring-boot-starter-websocket`依赖,配置`WebSocketConfig`类来启用WebSocket支持。接着,创建`WebSocketTest`服务器类,处理连接、消息收发及错误等事件,并使用`ConcurrentHashMap`管理用户连接。最后,前端通过JavaScript建立WebSocket连接,监听消息并进行相应处理。此方案适用于需要实时通信的应用场景,如聊天室、通知系统等。
2076 2
|
JSON Go 开发者
go-carbon v2.5.0 发布,轻量级、语义化、对开发者友好的 golang 时间处理库
carbon 是一个轻量级、语义化、对开发者友好的 Golang 时间处理库,提供了对时间穿越、时间差值、时间极值、时间判断、星座、星座、农历、儒略日 / 简化儒略日、波斯历 / 伊朗历的支持。
298 4
|
负载均衡 网络协议 C#
C#实现WebSocket实时消息推送技术详解
C#实现WebSocket实时消息推送技术详解
1071 1
|
NoSQL Go API
go语言操作Redis
go语言操作Redis
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。
392 1