【Redis源码】发布与订阅(六)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 【Redis源码】发布与订阅(六)

(一)发布与订阅功能介绍:

官方文档:https://redis.io/topics/pubsub

Redis是一个快速稳定的发布/订阅消息传递系统。

发布与订阅命令:

命令 备注 命令原形
PSUBSCRIBE 订阅一个或多个给定匹配模式的频道 PSUBSCRIBE pattern [pattern …]
PUBLISH 往频道发布消息 PUBLISH channel message
PUBSUB 查看订阅与发布系统状态 PUBSUB subcommand [argument [argument …]]
PUNSUBSCRIBE 退订给定匹配模式的频道 PUNSUBSCRIBE pattern [pattern …]
SUBSCRIBE 用于创建订阅频道, 可创建一个或多个渠道。 SUBSCRIBE  channel [channel …]
UNSUBSCRIBE 退订给定的频道 UNSUBSCRIBE channel [channel …]

发布订阅与keys 空间无关。使其在任何级别(包括数据库编号)都不会收到干扰。

比如说在db10 上发布,然后由db 1上订阅者接收,是可以收到的。

如果你需要某种范围,请在通道名称上加上一个环境名称(如 test、online、dev等)去区分。

看如下图:

图中有有个接收频道订阅脚本:

窗口1)打开数据库12接收channel1和channel2频道

窗口2)打开数据库10接收channel1和channel2频道

窗口3)打开数据库9接收channel2频道

窗口4) 执行两个发布命令:

(1)发布channel1 频道一个1111消息,有2个client 接收到

(2)发布chennel2 频道一个1111消息,有3个client 接收到

(二)源码解析:

2.1 subscribe订阅相关

pubsub.c 中subscribe命令函数

voidsubscribeCommand(client *c) {
   int j;
   //j是从1开始的,其实就是挨个处理subscribe命令的参数,subscribe命令后面跟的是频道信息名称,
   //如 subscribe channel1 channel2 命令,subscribe对应的元素下标0,从1开始的就是频道名称
   for (j = 1; j < c->argc; j++)
       pubsubSubscribeChannel(c,c->argv[j]); //订阅每个频道
   c->flags |= CLIENT_PUBSUB;  //设置客户端状态
}

pubsub.c 中订阅订阅频道函数

intpubsubSubscribeChannel(client *c, robj *channel) {
   dictEntry *de;
   list *clients = NULL;
   int retval = 0;

   /* 把指定channel加入到client的pubsub_channels哈希表中,pubsub_channels时一个字典结构。
    不成功说明已经订阅了该频道 */

   if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
       retval = 1;
       incrRefCount(channel);  //该channel引用加1
       /*在server的pubsub_channels找对应的channel */
       de = dictFind(server.pubsub_channels,channel);
       if (de == NULL) {
           //创建一个list链表
           clients = listCreate();
         
           //server的pubsub_channels中添加channel,value则是该channel的订阅者
           dictAdd(server.pubsub_channels,channel,clients);
           
           incrRefCount(channel);   //该channel引用加1
       } else {
           //de非空时获取clients
           clients = dictGetVal(de);
       }
       //把client加入到该channel的订阅链表中
       listAddNodeTail(clients,c);
   }
   /* 客户端的通知操作 */
   addReply(c,shared.mbulkhdr[3]);
   addReply(c,shared.subscribebulk);
   addReplyBulk(c,channel);
   addReplyLongLong(c,clientSubscriptionsCount(c));
   return retval;
}

订阅其实是把制定channel加入到client和server的pubsub_channels哈希表中。

订阅的引用计数含义:

在subscribe,unsubscribe,psubscribe和punsubscribe 消息类型,最后一个参数是订阅还是活跃的计数。该数字实际上是客户端仍订阅的频道和模式的总数。因此,仅当由于取消订阅所有通道和模式而导致计数降至零时,客户端才会退出发布/订阅状态。

server.c 中 processCommand函数

intprocessCommand(client *c) {
   ...
   /* 当client端处于Pub/Sub上下文时,只接收ping命令 */
    if (c->flags & CLIENT_PUBSUB &&
       c->cmd->proc != pingCommand &&
       c->cmd->proc != subscribeCommand &&
       c->cmd->proc != unsubscribeCommand &&
       c->cmd->proc != psubscribeCommand &&
       c->cmd->proc != punsubscribeCommand) {
       addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
       return C_OK;
   }
   ...
}

该部分代码充分体现了c->flags状态在pub/sub时,只接收ping命令。这个也说明了subscribeCommand函数中为什么要设置客户端状态。

2.2 publish发布相关

pubsub.c 中publish发布命令

voidpublishCommand(client *c) {
   //发布逻辑
   int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
   
   //关于集群和aof逻辑
   if (server.cluster_enabled)
       clusterPropagatePublish(c->argv[1],c->argv[2]);
   else
       forceCommandPropagation(c,PROPAGATE_REPL);
   addReplyLongLong(c,receivers); //返回给client通知的订阅数
}

pubsub.c 中publish发布函数

intpubsubPublishMessage(robj *channel, robj *message) {
   int receivers = 0;
   dictEntry *de;
   listNode *ln;
   listIter li;

   /*取出server的pubsub_channels中订阅channel的clients */
   de = dictFind(server.pubsub_channels,channel);
   if (de) {
       list *list = dictGetVal(de); //获取client的链表
       listNode *ln;
       listIter li;

       listRewind(list,&li); //client链表迭代器
       
       //遍历所有的client并发送消息
       while ((ln = listNext(&li)) != NULL) {
           client *c = ln->value;

           addReply(c,shared.mbulkhdr[3]);
           addReply(c,shared.messagebulk);
           addReplyBulk(c,channel);
           addReplyBulk(c,message);
           receivers++;
       }
   }
   /* 开始模糊匹配逻辑,模糊模式使用的是链表而不是哈希表*/
   if (listLength(server.pubsub_patterns)) {
       listRewind(server.pubsub_patterns,&li); //创建模糊规则的迭代器
       channel = getDecodedObject(channel);
       //遍历所有模糊规则,如果匹配成功则发送消息
       while ((ln = listNext(&li)) != NULL) {
           pubsubPattern *pat = ln->value;
           //判断当前channel是否匹配模糊规则
           if (stringmatchlen((char*)pat->pattern->ptr,
                               sdslen(pat->pattern->ptr),
                               (char*)channel->ptr,
                               sdslen(channel->ptr),0)) {
               addReply(pat->client,shared.mbulkhdr[4]);
               addReply(pat->client,shared.pmessagebulk);
               addReplyBulk(pat->client,pat->pattern);
               addReplyBulk(pat->client,channel);
               addReplyBulk(pat->client,message);
               receivers++;
           }
       }
       decrRefCount(channel);
   }
   return receivers;
}

根据publish的函数我们可以得知,当我们发布消息时都会将普通规则和模糊规则发送相应消息,而模糊规则和普通规则的数据结构则不相同。

模糊规则使用的是链表,普通规则使用的是哈希表。

2.3 client端缓冲区数据显示

netorking.c 中handleClientsWithPendingWrites

/* 这个函数是在事件loop之前调用这个函数, 我们希望只需要将响应写入客户端输出缓冲区.
* 而不需要任何使用系统调用才添加可写事件处理程序。
* 其实这个函数的作用就是把数据回写到client端的缓冲区.
*/

inthandleClientsWithPendingWrites(void) {
   listIter li;
   listNode *ln;
   int processed = listLength(server.clients_pending_write); //获得当前链接的clients数

   listRewind(server.clients_pending_write,&li);  //创建clients的迭代器
   //迭代写入client
   while((ln = listNext(&li))) {
       client *c = listNodeValue(ln);
       c->flags &= ~CLIENT_PENDING_WRITE;
       listDelNode(server.clients_pending_write,ln);

       /* 尝试写入数据到client socket连接中. */
       if (writeToClient(c->fd,c,0) == C_ERR) continue;

       /*如果什么都没有了,就什么也不要做。否则添加写入事件。*/
       if (clientHasPendingReplies(c) &&
           aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
               sendReplyToClient, c) == AE_ERR)
       {
           freeClientAsync(c);
       }
   }
   return processed;
}

(三)总结:

  1. 发布/订阅与keys 空间无关,不受到空间影响,例如数据库ID。
    因为普通订阅存储在server.pubsub_channels中,模糊订阅存储在server.pubsub_patterns中。
    2.普通订阅的数据结构为哈希字典,模糊订阅的数据结构为链表。
    3.PUB/SUB状态下,只能接收ping命令。
    4.区分发布/订阅的环境需要从通道名称名称上去做,如:dev_channel、test_channel、online_channel。
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
NoSQL 安全 Unix
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(中)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
25 0
|
1月前
|
NoSQL API Redis
Redis源码(1)基本数据结构(上)
Redis源码(1)基本数据结构
35 2
|
1月前
|
存储 NoSQL 算法
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)(二)
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)
68 0
|
20天前
|
存储 缓存 NoSQL
Redis与数据库同步指南:订阅Binlog实现数据一致性
本文由开发者小米分享,探讨分布式系统中的一致性问题,尤其是数据库和Redis一致性。文章介绍了全量缓存策略的优势,如高效读取和稳定性,但也指出其一致性挑战。为解决此问题,提出了通过订阅数据库的Binlog实现数据同步的方法,详细解释了工作原理和步骤,并分析了优缺点。此外,还提到了异步校准方案作为补充,以进一步保证数据一致性。最后,提醒在实际线上环境中需注意日志记录、逐步优化和监控报警。
59 3
|
1月前
|
NoSQL 算法 Java
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
|
1月前
|
存储 NoSQL Redis
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群(下)
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群
236 1
|
1月前
|
监控 NoSQL Redis
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群(上)
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群
286 0
|
1月前
|
存储 NoSQL 调度
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(下)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
20 0
|
1月前
|
存储 NoSQL API
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(上)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
28 1
|
1月前
|
NoSQL API Redis
Redis源码、面试指南(3)数据对象类型编码(下)
Redis源码、面试指南(3)数据对象类型编码
21 1