通过上篇文章的学习,我们知道 Redis server 启动后的进程会以单线程的方式,执行客户端请求解析和处理工作。但是,Redis server 也会通过 bioInit 函数启动三个后台线程,来处理后台任务。也就是说,Redis 不再让主线程执行一些耗时操作,比如同步写、删除等,而是交给后台线程异步完成,从而避免了对主线程的阻塞。
实际上,在 2020 年 5 月推出的 Redis 6.0 版本中,Redis 在执行模型中还进一步使用了多线程来处理 IO 任务,这样设计的目的,就是为了充分利用当前服务器的多核特性,使用多核运行多线程,让多线程帮助加速数据读取、命令解析以及数据写回的速度,提升 Redis 整体性能。
那么,这些多线程具体是在什么时候启动,又是通过什么方式来处理 IO 请求的呢?
今天这篇文章,我就来给你介绍下 Redis 6.0 实现的多 IO 线程机制。通过这部分内容的学习,你可以充分了解到 Redis 6.0 是如何通过多线程来提升 IO 请求处理效率的。这样你也就可以结合实际业务来评估,自己是否需要使用 Redis 6.0 了。
好,接下来,我们先来看下多 IO 线程的初始化。在开始学习今天的学习之前,你还需要下载Redis 6.2.6(https://redis.io/)的源码,以便能查看到和多 IO 线程机制相关的代码。
多 IO 线程的初始化
我在上篇文章给你介绍过,Redis 中的三个后台线程,是 server 在初始化过程的最后,调用 InitSeverLast 函数,而 InitServerLast 函数再进一步调用 bioInit 函数来完成的。如果我们在 Redis 6.0 中查看 InitServerLast 函数,会发现和 Redis 5.0 相比,该函数在调完 bioInit 函数后,又调用了 initThreadedIO 函数。而 initThreadedIO 函数正是用来初始化多 IO 线程的,这部分的代码调用如下所示:
server.c文件中查看
/* Some steps in server initialization need to be done last (after modules * are loaded). * Specifically, creation of threads due to a race bug in ld.so, in which * Thread Local Storage initialization collides with dlopen call. * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */ void InitServerLast() { bioInit(); //调用initThreadedIO函数初始化IO线程 initThreadedIO(); set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); }
所以下面,我们就来看下 initThreadedIO 函数的主要执行流程,这个函数是在networking.c文件中实现的。
首先,initThreadedIO 函数会设置 IO 线程的激活标志。这个激活标志保存在 redisServer 结构体类型的全局变量 server 当中,对应 redisServer 结构体的成员变量 io_threads_active。initThreadedIO 函数会把 io_threads_active 初始化为 0,表示 IO 线程还没有被激活。这部分代码如下所示:
/* Initialize the data structures needed for threaded I/O. */ void initThreadedIO(void) { server.io_threads_active = 0; /* We start with threads not active. */ /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ // 如果用户选择了单个线程,则不要产生任何线程:我们将直接从主线程处理 IO if (server.io_threads_num == 1) return; if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); } /* Spawn and initialize the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); if (i == 0) continue; /* Thread 0 is the main thread. */ /* Things we do only for the additional threads. */ pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); setIOPendingCount(i, 0); pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; } }
这里,你要注意一下,刚才提到的全局变量 server 是 Redis server 运行时,用来保存各种全局信息的结构体变量。我在之前给你介绍 Redis server 初始化过程的时候,提到过 Redis server 的各种参数初始化配置,都是保存在这个全局变量 server 中的。所以,当你在阅读 Redis 源码时,如果在某个函数中看到变量 server,要知道其实就是这个全局变量。
紧接着,initThreadedIO函数会对设置的 IO 线程数量进行判断。这个数量就是保存在全局变量 server 的成员变量 io_threads_num 中的。那么在这里,IO 线程的数量判断会有三种结果。
- 第一种,如果 IO 线程数量为 1,就表示只有 1 个主 IO 线程,initThreadedIO 函数就直接返回了。此时,Redis server 的 IO 线程和 Redis 6.0 之前的版本是相同的。
- 第二种,如果 IO 线程数量大于宏定义 IO_THREADS_MAX_NUM(默认值为 128),那么 initThreadedIO 函数会报错,并退出整个程序。
if (server.io_threads_num > IO_THREADS_MAX_NUM) { … //报错日志记录 exit(1); //退出程序 }
- 第三种,如果 IO 线程数量大于 1,并且小于宏定义 IO_THREADS_MAX_NUM,那么,initThreadedIO 函数会执行一个循环流程,该流程的循环次数就是设置的 IO 线程数量。
如此一来,在该循环流程中,initThreadedIO 函数就会给以下四个数组进行初始化操作。
- io_threads_list 数组:保存了每个 IO 线程要处理的客户端,将数组每个元素初始化为一个 List 类型的列表;
- io_threads_pending 数组:保存等待每个 IO 线程处理的客户端个数;
- io_threads_mutex 数组:保存线程互斥锁;
- io_threads 数组:保存每个 IO 线程的描述符。
这四个数组的定义都在 networking.c 文件中,如下所示:
// 记录线程描述符的数组 pthread_t io_threads[IO_THREADS_MAX_NUM]; // 记录线程互斥锁的数组 pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; // 记录线程待处理的客户端个数 redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; // IO操作是读还是写 int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ /* This is the list of clients each thread will serve when threaded I/O is * used. We spawn io_threads_num-1 threads, since one is the main thread * itself. */ // 记录线程对应处理的客户端 list *io_threads_list[IO_THREADS_MAX_NUM];
然后,在对这些数组进行初始化的同时,initThreadedIO 函数还会根据 IO 线程数量,调用 pthread_create 函数创建相应数量的线程。我在上节课给你介绍过,pthread_create 函数的参数包括创建线程要运行的函数和函数参数(*tidp、*attr、*start_routine、*arg)。
所以,对于 initThreadedIO 函数来说,它创建的线程要运行的函数是 IOThreadMain,参数是当前创建线程的编号。不过要注意的是,这个编号是从 1 开始的,编号为 0 的线程其实是运行 Redis server 主流程的主 IO 线程。
以下代码就展示了 initThreadedIO 函数对数组的初始化,以及创建 IO 线程的过程,你可以看下。
/* Spawn and initialize the I/O threads. */ // 生产并初始化IO线程 for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); // 编号为0的线程是主IO线程 if (i == 0) continue; /* Thread 0 is the main thread. */ /* Things we do only for the additional threads. */ pthread_t tid; // 初始化io_threads_mutex数组 pthread_mutex_init(&io_threads_mutex[i],NULL); // 初始化io_threads_pending数组 setIOPendingCount(i, 0); // 线程将会停止 pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ // 调用pthread_create函数创建IO线程,线程运行函数为IOThreadMain if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } // 初始化io_threads数组,设置值为线程标识 io_threads[i] = tid; }
好了,现在我们再来看下,刚才介绍的 IO 线程启动后要运行的函数 IOThreadMain。了解这个函数,可以帮助我们掌握 IO 线程实际做的工作。
IO 线程的运行函数 IOThreadMain
IOThreadMain 函数也是在 networking.c 文件中定义的,它的主要执行逻辑是一个 while(1) 循环。在这个循环中,IOThreadMain 函数会把 io_threads_list 数组中,每个 IO 线程对应的列表读取出来。
就像我在前面给你介绍的一样,io_threads_list 数组中会针对每个 IO 线程,使用一个列表记录该线程要处理的客户端。所以,IOThreadMain 函数就会从每个 IO 线程对应的列表中,进一步取出要处理的客户端,然后判断线程要执行的操作标记。这个操作标记是用变量 io_threads_op 表示的,它有两种取值。
- io_threads_op 的值为宏定义 IO_THREADS_OP_WRITE:这表明该 IO 线程要做的是写操作,线程会调用 writeToClient 函数将数据写回客户端。
- io_threads_op 的值为宏定义 IO_THREADS_OP_READ:这表明该 IO 线程要做的是读操作,线程会调用 readQueryFromClient 函数从客户端读取数据。
这部分的代码逻辑你可以看看下面的代码。
networking.c文件中查看
void *IOThreadMain(void *myid) { /* The ID is the thread number (from 0 to server.iothreads_num-1), and is * used by the thread to just manipulate a single sub-array of clients. */ long id = (unsigned long)myid; char thdname[16]; snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); redis_set_thread_title(thdname); redisSetCpuAffinity(server.server_cpulist); makeThreadKillable(); while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { if (getIOPendingCount(id) != 0) break; } /* Give the main thread a chance to stop this thread. */ if (getIOPendingCount(id) == 0) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; } serverAssert(getIOPendingCount(id) != 0); /* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */ listIter li; listNode *ln; // 获取IO线程要处理的客户端列表 listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { // 从客户端列表中获取一个客户端 client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { // 如果线程操作是写操作,则调用writeToClient将数据写回客户端 writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { // 如果线程操作是读操作,则调用readQueryFromClient从客户端读取数据 readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } // 处理完所有客户端后,清空该线程的客户端列表 listEmpty(io_threads_list[id]); // 将该线程的待处理任务数量设置为0 setIOPendingCount(id, 0); } }
我也画了下面这张图,展示了 IOThreadMain 函数的基本流程,你可以看下。
好了,到这里你应该就了解了,每一个 IO 线程运行时,都会不断检查是否有等待它处理的客户端。如果有,就根据操作类型,从客户端读取数据或是将数据写回客户端。你可以看到,这些操作都是 Redis 要和客户端完成的 IO 操作,所以,这也是为什么我们把这些线程称为 IO 线程的原因。那么,你看到这里,可能也会产生一些疑问,IO 线程要处理的客户端是如何添加到 io_threads_list 数组中的呢?
这就要说到 Redis server 对应的全局变量 server 了。server 变量中有两个 List 类型的成员变量:clients_pending_write 和 clients_pending_read,它们分别记录了待写回数据的客户端和待读取数据的客户端,如下所示:
server.h文件中查看
struct redisServer { ... list *clients_pending_write; //待写回数据的客户端 list *clients_pending_read; //待读取数据的客户端 ... }
你要知道,Redis server 在接收到客户端请求和给客户端返回数据的过程中,会根据一定条件,推迟客户端的读写操作,并分别把待读写的客户端保存到这两个列表中。然后,Redis server 在每次进入事件循环前,会再把列表中的客户端添加到 io_threads_list 数组中,交给 IO 线程进行处理。所以接下来,我们就先来看下,Redis 是如何推迟客户端的读写操作,并把这些客户端添加到 clients_pending_write 和 clients_pending_read 这两个列表中的。
如何推迟客户端读操作?
Redis server 在和一个客户端建立连接后,就会开始监听这个客户端上的可读事件,而处理可读事件的回调函数是 readQueryFromClient。我在之前文章中给你介绍了这个过程,你可以再去回顾下。
那么这里,我们再来看下 Redis 6.0 版本中的 readQueryFromClient 函数。这个函数一开始会先从传入参数 conn 中获取客户端 c,紧接着就调用 postponeClientRead 函数,来判断是否推迟从客户端读取数据。这部分的执行逻辑如下所示:
networking.c文件中查看
void readQueryFromClient(connection *conn) { //从连接数据结构中获取客户端 client *c = connGetPrivateData(conn); int nread, readlen; size_t qblen; /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ // 判断是否推迟从客户端读取数据 if (postponeClientRead(c)) return; /* Update total number of reads on server */ atomicIncr(server.stat_total_reads_processed, 1); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0 && remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; atomicIncr(server.stat_net_input_bytes, nread); if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClientAsync(c); return; } /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ processInputBuffer(c); }
现在,我们就来看下 postponeClientRead 函数的执行逻辑。这个函数会根据四个条件判断能否推迟从客户端读取数据。
networking.c文件中查看
/* Return 1 if we want to handle the client read later using threaded I/O. * This is called by the readable handler of the event loop. * As a side effect of calling this function the client is put in the * pending read clients and flagged as such. */ int postponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) { c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c); return 1; } else { return 0; } }
- 条件一:全局变量 server 的 io_threads_active 值为 1
这表示多 IO 线程已经激活。我刚才说过,这个变量值在 initThreadedIO 函数中是会被初始化为 0 的,也就是说,多 IO 线程初始化后,默认还没有激活(我一会儿还会给你介绍这个变量值何时被设置为 1)。
- 条件二:全局变量 server 的 io_threads_do_read 值为 1
这表示多 IO 线程可以用于处理延后执行的客户端读操作。这个变量值是在 Redis 配置文件 redis.conf 中,通过配置项 io-threads-do-reads 设置的,默认值为 no,也就是说,多 IO 线程机制默认并不会用于客户端读操作。所以,如果你想用多 IO 线程处理客户端读操作,就需要把 io-threads-do-reads 配置项设为 yes。
- 条件三:ProcessingEventsWhileBlocked 变量值为 0
这表示 processEventsWhileBlokced 函数没有在执行。ProcessingEventsWhileBlocked 是一个全局变量,它会在 processEventsWhileBlokced 函数执行时被设置为 1,在 processEventsWhileBlokced 函数执行完成时被设置为 0。而 processEventsWhileBlokced 函数是在networking.c文件中实现的。当 Redis 在读取 RDB 文件或是 AOF 文件时,这个函数会被调用,用来处理事件驱动框架捕获到的事件。这样就避免了因读取 RDB 或 AOF 文件造成 Redis 阻塞,而无法及时处理事件的情况。所以,当 processEventsWhileBlokced 函数执行处理客户端可读事件时,这些客户端读操作是不会被推迟执行的。
- 条件四:客户端现有标识不能有 CLIENT_MASTER、CLIENT_SLAVE 和 CLIENT_PENDING_READ
其中,CLIENT_MASTER 和 CLIENT_SLAVE 标识分别表示客户端是用于主从复制的客户端,也就是说,这些客户端不会推迟读操作。CLIENT_PENDING_READ 本身就表示一个客户端已经被设置为推迟读操作了,所以,对于已带有 CLIENT_PENDING_READ 标识的客户端,postponeClientRead 函数就不会再推迟它的读操作了。总之,只有前面这四个条件都满足了,postponeClientRead 函数才会推迟当前客户端的读操作。具体来说,postponeClientRead 函数会给该客户端设置 CLIENT_PENDING_REA 标识,并调用 listAddNodeHead 函数,把这个客户端添加到全局变量 server 的 clients_pending_read 列表中。
好,现在你已经知道,Redis 是在客户端读事件回调函数 readQueryFromClient 中,通过调用 postponeClientRead 函数来判断和推迟客户端读操作。下面,我再带你来看下 Redis 是如何推迟客户端写操作的。