Redis6.0的多IO线程(二)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Redis6.0的多IO线程

如何推迟客户端写操作?

Redis 在执行了客户端命令,要给客户端返回结果时,会调用 addReply 函数将待返回结果写入客户端输出缓冲区。

而在 addReply 函数的一开始,该函数会调用 prepareClientToWrite 函数,来判断是否推迟执行客户端写操作。下面代码展示了 addReply 函数对 prepareClientToWrite 函数的调用,你可以看下。

/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;
    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyProtoToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

所以这里,我们继续来看下 prepareClientToWrite 函数。这个函数会根据客户端设置的标识进行一系列的判断。其中,该函数会调用 clientHasPendingReplies 函数,判断当前客户端是否还有留存在输出缓冲区中的数据等待写回。

如果没有的话,那么,prepareClientToWrite 就会调用 clientInstallWriteHandler 函数,再进一步判断能否推迟该客户端写操作。下面的代码展示了这一调用过程,你可以看下。

int prepareClientToWrite(client *c) {
    /* If it's the Lua client we always return ok without installing any
     * handler since there is no socket at all. */
    if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
    /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
    if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
    /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
    if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
    /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
     * is set. */
    if ((c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
    if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
    /* Schedule the client to write the output buffers to the socket, unless
     * it should already be setup to do so (it has already pending data).
     *
     * If CLIENT_PENDING_READ is set, we're in an IO thread and should
     * not install a write handler. Instead, it will be done by
     * handleClientsWithPendingReadsUsingThreads() upon return.
     */
    //如果当前客户端没有待写回数据,调用clientInstallWriteHandler函数
    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
            clientInstallWriteHandler(c);
    /* Authorize the caller to queue in the output buffer of this client. */
    return C_OK;
}

那么这样一来,我们其实就知道了,能否推迟客户端写操作,最终是由 clientInstallWriteHandler 函数来决定的,这个函数会判断两个条件。

networking.c文件中查看

void clientInstallWriteHandler(client *c) {
    /* Schedule the client to write the output buffers to the socket only
     * if not already done and, for slaves, if the slave can actually receive
     * writes at this stage. */
     // 如果客户端没有设置过CLIENT_PENDING_WRITE标识,并且客户端没有在进行主从复制,或者客户端是主从复制中的从节点,已经能接收请求
    if (!(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
    {
        /* Here instead of installing the write handler, we just flag the
         * client and put it into a list of clients that have something
         * to write to the socket. This way before re-entering the event
         * loop, we can try to directly write to the client sockets avoiding
         * a system call. We'll only really install the write handler if
         * we'll not be able to write the whole reply at once. */
        // 将客户端的标识设置为待写回,即CLIENT_PENDING_WRITE
        c->flags |= CLIENT_PENDING_WRITE;
        // 将可获得加入clients_pending_write列表
        listAddNodeHead(server.clients_pending_write,c);
    }
}
  • 条件一:客户端没有设置过 CLIENT_PENDING_WRITE 标识,即没有被推迟过执行写操作。
  • 条件二:客户端所在实例没有进行主从复制,或者客户端所在实例是主从复制中的从节点,但全量复制的 RDB 文件已经传输完成,客户端可以接收请求。

一旦这两个条件都满足了,clientInstallWriteHandler 函数就会把客户端标识设置为 CLIENT_PENDING_WRITE,表示推迟该客户端的写操作。同时,clientInstallWriteHandler 函数会把这个客户端添加到全局变量 server 的待写回客户端列表中,也就是 clients_pending_write 列表中。

为了便于你更好地理解,我画了一张图,展示了 Redis 推迟客户端写操作的函数调用关系,你可以再回顾下。

不过,当 Redis 使用 clients_pending_read 和 clients_pending_write 两个列表,保存了推迟执行的客户端后,这些客户端又是如何分配给多 IO 线程执行的呢?这就和下面两个函数相关了。

  • handleClientsWithPendingReadsUsingThreads 函数:该函数主要负责将 clients_pending_read 列表中的客户端分配给 IO 线程进行处理。
  • handleClientsWithPendingWritesUsingThreads 函数:该函数主要负责将 clients_pending_write 列表中的客户端分配给 IO 线程进行处理。

所以接下来,我们就来看下这两个函数的具体操作。

如何把待读客户端分配给 IO 线程执行?

首先,我们来了解 handleClientsWithPendingReadsUsingThreads 函数。这个函数是在 beforeSleep 函数中调用的。

在 Redis 6.0 版本的代码中,事件驱动框架同样是调用 aeMain 函数来执行事件循环流程,该循环流程会调用 aeProcessEvents 函数处理各种事件。而在 aeProcessEvents 函数实际调用 aeApiPoll 函数捕获 IO 事件之前,beforeSleep 函数会被调用。

这个过程如下图所示,你可以看下。

handleClientsWithPendingReadsUsingThreads 函数的主要执行逻辑可以分成四步。

  • 第一步:该函数会先根据全局变量 server 的 io_threads_active 成员变量,判定 IO 线程是否激活,并且根据 server 的 io_threads_do_reads 成员变量,判定用户是否设置了 Redis 可以用 IO 线程处理待读客户端。只有在 IO 线程激活,并且 IO 线程可以用于处理待读客户端时,handleClientsWithPendingReadsUsingThreads 函数才会继续执行,否则该函数就直接结束返回了。这一步的判断逻辑如以下代码所示:
if (!server.io_threads_active || !server.io_threads_do_reads) 
   return 0;
  • 第二步handleClientsWithPendingReadsUsingThreads 函数会获取 clients_pending_read 列表的长度,这代表了要处理的待读客户端个数。然后,该函数会从 clients_pending_read 列表中逐一取出待处理的客户端,并用客户端在列表中的序号,对 IO 线程数量进行取模运算。

为了便于你理解,我来给你举个例子。

假设 IO 线程数量设置为 3,clients_pending_read 列表中一共有 5 个待读客户端,它们在列表中的序号分别是 0,1,2,3 和 4。在这一步中,0 号到 4 号客户端对线程数量 3 取模的结果分别是 0,1,2,0,1,这也对应了即将处理这些客户端的 IO 线程编号。这也就是说,0 号客户端由 0 号线程处理,1 号客户端有 1 号线程处理,以此类推。你可以看到,这个分配方式其实就是把待处理客户端,以轮询方式逐一分配给各个 IO 线程。

我画了下面这张图,展示了这个分配结果,你可以再看下。

以下代码展示的就是以轮询方式将客户端分配给 IO 线程的执行逻辑:

int processed = listLength(server.clients_pending_read);
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
 }

这样,当 handleClientsWithPendingReadsUsingThreads 函数完成客户端的 IO 线程分配之后,它会将 IO 线程的操作标识设置为读操作,也就是 IO_THREADS_OP_READ。然后,它会遍历 io_threads_list 数组中的每个元素列表长度,等待每个线程处理的客户端数量,赋值给 io_threads_pending 数组。这一过程如下所示:

/* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
// 通过设置启动条件 atomic var 为等待线程提供启动条件。
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
    int count = listLength(io_threads_list[j]);
    setIOPendingCount(j, count);
}

setIOPendingCount函数原型如下:

static inline void setIOPendingCount(int i, unsigned long count) {
    atomicSetWithSync(io_threads_pending[i], count);
}
  • 第三步handleClientsWithPendingReadsUsingThreads 函数会将 io_threads_list 数组 0 号列表(也就是 io_threads_list[0]元素)中的待读客户端逐一取出来,并调用 readQueryFromClient 函数进行处理。其实,handleClientsWithPendingReadsUsingThreads 函数本身就是由 IO 主线程执行的,而 io_threads_list 数组对应的 0 号线程正是 IO 主线程,所以,这里就是让主 IO 线程来处理它的待读客户端。
/* Also use the main thread to process a slice of clients. */
// 使用主线程来处理一部分客户端。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
    client *c = listNodeValue(ln);
    readQueryFromClient(c->conn);
}
// 处理完后,清空0号列表,也就是清空主线程里的客户端
listEmpty(io_threads_list[0]);

紧接着,handleClientsWithPendingReadsUsingThreads 函数会执行一个 while(1) 循环,等待所有 IO 线程完成待读客户端的处理,如下所示:

/* Wait for all the other threads to end their work. */
while(1) {
    unsigned long pending = 0;
    for (int j = 1; j < server.io_threads_num; j++)
        pending += getIOPendingCount(j);
    if (pending == 0) break;
}
  • 第四步handleClientsWithPendingReadsUsingThreads 函数会再次遍历一遍 clients_pending_read 列表,依次取出其中的客户端。紧接着,它会判断客户端的标识中是否有 CLIENT_PENDING_COMMAND。如果有 CLIENT_PENDING_COMMAND 标识,表明该客户端中的命令已经被某一个 IO 线程解析过,已经可以被执行了。

此时,handleClientsWithPendingReadsUsingThreads 函数会调用 processCommandAndResetClient 函数执行命令。最后,它会直接调用 processInputBuffer 函数解析客户端中所有命令并执行。

这部分的代码逻辑如下所示,你可以看下。

/* Run the list of clients again to process the new buffers. */
// 再次运行客户端列表以处理新缓冲区
while(listLength(server.clients_pending_read)) {
    ln = listFirst(server.clients_pending_read);
    client *c = listNodeValue(ln);
    c->flags &= ~CLIENT_PENDING_READ;
    listDelNode(server.clients_pending_read,ln);
    // 断言客户端是否阻塞,没被阻塞才可以继续
    serverAssert(!(c->flags & CLIENT_BLOCKED));
    // 如果客户端不再有效,我们避免稍后处理客户端。所以我们只是去下一个。
    if (processPendingCommandsAndResetClient(c) == C_ERR) {
        /* If the client is no longer valid, we avoid
         * processing the client later. So we just go
         * to the next. */
        continue;
    }
    // 解析并执行命令
    processInputBuffer(c);
    /* We may have pending replies if a thread readQueryFromClient() produced
     * replies and did not install a write handler (it can't).
     */
    // 如果线程 readQueryFromClient() 产生了回复并且没有去进行写处理(它不能),我们可能会有待处理的回复。
    if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
        clientInstallWriteHandler(c);
}

好了,到这里,你就了解了 clients_pending_read 列表中的待读客户端,是如何经过以上四个步骤来分配给 IO 线程进行处理的。下图展示了这个主要过程,你可以再回顾下:

那么,接下来,我们再来看下待写客户端的分配和处理。

如何把待写客户端分配给 IO 线程执行?

和待读客户端的分配处理类似,待写客户端分配处理是由 handleClientsWithPendingWritesUsingThreads 函数来完成的。该函数也是在 beforeSleep 函数中被调用的。

handleClientsWithPendingWritesUsingThreads 函数的主要流程同样也可以分成 4 步,其中,第 2、3 和 4 步的执行逻辑,和 handleClientsWithPendingReadsUsingThreads 函数类似。

简单来说,在第 2 步,handleClientsWithPendingWritesUsingThreads 函数会把待写客户端,按照轮询方式分配给 IO 线程,添加到 io_threads_list 数组各元素中。

然后,在第 3 步,handleClientsWithPendingWritesUsingThreads 函数会让主 IO 线程处理其待写客户端,并执行 while(1) 循环等待所有 IO 线程完成处理。

在第 4 步,handleClientsWithPendingWritesUsingThreads 函数会再次检查 clients_pending_write 列表中,是否还有待写的客户端。如果有的话,并且这些客户端还有留存在缓冲区中的数据,那么,handleClientsWithPendingWritesUsingThreads 函数就会调用 connSetWriteHandler 函数注册可写事件,而这个可写事件对应的回调函数是 sendReplyToClient 函数。

等到事件循环流程再次执行时,刚才 handleClientsWithPendingWritesUsingThreads 函数注册的可写事件就会被处理,紧接着 sendReplyToClient 函数会执行,它会直接调用 writeToClient 函数,把客户端缓冲区中的数据写回。

这里,你需要注意的是,connSetWriteHandler 函数最终会映射为 connSocketSetWriteHandler 函数,而 connSocketSetWriteHandler 函数是在connection.c文件中实现的。connSocketSetWriteHandler 函数会调用 aeCreateFileEvent 函数创建 AE_WRITABLE 事件,这就是刚才介绍的可写事件的注册(关于 aeCreateFileEvent 函数的使用,你也可以再回顾之前的)。

不过,和 handleClientsWithPendingReadsUsingThreads 函数不同的是在第 1 步,handleClientsWithPendingWritesUsingThreads 函数,会判断 IO 线程数量是否为 1,或者待写客户端数量是否小于 IO 线程数量的 2 倍。

如果这两个条件中有一个条件成立,那么 handleClientsWithPendingWritesUsingThreads 函数就不会用多线程来处理客户端了,而是会调用 handleClientsWithPendingWrites 函数由主 IO 线程直接处理待写客户端。这样做的目的,主要是为了在待写客户端数量不多时,避免采用多线程,从而节省 CPU 开销

这一步的条件判断逻辑如下所示。其中,stopThreadedIOIfNeeded 函数主要是用来判断待写客户端数量,是否不足为 IO 线程数量的 2 倍。

/* If I/O threads are disabled or we have few clients to serve, don't
 * use I/O threads, but the boring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
    return handleClientsWithPendingWrites();
}

stopThreadedIOIfNeeded函数原型如下:

int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);
    /* Return ASAP if IO threads are disabled (single threaded mode). */
    if (server.io_threads_num == 1) return 1;
    // 满足代写的客户端数量是小于io_threads_num的2倍返回1,否则返回0
    if (pending < (server.io_threads_num*2)) {
        if (server.io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}

另外,handleClientsWithPendingWritesUsingThreads 函数在第 1 步中,还会判断 IO 线程是否已激活。如果没有激活,它就会调用 startThreadedIO 函数,把全局变量 server 的 io_threads_active 成员变量值设置为 1,表示 IO 线程已激活。这步判断操作如下所示:

if (!server.io_threads_active) startThreadedIO();

整个handleClientsWithPendingWritesUsingThreads函数原型如下:

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */
    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but the boring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }
    /* Start threads if needed. */
    if (!server.io_threads_active) startThreadedIO();
    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }
    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }
    /* Run the list of clients again to install the write handler where
     * needed. */
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        /* Install the write handler if there are pending writes in some
         * of the clients. */
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);
    /* Update processed count on server */
    server.stat_io_writes_processed += processed;
    return processed;
}

总之你要知道的就是,Redis 是通过 handleClientsWithPendingWritesUsingThreads 函数,把待写客户端按轮询方式分配给各个 IO 线程,并由它们来负责写回数据的。

总结

今天这篇文章,我给你介绍了 Redis 6.0 中新设计实现的多 IO 线程机制。这个机制的设计主要是为了使用多个 IO 线程,来并发处理客户端读取数据、解析命令和写回数据。使用了多线程后,Redis 就可以充分利用服务器的多核特性,从而提高 IO 效率。

总结来说,Redis 6.0 先是在初始化过程中,根据用户设置的 IO 线程数量,创建对应数量的 IO 线程。当 Redis server 初始化完成后正常运行时,它会在 readQueryFromClient 函数中通过调用 postponeClientRead 函数来决定是否推迟客户端读操作。同时,Redis server 会在 addReply 函数中通过调用 prepareClientToWrite 函数,来决定是否推迟客户端写操作。而待读写的客户端会被分别加入到 clients_pending_read 和 clients_pending_write 两个列表中。这样,每当 Redis server 要进入事件循环流程前,都会在 beforeSleep 函数中分别调用 handleClientsWithPendingReadsUsingThreads 函数和 handleClientsWithPendingWritesUsingThreads 函数,将待读写客户端以轮询方式分配给 IO 线程,加入到 IO 线程的待处理客户端列表 io_threads_list 中。

而 IO 线程一旦运行后,本身会一直检测 io_threads_list 中的客户端,如果有待读写客户端,IO 线程就会调用 readQueryFromClient 或 writeToClient 函数来进行处理。最后,我也想再提醒你一下,多 IO 线程本身并不会执行命令,它们只是利用多核并行地读取数据和解析命令,或是将 server 数据写回(下节课我还会结合分布式锁的原子性保证,来给你介绍这一部分的源码实现。)。所以,Redis 执行命令的线程还是主 IO 线程。这一点对于你理解多 IO 线程机制很重要,可以避免你误解 Redis 有多线程同时执行命令。这样一来,我们原来针对 Redis 单个主 IO 线程做的优化仍然有效,比如避免 bigkey、避免阻塞操作等。

  • 1、Redis 6.0 之前,处理客户端请求是单线程,这种模型的缺点是,只能用到「单核」CPU。如果并发量很高,那么在读写客户端数据时,容易引发性能瓶颈,所以 Redis 6.0 引入了多 IO 线程解决这个问题
  • 2、配置文件开启 io-threads N 后,Redis Server 启动时,会启动 N - 1 个 IO 线程(主线程也算一个 IO 线程),这些 IO 线程执行的逻辑是 networking.c 的 IOThreadMain 函数。但默认只开启多线程「写」client socket,如果要开启多线程「读」,还需配置 io-threads-do-reads = yes
  • 3、Redis 在读取客户端请求时,判断如果开启了 IO 多线程,则把这个 client 放到 clients_pending_read 链表中(postponeClientRead 函数),之后主线程在处理每次事件循环之前,把链表数据轮询放到 IO 线程的链表(io_threads_list)中
  • 4、同样地,在写回响应时,是把 client 放到 clients_pending_write 中(prepareClientToWrite 函数),执行事件循环之前把数据轮询放到 IO 线程的链表(io_threads_list)中
  • 5、主线程把 client 分发到 IO 线程时,自己也会读写客户端 socket(主线程也要分担一部分读写操作),之后「等待」所有 IO 线程完成读写,再由主线程「串行」执行后续逻辑
  • 6、每个 IO 线程,不停地从 io_threads_list 链表中取出 client,并根据指定类型读、写 client socket
  • 7、IO 线程在处理读、写 client 时有些许差异,如果 write_client_pedding < io_threads * 2,则直接由「主线程」负责写,不再交给 IO 线程处理,从而节省 CPU 消耗
  • 8、Redis 官方建议,服务器最少 4 核 CPU 才建议开启 IO 多线程,4 核 CPU 建议开 2-3 个 IO 线程,8 核 CPU 开 6 个 IO 线程,超过 8 个线程性能提升不大
  • 9、Redis 官方表示,开启多 IO 线程后,性能可提升 1 倍。当然,如果 Redis 性能足够用,没必要开 IO 线程

课后题:为什么 startThreadedIO / stopThreadedIO 要执行加解锁?

是为了方便主线程动态,灵活调整IO线程而设计的当clients数量较少的时候可以方便直接停止IO线程。停止IO线程的阈值是,当等待写的client客户端数量小于IO线程数量的两倍,就会停止IO线程避免多线程带来不必要的开销

回归代码:

1、stopThreadedIO,startThreadedIO 和 stopThreadedIOIfNeeded这三个函数中有体现,其中在stopThreadedIOIfNeeded中会判断当前待写出客户端数量是否大于2倍IO线程数量,如果不是则会调用stopThreadedIO函数通过io_threads_mutex的方式停止所有IO线程(主线程除外,因为index是从1开始的)并且将io_threads_active设置为0,并且后续调用stopThreadedIOIfNeeded函数会返回0,在handleClientsWithPendingWritesUsingThreads函数中会直接调用handleClientsWithPendingWrites来使用单线程进行写出。

流程如下:

第一次:handleClientsWithPendingWritesUsingThreads -> stopThreadedIOIfNeeded -> stopThreadedIO -> 设置io_threads_active为0并lock住IO线程

第二次: handleClientsWithPendingWritesUsingThreads -> stopThreadedIOIfNeeded -> 直接返回1 -> handleClientsWithPendingWrites进行单线程处理

2、当待写出client的数量上来的时候,stopThreadedIOIfNeeded函数中判断,待写出client数量大于2倍IO线程数量,返回0,然后调用startThreadedIO激活IO线程

流程如下:

handleClientsWithPendingWritesUsingThreads -> stopThreadedIOIfNeeded(发现不满足需要IO线程,返回0) -> startThreadedIO(激活IO线程) -> 设置io_threads_active为1

此外注意:IO线程一定是处理完了所有client之后,才会倍lock,在IOThreadMain有一个条件 if (getIOPendingCount(id) == 0)

既然涉及到加锁操作,必然是为了「互斥」从而控制某些逻辑。可以在代码中检索这个锁变量,看存在哪些逻辑对 io_threads_mutex 操作了加解锁。

跟踪代码可以看到,在 networking.c 的 IOThreadMain 函数,也对这个变量进行了加解锁操作,那就说明 startThreadedIO / stopThreadedIO 函数,可以控制 IOThreadMain 里逻辑的执行,IOThreadMain 代码如下。

void *IOThreadMain(void *myid) {
  ...
  while(1) {
    ...
    /* Give the main thread a chance to stop this thread. */
    if (io_threads_pending[id] == 0) {
      pthread_mutex_lock(&io_threads_mutex[id]);
      pthread_mutex_unlock(&io_threads_mutex[id]);
      continue;
    }
    // 读写 client socket
    // ...
  }
}

这个函数正是 IO 多线程的主逻辑。

从注释可以看到,这是为了给主线程停止 IO 线程的的机会。也就是说,这里的目的是为了让主线程可以控制 IO 线程的开启 / 暂停。

因为每次 IO 线程在执行时必须先拿到锁,才能执行后面的逻辑,如果主线程执行了 stopThreadedIO,就会先拿到锁,那么 IOThreadMain 函数在执行时就会因为拿不到锁阻塞「等待」,这就达到了 stop IO 线程的目的。

同样地,调用 startThreadedIO 函数后,会释放锁,IO 线程就可以拿到锁,继续「恢复」执行。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
24天前
|
监控 NoSQL 安全
如何在 Redis 中正确使用多线程?
【10月更文挑战第16天】正确使用 Redis 多线程需要综合考虑多个因素,并且需要在实践中不断摸索和总结经验。通过合理的配置和运用,多线程可以为 Redis 带来性能上的提升,同时也要注意避免可能出现的问题,以保障系统的稳定和可靠运行。
35 2
|
24天前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
29 1
|
2月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
92 20
剖析 Redis List 消息队列的三种消费线程模型
|
1月前
|
存储 运维 NoSQL
Redis为什么最开始被设计成单线程而不是多线程
总之,Redis采用单线程设计是基于对系统特性的深刻洞察和权衡的结果。这种设计不仅保持了Redis的高性能,还确保了其代码的简洁性、可维护性以及部署的便捷性,使之成为众多应用场景下的首选数据存储解决方案。
38 1
|
1月前
|
Java Linux
【网络】高并发场景处理:线程池和IO多路复用
【网络】高并发场景处理:线程池和IO多路复用
40 2
|
1月前
|
NoSQL Redis 数据库
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
本文解释了Redis为什么采用单线程模型,以及为什么Redis单线程模型的效率和速度依然可以非常高,主要原因包括Redis操作主要访问内存、核心操作简单、单线程避免了线程竞争开销,以及使用了IO多路复用机制epoll。
43 0
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
|
2月前
|
NoSQL 网络协议 Unix
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
63 1
|
2月前
|
存储 消息中间件 NoSQL
Redis的单线程设计之谜:高性能与简洁并存
Redis的单线程设计之谜:高性能与简洁并存
43 0
|
3月前
|
NoSQL Redis 数据库
Redis AOF重写问题之同一数据产生两次磁盘IO如何解决
Redis AOF重写问题之同一数据产生两次磁盘IO如何解决
Redis AOF重写问题之同一数据产生两次磁盘IO如何解决
|
3月前
|
缓存 开发框架 NoSQL
【Azure Redis 缓存】Azure Redis 异常 - 因线程池Busy而产生的Timeout异常问题
【Azure Redis 缓存】Azure Redis 异常 - 因线程池Busy而产生的Timeout异常问题