写在前面
在 Redis6.0 版本之前,采用的是单线程模型,即:命令的读取、解析、执行及回复都是在一个线程中执行。但Redis仍可以提供极为优秀的并发能力,核心在于优秀的代码设计:IO多路复用 + 内存操作 + 优秀的数据结构设计。
从Redis 6.0 版本开始,引入了多线程模型,主要用来分担主线的压力,具体负责io时间的读写和解析,注意:命令的执行仍然在主线程中处理。
以下分析默认你已经具备了网络编程、IO模型和Reactor模型的相关知识,如果这些知识不熟悉的朋友,可以先参考我之前的文章:Posix API与网络协议栈实现原理、Linux五种IO模型 和 Reactor实现原理及代码示例,有了这些预备知识,再来看这篇文章会容易点。
今天我们以一条简单的set命令,到redis返回OK,从源码的角度看看整个执行流程是什么样的~
127.0.0.1:6379> set msg "hello world" OK 127.0.0.1:6379>
以下内容是基于Redis 6.2.6 版本整理总结。
一、Redis的Reactor模型
1.1 服务端初始化
服务端Reactor模型的初始化主要在src/server.c/initServer() 函数中执行。
1.1.1 创建epoll IO多路复用
// src/server.c/initServer() server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); if (server.el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", strerror(errno)); exit(1); }
1.1.2 将服务器套接字ipfd与端口绑定
// src/server.c/initServer() if (server.port != 0 && listenToPort(server.port,&server.ipfd) == C_ERR) { serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port); exit(1); }
1.1.3 将ipfd加入到epoll集合,监听客户端连接
createSocketAcceptHandler() 函数通过调用 aeCreateFileEvent,将该服务段的套接字加入到epoll 集合,并为其注册可读事件的回调函数accept_handler,如果内核检测到有客户端连接上来,会执行accept_handler函数。
// src/server.c/initServer() int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) { int j; // sfd->count 监听的地址个数 for (j = 0; j < sfd->count; j++) { // 注册listen fd 读事件 accept_handler 回调 if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) { /* Rollback */ for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE); return C_ERR; } } return C_OK; }
对于内核提供的多路复用结构,redis 通过aeCreateFileEvent() 函数对其了一层封装。redis支持不同的操作系统下的多路复用,这里我们以linux下的IO多路复用epoll进行举例,底层调用的是 epoll_ctl()。
1.1.4 acceptTcpHandler() 接收新连接的处理
底层调用 accept 系统调用,处理新的客户端连接。每次当连接事件触发时,acceptTcpHandler 内部每次最多循环MAX_ACCEPTS_PER_CALL 次accep系统调用从内核中的已连接队列中获取新的连接。MAX_ACCEPTS_PER_CALL 机制,通过一次性调用接收多个新连接,提升IO 多路复用能力;又可以避免一次行accept过多的连接,来不及处理,造成文件描述的浪费。
// src/networking.c #define MAX_ACCEPTS_PER_CALL 1000 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); // 每次最多处理1000条连接 while(max--) { // 调用 accept 返回新的客户端套接字 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } // 主要用于frok出来的子进程自动关闭这些从父进程继承来的fd,防止fd泄漏 anetCloexec(cfd); serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } }
acceptCommonHandler() 函数实现
static void acceptCommonHandler(connection *conn, int flags, char *ip) { ... // 创建 client 对象 if ((c = createClient(conn)) == NULL) { ... } ... }
createClient() 函数
client *createClient(connection *conn) { ... if (conn) { connNonBlock(conn); // 设置非阻塞 connEnableTcpNoDelay(conn); // setsockopt(TCP_NODELAY) if (server.tcpkeepalive) connKeepAlive(conn,server.tcpkeepalive); // 注册该客户单的读事件回调函数 connSetReadHandler(conn, readQueryFromClient); connSetPrivateData(conn, c); // 将c保存到conn的私有空间 } ... return c; }
readQueryFromClient 方法非常重要,是redis读取数据的入口。后面再详聊。到这里客户端连接的处理及读事件的回调注册,就很明了了。
// src/connection.h static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) { return conn->type->set_read_handler(conn, func); } // src/connection.c static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) { if (func == conn->read_handler) return C_OK; // 指定当前客户单的read_handler为func conn->read_handler = func; if (!conn->read_handler) aeDeleteFileEvent(server.el,conn->fd,AE_READABLE); else // 将客户端fd加入到epoll集合,并注册读事件 if (aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; return C_OK; }
以上就是Redis服务端读事件注册和监听的全过程,写事件类似,这里就不赘述了,有兴趣的小伙伴可以自己尝试跟一跟。
1.2 命令处理及回复
我们首先要从客户端读取数据并解析,根据命令执行对应的命令函数,将执行结果返回给客户端。
1.2.1 读取数据的入口函数 readQueryFromClient
从Redis6.0版本开始,加入了io多线程的优化,这篇文章我们先不展开,后面会专门出一篇文章聊聊io多线程优化。
// src/networking.c void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); ... // 开启io多线程优化,先不展开 if (postponeClientRead(c)) return; ... // 将客户端数据读到 querybuf 缓冲区 nread = connRead(c->conn, c->querybuf+qblen, readlen); ... // 处理读缓冲区 processInputBuffer(c); }
1.2.2 命令执行的入口函数processInputBuffer 及 具体执行命令的 call 方法
processInputBuffer 函数会根据redis协议解析客户端请求(客户端的请求数据保存在c->querybuf中),将相关参数保存在c->argv数组和c->argc中。如果命令就绪,就会调用 processCommand 处理命令。
// src/networking.c void processInputBuffer(client *c) { while(c->qb_pos < sdslen(c->querybuf)) { ... /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { ... // 数据处理流程:read decode compute encode write // 这里进行真正的 compute 命令处理 if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this * loop and trimming the client buffer later. So we return * ASAP in that case. */ return; } } } ... } int processCommandAndResetClient(client *c) { ... if (processCommand(c) == C_OK) { commandProcessed(c); } ... } int processCommand(client *c) { // 获取命令行的第一个参数,比如: set msg "hello world" 命令中的 set // 通过 set 查找redisCommand字典,找到对应的cmd entry c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); /* Exec the command */ // 执行命令 if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand && c->cmd->proc != resetCommand) { queueMultiCommand(c); addReply(c,shared.queued); } else { call(c,CMD_CALL_FULL); // 在这里执行具体命令 c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); } return C_OK; } void call(client *c, int flags) { // 执行命令的核心函数 .. // 这里调用具体的命令处理函数, 比如: setCommand函数 c->cmd->proc(c); .. }
1.2.3 命令执行
Redis中的命令都是独立封装的,每个命令及对应的处理函数注册保存在 src/server.c/ redisCommandTable 中。
// src/server.c struct redisCommand redisCommandTable[] = { ... {"set",setCommand,-3, "write use-memory @string", 0,NULL,1,1,1,0,0,0}, ... }
setCommand 函数实现。 在 call 方法调用 c->cmd->proc©时,redis通过解析,发现是set命令,就会调用 setCommand 函数,
// src/t_string.c /* SET key value [NX] [XX] [KEEPTTL] [GET] [EX <seconds>] [PX <milliseconds>] * [EXAT <seconds-timestamp>][PXAT <milliseconds-timestamp>] */ void setCommand(client *c) { ... setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); } void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { ... { if (milliseconds <= 0 || (unit == UNIT_SECONDS && milliseconds > LLONG_MAX / 1000)) { addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name); } ... if (when <= 0) { addReplyErrorFormat(c, "invalid expire time in %s", c->cmd->name); } } ... if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.null[c->resp]); return; } ... if (!(flags & OBJ_SET_GET)) { addReply(c, ok_reply ? ok_reply : shared.ok); } ... }
1.2.3 执行结果返回给客户端
每个命令在处理完后,都会有addReplyXxx 类似的方法调用,这就是Redis在向客户端返回执行结果。但是,并不是直接发送给客户端,而是将响应结构保存在客户端的response buf中。在看addReply的实现之前,我们先来看看 client的定义,以便我们更好的理解。client结构定义如下:
// src/server.c #define PROTO_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */ typedef struct client { ... connection *conn; ... sds querybuf; // 从客户端读取的数据保存在querybuf中 ... int argc; // 命令参数的个数 3 robj **argv; // 命令的具体参数 set "msg" "hello world" ... list *reply; /* List of reply objects to send to the client. */ ... /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; } client;
// src/networking.c void addReply(client *c, robj *obj) { if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { // 将响应结果保存到客户端的输出缓冲区 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); // 将响应结果保存到客户端的输出链表 } else if (obj->encoding == OBJ_ENCODING_INT) { /* For integer encoded strings we just convert it into a string * using our optimized function, and attach the resulting string * to the output buffer. */ char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyProtoToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } }
说明:_addReplyToBuffer 是将回复数据保存到client对象的输出缓冲区,_addReplyProtoToList 是保存到输出链表,前者的优先级高。到这里要回复给客户端的数据已经就绪,什么时候发给客户端呢?
在主事件循环 aeMain 中,在每次获取就绪的事件之前,都会先执行 beforesleep 函数,这个函数只要执行一些耗时少的操作,比如过期键的删除、给客户端返回数据等。也就是在这个时候,会将client输出缓冲区中的数据发给客户端。
beforesleep 函数注册:
// src/server.c/initServer void initServer(void) { ... // 注册beforeSleep 和 afterSleep 的回调函数,注意这两个函数在主时间循环中的执行位置 aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); ... }
发送数据给客户端逻辑,多线程读写的优化,会专门出一篇文章详解,这里也不展开,默认不开启多线程优化:
// stc/networking.c int handleClientsWithPendingWritesUsingThreads(void) { ... // 如果没有开启io多线程写,或者待处理的客户端数量较少时,直接由主线程执行写操作 if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { return handleClientsWithPendingWrites(); } ... } int handleClientsWithPendingWrites(void) { ... while((ln = listNext(&li))) { ... // 给客户端发送数据 if (writeToClient(c,0) == C_ERR) continue; ... } ... }
1.3 事件循环
1.3.1 事件循环入口函数
事件处理的入口函数是:aeMain(server.el),死循环对就绪的io事件进行处理,直至服务退出。
// Loop事件循环 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } }
核心逻辑在 aeProcessEvents() 函数中实现:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { ... // 事件循环中处理 beforesleep if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); // numevents 当前就绪的事件个数 numevents = aeApiPoll(eventLoop, tvp); // 底层就是epoll_wait ... // 处理就绪事件 for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; ... // 执行读事件的回调,如果是listenfd就是Accept回调,如果是clientfd就是read回调 if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } // 执行写事件回调 if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } ... } } ... }
以上逻辑是epoll 的固定写法,找到就绪事件,然后再for循环中依次进行处理。如果读事件就绪,直接调用rfileProc方法;如果写事件就绪,调用wfileProc。这得益于redis对“文件事件”的封装,我们来看aeFileEvent 结构定义:
/* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
rfileProc 和 wfileProc 都是函数指针,我们在创建一个 aeFileEvent 事件的时候,就可以绑定具体的实现。
二、 总结
- Redis 服务在启动时,创建epfd,监听指定端口并创建服务端套接字ipfd,将ipfd加入到epoll集合,并注册其读事件,绑定acceptTcpHandler回调;
- 对于服务端的ipfd来说,读事件就是客户端的连接,有客户端连接时,在acceptTcpHandler中为其创建client对象和conn对象等,将accept系统调用返回的客户端fd,也加入到epoll集合,并为其注册读事件readQueryFromClient
- 客户端发送命令,服务端调用 readQueryFromClient 读取数据,processInputBuffer 解析数据,最终调用call 执行命令
- 最后在主事件循环中,通过 writeToClient 将数据返回给客户端
三、一些梳理和思考
3.1 Redis对于连接事件的处理
Redis是单线程,所以服务端ipfd和客户端clifd都是在一个主事件循环中处理。Redis的处理逻辑是:当新的连接事件触发时,acceptTcpHandler 一次调用就可以从内核的Tcp就绪队列中取1000条连接,并将这些连接对应的文件描述符注册到epoll中进行监听。并不是取一条连接就结束,充分利用了IO多路复用的特性。
3.2 Redis单线程的理解
都是Redis是单线程的,实际上说的是Redis就算在有多个客户端连接的时候,命令处理也是在一个线程中,也就是事件循环的主线程。实际上还有其他的线程,用来执行特定操作,如:
- bio_close_file,异步关闭大文件线程
- bio_aof_fsync,异步aof刷盘线程
- bio_lazy_free,异步清理大块内存线程
- jemalloc_bg_thd, jemalloc 后台线程
- io_thd_*,Redis6.0版本开始的io多线程,是针对网络io做的优化
3.4 Redis单线程的瓶颈会出现在什么地方?
(1)io密集型 或 CPU密集型:
- io 密集型主要分为:磁盘io 和 网络io
a) 对于磁盘io,Redis采取fork子进程的方式,在子进程中持久化和异步刷盘;
b) 对于网络io,可能服务多个客户端或者客户端请求的数据量很大时,造成网络io压力大 - cpu密集型,比如:在子进程中需要将aof持久化的数据进行压缩,此时cpu的压力可能会增大,但是对于主线程来说几乎是没有影响的。
(2)单线程局限性:
因为是单线程,所以同一时刻只能有一个操作在进行。所以,耗时的命令会导致并发的下降,不只是读并发,写并发也会下降。而且单一线程也只能用到一个cpu核心,所以可以在同一个多核的服务器中,可以启动多个实例,组成master-master或者master-slave的形式,耗时的读命令可以完全在slave进行。
(2)为什么Redis不采用多线程
- Redis 提供了丰富的数据结构,如果采用多线程,加锁的粒度不好控制;单线程,避免使用锁
- 多线程频繁的上下文的切换,会抵消多线程的优势
综上,redis的瓶颈存在于网络io层面,因此,如果想要通过多线程来提升系统的并发处理能力,也应该在网络io层面考虑。因此,Redis从6.0版本开始,支持了io多线来处理网络io。关于io多线程优化解析可以关注我后面的文章哦~