1、Redis 单线程模型
Redis 6.0 以前采用单线程模型
1、Redis 单线程模型
Redis 6.0 以前采用单线程模型
Redis 单线程模型
面试题:Redis 单线程为什么还那么快?
- Redis 大部分操作都在内存中,并且采用了高效的数据结构。性能瓶颈是网络延迟和内存大小。
- 避免了多线程之间的竞争,减少了多线程切换带来的系统开销,而且不会有线程安全问题(锁开销)
- I/O 多路复用机制
2、Redis 多线程模型
2.1、* Redis 为什么引入 I/O 多线程
Redis 的性能瓶颈在网络 IO 的处理上。Redis 是网络 IO 密集型,需要同时处理多条并发请求,读写 IO 的问题(请求大量数据,写日志业务等)。多线程处理网络 IO,单线程执行命令。
Redis 线程池作用读写 IO 阶段,即 read, decode 和 encode, send 阶段。主线程处理业务逻辑,之所以用单线程执行命令,是因为 Redis 采用高效的数据结构,其业务逻辑处理较快。
redis 线程池作用阶段
2.2、I/O 多线程模型
,每个 io 线程(主线程同时也是 io 线程)拥有一个专属队列 io_threads_list[id]
中,再把每个事件负载均衡地分配到每个 io 线程的专属任务队列中。一次事件循环中不会出现同名 fd,不同的 fd 分配到每个 io 线程各自的队列中,避免了多个 io 线程同时从全局队列中取数据,因此,不需要加锁操作。
接下来,io 线程从自己的专属队列中取出任务,(除主线程外)并发执行 read 和 decode 操作。主线程将解析后的任务做 compute 操作。最后,io 线程(包括主线程)并发执行 encode 和 send 操作。
redis 线程池原理图
redis 线程池默认作用在 encode, send 阶段,这是因为客户端从 redis 获取大量数据需要并发处理。若想作用在 read, decode 阶段,需要手动开启。在 redis.conf 文件中,可以设置:
# 开启io线程的数量 io-threads 4 # 优化:read deconde 过程。默认优化,encode send从 redis 获取大量数据 io-threads-do-reads yes
开启 io 多线程的前提是有多个并发连接。如何在单个连接的情况下,开启 io 多线程调试,需要修改 redis 源码:
// networking.c int stopThreadedIOIfNeeded(void) { // 单个连接的情况下,开启多线程调试,永远不关闭 io 多线程 return 0; ... }
- 连接到达,触发读事件回调:acceptTcpHandler
- 接收连接:acceptTcpHandler
- 初始化新连接:createClient
// server.c void initServer(void) { ... // 1、连接到来,触发读事件回调 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) ... } // networking.c void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ... while(max--) { // 2、接收连接:内部封装 accept cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); ... // 为 cfd 初始化新连接,内部调用 createClient acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } } static void acceptCommonHandler(connection *conn, int flags, char *ip) { ... /* Create connection and client */ // 3、创建新的连接 if ((c = createClient(conn)) == NULL) { ... } ... }
- 读事件回调:readQueryFromClient
- 分割并处理数据包 processInputBuffer
- 分割数据包:processInlineBuffer 和 processMultibulkBuffer
- 处理数据包:processCommandAndResetClient
- 数据写到 buffer:addReply
- 数据写到 socket:writeToClient
- 写事件回调:sendReplyToClient
。子线程从该全局队列中获取任务后,也调用该读事件回调函数,进行 read 和 decode 的业务逻辑处理。
// networking.c void readQueryFromClient(connection *conn) { ... /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ // 开启 io 线程后,延迟处理客户端的读,将任务丢到全局队列,再分配给 io 线程 // 主线程返回 1,不执行业务逻辑处理; // 子线程返回 0,继续往下,执行业务逻辑处理 if (postponeClientRead(c)) return; // 1、read 阶段,(io 线程)将任务读到缓冲区 nread = connRead(c->conn, c->querybuf+qblen, readlen); // 2、decode 阶段,(io 线程)解析数据包 processInputBuffer(c); } int postponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && !clientsArePaused() && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) { // 主线程,返回 1 // 将连接状态设置为 CLIENT_PENDING_READ c->flags |= CLIENT_PENDING_READ; // 收集任务,把客户端连接放到全局队列中,分配到 io 线程 listAddNodeHead(server.clients_pending_read,c); return 1; } else { // 子线程,即 io 线程,返回 0 return 0; } }
子线程(IO 线程)从专属任务队列 io_threads_pending
获取任务,执行 read decode 和 encode write 业务逻辑处理。
// networking.c // 线程池入口函数:子线程 void *IOThreadMain(void *myid) { ... while(1) { /* Wait for start */ // 等待获取专属任务队列中的任务 for (int j = 0; j < 1000000; j++) { if (io_threads_pending[id] != 0) break; } ... /* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */ listIter li; listNode *ln; // 从专属任务队列中取出任务 listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { // encode 和 write writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { // read 和 decode,读事件回调函数 readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); io_threads_pending[id] = 0; ... } }
子线程 decode 结束后,设置连接状态 CLIENT_PENDING_COMMAND
,交给主线程来 compute,退出读事件回调函数。主线程负责 compute ,解析 redis 命令。
// networking.c // readQueryFromClient 函数中 decode 阶段调用 void processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { ... if (c->reqtype == PROTO_REQ_INLINE) { // 分割数据包。并判断是否完整 if (processInlineBuffer(c) != C_OK) break; ... } else if (c->reqtype == PROTO_REQ_MULTIBULK) { // 分割 pipline 的数据包,并判断是否完整 if (processMultibulkBuffer(c) != C_OK) break; } ... else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ // io 线程设置任务状态,交给主线程compute,退出读事件回调函数 if (c->flags & CLIENT_PENDING_READ) { c->flags |= CLIENT_PENDING_COMMAND; break; } /* We are finally ready to execute the command. */ // 3、compute,主线程解析命令 if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this * loop and trimming the client buffer later. So we return * ASAP in that case. */ return; } } } ... }
主线程 compute 结束后,调用 addReply 函数,将处理完的连接放到全局任务队列clients_pending_write
// networking.c int processCommandAndResetClient(client *c) { ... // 处理命令 if (processCommand(c) == C_OK) { commandProcessed(c); } ... } // server.c int processCommand(client *c) { ... /* Exec the command */ // 开启 io 多线程,且不是事务命令 if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { // 把数据写到缓冲区 addReply(c,shared.queued); } else { // 执行 redis 命令 call(c,CMD_CALL_FULL); ... } ... } // networking.c // 数据写到发送缓冲区 void addReply(client *c, robj *obj) { if (prepareClientToWrite(c) != C_OK) return; ... } int prepareClientToWrite(client *c) { ... if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ)) clientInstallWriteHandler(c); // 任务写到全局队列中 ... }
接下来,子线程和主线程都可以从自己的专属任务队列中获得该任务,执行 encode 和 send 的业务逻辑处理 writeToClient。若数据未发送完,则注册写事件回调,等待再次发送。
// 子线程:线程池入口函数 void *IOThreadMain(void *myid) { ... if (io_threads_op == IO_THREADS_OP_WRITE) { // encode 和 write writeToClient(c,0); // 数据写到 socket } else if (io_threads_op == IO_THREADS_OP_READ) { // read 和 decode readQueryFromClient(c->conn); // 读事件回调函数 ... } // 主线程 int handleClientsWithPendingWritesUsingThreads(void) { ... /* Also use the main thread to process a slice of clients. */ listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); writeToClient(c,0); } listEmpty(io_threads_list[0]); ... /* Run the list of clients again to install the write handler where * needed. */ listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); /* Install the write handler if there are pending writes in some * of the clients. */ // 数据没写完,注册写事件回调 if (clientHasPendingReplies(c) && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) { freeClientAsync(c); } } listEmpty(server.clients_pending_write); ... }