Redis6.0的多IO线程(一)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis6.0的多IO线程

通过上篇文章的学习,我们知道 Redis server 启动后的进程会以单线程的方式,执行客户端请求解析和处理工作。但是,Redis server 也会通过 bioInit 函数启动三个后台线程,来处理后台任务。也就是说,Redis 不再让主线程执行一些耗时操作,比如同步写、删除等,而是交给后台线程异步完成,从而避免了对主线程的阻塞。

实际上,在 2020 年 5 月推出的 Redis 6.0 版本中,Redis 在执行模型中还进一步使用了多线程来处理 IO 任务,这样设计的目的,就是为了充分利用当前服务器的多核特性,使用多核运行多线程,让多线程帮助加速数据读取、命令解析以及数据写回的速度,提升 Redis 整体性能

那么,这些多线程具体是在什么时候启动,又是通过什么方式来处理 IO 请求的呢?

今天这篇文章,我就来给你介绍下 Redis 6.0 实现的多 IO 线程机制。通过这部分内容的学习,你可以充分了解到 Redis 6.0 是如何通过多线程来提升 IO 请求处理效率的。这样你也就可以结合实际业务来评估,自己是否需要使用 Redis 6.0 了。

好,接下来,我们先来看下多 IO 线程的初始化。在开始学习今天的学习之前,你还需要下载Redis 6.2.6(https://redis.io/)的源码,以便能查看到和多 IO 线程机制相关的代码。

多 IO 线程的初始化

我在上篇文章给你介绍过,Redis 中的三个后台线程,是 server 在初始化过程的最后,调用 InitSeverLast 函数,而 InitServerLast 函数再进一步调用 bioInit 函数来完成的。如果我们在 Redis 6.0 中查看 InitServerLast 函数,会发现和 Redis 5.0 相比,该函数在调完 bioInit 函数后,又调用了 initThreadedIO 函数。而 initThreadedIO 函数正是用来初始化多 IO 线程的,这部分的代码调用如下所示:

server.c文件中查看

/* Some steps in server initialization need to be done last (after modules
 * are loaded).
 * Specifically, creation of threads due to a race bug in ld.so, in which
 * Thread Local Storage initialization collides with dlopen call.
 * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
void InitServerLast() {
    bioInit();
    //调用initThreadedIO函数初始化IO线程
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}

所以下面,我们就来看下 initThreadedIO 函数的主要执行流程,这个函数是在networking.c文件中实现的。

首先,initThreadedIO 函数会设置 IO 线程的激活标志。这个激活标志保存在 redisServer 结构体类型的全局变量 server 当中,对应 redisServer 结构体的成员变量 io_threads_active。initThreadedIO 函数会把 io_threads_active 初始化为 0,表示 IO 线程还没有被激活。这部分代码如下所示:

/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */
    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    // 如果用户选择了单个线程,则不要产生任何线程:我们将直接从主线程处理 IO
    if (server.io_threads_num == 1) return;
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }
    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */
        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

这里,你要注意一下,刚才提到的全局变量 server 是 Redis server 运行时,用来保存各种全局信息的结构体变量。我在之前给你介绍 Redis server 初始化过程的时候,提到过 Redis server 的各种参数初始化配置,都是保存在这个全局变量 server 中的。所以,当你在阅读 Redis 源码时,如果在某个函数中看到变量 server,要知道其实就是这个全局变量。

紧接着,initThreadedIO函数会对设置的 IO 线程数量进行判断。这个数量就是保存在全局变量 server 的成员变量 io_threads_num 中的。那么在这里,IO 线程的数量判断会有三种结果。

  • 第一种,如果 IO 线程数量为 1,就表示只有 1 个主 IO 线程,initThreadedIO 函数就直接返回了。此时,Redis server 的 IO 线程和 Redis 6.0 之前的版本是相同的。
  • 第二种,如果 IO 线程数量大于宏定义 IO_THREADS_MAX_NUM(默认值为 128),那么 initThreadedIO 函数会报错,并退出整个程序。
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        …  //报错日志记录
        exit(1);  //退出程序
 }
  • 第三种,如果 IO 线程数量大于 1,并且小于宏定义 IO_THREADS_MAX_NUM,那么,initThreadedIO 函数会执行一个循环流程,该流程的循环次数就是设置的 IO 线程数量。

如此一来,在该循环流程中,initThreadedIO 函数就会给以下四个数组进行初始化操作。

  • io_threads_list 数组:保存了每个 IO 线程要处理的客户端,将数组每个元素初始化为一个 List 类型的列表;
  • io_threads_pending 数组:保存等待每个 IO 线程处理的客户端个数;
  • io_threads_mutex 数组:保存线程互斥锁;
  • io_threads 数组:保存每个 IO 线程的描述符。

这四个数组的定义都在 networking.c 文件中,如下所示:

// 记录线程描述符的数组
pthread_t io_threads[IO_THREADS_MAX_NUM];
// 记录线程互斥锁的数组
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
// 记录线程待处理的客户端个数
redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
// IO操作是读还是写
int io_threads_op;      /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
/* This is the list of clients each thread will serve when threaded I/O is
 * used. We spawn io_threads_num-1 threads, since one is the main thread
 * itself. */
// 记录线程对应处理的客户端
list *io_threads_list[IO_THREADS_MAX_NUM];

然后,在对这些数组进行初始化的同时,initThreadedIO 函数还会根据 IO 线程数量,调用 pthread_create 函数创建相应数量的线程。我在上节课给你介绍过,pthread_create 函数的参数包括创建线程要运行的函数和函数参数(*tidp、*attr、*start_routine、*arg)。

所以,对于 initThreadedIO 函数来说,它创建的线程要运行的函数是 IOThreadMain,参数是当前创建线程的编号。不过要注意的是,这个编号是从 1 开始的,编号为 0 的线程其实是运行 Redis server 主流程的主 IO 线程。

以下代码就展示了 initThreadedIO 函数对数组的初始化,以及创建 IO 线程的过程,你可以看下。

/* Spawn and initialize the I/O threads. */
// 生产并初始化IO线程
for (int i = 0; i < server.io_threads_num; i++) {
    /* Things we do for all the threads including the main thread. */
    io_threads_list[i] = listCreate();
    // 编号为0的线程是主IO线程
    if (i == 0) continue; /* Thread 0 is the main thread. */
    /* Things we do only for the additional threads. */
    pthread_t tid;
    // 初始化io_threads_mutex数组
    pthread_mutex_init(&io_threads_mutex[i],NULL);
    // 初始化io_threads_pending数组
    setIOPendingCount(i, 0);
    // 线程将会停止
    pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
    // 调用pthread_create函数创建IO线程,线程运行函数为IOThreadMain
    if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
        serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
        exit(1);
    }
    // 初始化io_threads数组,设置值为线程标识
    io_threads[i] = tid;
}

好了,现在我们再来看下,刚才介绍的 IO 线程启动后要运行的函数 IOThreadMain。了解这个函数,可以帮助我们掌握 IO 线程实际做的工作。

IO 线程的运行函数 IOThreadMain

IOThreadMain 函数也是在 networking.c 文件中定义的,它的主要执行逻辑是一个 while(1) 循环。在这个循环中,IOThreadMain 函数会把 io_threads_list 数组中,每个 IO 线程对应的列表读取出来。

就像我在前面给你介绍的一样,io_threads_list 数组中会针对每个 IO 线程,使用一个列表记录该线程要处理的客户端。所以,IOThreadMain 函数就会从每个 IO 线程对应的列表中,进一步取出要处理的客户端,然后判断线程要执行的操作标记。这个操作标记是用变量 io_threads_op 表示的,它有两种取值。

  • io_threads_op 的值为宏定义 IO_THREADS_OP_WRITE:这表明该 IO 线程要做的是写操作,线程会调用 writeToClient 函数将数据写回客户端。
  • io_threads_op 的值为宏定义 IO_THREADS_OP_READ:这表明该 IO 线程要做的是读操作,线程会调用 readQueryFromClient 函数从客户端读取数据。

这部分的代码逻辑你可以看看下面的代码。

networking.c文件中查看

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];
    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();
    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (getIOPendingCount(id) != 0) break;
        }
        /* Give the main thread a chance to stop this thread. */
        if (getIOPendingCount(id) == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
        serverAssert(getIOPendingCount(id) != 0);
        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        // 获取IO线程要处理的客户端列表
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            // 从客户端列表中获取一个客户端
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                // 如果线程操作是写操作,则调用writeToClient将数据写回客户端
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                // 如果线程操作是读操作,则调用readQueryFromClient从客户端读取数据
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        // 处理完所有客户端后,清空该线程的客户端列表
        listEmpty(io_threads_list[id]);
        // 将该线程的待处理任务数量设置为0
        setIOPendingCount(id, 0);
    }
}

我也画了下面这张图,展示了 IOThreadMain 函数的基本流程,你可以看下。

好了,到这里你应该就了解了,每一个 IO 线程运行时,都会不断检查是否有等待它处理的客户端。如果有,就根据操作类型,从客户端读取数据或是将数据写回客户端。你可以看到,这些操作都是 Redis 要和客户端完成的 IO 操作,所以,这也是为什么我们把这些线程称为 IO 线程的原因。那么,你看到这里,可能也会产生一些疑问,IO 线程要处理的客户端是如何添加到 io_threads_list 数组中的呢?

这就要说到 Redis server 对应的全局变量 server 了。server 变量中有两个 List 类型的成员变量:clients_pending_writeclients_pending_read,它们分别记录了待写回数据的客户端和待读取数据的客户端,如下所示:

server.h文件中查看

struct redisServer {
...
list *clients_pending_write;  //待写回数据的客户端
list *clients_pending_read;  //待读取数据的客户端
...
}

你要知道,Redis server 在接收到客户端请求和给客户端返回数据的过程中,会根据一定条件,推迟客户端的读写操作,并分别把待读写的客户端保存到这两个列表中。然后,Redis server 在每次进入事件循环前,会再把列表中的客户端添加到 io_threads_list 数组中,交给 IO 线程进行处理。所以接下来,我们就先来看下,Redis 是如何推迟客户端的读写操作,并把这些客户端添加到 clients_pending_writeclients_pending_read 这两个列表中的。

如何推迟客户端读操作?

Redis server 在和一个客户端建立连接后,就会开始监听这个客户端上的可读事件,而处理可读事件的回调函数是 readQueryFromClient。我在之前文章中给你介绍了这个过程,你可以再去回顾下。

那么这里,我们再来看下 Redis 6.0 版本中的 readQueryFromClient 函数。这个函数一开始会先从传入参数 conn 中获取客户端 c,紧接着就调用 postponeClientRead 函数,来判断是否推迟从客户端读取数据。这部分的执行逻辑如下所示:

networking.c文件中查看

void readQueryFromClient(connection *conn) {
    //从连接数据结构中获取客户端
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;
    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    // 判断是否推迟从客户端读取数据
    if (postponeClientRead(c)) return;
    /* Update total number of reads on server */
    atomicIncr(server.stat_total_reads_processed, 1);
    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
        /* Note that the 'remaining' variable may be zero in some edge case,
         * for example once we resume a blocked client after CLIENT PAUSE. */
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (connGetState(conn) == CONN_STATE_CONNECTED) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
            freeClientAsync(c);
            return;
        }
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClientAsync(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    atomicIncr(server.stat_net_input_bytes, nread);
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClientAsync(c);
        return;
    }
    /* There is more data in the client input buffer, continue parsing it
     * in case to check if there is a full command to execute. */
     processInputBuffer(c);
}

现在,我们就来看下 postponeClientRead 函数的执行逻辑。这个函数会根据四个条件判断能否推迟从客户端读取数据。

networking.c文件中查看

/* Return 1 if we want to handle the client read later using threaded I/O.
 * This is called by the readable handler of the event loop.
 * As a side effect of calling this function the client is put in the
 * pending read clients and flagged as such. */
int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) 
    {
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}
  • 条件一:全局变量 server 的 io_threads_active 值为 1

这表示多 IO 线程已经激活。我刚才说过,这个变量值在 initThreadedIO 函数中是会被初始化为 0 的,也就是说,多 IO 线程初始化后,默认还没有激活(我一会儿还会给你介绍这个变量值何时被设置为 1)。

  • 条件二:全局变量 server 的 io_threads_do_read 值为 1

这表示多 IO 线程可以用于处理延后执行的客户端读操作。这个变量值是在 Redis 配置文件 redis.conf 中,通过配置项 io-threads-do-reads 设置的,默认值为 no,也就是说,多 IO 线程机制默认并不会用于客户端读操作。所以,如果你想用多 IO 线程处理客户端读操作,就需要把 io-threads-do-reads 配置项设为 yes。

  • 条件三:ProcessingEventsWhileBlocked 变量值为 0

这表示 processEventsWhileBlokced 函数没有在执行。ProcessingEventsWhileBlocked 是一个全局变量,它会在 processEventsWhileBlokced 函数执行时被设置为 1,在 processEventsWhileBlokced 函数执行完成时被设置为 0。而 processEventsWhileBlokced 函数是在networking.c文件中实现的。当 Redis 在读取 RDB 文件或是 AOF 文件时,这个函数会被调用,用来处理事件驱动框架捕获到的事件。这样就避免了因读取 RDB 或 AOF 文件造成 Redis 阻塞,而无法及时处理事件的情况。所以,当 processEventsWhileBlokced 函数执行处理客户端可读事件时,这些客户端读操作是不会被推迟执行的。

  • 条件四:客户端现有标识不能有 CLIENT_MASTER、CLIENT_SLAVE 和 CLIENT_PENDING_READ

其中,CLIENT_MASTER 和 CLIENT_SLAVE 标识分别表示客户端是用于主从复制的客户端,也就是说,这些客户端不会推迟读操作。CLIENT_PENDING_READ 本身就表示一个客户端已经被设置为推迟读操作了,所以,对于已带有 CLIENT_PENDING_READ 标识的客户端,postponeClientRead 函数就不会再推迟它的读操作了。总之,只有前面这四个条件都满足了,postponeClientRead 函数才会推迟当前客户端的读操作。具体来说,postponeClientRead 函数会给该客户端设置 CLIENT_PENDING_REA 标识,并调用 listAddNodeHead 函数,把这个客户端添加到全局变量 server 的 clients_pending_read 列表中。

好,现在你已经知道,Redis 是在客户端读事件回调函数 readQueryFromClient 中,通过调用 postponeClientRead 函数来判断和推迟客户端读操作。下面,我再带你来看下 Redis 是如何推迟客户端写操作的。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
监控 NoSQL 安全
如何在 Redis 中正确使用多线程?
【10月更文挑战第16天】正确使用 Redis 多线程需要综合考虑多个因素,并且需要在实践中不断摸索和总结经验。通过合理的配置和运用,多线程可以为 Redis 带来性能上的提升,同时也要注意避免可能出现的问题,以保障系统的稳定和可靠运行。
56 2
|
2月前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
56 1
|
3月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
101 20
剖析 Redis List 消息队列的三种消费线程模型
|
2月前
|
存储 运维 NoSQL
Redis为什么最开始被设计成单线程而不是多线程
总之,Redis采用单线程设计是基于对系统特性的深刻洞察和权衡的结果。这种设计不仅保持了Redis的高性能,还确保了其代码的简洁性、可维护性以及部署的便捷性,使之成为众多应用场景下的首选数据存储解决方案。
43 1
|
2月前
|
Java Linux
【网络】高并发场景处理:线程池和IO多路复用
【网络】高并发场景处理:线程池和IO多路复用
58 2
|
2月前
|
NoSQL Redis 数据库
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
本文解释了Redis为什么采用单线程模型,以及为什么Redis单线程模型的效率和速度依然可以非常高,主要原因包括Redis操作主要访问内存、核心操作简单、单线程避免了线程竞争开销,以及使用了IO多路复用机制epoll。
54 0
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
|
3月前
|
NoSQL 网络协议 Unix
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
86 1
|
3月前
|
存储 消息中间件 NoSQL
Redis的单线程设计之谜:高性能与简洁并存
Redis的单线程设计之谜:高性能与简洁并存
47 1
|
4月前
|
NoSQL Redis 数据库
Redis AOF重写问题之同一数据产生两次磁盘IO如何解决
Redis AOF重写问题之同一数据产生两次磁盘IO如何解决
Redis AOF重写问题之同一数据产生两次磁盘IO如何解决
|
4月前
|
缓存 开发框架 NoSQL
【Azure Redis 缓存】Azure Redis 异常 - 因线程池Busy而产生的Timeout异常问题
【Azure Redis 缓存】Azure Redis 异常 - 因线程池Busy而产生的Timeout异常问题