(一)发布与订阅功能介绍:
官方文档:https://redis.io/topics/pubsub
Redis是一个快速稳定的发布/订阅消息传递系统。
发布与订阅命令:
命令 | 备注 | 命令原形 |
PSUBSCRIBE | 订阅一个或多个给定匹配模式的频道 | PSUBSCRIBE pattern [pattern …] |
PUBLISH | 往频道发布消息 | PUBLISH channel message |
PUBSUB | 查看订阅与发布系统状态 | PUBSUB subcommand [argument [argument …]] |
PUNSUBSCRIBE | 退订给定匹配模式的频道 | PUNSUBSCRIBE pattern [pattern …] |
SUBSCRIBE | 用于创建订阅频道, 可创建一个或多个渠道。 | SUBSCRIBE channel [channel …] |
UNSUBSCRIBE | 退订给定的频道 | UNSUBSCRIBE channel [channel …] |
发布订阅与keys 空间无关。使其在任何级别(包括数据库编号)都不会收到干扰。
比如说在db10 上发布,然后由db 1上订阅者接收,是可以收到的。
如果你需要某种范围,请在通道名称上加上一个环境名称(如 test、online、dev等)去区分。
看如下图:
图中有有个接收频道订阅脚本:
窗口1)打开数据库12接收channel1和channel2频道
窗口2)打开数据库10接收channel1和channel2频道
窗口3)打开数据库9接收channel2频道
窗口4) 执行两个发布命令:
(1)发布channel1 频道一个1111消息,有2个client 接收到
(2)发布chennel2 频道一个1111消息,有3个client 接收到
(二)源码解析:
2.1 subscribe订阅相关
pubsub.c 中subscribe命令函数
voidsubscribeCommand(client *c) {
int j;
//j是从1开始的,其实就是挨个处理subscribe命令的参数,subscribe命令后面跟的是频道信息名称,
//如 subscribe channel1 channel2 命令,subscribe对应的元素下标0,从1开始的就是频道名称
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]); //订阅每个频道
c->flags |= CLIENT_PUBSUB; //设置客户端状态
}
pubsub.c 中订阅订阅频道函数
intpubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* 把指定channel加入到client的pubsub_channels哈希表中,pubsub_channels时一个字典结构。
不成功说明已经订阅了该频道 */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel); //该channel引用加1
/*在server的pubsub_channels找对应的channel */
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
//创建一个list链表
clients = listCreate();
//server的pubsub_channels中添加channel,value则是该channel的订阅者
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel); //该channel引用加1
} else {
//de非空时获取clients
clients = dictGetVal(de);
}
//把client加入到该channel的订阅链表中
listAddNodeTail(clients,c);
}
/* 客户端的通知操作 */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
订阅其实是把制定channel加入到client和server的pubsub_channels哈希表中。
订阅的引用计数含义:
在subscribe,unsubscribe,psubscribe和punsubscribe 消息类型,最后一个参数是订阅还是活跃的计数。该数字实际上是客户端仍订阅的频道和模式的总数。因此,仅当由于取消订阅所有通道和模式而导致计数降至零时,客户端才会退出发布/订阅状态。
server.c 中 processCommand函数
intprocessCommand(client *c) {
...
/* 当client端处于Pub/Sub上下文时,只接收ping命令 */
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
...
}
该部分代码充分体现了c->flags状态在pub/sub时,只接收ping命令。这个也说明了subscribeCommand函数中为什么要设置客户端状态。
2.2 publish发布相关
pubsub.c 中publish发布命令
voidpublishCommand(client *c) {
//发布逻辑
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
//关于集群和aof逻辑
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers); //返回给client通知的订阅数
}
pubsub.c 中publish发布函数
intpubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
/*取出server的pubsub_channels中订阅channel的clients */
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de); //获取client的链表
listNode *ln;
listIter li;
listRewind(list,&li); //client链表迭代器
//遍历所有的client并发送消息
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
/* 开始模糊匹配逻辑,模糊模式使用的是链表而不是哈希表*/
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li); //创建模糊规则的迭代器
channel = getDecodedObject(channel);
//遍历所有模糊规则,如果匹配成功则发送消息
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
//判断当前channel是否匹配模糊规则
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
根据publish的函数我们可以得知,当我们发布消息时都会将普通规则和模糊规则发送相应消息,而模糊规则和普通规则的数据结构则不相同。
模糊规则使用的是链表,普通规则使用的是哈希表。
2.3 client端缓冲区数据显示
netorking.c 中handleClientsWithPendingWrites
/* 这个函数是在事件loop之前调用这个函数, 我们希望只需要将响应写入客户端输出缓冲区.
* 而不需要任何使用系统调用才添加可写事件处理程序。
* 其实这个函数的作用就是把数据回写到client端的缓冲区.
*/
inthandleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write); //获得当前链接的clients数
listRewind(server.clients_pending_write,&li); //创建clients的迭代器
//迭代写入client
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
/* 尝试写入数据到client socket连接中. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/*如果什么都没有了,就什么也不要做。否则添加写入事件。*/
if (clientHasPendingReplies(c) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
return processed;
}
(三)总结:
- 发布/订阅与keys 空间无关,不受到空间影响,例如数据库ID。
因为普通订阅存储在server.pubsub_channels中,模糊订阅存储在server.pubsub_patterns中。
2.普通订阅的数据结构为哈希字典,模糊订阅的数据结构为链表。
3.PUB/SUB状态下,只能接收ping命令。
4.区分发布/订阅的环境需要从通道名称名称上去做,如:dev_channel、test_channel、online_channel。