命令执行阶段:processCommand 函数
首先,我们要知道,processCommand 函数是在server.c文件中实现的。它在实际执行命令前的主要逻辑可以分成三步:
- 第一步,processCommand 函数会调用 moduleCallCommandFilters 函数(在module.c文件),将 Redis 命令替换成 module 中想要替换的命令。
- 第二步,processCommand 函数会判断当前命令是否为 quit 命令,并进行相应处理。
- 第三步,processCommand 函数会调用 lookupCommand 函数,在全局变量 server 的 commands 成员变量中查找相关的命令。
这里,你需要注意下,全局变量 server 的 commands 成员变量是一个哈希表,它的定义是在server.h文件中的 redisServer 结构体里面,如下所示:
struct redisServer { ... dict *commands; ... }
另外,commands 成员变量的初始化是在 initServerConfig 函数中,通过调用 dictCreate 函数完成哈希表创建,再通过调用 populateCommandTable 函数,将 Redis 提供的命令名称和对应的实现函数,插入到哈希表中的。
void initServerConfig(void) { ... server.commands = dictCreate(&commandTableDictType,NULL); ... populateCommandTable(); ... }
而这其中的 populateCommandTable 函数,实际上是使用到了 redisCommand 结构体数组 redisCommandTable。
redisCommandTable 数组是在 server.c 文件中定义的,它的每一个元素是一个 redisCommand 结构体类型的记录,对应了 Redis 实现的一条命令。也就是说,redisCommand 结构体中就记录了当前命令所对应的实现函数是什么。
比如,以下代码展示了 GET 和 SET 这两条命令的信息,它们各自的实现函数分别是 getCommand 和 setCommand。当然,如果你想进一步了解 redisCommand 结构体,也可以去看下它的定义,在 server.h 文件当中。
server.c文件中查看
struct redisCommand redisCommandTable[] = { ... {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, ... }
server.h文件中查看
struct redisCommand { char *name; redisCommandProc *proc; int arity; char *sflags; /* Flags as string representation, one char per flag. */ uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */ /* Use a function to determine keys arguments in a command line. * Used for Redis Cluster redirect. */ redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ int lastkey; /* The last argument that's a key */ int keystep; /* The step between first and last key */ long long microseconds, calls, rejected_calls, failed_calls; int id; /* Command ID. This is a progressive ID starting from 0 that is assigned at runtime, and is used in order to check ACLs. A connection is able to execute a given command if the user associated to the connection has this command bit set in the bitmap of allowed commands. */ };
好了,到这里,你就了解了 lookupCommand 函数会根据解析的命令名称,在 commands 对应的哈希表中查找相应的命令。
那么,一旦查到对应命令后,processCommand 函数就会进行多种检查,比如命令的参数是否有效、发送命令的用户是否进行过验证、当前内存的使用情况,等等。这部分的处理逻辑比较多,你可以进一步阅读 processCommand 函数来了解下。
这样,等到 processCommand 函数对命令做完各种检查后,它就开始执行命令了。它会判断当前客户端是否有 CLIENT_MULTI 标记,如果有的话,就表明要处理的是 Redis 事务的相关命令,所以它会按照事务的要求,调用 queueMultiCommand 函数将命令入队保存,等待后续一起处理。
而如果没有,processCommand 函数就会调用 call 函数来实际执行命令了。以下代码展示了这部分的逻辑,你可以看下。
/* Exec the command */ // 如果客户端有CLIENT_MULTI标记,并且当前不是exec、discard、multi和watch命令 if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand && c->cmd->proc != resetCommand) { // 将命令入队保存,等待后续一起处理 queueMultiCommand(c); addReply(c,shared.queued); } else { // 调用call函数执行命令 call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); }
这里你要知道,call 函数是在 server.c 文件中实现的,它执行命令是通过调用命令本身,即 redisCommand 结构体中定义的函数指针来完成的。而就像我刚才所说的,每个 redisCommand 结构体中都定义了它对应的实现函数,在 redisCommandTable 数组中能查找到。
因为分布式锁的加锁操作就是使用 SET 命令来实现的,所以这里,我就以 SET 命令为例来介绍下它的实际执行过程。
SET 命令对应的实现函数是 setCommand,这是在t_string.c文件中定义的。setCommand 函数首先会对命令参数进行判断,比如参数是否带有 NX、EX、XX、PX 等这类命令选项,如果有的话,setCommand 函数就会记录下这些标记。
t_string.c文件中查看
/* SET key value [NX] [XX] [KEEPTTL] [GET] [EX <seconds>] [PX <milliseconds>] * [EXAT <seconds-timestamp>][PXAT <milliseconds-timestamp>] */ void setCommand(client *c) { robj *expire = NULL; int unit = UNIT_SECONDS; int flags = OBJ_NO_FLAGS; if (parseExtendedStringArgumentsOrReply(c,&flags,&unit,&expire,COMMAND_SET) != C_OK) { return; } c->argv[2] = tryObjectEncoding(c->argv[2]); setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); }
然后,setCommand 函数会调用 setGenericCommand 函数,这个函数也是在 t_string.c 文件中实现的。setGenericCommand 函数会根据刚才 setCommand 函数记录的命令参数的标记,来进行相应处理。比如,如果命令参数中有 NX 选项,那么,setGenericCommand 函数会调用 lookupKeyWrite 函数(在db.c文件中),查找要执行 SET 命令的 key 是否已经存在。
如果这个 key 已经存在了,那么 setGenericCommand 函数就会调用 addReply 函数,返回 NULL 空值,而这也正是符合分布式锁的语义的。
t_string.c文件中查看
setGenericCommand源码如下:
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0, when = 0; /* initialized to avoid any harmness warning */ // 如果设置了过期时间 if (expire) { if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return; // 如果设置的时间无效,打印信息提示 if (milliseconds <= 0 || (unit == UNIT_SECONDS && milliseconds > LLONG_MAX / 1000)) { /* Negative value provided or multiplication is gonna overflow. */ addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name); return; } // 如果设置的过期时间单位是秒 if (unit == UNIT_SECONDS) milliseconds *= 1000; when = milliseconds; if ((flags & OBJ_PX) || (flags & OBJ_EX)) when += mstime(); if (when <= 0) { /* Overflow detected. */ addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name); return; } } // 如果有NX选项,那么查找key是否已经存在 if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.null[c->resp]); return; } if (flags & OBJ_SET_GET) { if (getGenericCommand(c) == C_ERR) return; } genericSetKey(c,c->db,key, val,flags & OBJ_KEEPTTL,1); server.dirty++; notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) { setExpire(c,c->db,key,when); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); /* Propagate as SET Key Value PXAT millisecond-timestamp if there is EXAT/PXAT or * propagate as SET Key Value PX millisecond if there is EX/PX flag. * * Additionally when we propagate the SET with PX (relative millisecond) we translate * it again to SET with PXAT for the AOF. * * Additional care is required while modifying the argument order. AOF relies on the * exp argument being at index 3. (see feedAppendOnlyFile) * */ robj *exp = (flags & OBJ_PXAT) || (flags & OBJ_EXAT) ? shared.pxat : shared.px; robj *millisecondObj = createStringObjectFromLongLong(milliseconds); rewriteClientCommandVector(c,5,shared.set,key,val,exp,millisecondObj); decrRefCount(millisecondObj); } if (!(flags & OBJ_SET_GET)) { addReply(c, ok_reply ? ok_reply : shared.ok); } /* Propagate without the GET argument (Isn't needed if we had expire since in that case we completely re-written the command argv) */ if ((flags & OBJ_SET_GET) && !expire) { int argc = 0; int j; robj **argv = zmalloc((c->argc-1)*sizeof(robj*)); for (j=0; j < c->argc; j++) { char *a = c->argv[j]->ptr; /* Skip GET which may be repeated multiple times. */ if (j >= 3 && (a[0] == 'g' || a[0] == 'G') && (a[1] == 'e' || a[1] == 'E') && (a[2] == 't' || a[2] == 'T') && a[3] == '\0') continue; argv[argc++] = c->argv[j]; incrRefCount(c->argv[j]); } replaceClientCommandVector(c, argc, argv); } }
下面的代码就展示了这个执行逻辑,你可以看下。
// 如果有NX选项,那么查找key是否已经存在 if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.null[c->resp]); return; }
好,那么如果 SET 命令可以正常执行的话,也就是说命令带有 NX 选项但是 key 并不存在,或者带有 XX 选项但是 key 已经存在,这样 setGenericCommand 函数就会调用 genericSetKey函数(在 db.c 文件中)来完成键值对的实际插入,如下所示:
genericSetKey(c,c->db,key, val,flags & OBJ_KEEPTTL,1);
然后,如果命令设置了过期时间,setGenericCommand 函数还会调用 setExpire 函数设置过期时间。最后,setGenericCommand 函数会调用 addReply 函数,将结果返回给客户端,如下所示:
if (expire) { setExpire(c,c->db,key,when); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); /* Propagate as SET Key Value PXAT millisecond-timestamp if there is EXAT/PXAT or * propagate as SET Key Value PX millisecond if there is EX/PX flag. * * Additionally when we propagate the SET with PX (relative millisecond) we translate * it again to SET with PXAT for the AOF. * * Additional care is required while modifying the argument order. AOF relies on the * exp argument being at index 3. (see feedAppendOnlyFile) * */ robj *exp = (flags & OBJ_PXAT) || (flags & OBJ_EXAT) ? shared.pxat : shared.px; robj *millisecondObj = createStringObjectFromLongLong(milliseconds); rewriteClientCommandVector(c,5,shared.set,key,val,exp,millisecondObj); decrRefCount(millisecondObj); } // 调用 addReply 函数,将结果返回给客户端 if (!(flags & OBJ_SET_GET)) { addReply(c, ok_reply ? ok_reply : shared.ok); }
好了,到这里,SET 命令的执行就结束了,你也可以再看下下面的基本流程图。
而且你也可以看到,无论是在命令执行的过程中,发现不符合命令的执行条件,或是命令能成功执行,addReply 函数都会被调用,用来返回结果。所以,这就进入到我所说的命令处理过程的最后一个阶段:结果返回阶段。
结果返回阶段:addReply 函数
addReply 函数是在 networking.c 文件中定义的。它的执行逻辑比较简单,主要是调用 prepareClientToWrite 函数,并在 prepareClientToWrite 函数中调用 clientInstallWriteHandler 函数,将待写回客户端加入到全局变量 server 的 clients_pending_write 列表中。
然后,addReply 函数会调用 _addReplyToBuffer 等函数(在 networking.c 中),将要返回的结果添加到客户端的输出缓冲区中。
networking.c文件中查看
/* Add the object 'obj' string representation to the client output buffer. */ void addReply(client *c, robj *obj) { if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); } else if (obj->encoding == OBJ_ENCODING_INT) { /* For integer encoded strings we just convert it into a string * using our optimized function, and attach the resulting string * to the output buffer. */ char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyProtoToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } }
好,现在你就了解一条命令是如何从读取,经过解析、执行等步骤,最终将结果返回给客户端的了。下图展示了这个过程以及涉及的主要函数,你可以再回顾下。
不过除此之外,你还需要注意一点,就是如果在前面的命令处理过程中,都是由 IO 主线程处理的,那么命令执行的原子性肯定能得到保证,分布式锁的原子性也就相应能得到保证了。
但是,如果这个处理过程配合上了我们前面介绍的 IO 多路复用机制和多 IO 线程机制,那么,这两个机制是在这个过程的什么阶段发挥作用的呢,以及会不会影响命令执行的原子性呢?
所以接下来,我们就来看下它们各自对原子性保证的影响。
IO 多路复用对命令原子性保证的影响
首先你要知道,IO 多路复用机制是在 readQueryFromClient 函数执行前发挥作用的。它实际是在事件驱动框架中调用 aeApiPoll 函数,获取一批已经就绪的 socket 描述符。然后执行一个循环,针对每个就绪描述符上的读事件,触发执行 readQueryFromClient 函数。这样一来,即使 IO 多路复用机制同时获取了多个就绪 socket 描述符,在实际处理时,Redis 的主线程仍然是针对每个事件逐一调用回调函数进行处理的。而且对于写事件来说,IO 多路复用机制也是针对每个事件逐一处理的。下面的代码展示了 IO 多路复用机制通过 aeApiPoll 函数获取一批事件,然后逐一处理的逻辑,你可以再看下。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, j, numevents = 0; memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { for (j = 0; j <= eventLoop->maxfd; j++) { int mask = 0; aeFileEvent *fe = &eventLoop->events[j]; if (fe->mask == AE_NONE) continue; if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; }
所以这也就是说,即使使用了 IO 多路复用机制,命令的整个处理过程仍然可以由 IO 主线程来完成,也仍然可以保证命令执行的原子性。下图就展示了 IO 多路复用机制和命令处理过程的关系,你可以看下。
接下来,我们再来看下多 IO 线程对命令原子性保证的影响。
多 IO 线程对命令原子性保证的影响
我们知道,多 IO 线程可以执行读操作或是写操作。那么,对于读操作来说,readQueryFromClient 函数会在执行过程中,调用 postponeClient 将待读客户端加入 clients_pending_read 等待列表。
然后,待读客户端会被分配给多 IO 线程执行,每个 IO 线程执行的函数就是 readQueryFromClient 函数,readQueryFromClient 函数会读取命令,并进一步调用 processInputBuffer 函数解析命令,这个基本过程和 Redis 6.0 前的代码是一样的。
不过,相比于 Redis 6.0 前的代码,在 Redis 6.0 版本中,processInputBuffer 函数中新增加了一个判断条件,也就是当客户端标识中有 CLIENT_PENDING_READ 的话,那么在解析完命令后,processInputBuffer 函数只会把客户端标识改为 CLIENT_PENDING_COMMAND,就退出命令解析的循环流程了。
// 如果我们在一个 IO 线程的上下文中,我们不能真正执行这里的命令。我们所能做的就是将客户端标记为需要处理命令的客户端。 // 如果客户端有CLIENT_PENDING_READ标识,将其改为CLIENT_PENDING_COMMAND,就退出循环,并不调用processCommandAndResetClient函数执行命令 if (c->flags & CLIENT_PENDING_READ) { c->flags |= CLIENT_PENDING_COMMAND; break; }
这样,等到所有的 IO 线程都解析完了第一个命令后,IO 主线程中执行的 handleClientsWithPendingReadsUsingThreads 函数,会再调用 processCommandAndResetClient 函数执行命令,以及调用 processInputBuffer 函数解析剩余命令,这部分的内容你也可以再回顾下第 13 讲。
所以现在,你就可以知道,即使使用了多 IO 线程,其实命令执行这一阶段也是由主 IO 线程来完成的,所有命令执行的原子性仍然可以得到保证,也就是说分布式锁的原子性也仍然可以得到保证。
我们再来看下写回数据的流程。
在这个阶段,addReply 函数是将客户端写回操作推迟执行的,而此时 Redis 命令已经完成执行了,所以,即使有多个 IO 线程在同时将客户端数据写回,也只是把结果返回给客户端,并不影响命令在 Redis server 中的执行结果。也就是说,即使使用了多 IO 线程写回,Redis 同样可以保证命令执行的原子性。
下图展示了使用多 IO 线程机制后,命令处理过程各个阶段是由什么线程执行的,你可以再看下。
小结
今天这篇文章我主要结合分布式锁的原子性保证需求,带你学习了 Redis 处理一条命令的整个过程。其中,你需要重点关注分布式锁实现的方法。
我们知道,加锁和解锁操作分别可以使用 SET 命令和 Lua 脚本与 EVAL 命令来完成。那么,分布式锁的原子性保证,就主要依赖 SET 和 EVAL 命令在 Redis server 中执行时的原子性保证了。
紧接着,我还带你具体剖析了下 Redis 中命令处理的整个过程。我把这个过程分成了四个阶段,分别是命令读取、命令解析、命令执行和结果返回。
所以,你还需要了解这四个阶段中所执行函数的主要流程。这四个阶段在 Redis 6.0 版本前都是由主 IO 线程来执行完成的。虽然 Redis 使用了 IO 多路复用机制,但是该机制只是一次性获取多个就绪的 socket 描述符,对应了多个发送命令请求的客户端。而 Redis 在主 IO 线程中,还是逐一来处理每个客户端上的命令的,所以命令执行的原子性依然可以得到保证。而当使用了 Redis 6.0 版本后,命令处理过程中的读取、解析和结果写回,就由多个 IO 线程来处理了。
不过你也不用担心,多个 IO 线程只是完成解析第一个读到的命令,命令的实际执行还是由主 IO 线程处理。当多个 IO 线程在并发写回结果时,命令就已经执行完了,不存在多 IO 线程冲突的问题。所以,使用了多 IO 线程后,命令执行的原子性仍然可以得到保证。好,最后,我也想再说下我对多 IO 线程的看法。从今天课程介绍的内容中,你可以看到,多 IO 线程实际并不会加快命令的执行,而是只会将读取解析命令并行化执行,以及写回结果并行化执行,并且读取解析命令还是针对收到的第一条命令。实际上,这一设计考虑还是由于网络 IO 需要加速处理。那么,如果命令执行本身成为 Redis 运行时瓶颈了,你其实可以考虑使用 Redis 切片集群来提升处理效率。
如果将命令处理过程中的命令执行也交给多 IO 线程执行,你觉得除了对原子性会有影响,还会有什么好处或是其他不好的影响吗?
好处:
- 每个请求分配给不同的线程处理,一个请求处理慢,并不影响其它请求
- 请求操作的 key 越分散,性能会变高(并行处理比串行处理性能高)
- 可充分利用多核 CPU 资源
坏处:
- 操作同一个 key 需加锁,加锁会影响性能,如果是热点 key,性能下降明显
- 多线程上下文切换存在性能损耗
- 多线程开发和调试不友好