Redis IO 线程池

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis IO 线程池

1、Redis 单线程模型

Redis 6.0 以前采用单线程模型


1、Redis 单线程模型

Redis 6.0 以前采用单线程模型

Redis 单线程模型

面试题:Redis 单线程为什么还那么快?

  • Redis 大部分操作都在内存中,并且采用了高效的数据结构。性能瓶颈是网络延迟和内存大小。
  • 避免了多线程之间的竞争,减少了多线程切换带来的系统开销,而且不会有线程安全问题(锁开销)
  • I/O 多路复用机制

2、Redis 多线程模型

2.1、* Redis 为什么引入 I/O 多线程

Redis 的性能瓶颈在网络 IO 的处理上。Redis 是网络 IO 密集型,需要同时处理多条并发请求,读写 IO 的问题(请求大量数据,写日志业务等)。多线程处理网络 IO,单线程执行命令。

Redis 线程池作用读写 IO 阶段,即 read, decode 和 encode, send 阶段。主线程处理业务逻辑,之所以用单线程执行命令,是因为 Redis 采用高效的数据结构,其业务逻辑处理较快。

redis 线程池作用阶段

2.2、I/O 多线程模型

主线程拥有两个全局队列clients_pending_readclients_pending_write,每个 io 线程(主线程同时也是 io 线程)拥有一个专属队列 io_threads_list[id]。主线程既作为生产者,产生任务;又作为消费者,获取任务执行。

首先,主线程将一次循环的所有就绪的读事件收集到自己的全局任务队列clients_pending_read中,再把每个事件负载均衡地分配到每个 io 线程的专属任务队列中。一次事件循环中不会出现同名 fd,不同的 fd 分配到每个 io 线程各自的队列中,避免了多个 io 线程同时从全局队列中取数据,因此,不需要加锁操作。

接下来,io 线程从自己的专属队列中取出任务,(除主线程外)并发执行 read 和 decode 操作。主线程将解析后的任务做 compute 操作。最后,io 线程(包括主线程)并发执行 encode 和 send 操作。

redis 线程池原理图

3、源码解析

3.1、测试设置

redis 线程池默认作用在 encode, send 阶段,这是因为客户端从 redis 获取大量数据需要并发处理。若想作用在 read, decode 阶段,需要手动开启。在 redis.conf 文件中,可以设置:

# 开启io线程的数量
 io-threads 4
 # 优化:read deconde 过程。默认优化,encode send从 redis 获取大量数据
 io-threads-do-reads yes

开启 io 多线程的前提是有多个并发连接。如何在单个连接的情况下,开启 io 多线程调试,需要修改 redis 源码:

// networking.c
 int stopThreadedIOIfNeeded(void) {
     // 单个连接的情况下,开启多线程调试,永远不关闭 io 多线程
     return 0;   
     ...
 }

3.2、连接建立

主线程处理连接建立,listenfd

  • 连接到达,触发读事件回调:acceptTcpHandler
  • 接收连接:acceptTcpHandler
  • 初始化新连接:createClient
// server.c
 void initServer(void) {
    ...
    // 1、连接到来,触发读事件回调
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
             acceptTcpHandler,NULL) == AE_ERR)  
    ...
 }
 // networking.c
 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     ...
     while(max--) {
         // 2、接收连接:内部封装 accept
         cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
         ...
         // 为 cfd 初始化新连接,内部调用 createClient
         acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
     }
 }
 static void acceptCommonHandler(connection *conn, int flags, char *ip) {
     ...
     /* Create connection and client */
     // 3、创建新的连接
     if ((c = createClient(conn)) == NULL) {
         ...
     }
     ...
 }

3.2、数据传输

clientfd

  • 读事件回调:readQueryFromClient
  • 分割并处理数据包 processInputBuffer
  • 分割数据包:processInlineBuffer 和 processMultibulkBuffer
  • 处理数据包:processCommandAndResetClient
  • 数据写到 buffer:addReply
  • 数据写到 socket:writeToClient
  • 写事件回调:sendReplyToClient

当读事件触发时,执行读事件回调函数。主线程收集读事件就绪的连接放入全局任务队列`clients_pending_read,并设置连接状态为CLIENT_PENDING_READ。子线程从该全局队列中获取任务后,也调用该读事件回调函数,进行 read 和 decode 的业务逻辑处理。

// networking.c
 void readQueryFromClient(connection *conn) {
     ...
     /* Check if we want to read from the client later when exiting from
      * the event loop. This is the case if threaded I/O is enabled. */
     // 开启 io 线程后,延迟处理客户端的读,将任务丢到全局队列,再分配给 io 线程
     // 主线程返回 1,不执行业务逻辑处理;
     // 子线程返回 0,继续往下,执行业务逻辑处理
     if (postponeClientRead(c)) return;  
     // 1、read 阶段,(io 线程)将任务读到缓冲区 
     nread = connRead(c->conn, c->querybuf+qblen, readlen);
     // 2、decode 阶段,(io 线程)解析数据包
      processInputBuffer(c);
 }
 int postponeClientRead(client *c) {
     if (server.io_threads_active &&
         server.io_threads_do_reads &&
         !clientsArePaused() &&
         !ProcessingEventsWhileBlocked &&
         !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
     {
         // 主线程,返回 1
         // 将连接状态设置为 CLIENT_PENDING_READ
         c->flags |= CLIENT_PENDING_READ;
         // 收集任务,把客户端连接放到全局队列中,分配到 io 线程
         listAddNodeHead(server.clients_pending_read,c);
         return 1;
     } else {
         // 子线程,即 io 线程,返回 0
         return 0;
     }
 }

子线程(IO 线程)从专属任务队列 io_threads_pending获取任务,执行 read decode 和 encode write 业务逻辑处理。

// networking.c
 // 线程池入口函数:子线程
 void *IOThreadMain(void *myid) {
     ...
     while(1) {
         /* Wait for start */
         // 等待获取专属任务队列中的任务
         for (int j = 0; j < 1000000; j++) {
             if (io_threads_pending[id] != 0) break;
         }      
         ...
         /* Process: note that the main thread will never touch our list
          * before we drop the pending count to 0. */
         listIter li;
         listNode *ln;
         // 从专属任务队列中取出任务
         listRewind(io_threads_list[id],&li);    
         while((ln = listNext(&li))) {
             client *c = listNodeValue(ln);
             if (io_threads_op == IO_THREADS_OP_WRITE) {
                 // encode 和 write
                 writeToClient(c,0);
             } else if (io_threads_op == IO_THREADS_OP_READ) {
                 // read 和 decode,读事件回调函数
                 readQueryFromClient(c->conn);
             } else {
                 serverPanic("io_threads_op value is unknown");
             }
         }
         listEmpty(io_threads_list[id]);
         io_threads_pending[id] = 0;
         ...
     }
 }

子线程 decode 结束后,设置连接状态 CLIENT_PENDING_COMMAND,交给主线程来 compute,退出读事件回调函数。主线程负责 compute ,解析 redis 命令。

// networking.c
 // readQueryFromClient 函数中 decode 阶段调用
 void processInputBuffer(client *c) {
     /* Keep processing while there is something in the input buffer */
     while(c->qb_pos < sdslen(c->querybuf)) {
     ...
         if (c->reqtype == PROTO_REQ_INLINE) {
             // 分割数据包。并判断是否完整
             if (processInlineBuffer(c) != C_OK) break;
             ...
         } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
             // 分割 pipline 的数据包,并判断是否完整
             if (processMultibulkBuffer(c) != C_OK) break;
         }
         ...
         else {
             /* If we are in the context of an I/O thread, we can't really
              * execute the command here. All we can do is to flag the client
              * as one that needs to process the command. */
             // io 线程设置任务状态,交给主线程compute,退出读事件回调函数
             if (c->flags & CLIENT_PENDING_READ) {
                 c->flags |= CLIENT_PENDING_COMMAND;
                 break;
             }
             /* We are finally ready to execute the command. */
             // 3、compute,主线程解析命令
             if (processCommandAndResetClient(c) == C_ERR) {
                 /* If the client is no longer valid, we avoid exiting this
                  * loop and trimming the client buffer later. So we return
                  * ASAP in that case. */
                 return;
             }
         }
     }
     ...
 }

主线程 compute 结束后,调用 addReply 函数,将处理完的连接放到全局任务队列clients_pending_write,并将待发送的数据写到缓冲区。

// networking.c
 int processCommandAndResetClient(client *c) {
     ...
     // 处理命令
     if (processCommand(c) == C_OK) {
         commandProcessed(c);
     }
     ...
 }
 // server.c
 int processCommand(client *c) {
     ...
     /* Exec the command */
     // 开启 io 多线程,且不是事务命令
     if (c->flags & CLIENT_MULTI &&
         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
     {
         // 把数据写到缓冲区
         addReply(c,shared.queued);
     } else {
         // 执行 redis 命令
         call(c,CMD_CALL_FULL);
         ...
     }
     ...
 }
 // networking.c
 // 数据写到发送缓冲区
 void addReply(client *c, robj *obj) {
     if (prepareClientToWrite(c) != C_OK) return;
     ...
 }
 int prepareClientToWrite(client *c) {
     ...
     if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
             clientInstallWriteHandler(c);   // 任务写到全局队列中
     ...
 }

接下来,子线程和主线程都可以从自己的专属任务队列中获得该任务,执行 encode 和 send 的业务逻辑处理 writeToClient。若数据未发送完,则注册写事件回调,等待再次发送。

// 子线程:线程池入口函数
 void *IOThreadMain(void *myid) {
     ... 
         if (io_threads_op == IO_THREADS_OP_WRITE) {
             // encode 和 write
             writeToClient(c,0); // 数据写到 socket
         } else if (io_threads_op == IO_THREADS_OP_READ) {
             // read 和 decode
             readQueryFromClient(c->conn); // 读事件回调函数
     ...
 }
 // 主线程
 int handleClientsWithPendingWritesUsingThreads(void) {
     ...
     /* Also use the main thread to process a slice of clients. */
     listRewind(io_threads_list[0],&li);
     while((ln = listNext(&li))) {
         client *c = listNodeValue(ln);
         writeToClient(c,0); 
     }
     listEmpty(io_threads_list[0]);
     ...
     /* Run the list of clients again to install the write handler where
      * needed. */
     listRewind(server.clients_pending_write,&li);
     while((ln = listNext(&li))) {
         client *c = listNodeValue(ln);
         /* Install the write handler if there are pending writes in some
          * of the clients. */
         // 数据没写完,注册写事件回调
         if (clientHasPendingReplies(c) &&
                 connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
         {
             freeClientAsync(c);
         }
     }
     listEmpty(server.clients_pending_write);
     ...
 }
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
26天前
|
算法 数据处理 Python
Python并发编程:解密异步IO与多线程
本文将深入探讨Python中的并发编程技术,重点介绍异步IO和多线程两种常见的并发模型。通过对比它们的特点、适用场景和实现方式,帮助读者更好地理解并发编程的核心概念,并掌握在不同场景下选择合适的并发模型的方法。
|
29天前
|
存储 缓存 NoSQL
Redis单线程已经很快了6.0引入多线程
Redis单线程已经很快了6.0引入多线程
31 3
|
28天前
|
NoSQL 数据处理 调度
【Redis深度专题】「踩坑技术提升」探索Redis 6.0为何必须启用多线程以提升性能与效率
【Redis深度专题】「踩坑技术提升」探索Redis 6.0为何必须启用多线程以提升性能与效率
216 0
|
3天前
|
存储 缓存 NoSQL
为什么Redis使用单线程 性能会优于多线程?
在计算机领域,性能一直都是一个关键的话题。无论是应用开发还是系统优化,我们都需要关注如何在有限的资源下,实现最大程度的性能提升。Redis,作为一款高性能的开源内存数据库,因其出色的单线程性能而备受瞩目。那么,为什么Redis使用单线程性能会优于多线程呢?
15 1
|
1月前
|
NoSQL Java Redis
【问题篇】解决Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException
【问题篇】解决Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException
322 0
|
2月前
|
存储 消息中间件 缓存
Redis是否为单线程?
【2月更文挑战第6天】
40 1
Redis是否为单线程?
|
2月前
|
并行计算 开发者 Python
Python中的并发编程:异步IO与多线程比较
本文将探讨Python中的并发编程方法,着重比较异步IO和多线程两种不同的实现方式。通过对它们的特点、优缺点以及适用场景进行分析,帮助读者更好地理解并发编程在Python中的应用。
26 1
|
3月前
|
缓存 NoSQL 安全
Redis 新特性篇:多线程模型解读
Redis 新特性篇:多线程模型解读
50 5
|
3月前
|
存储 缓存 NoSQL
Redis 数据结构+线程模型+持久化+内存淘汰+分布式
Redis 数据结构+线程模型+持久化+内存淘汰+分布式
311 0
|
3月前
|
存储 缓存 NoSQL
《吊打面试官》系列-Redis双写一致性、并发竞争、线程模型
《吊打面试官》系列-Redis双写一致性、并发竞争、线程模型
40 0

热门文章

最新文章