Redis6.0的多IO线程(二)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis6.0的多IO线程

如何推迟客户端写操作?

Redis 在执行了客户端命令,要给客户端返回结果时,会调用 addReply 函数将待返回结果写入客户端输出缓冲区。

而在 addReply 函数的一开始,该函数会调用 prepareClientToWrite 函数,来判断是否推迟执行客户端写操作。下面代码展示了 addReply 函数对 prepareClientToWrite 函数的调用,你可以看下。

/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;
    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyProtoToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

所以这里,我们继续来看下 prepareClientToWrite 函数。这个函数会根据客户端设置的标识进行一系列的判断。其中,该函数会调用 clientHasPendingReplies 函数,判断当前客户端是否还有留存在输出缓冲区中的数据等待写回。

如果没有的话,那么,prepareClientToWrite 就会调用 clientInstallWriteHandler 函数,再进一步判断能否推迟该客户端写操作。下面的代码展示了这一调用过程,你可以看下。

int prepareClientToWrite(client *c) {
    /* If it's the Lua client we always return ok without installing any
     * handler since there is no socket at all. */
    if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
    /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
    if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
    /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
    if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
    /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
     * is set. */
    if ((c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
    if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
    /* Schedule the client to write the output buffers to the socket, unless
     * it should already be setup to do so (it has already pending data).
     *
     * If CLIENT_PENDING_READ is set, we're in an IO thread and should
     * not install a write handler. Instead, it will be done by
     * handleClientsWithPendingReadsUsingThreads() upon return.
     */
    //如果当前客户端没有待写回数据,调用clientInstallWriteHandler函数
    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
            clientInstallWriteHandler(c);
    /* Authorize the caller to queue in the output buffer of this client. */
    return C_OK;
}

那么这样一来,我们其实就知道了,能否推迟客户端写操作,最终是由 clientInstallWriteHandler 函数来决定的,这个函数会判断两个条件。

networking.c文件中查看

void clientInstallWriteHandler(client *c) {
    /* Schedule the client to write the output buffers to the socket only
     * if not already done and, for slaves, if the slave can actually receive
     * writes at this stage. */
     // 如果客户端没有设置过CLIENT_PENDING_WRITE标识,并且客户端没有在进行主从复制,或者客户端是主从复制中的从节点,已经能接收请求
    if (!(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
    {
        /* Here instead of installing the write handler, we just flag the
         * client and put it into a list of clients that have something
         * to write to the socket. This way before re-entering the event
         * loop, we can try to directly write to the client sockets avoiding
         * a system call. We'll only really install the write handler if
         * we'll not be able to write the whole reply at once. */
        // 将客户端的标识设置为待写回,即CLIENT_PENDING_WRITE
        c->flags |= CLIENT_PENDING_WRITE;
        // 将可获得加入clients_pending_write列表
        listAddNodeHead(server.clients_pending_write,c);
    }
}
  • 条件一:客户端没有设置过 CLIENT_PENDING_WRITE 标识,即没有被推迟过执行写操作。
  • 条件二:客户端所在实例没有进行主从复制,或者客户端所在实例是主从复制中的从节点,但全量复制的 RDB 文件已经传输完成,客户端可以接收请求。

一旦这两个条件都满足了,clientInstallWriteHandler 函数就会把客户端标识设置为 CLIENT_PENDING_WRITE,表示推迟该客户端的写操作。同时,clientInstallWriteHandler 函数会把这个客户端添加到全局变量 server 的待写回客户端列表中,也就是 clients_pending_write 列表中。

为了便于你更好地理解,我画了一张图,展示了 Redis 推迟客户端写操作的函数调用关系,你可以再回顾下。

不过,当 Redis 使用 clients_pending_read 和 clients_pending_write 两个列表,保存了推迟执行的客户端后,这些客户端又是如何分配给多 IO 线程执行的呢?这就和下面两个函数相关了。

  • handleClientsWithPendingReadsUsingThreads 函数:该函数主要负责将 clients_pending_read 列表中的客户端分配给 IO 线程进行处理。
  • handleClientsWithPendingWritesUsingThreads 函数:该函数主要负责将 clients_pending_write 列表中的客户端分配给 IO 线程进行处理。

所以接下来,我们就来看下这两个函数的具体操作。

如何把待读客户端分配给 IO 线程执行?

首先,我们来了解 handleClientsWithPendingReadsUsingThreads 函数。这个函数是在 beforeSleep 函数中调用的。

在 Redis 6.0 版本的代码中,事件驱动框架同样是调用 aeMain 函数来执行事件循环流程,该循环流程会调用 aeProcessEvents 函数处理各种事件。而在 aeProcessEvents 函数实际调用 aeApiPoll 函数捕获 IO 事件之前,beforeSleep 函数会被调用。

这个过程如下图所示,你可以看下。

handleClientsWithPendingReadsUsingThreads 函数的主要执行逻辑可以分成四步。

  • 第一步:该函数会先根据全局变量 server 的 io_threads_active 成员变量,判定 IO 线程是否激活,并且根据 server 的 io_threads_do_reads 成员变量,判定用户是否设置了 Redis 可以用 IO 线程处理待读客户端。只有在 IO 线程激活,并且 IO 线程可以用于处理待读客户端时,handleClientsWithPendingReadsUsingThreads 函数才会继续执行,否则该函数就直接结束返回了。这一步的判断逻辑如以下代码所示:
if (!server.io_threads_active || !server.io_threads_do_reads) 
   return 0;
  • 第二步handleClientsWithPendingReadsUsingThreads 函数会获取 clients_pending_read 列表的长度,这代表了要处理的待读客户端个数。然后,该函数会从 clients_pending_read 列表中逐一取出待处理的客户端,并用客户端在列表中的序号,对 IO 线程数量进行取模运算。

为了便于你理解,我来给你举个例子。

假设 IO 线程数量设置为 3,clients_pending_read 列表中一共有 5 个待读客户端,它们在列表中的序号分别是 0,1,2,3 和 4。在这一步中,0 号到 4 号客户端对线程数量 3 取模的结果分别是 0,1,2,0,1,这也对应了即将处理这些客户端的 IO 线程编号。这也就是说,0 号客户端由 0 号线程处理,1 号客户端有 1 号线程处理,以此类推。你可以看到,这个分配方式其实就是把待处理客户端,以轮询方式逐一分配给各个 IO 线程。

我画了下面这张图,展示了这个分配结果,你可以再看下。

以下代码展示的就是以轮询方式将客户端分配给 IO 线程的执行逻辑:

int processed = listLength(server.clients_pending_read);
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
 }

这样,当 handleClientsWithPendingReadsUsingThreads 函数完成客户端的 IO 线程分配之后,它会将 IO 线程的操作标识设置为读操作,也就是 IO_THREADS_OP_READ。然后,它会遍历 io_threads_list 数组中的每个元素列表长度,等待每个线程处理的客户端数量,赋值给 io_threads_pending 数组。这一过程如下所示:

/* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
// 通过设置启动条件 atomic var 为等待线程提供启动条件。
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
    int count = listLength(io_threads_list[j]);
    setIOPendingCount(j, count);
}

setIOPendingCount函数原型如下:

static inline void setIOPendingCount(int i, unsigned long count) {
    atomicSetWithSync(io_threads_pending[i], count);
}
  • 第三步handleClientsWithPendingReadsUsingThreads 函数会将 io_threads_list 数组 0 号列表(也就是 io_threads_list[0]元素)中的待读客户端逐一取出来,并调用 readQueryFromClient 函数进行处理。其实,handleClientsWithPendingReadsUsingThreads 函数本身就是由 IO 主线程执行的,而 io_threads_list 数组对应的 0 号线程正是 IO 主线程,所以,这里就是让主 IO 线程来处理它的待读客户端。
/* Also use the main thread to process a slice of clients. */
// 使用主线程来处理一部分客户端。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
    client *c = listNodeValue(ln);
    readQueryFromClient(c->conn);
}
// 处理完后,清空0号列表,也就是清空主线程里的客户端
listEmpty(io_threads_list[0]);

紧接着,handleClientsWithPendingReadsUsingThreads 函数会执行一个 while(1) 循环,等待所有 IO 线程完成待读客户端的处理,如下所示:

/* Wait for all the other threads to end their work. */
while(1) {
    unsigned long pending = 0;
    for (int j = 1; j < server.io_threads_num; j++)
        pending += getIOPendingCount(j);
    if (pending == 0) break;
}
  • 第四步handleClientsWithPendingReadsUsingThreads 函数会再次遍历一遍 clients_pending_read 列表,依次取出其中的客户端。紧接着,它会判断客户端的标识中是否有 CLIENT_PENDING_COMMAND。如果有 CLIENT_PENDING_COMMAND 标识,表明该客户端中的命令已经被某一个 IO 线程解析过,已经可以被执行了。

此时,handleClientsWithPendingReadsUsingThreads 函数会调用 processCommandAndResetClient 函数执行命令。最后,它会直接调用 processInputBuffer 函数解析客户端中所有命令并执行。

这部分的代码逻辑如下所示,你可以看下。

/* Run the list of clients again to process the new buffers. */
// 再次运行客户端列表以处理新缓冲区
while(listLength(server.clients_pending_read)) {
    ln = listFirst(server.clients_pending_read);
    client *c = listNodeValue(ln);
    c->flags &= ~CLIENT_PENDING_READ;
    listDelNode(server.clients_pending_read,ln);
    // 断言客户端是否阻塞,没被阻塞才可以继续
    serverAssert(!(c->flags & CLIENT_BLOCKED));
    // 如果客户端不再有效,我们避免稍后处理客户端。所以我们只是去下一个。
    if (processPendingCommandsAndResetClient(c) == C_ERR) {
        /* If the client is no longer valid, we avoid
         * processing the client later. So we just go
         * to the next. */
        continue;
    }
    // 解析并执行命令
    processInputBuffer(c);
    /* We may have pending replies if a thread readQueryFromClient() produced
     * replies and did not install a write handler (it can't).
     */
    // 如果线程 readQueryFromClient() 产生了回复并且没有去进行写处理(它不能),我们可能会有待处理的回复。
    if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
        clientInstallWriteHandler(c);
}

好了,到这里,你就了解了 clients_pending_read 列表中的待读客户端,是如何经过以上四个步骤来分配给 IO 线程进行处理的。下图展示了这个主要过程,你可以再回顾下:

那么,接下来,我们再来看下待写客户端的分配和处理。

如何把待写客户端分配给 IO 线程执行?

和待读客户端的分配处理类似,待写客户端分配处理是由 handleClientsWithPendingWritesUsingThreads 函数来完成的。该函数也是在 beforeSleep 函数中被调用的。

handleClientsWithPendingWritesUsingThreads 函数的主要流程同样也可以分成 4 步,其中,第 2、3 和 4 步的执行逻辑,和 handleClientsWithPendingReadsUsingThreads 函数类似。

简单来说,在第 2 步,handleClientsWithPendingWritesUsingThreads 函数会把待写客户端,按照轮询方式分配给 IO 线程,添加到 io_threads_list 数组各元素中。

然后,在第 3 步,handleClientsWithPendingWritesUsingThreads 函数会让主 IO 线程处理其待写客户端,并执行 while(1) 循环等待所有 IO 线程完成处理。

在第 4 步,handleClientsWithPendingWritesUsingThreads 函数会再次检查 clients_pending_write 列表中,是否还有待写的客户端。如果有的话,并且这些客户端还有留存在缓冲区中的数据,那么,handleClientsWithPendingWritesUsingThreads 函数就会调用 connSetWriteHandler 函数注册可写事件,而这个可写事件对应的回调函数是 sendReplyToClient 函数。

等到事件循环流程再次执行时,刚才 handleClientsWithPendingWritesUsingThreads 函数注册的可写事件就会被处理,紧接着 sendReplyToClient 函数会执行,它会直接调用 writeToClient 函数,把客户端缓冲区中的数据写回。

这里,你需要注意的是,connSetWriteHandler 函数最终会映射为 connSocketSetWriteHandler 函数,而 connSocketSetWriteHandler 函数是在connection.c文件中实现的。connSocketSetWriteHandler 函数会调用 aeCreateFileEvent 函数创建 AE_WRITABLE 事件,这就是刚才介绍的可写事件的注册(关于 aeCreateFileEvent 函数的使用,你也可以再回顾之前的)。

不过,和 handleClientsWithPendingReadsUsingThreads 函数不同的是在第 1 步,handleClientsWithPendingWritesUsingThreads 函数,会判断 IO 线程数量是否为 1,或者待写客户端数量是否小于 IO 线程数量的 2 倍。

如果这两个条件中有一个条件成立,那么 handleClientsWithPendingWritesUsingThreads 函数就不会用多线程来处理客户端了,而是会调用 handleClientsWithPendingWrites 函数由主 IO 线程直接处理待写客户端。这样做的目的,主要是为了在待写客户端数量不多时,避免采用多线程,从而节省 CPU 开销

这一步的条件判断逻辑如下所示。其中,stopThreadedIOIfNeeded 函数主要是用来判断待写客户端数量,是否不足为 IO 线程数量的 2 倍。

/* If I/O threads are disabled or we have few clients to serve, don't
 * use I/O threads, but the boring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
    return handleClientsWithPendingWrites();
}

stopThreadedIOIfNeeded函数原型如下:

int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);
    /* Return ASAP if IO threads are disabled (single threaded mode). */
    if (server.io_threads_num == 1) return 1;
    // 满足代写的客户端数量是小于io_threads_num的2倍返回1,否则返回0
    if (pending < (server.io_threads_num*2)) {
        if (server.io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}

另外,handleClientsWithPendingWritesUsingThreads 函数在第 1 步中,还会判断 IO 线程是否已激活。如果没有激活,它就会调用 startThreadedIO 函数,把全局变量 server 的 io_threads_active 成员变量值设置为 1,表示 IO 线程已激活。这步判断操作如下所示:

if (!server.io_threads_active) startThreadedIO();

整个handleClientsWithPendingWritesUsingThreads函数原型如下:

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */
    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but the boring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }
    /* Start threads if needed. */
    if (!server.io_threads_active) startThreadedIO();
    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }
    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }
    /* Run the list of clients again to install the write handler where
     * needed. */
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        /* Install the write handler if there are pending writes in some
         * of the clients. */
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    /* Update processed count on server */
    server.stat_io_writes_processed += processed;
    return processed;
}

总之你要知道的就是,Redis 是通过 handleClientsWithPendingWritesUsingThreads 函数,把待写客户端按轮询方式分配给各个 IO 线程,并由它们来负责写回数据的。

总结

今天这篇文章,我给你介绍了 Redis 6.0 中新设计实现的多 IO 线程机制。这个机制的设计主要是为了使用多个 IO 线程,来并发处理客户端读取数据、解析命令和写回数据。使用了多线程后,Redis 就可以充分利用服务器的多核特性,从而提高 IO 效率。

总结来说,Redis 6.0 先是在初始化过程中,根据用户设置的 IO 线程数量,创建对应数量的 IO 线程。当 Redis server 初始化完成后正常运行时,它会在 readQueryFromClient 函数中通过调用 postponeClientRead 函数来决定是否推迟客户端读操作。同时,Redis server 会在 addReply 函数中通过调用 prepareClientToWrite 函数,来决定是否推迟客户端写操作。而待读写的客户端会被分别加入到 clients_pending_read 和 clients_pending_write 两个列表中。这样,每当 Redis server 要进入事件循环流程前,都会在 beforeSleep 函数中分别调用 handleClientsWithPendingReadsUsingThreads 函数和 handleClientsWithPendingWritesUsingThreads 函数,将待读写客户端以轮询方式分配给 IO 线程,加入到 IO 线程的待处理客户端列表 io_threads_list 中。

而 IO 线程一旦运行后,本身会一直检测 io_threads_list 中的客户端,如果有待读写客户端,IO 线程就会调用 readQueryFromClient 或 writeToClient 函数来进行处理。最后,我也想再提醒你一下,多 IO 线程本身并不会执行命令,它们只是利用多核并行地读取数据和解析命令,或是将 server 数据写回(下节课我还会结合分布式锁的原子性保证,来给你介绍这一部分的源码实现。)。所以,Redis 执行命令的线程还是主 IO 线程。这一点对于你理解多 IO 线程机制很重要,可以避免你误解 Redis 有多线程同时执行命令。这样一来,我们原来针对 Redis 单个主 IO 线程做的优化仍然有效,比如避免 bigkey、避免阻塞操作等。

  • 1、Redis 6.0 之前,处理客户端请求是单线程,这种模型的缺点是,只能用到「单核」CPU。如果并发量很高,那么在读写客户端数据时,容易引发性能瓶颈,所以 Redis 6.0 引入了多 IO 线程解决这个问题
  • 2、配置文件开启 io-threads N 后,Redis Server 启动时,会启动 N - 1 个 IO 线程(主线程也算一个 IO 线程),这些 IO 线程执行的逻辑是 networking.c 的 IOThreadMain 函数。但默认只开启多线程「写」client socket,如果要开启多线程「读」,还需配置 io-threads-do-reads = yes
  • 3、Redis 在读取客户端请求时,判断如果开启了 IO 多线程,则把这个 client 放到 clients_pending_read 链表中(postponeClientRead 函数),之后主线程在处理每次事件循环之前,把链表数据轮询放到 IO 线程的链表(io_threads_list)中
  • 4、同样地,在写回响应时,是把 client 放到 clients_pending_write 中(prepareClientToWrite 函数),执行事件循环之前把数据轮询放到 IO 线程的链表(io_threads_list)中
  • 5、主线程把 client 分发到 IO 线程时,自己也会读写客户端 socket(主线程也要分担一部分读写操作),之后「等待」所有 IO 线程完成读写,再由主线程「串行」执行后续逻辑
  • 6、每个 IO 线程,不停地从 io_threads_list 链表中取出 client,并根据指定类型读、写 client socket
  • 7、IO 线程在处理读、写 client 时有些许差异,如果 write_client_pedding < io_threads * 2,则直接由「主线程」负责写,不再交给 IO 线程处理,从而节省 CPU 消耗
  • 8、Redis 官方建议,服务器最少 4 核 CPU 才建议开启 IO 多线程,4 核 CPU 建议开 2-3 个 IO 线程,8 核 CPU 开 6 个 IO 线程,超过 8 个线程性能提升不大
  • 9、Redis 官方表示,开启多 IO 线程后,性能可提升 1 倍。当然,如果 Redis 性能足够用,没必要开 IO 线程

课后题:为什么 startThreadedIO / stopThreadedIO 要执行加解锁?

是为了方便主线程动态,灵活调整IO线程而设计的当clients数量较少的时候可以方便直接停止IO线程。停止IO线程的阈值是,当等待写的client客户端数量小于IO线程数量的两倍,就会停止IO线程避免多线程带来不必要的开销

回归代码:

1、stopThreadedIO,startThreadedIO 和 stopThreadedIOIfNeeded这三个函数中有体现,其中在stopThreadedIOIfNeeded中会判断当前待写出客户端数量是否大于2倍IO线程数量,如果不是则会调用stopThreadedIO函数通过io_threads_mutex的方式停止所有IO线程(主线程除外,因为index是从1开始的)并且将io_threads_active设置为0,并且后续调用stopThreadedIOIfNeeded函数会返回0,在handleClientsWithPendingWritesUsingThreads函数中会直接调用handleClientsWithPendingWrites来使用单线程进行写出。

流程如下:

第一次:handleClientsWithPendingWritesUsingThreads -> stopThreadedIOIfNeeded -> stopThreadedIO -> 设置io_threads_active为0并lock住IO线程

第二次: handleClientsWithPendingWritesUsingThreads -> stopThreadedIOIfNeeded -> 直接返回1 -> handleClientsWithPendingWrites进行单线程处理

2、当待写出client的数量上来的时候,stopThreadedIOIfNeeded函数中判断,待写出client数量大于2倍IO线程数量,返回0,然后调用startThreadedIO激活IO线程

流程如下:

handleClientsWithPendingWritesUsingThreads -> stopThreadedIOIfNeeded(发现不满足需要IO线程,返回0) -> startThreadedIO(激活IO线程) -> 设置io_threads_active为1

此外注意:IO线程一定是处理完了所有client之后,才会倍lock,在IOThreadMain有一个条件 if (getIOPendingCount(id) == 0)

既然涉及到加锁操作,必然是为了「互斥」从而控制某些逻辑。可以在代码中检索这个锁变量,看存在哪些逻辑对 io_threads_mutex 操作了加解锁。

跟踪代码可以看到,在 networking.c 的 IOThreadMain 函数,也对这个变量进行了加解锁操作,那就说明 startThreadedIO / stopThreadedIO 函数,可以控制 IOThreadMain 里逻辑的执行,IOThreadMain 代码如下。

void *IOThreadMain(void *myid) {
  ...
  while(1) {
    ...
    /* Give the main thread a chance to stop this thread. */
    if (io_threads_pending[id] == 0) {
      pthread_mutex_lock(&io_threads_mutex[id]);
      pthread_mutex_unlock(&io_threads_mutex[id]);
      continue;
    }
    // 读写 client socket
    // ...
  }
}

这个函数正是 IO 多线程的主逻辑。

从注释可以看到,这是为了给主线程停止 IO 线程的的机会。也就是说,这里的目的是为了让主线程可以控制 IO 线程的开启 / 暂停。

因为每次 IO 线程在执行时必须先拿到锁,才能执行后面的逻辑,如果主线程执行了 stopThreadedIO,就会先拿到锁,那么 IOThreadMain 函数在执行时就会因为拿不到锁阻塞「等待」,这就达到了 stop IO 线程的目的。

同样地,调用 startThreadedIO 函数后,会释放锁,IO 线程就可以拿到锁,继续「恢复」执行。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4天前
|
算法 数据处理 Python
Python并发编程:解密异步IO与多线程
本文将深入探讨Python中的并发编程技术,重点介绍异步IO和多线程两种常见的并发模型。通过对比它们的特点、适用场景和实现方式,帮助读者更好地理解并发编程的核心概念,并掌握在不同场景下选择合适的并发模型的方法。
|
4天前
|
存储 缓存 NoSQL
Redis单线程已经很快了6.0引入多线程
Redis单线程已经很快了6.0引入多线程
35 3
|
4天前
|
NoSQL 数据处理 调度
【Redis深度专题】「踩坑技术提升」探索Redis 6.0为何必须启用多线程以提升性能与效率
【Redis深度专题】「踩坑技术提升」探索Redis 6.0为何必须启用多线程以提升性能与效率
293 0
|
1天前
|
NoSQL Redis 缓存
【后端面经】【缓存】36|Redis 单线程:为什么 Redis 用单线程而 Memcached 用多线程?
【5月更文挑战第17天】Redis常被称为单线程,但实际上其在处理命令时采用单线程,但在6.0后IO变为多线程。持久化和数据同步等任务由额外线程处理,因此严格来说Redis是多线程的。面试时需理解Redis的IO模型,如epoll和Reactor模式,以及其内存操作带来的高性能。Redis使用epoll进行高效文件描述符管理,实现高性能的网络IO。在讨论Redis与Memcached的线程模型差异时,应强调Redis的单线程模型如何通过内存操作和高效IO实现高性能。
23 7
【后端面经】【缓存】36|Redis 单线程:为什么 Redis 用单线程而 Memcached 用多线程?
|
4天前
|
存储 NoSQL Redis
深入浅出Redis(二):Redis单线程模型与通信流程
深入浅出Redis(二):Redis单线程模型与通信流程
|
4天前
|
NoSQL Redis
Redis 线程模型
Redis 线程模型
|
4天前
|
并行计算 数据处理 开发者
Python并发编程:解析异步IO与多线程
本文探讨了Python中的并发编程技术,着重比较了异步IO和多线程两种常见的并发模型。通过详细分析它们的特点、优劣势以及适用场景,帮助读者更好地理解并选择适合自己项目需求的并发编程方式。
|
4天前
|
存储 消息中间件 缓存
jeecgboot运行磁盘不足问题( java.io.IOException)和redis闪退问题
jeecgboot运行磁盘不足问题( java.io.IOException)和redis闪退问题
22 0
|
4天前
|
存储 缓存 NoSQL
为什么Redis使用单线程 性能会优于多线程?
在计算机领域,性能一直都是一个关键的话题。无论是应用开发还是系统优化,我们都需要关注如何在有限的资源下,实现最大程度的性能提升。Redis,作为一款高性能的开源内存数据库,因其出色的单线程性能而备受瞩目。那么,为什么Redis使用单线程性能会优于多线程呢?
25 1
|
4天前
|
NoSQL Java Redis
【问题篇】解决Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException
【问题篇】解决Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException
474 0

热门文章

最新文章