把酒言欢话聊天,基于Vue3.0+Tornado6.1+Redis发布订阅(pubsub)模式打造异步非阻塞(aioredis)实时(websocket)通信聊天系统

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: “表达欲”是人类成长史上的强大“源动力”,恩格斯早就直截了当地指出,处在蒙昧时代即低级阶段的人类,“以果实、坚果、根作为食物;音节清晰的语言的产生是这一时期的主要成就”。而在网络时代人们的表达欲往往更容易被满足,因为有聊天软件的存在。通常意义上,聊天大抵都基于两种形式:群聊和单聊。群聊或者群组聊天我们可以理解为聊天室,可以有人数上限,而单聊则可以认为是上限为2个人的特殊聊天室。

“表达欲”是人类成长史上的强大“源动力”,恩格斯早就直截了当地指出,处在蒙昧时代即低级阶段的人类,“以果实、坚果、根作为食物;音节清晰的语言的产生是这一时期的主要成就”。而在网络时代人们的表达欲往往更容易被满足,因为有聊天软件的存在。通常意义上,聊天大抵都基于两种形式:群聊和单聊。群聊或者群组聊天我们可以理解为聊天室,可以有人数上限,而单聊则可以认为是上限为2个人的特殊聊天室。

为了开发高质量的聊天系统,开发者应该具备客户机和服务器如何通信的基本知识。在聊天系统中,客户端可以是移动应用程序(C端)或web应用程序(B端)。客户端之间不直接通信。相反,每个客户端都连接到一个聊天服务,该服务支撑双方通信的功能。所以该服务在业务上必须支持的最基本功能:

1.能够实时接收来自其他客户端的信息。

2.能够将每条信息实时推送给收件人。

当客户端打算启动聊天时,它会使用一个或多个网络协议连接聊天服务。对于聊天服务,网络协议的选择至关重要,这里,我们选择Tornado框架内置Websocket协议的接口,简单而又方便,安装tornado6.1

pip3 install tornado==6.1

随后编写程序启动文件main.py:

import tornado.httpserver  
import tornado.websocket  
  
import tornado.ioloop  
  
import tornado.web  
  
import redis  
  
import threading  
  
import asyncio  
  
# 用户列表  
users = []  
  
# websocket协议  
class WB(tornado.websocket.WebSocketHandler):  
  
  
    # 跨域支持  
    def check_origin(self,origin):  
  
        return True  
  
    # 开启链接  
    def open(self):  
  
                users.append(self)  
  
  
    # 接收消息  
    def on_message(self,message):  
  
        self.write_message(message['data'])  
  
    # 断开  
    def on_close(self):  
  
        users.remove(self)

# 建立torando实例  
  
app = tornado.web.Application(  
  
    [  
  
    (r'/wb/',WB)  
  
    ],debug=True  
  
)  
  
if __name__ == '__main__':  
  
  
    # 声明服务器  
    http_server_1 = tornado.httpserver.HTTPServer(app)  
  
    # 监听端口  
    http_server_1.listen(8000)  
  
    # 开启事件循环  
    tornado.ioloop.IOLoop.instance().start() 

如此,就在短时间搭建起了一套websocket协议服务,每一次有客户端发起websocket连接请求,我们都会将它添加到用户列表中,等待用户的推送或者接收信息的动作。

下面我们需要通过某种形式将消息的发送方和接收方联系起来,以达到“聊天”的目的,这里选择Redis的发布订阅模式(pubsub),以一个demo来实例说明,server.py

import redis  
  
r = redis.Redis()  
r.publish("test",'hello')

随后编写 client.py:

import redis  
r = redis.Redis()  
ps = r.pubsub()  
ps.subscribe('test')    
for item in ps.listen():   
    if item['type'] == 'message':  
        print(item['data'])

可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道(channel)发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。

频道不仅可以联系发布者和订阅者,同时,也可以利用频道进行“消息隔离”,即不同频道的消息只会给订阅该频道的用户进行推送:

根据发布者订阅者逻辑,改写main.py:

import tornado.httpserver  
import tornado.websocket  
  
import tornado.ioloop  
  
import tornado.web  
  
import redis  
  
import threading  
  
import asyncio  
  
# 用户列表  
users = []  
  
# 频道列表  
channels = ["channel_1","channel_2"]  
  
  
# websocket协议  
class WB(tornado.websocket.WebSocketHandler):  
  
  
    # 跨域支持  
    def check_origin(self,origin):  
  
        return True  
  
    # 开启链接  
    def open(self):  
  
  
        users.append(self)  
  
  
    # 接收消息  
    def on_message(self,message):  
  
        self.write_message(message['data'])  
  
    # 断开  
    def on_close(self):  
  
        users.remove(self)  
  
  
  
  
  
  
# 基于redis监听发布者发布消息  
def redis_listener(loop):  
  
    asyncio.set_event_loop(loop)  
  
    async def listen():   
  
        r = redis.Redis(decode_responses=True)  
  
        # 声明pubsb实例  
        ps = r.pubsub()  
  
        # 订阅聊天室频道  
  
        ps.subscribe(["channel_1","channel_2"])  
  
  
        # 监听消息  
        for message in ps.listen():  
  
            print(message)  
  
            # 遍历链接上的用户  
            for user in users:  
  
                print(user)  
  
                if message["type"] == "message" and message["channel"] == user.get_cookie("channel"):  
  
  
                    user.write_message(message["data"])  
  
    future = asyncio.gather(listen())  
    loop.run_until_complete(future)  
  
  
  
# 接口  发布信息  
class Msg(tornado.web.RequestHandler):  
  
  
    # 重写父类方法  
    def set_default_headers(self):  
  
        # 设置请求头信息  
        print("开始设置")  
        # 域名信息  
        self.set_header("Access-Control-Allow-Origin","*")  
        # 请求信息  
        self.set_header("Access-Control-Allow-Headers","x-requested-with")  
        # 请求方式  
        self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")  
  
      
  
    # 发布信息  
    async def post(self):  
  
        data = self.get_argument("data",None)  
  
        channel = self.get_argument("channel","channel_1")  
  
        print(data)  
  
        # 发布  
        r = redis.Redis()  
  
        r.publish(channel,data)  
  
        return self.write("ok")  
  
  
# 建立torando实例  
  
app = tornado.web.Application(  
  
    [  
  
    (r'/send/',Msg),  
    (r'/wb/',WB)  
  
    ],debug=True  
  
)  
  
if __name__ == '__main__':  
  
  
    loop = asyncio.new_event_loop()  
  
    # 单线程启动订阅者服务  
    threading.Thread(target=redis_listener,args=(loop,)).start()  
  
  
    # 声明服务器  
    http_server_1 = tornado.httpserver.HTTPServer(app)  
  
    # 监听端口  
    http_server_1.listen(8000)  
  
    # 开启事件循环  
    tornado.ioloop.IOLoop.instance().start()

这里假设默认有两个频道,逻辑是这样的:由前端控制websocket链接用户选择将消息发布到那个频道上,同时每个用户通过前端cookie的设置具备频道属性,当具备频道属性的用户对该频道发布了一条消息之后,所有其他具备该频道属性的用户通过redis进行订阅后主动推送刚刚发布的消息,而频道的推送只匹配订阅该频道的用户,达到消息隔离的目的。

需要注意的一点是,通过线程启动redis订阅服务时,需要将当前的loop实例传递给协程对象,否则在订阅方法内将会获取不到websocket实例,报这个错误:

IOLoop.current() doesn't work in non-main

这是因为Tornado底层基于事件循环ioloop,而同步框架模式的Django或者Flask则没有这个问题。

下面编写前端代码,这里我们使用时下最流行的vue3.0框架,编写chat.vue:

<template>  
  <div>  
  
  
            <h1>聊天窗口</h1>  
  
  
            <van-tabs v-model:active="active" @click="change_channel">  
  
              <van-tab title="客服1号">  
  
  
                <table>  
                
              <tr v-for="item,index in msglist" :key="index">  
                  
                {{ item }}  
  
              </tr>  
  
            </table>  
                  
  
  
              </van-tab>  
  
  
              <van-tab title="客服2号">  
                  
  
                <table>  
                
              <tr v-for="item,index in msglist" :key="index">  
                  
                {{ item }}  
  
              </tr>  
  
            </table>  
  
  
              </van-tab>  
  
            </van-tabs>  
  
  
              
  
  
            <van-field label="聊天信息" v-model="msg" />  
  
            <van-button color="gray" @click="commit">发送</van-button>  
  
     
  </div>  
</template>  
  
<script>  
  
export default {  
 data() {  
    return {  
      auditlist:[],  
  
      //聊天记录  
      msglist:[],  
      msg:"",  
       websock: null, //建立的连接  
      lockReconnect: false, //是否真正建立连接  
      timeout: 3 * 1000, //30秒一次心跳  
      timeoutObj: null, //外层心跳倒计时  
      serverTimeoutObj: null, //内层心跳检测  
      timeoutnum: null, //断开 重连倒计时  
      active:0,  
      channel:"channel_1"  
       
    }  
  },  
  methods:{  
  
  
    //切换频道  
    change_channel:function(){  
  
  
          if(this.active === 0){  
  
  
                this.channel = "channel_1";  
  
                var name = "channel";  
          var value = "channel_1";  
  
            
  
          }else{  
  
  
              this.channel = "channel_2";  
  
                var name = "channel";  
          var value = "channel_2";  
  
  
          }  
  
  
          //清空聊天记录  
          this.msglist = [];  
  
  
          var d = new Date();  
          d.setTime(d.getTime() + (24 * 60 * 60 * 1000));  
          var expires = "expires=" + d.toGMTString();  
          document.cookie = name + "=" + value + "; " + expires;  
  
  
          this.reconnect();  
  
  
    },  
     initWebSocket() {  
      //初始化weosocket  
      const wsuri = "ws://localhost:8000/wb/";  
      this.websock = new WebSocket(wsuri);  
      this.websock.onopen = this.websocketonopen;  
      this.websock.onmessage = this.websocketonmessage;  
      this.websock.onerror = this.websocketonerror;  
      this.websock.onclose = this.websocketclose;  
    },  
  
    reconnect() {  
      //重新连接  
      var that = this;  
      if (that.lockReconnect) {  
        // 是否真正建立连接  
        return;  
      }  
      that.lockReconnect = true;  
      //没连接上会一直重连,设置延迟避免请求过多  
      that.timeoutnum && clearTimeout(that.timeoutnum);  
      // 如果到了这里断开重连的倒计时还有值的话就清除掉  
      that.timeoutnum = setTimeout(function() {  
        //然后新连接  
        that.initWebSocket();  
        that.lockReconnect = false;  
      }, 5000);  
    },  
  
     reset() {  
      //重置心跳  
      var that = this;  
      //清除时间(清除内外两个心跳计时)  
      clearTimeout(that.timeoutObj);  
      clearTimeout(that.serverTimeoutObj);  
      //重启心跳  
      that.start();  
    },  
  
    start() {  
      //开启心跳  
      var self = this;  
      self.timeoutObj && clearTimeout(self.timeoutObj);  
      // 如果外层心跳倒计时存在的话,清除掉  
      self.serverTimeoutObj && clearTimeout(self.serverTimeoutObj);  
      // 如果内层心跳检测倒计时存在的话,清除掉  
      self.timeoutObj = setTimeout(function() {  
        // 重新赋值重新发送 进行心跳检测  
        //这里发送一个心跳,后端收到后,返回一个心跳消息,  
        if (self.websock.readyState == 1) {  
          //如果连接正常  
          // self.websock.send("heartCheck");  
        } else {  
          //否则重连  
          self.reconnect();  
        }  
        self.serverTimeoutObj = setTimeout(function() {  
          // 在三秒一次的心跳检测中如果某个值3秒没响应就关掉这次连接  
          //超时关闭  
         // self.websock.close();  
        }, self.timeout);  
      }, self.timeout);  
      // 3s一次  
    },  
  
    websocketonopen(e) {  
      //连接建立之后执行send方法发送数据  
      console.log("成功");  
  
     // this.websock.send("123");  
      // this.websocketsend(JSON.stringify(actions));  
    },  
    websocketonerror() {  
      //连接建立失败重连  
      console.log("失败");  
      this.initWebSocket();  
    },  
    websocketonmessage(e) {  
  
      console.log(e);  
      //数据接收  
      //const redata = JSON.parse(e.data);  
      const redata = e.data;  
  
      //累加  
      this.msglist.push(redata);  
  
      console.log(redata);  
  
       
    },  
    websocketsend(Data) {  
      //数据发送  
      this.websock.send(Data);  
    },  
    websocketclose(e) {  
      //关闭  
      this.reconnect()  
      console.log("断开连接", e);  
    },  
  
    //提交表单  
    commit:function(){  
  
  
        //发送请求  
  
        this.myaxios("http://localhost:8000/send/","post",{"data":this.msg,channel:this.channel}).then(data =>{  
  
          console.log(data);  
  
        });  
  
  
  
    },  
    
  
  },  
  
  mounted(){  
  
  
      //连接后端websocket服务  
      this.initWebSocket();  
  
  
  
      var d = new Date();  
          d.setTime(d.getTime() + (24 * 60 * 60 * 1000));  
          var expires = "expires=" + d.toGMTString();  
          document.cookie = "channel" + "=" + "channel_1" + "; " + expires;  
  
      
  
  }  
  
}  
</script>  
  
  
<style scoped>  
  @import url("../assets/style.css");  
  
  .chatbox{  
  
      color:black;  
  
  }  
  
  .mymsg{  
  
      background-color:green;  
  
  }  
  
  
</style>

这里前端在线客户端定期向状态服务器发送心跳事件。如果服务端在特定时间内(例如x秒)从客户端接收到心跳事件,则认为用户处于联机状态。否则,它将处于脱机状态,脱机后在阈值时间内可以进行重新连接的动作。同时利用vant框架的标签页可以同步切换频道,切换后将频道标识写入cookie,便于后端服务识别后匹配推送。

效果是这样的:

诚然,功能业已实现,但是如果我们处在一个高并发场景之下呢?试想一下如果一个频道有10万人同时在线,每秒有100条新消息,那么后台tornado的websocket服务推送频率是100w*10/s = 1000w/s 。

这样的系统架构如果不做负载均衡的话,很难抗住压力,那么瓶颈在哪里呢?没错,就是数据库redis,这里我们需要异步redis库aioredis的帮助:

pip3 install aioredis

aioredis通过协程异步操作redis读写,避免了io阻塞问题,使消息的发布和订阅操作非阻塞。

此时,可以新建一个异步订阅服务文件main\_with\_aioredis.py:

import asyncio  
import aioredis  
from tornado import web, websocket  
from tornado.ioloop import IOLoop  
import tornado.httpserver  
import async_timeout

之后主要的修改逻辑是,通过aioredis异步建立redis链接,并且异步订阅多个频道,随后通过原生协程的asyncio.create\_task方法(也可以使用asyncio.ensure\_future)注册订阅消费的异步任务reader:

async def setup():  
    r = await aioredis.from_url("redis://localhost", decode_responses=True)  
    pubsub = r.pubsub()  
  
    print(pubsub)  
    await pubsub.subscribe("channel_1","channel_2")  
  
    #asyncio.ensure_future(reader(pubsub))  
    asyncio.create_task(reader(pubsub))

在订阅消费方法中,异步监听所订阅频道中的发布信息,同时和之前的同步方法一样,比对用户的频道属性并且进行按频道推送:

async def reader(channel: aioredis.client.PubSub):  
    while True:  
        try:  
            async with async_timeout.timeout(1):  
                message = await channel.get_message(ignore_subscribe_messages=True)  
                if message is not None:  
                    print(f"(Reader) Message Received: {message}")  
  
                    for user in users:  
  
                        if user.get_cookie("channel") == message["channel"]:  
  
                            user.write_message(message["data"])  
          
                await asyncio.sleep(0.01)  
        except asyncio.TimeoutError:  
            pass

最后,利用tornado事件循环IOLoop传递中执行回调方法,将setup方法加入到事件回调中:

if __name__ == '__main__':  
  
    # 监听端口  
    application.listen(8000)  
  
    loop = IOLoop.current()  
    loop.add_callback(setup)  
    loop.start()

完整的异步消息发布、订阅、推送服务改造 main\_aioredis.py:

import asyncio  
import aioredis  
from tornado import web, websocket  
from tornado.ioloop import IOLoop  
import tornado.httpserver  
import async_timeout  
  
users = []  
  
# websocket协议  
class WB(tornado.websocket.WebSocketHandler):  
  
  
    # 跨域支持  
    def check_origin(self,origin):  
  
        return True  
  
    # 开启链接  
    def open(self):  
  
  
        users.append(self)  
  
  
    # 接收消息  
    def on_message(self,message):  
  
        self.write_message(message['data'])  
  
    # 断开  
    def on_close(self):  
  
        users.remove(self)  
  
  
class Msg(web.RequestHandler):  
  
  
    # 重写父类方法  
    def set_default_headers(self):  
  
        # 设置请求头信息  
        print("开始设置")  
        # 域名信息  
        self.set_header("Access-Control-Allow-Origin","*")  
        # 请求信息  
        self.set_header("Access-Control-Allow-Headers","x-requested-with")  
        # 请求方式  
        self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")  
  
  
    # 发布信息  
    async def post(self):  
  
        data = self.get_argument("data",None)  
  
        channel = self.get_argument("channel","channel_1")  
  
        print(data)  
  
        # 发布  
        r = await aioredis.from_url("redis://localhost", decode_responses=True)  
  
        await r.publish(channel,data)  
  
        return self.write("ok")  
  
  
async def reader(channel: aioredis.client.PubSub):  
    while True:  
        try:  
            async with async_timeout.timeout(1):  
                message = await channel.get_message(ignore_subscribe_messages=True)  
                if message is not None:  
                    print(f"(Reader) Message Received: {message}")  
  
                    for user in users:  
  
                        if user.get_cookie("channel") == message["channel"]:  
  
                            user.write_message(message["data"])  
          
                await asyncio.sleep(0.01)  
        except asyncio.TimeoutError:  
            pass  
  
  
async def setup():  
    r = await aioredis.from_url("redis://localhost", decode_responses=True)  
    pubsub = r.pubsub()  
  
    print(pubsub)  
    await pubsub.subscribe("channel_1","channel_2")  
  
    #asyncio.ensure_future(reader(pubsub))  
    asyncio.create_task(reader(pubsub))  
  
  
application = web.Application([  
    (r'/send/',Msg),  
    (r'/wb/', WB),  
],debug=True)      
  
  
if __name__ == '__main__':  
  
    # 监听端口  
    application.listen(8000)  
  
    loop = IOLoop.current()  
    loop.add_callback(setup)  
    loop.start()

从程序设计角度上讲,充分利用了协程的异步执行思想,更加地丝滑流畅。

结语:实践操作来看,Redis发布订阅模式,非常契合这种实时(websocket)通信聊天系统的场景,但是发布的消息如果没有对应的频道或者消费者,消息则会被丢弃,假如我们在生产环境在消费的时候,突然断网,导致其中一个订阅者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在,所以如果想要保证系统的健壮性,还需要其他服务来设计高可用的实时存储方案,不过那就是另外一个故事了,最后奉上项目地址,与众乡亲同飨:https://github.com/zcxey2911/tornado\_redis\_vue3\_chatroom

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
NoSQL 网络协议 Linux
Redis的实现一:c、c++的网络通信编程技术,先实现server和client的通信
本文介绍了使用C/C++进行网络通信编程的基础知识,包括创建socket、设置套接字选项、绑定地址、监听连接以及循环接受和处理客户端请求的基本步骤。
50 6
|
1月前
|
NoSQL Redis
Redis 发布订阅
10月更文挑战第18天
29 1
Redis 发布订阅
|
2月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
17天前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
38 6
|
15天前
|
消息中间件 NoSQL Redis
【赵渝强老师】Redis消息的生产者消费者模式
消息队列在Redis中可通过List数据结构实现,支持发布者订阅者和生产者消费者两种模式。生产者通过`lpush`向List添加消息,消费者通过`rpop`或`brpop`消费消息,后者支持阻塞等待。示例代码展示了如何使用Redis的生产者消费者模式。
|
1月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
65 4
|
1月前
|
SQL 分布式计算 NoSQL
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
25 2
|
1月前
|
存储 缓存 NoSQL
大数据-46 Redis 持久化 RDB AOF 配置参数 混合模式 具体原理 触发方式 优点与缺点
大数据-46 Redis 持久化 RDB AOF 配置参数 混合模式 具体原理 触发方式 优点与缺点
60 1
|
2月前
|
消息中间件 存储 NoSQL
18)Redis 的发布订阅模型
18)Redis 的发布订阅模型
33 0
|
3月前
|
存储 缓存 NoSQL
Redis深度解析:部署模式、数据类型、存储模型与实战问题解决
Redis深度解析:部署模式、数据类型、存储模型与实战问题解决
下一篇
无影云桌面