Redis的事件驱动模型
微信公众号【黄小斜】大厂程序员,互联网行业新知,终身学习践行者。关注后回复「Java」、「Python」、「C++」、「大数据」、「机器学习」、「算法」、「AI」、「Android」、「前端」、「iOS」、「考研」、「BAT」、「校招」、「笔试」、「面试」、「面经」、「计算机基础」、「LeetCode」 等关键字可以获取对应的免费学习资料。
下面就会介绍这两种事件的实现原理。
文件事件
Redis 服务器通过 socket 实现与客户端(或其他redis服务器)的交互,文件事件就是服务器对 socket 操作的抽象。 Redis 服务器,通过监听这些 socket 产生的文件事件并处理这些事件,实现对客户端调用的响应。
Reactor
Redis 基于 Reactor 模式开发了自己的事件处理器。
这里就先展开讲一讲 Reactor 模式。看下图:
“I/O 多路复用模块”会监听多个 FD ,当这些FD产生,accept,read,write 或 close 的文件事件。会向“文件事件分发器(dispatcher)”传送事件。
文件事件分发器(dispatcher)在收到事件之后,会根据事件的类型将事件分发给对应的 handler。
我们顺着图,从上到下的逐一讲解 Redis 是怎么实现这个 Reactor 模型的。
I/O 多路复用模块
Redis 的 I/O 多路复用模块,其实是封装了操作系统提供的 select,epoll,avport 和 kqueue 这些基础函数。向上层提供了一个统一的接口,屏蔽了底层实现的细节。
一般而言 Redis 都是部署到 Linux 系统上,所以我们就看看使用 Redis 是怎么利用 linux 提供的 epoll 实现I/O 多路复用。
首先看看 epoll 提供的三个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
/* * 创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大 */ int epoll_create(int size); /* * 可以理解为,增删改 fd 需要监听的事件 * epfd 是 epoll_create() 创建的句柄。 * op 表示 增删改 * epoll_event 表示需要监听的事件,Redis 只用到了可读,可写,错误,挂断 四个状态 */ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); /* * 可以理解为查询符合条件的事件 * epfd 是 epoll_create() 创建的句柄。 * epoll_event 用来存放从内核得到事件的集合 * maxevents 获取的最大事件数 * timeout 等待超时时间 */ int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); |
再看 Redis 对文件事件,封装epoll向上提供的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
/* * 事件状态 */ typedef struct aeApiState { // epoll_event 实例描述符 int epfd; // 事件槽 struct epoll_event *events; } aeApiState; /* * 创建一个新的 epoll */ static int aeApiCreate(aeEventLoop *eventLoop) /* * 调整事件槽的大小 */ static int aeApiResize(aeEventLoop *eventLoop, int setsize) /* * 释放 epoll 实例和事件槽 */ static void aeApiFree(aeEventLoop *eventLoop) /* * 关联给定事件到 fd */ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) /* * 从 fd 中删除给定事件 */ static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) /* * 获取可执行事件 */ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) |
所以看看这个ae_peoll.c 如何对 epoll 进行封装的:
aeApiCreate()
是对epoll.epoll_create()
的封装。aeApiAddEvent()
和aeApiDelEvent()
是对epoll.epoll_ctl()
的封装。aeApiPoll()
是对epoll_wait()
的封装。
这样 Redis 的利用 epoll 实现的 I/O 复用器就比较清晰了。
再往上一层次我们需要看看 ea.c 是怎么封装的?
首先需要关注的是事件处理器的数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
typedef struct aeFileEvent { // 监听事件类型掩码, // 值可以是 AE_READABLE 或 AE_WRITABLE , // 或者 AE_READABLE | AE_WRITABLE int mask; /* one of AE_(READABLE|WRITABLE) */ // 读事件处理器 aeFileProc *rfileProc; // 写事件处理器 aeFileProc *wfileProc; // 多路复用库的私有数据 void *clientData; } aeFileEvent; |
mask
就是可以理解为事件的类型。
除了使用 ae_peoll.c 提供的方法外,ae.c 还增加 “增删查” 的几个 API。
- 增:
aeCreateFileEvent
- 删:
aeDeleteFileEvent
- 查: 查包括两个维度
aeGetFileEvents
获取某个 fd 的监听类型和aeWait
等待某个fd 直到超时或者达到某个状态。
事件分发器(dispatcher)
Redis 的事件分发器 ae.c/aeProcessEvents
不但处理文件事件还处理时间事件,所以这里只贴与文件分发相关的出部分代码,dispather 根据 mask 调用不同的事件处理器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
//从 epoll 中获关注的事件 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 从已就绪数组中获取事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; // 读事件 if (fe->mask & mask & AE_READABLE) { // rfired 确保读/写事件只能执行其中一个 rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } // 写事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } |
可以看到这个分发器,根据 mask 的不同将事件分别分发给了读事件和写事件。
文件事件处理器的类型
Redis 有大量的事件处理器类型,我们就讲解处理一个简单命令涉及到的三个处理器:
- acceptTcpHandler 连接应答处理器,负责处理连接相关的事件,当有client 连接到Redis的时候们就会产生 AE_READABLE 事件。引发它执行。
- readQueryFromClinet 命令请求处理器,负责读取通过 sokect 发送来的命令。
- sendReplyToClient 命令回复处理器,当Redis处理完命令,就会产生 AE_WRITEABLE 事件,将数据回复给 client。
文件事件实现总结
我们按照开始给出的 Reactor 模型,从上到下讲解了文件事件处理器的实现,下面将会介绍时间时间的实现。
时间事件
Reids 有很多操作需要在给定的时间点进行处理,时间事件就是对这类定时任务的抽象。
先看时间事件的数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
/* Time event structure * * 时间事件结构 */ typedef struct aeTimeEvent { // 时间事件的唯一标识符 long long id; /* time event identifier. */ // 事件的到达时间 long when_sec; /* seconds */ long when_ms; /* milliseconds */ // 事件处理函数 aeTimeProc *timeProc; // 事件释放函数 aeEventFinalizerProc *finalizerProc; // 多路复用库的私有数据 void *clientData; // 指向下个时间事件结构,形成链表 struct aeTimeEvent *next; } aeTimeEvent; |
看见 next
我们就知道这个 aeTimeEvent 是一个链表结构。看图:
注意这是一个按照id倒序排列的链表,并没有按照事件顺序排序。
processTimeEvent
Redis 使用这个函数处理所有的时间事件,我们整理一下执行思路:
- 记录最新一次执行这个函数的时间,用于处理系统时间被修改产生的问题。
- 遍历链表找出所有 when_sec 和 when_ms 小于现在时间的事件。
- 执行事件对应的处理函数。
- 检查事件类型,如果是周期事件则刷新该事件下一次的执行事件。
- 否则从列表中删除事件。
综合调度器(aeProcessEvents)
综合调度器是 Redis 统一处理所有事件的地方。我们梳理一下这个函数的简单逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
// 1. 获取离当前时间最近的时间事件 shortest = aeSearchNearestTimer(eventLoop); // 2. 获取间隔时间 timeval = shortest - nowTime; // 如果timeval 小于 0,说明已经有需要执行的时间事件了。 if(timeval < 0){ timeval = 0 } // 3. 在 timeval 时间内,取出文件事件。 numevents = aeApiPoll(eventLoop, timeval); // 4.根据文件事件的类型指定不同的文件处理器 if (AE_READABLE) { // 读事件 rfileProc(eventLoop,fd,fe->clientData,mask); } // 写事件 if (AE_WRITABLE) { wfileProc(eventLoop,fd,fe->clientData,mask); } |
以上的伪代码就是整个 Redis 事件处理器的逻辑。
我们可以再看看谁执行了这个 aeProcessEvents
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 如果有需要在事件处理前执行的函数,那么运行它 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); // 开始处理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } } |
然后我们再看看是谁调用了 eaMain
:
1 2 3 4 5 6 7 8 |
int main(int argc, char **argv) { //一些配置和准备 ... aeMain(server.el); //结束后的回收工作 ... } |
我们在 Redis 的 main 方法中找个了它。
这个时候我们整理出的思路就是:
Redis 的 main() 方法执行了一些配置和准备以后就调用
eaMain()
方法。eaMain()
while(true) 的调用aeProcessEvents()
。
所以我们说 Redis 是一个事件驱动的程序,期间我们发现,Redis 没有 fork 过任何线程。所以也可以说 Redis 是一个基于事件驱动的单线程应用。
总结
在后端的面试中 Redis 总是一个或多或少会问到的问题。
读完这篇文章你也许就能回答这几个问题:
- 为什么 Redis 是一个单线程应用?
- 为什么 Redis 是一个单线程应用,却有如此高的性能?
如果你用本文提供的知识点回答这两个问题,一定会在面试官心中留下一个高大的形象。
Redis的命令执行过程
原文地址:https://www.xilidou.com/2018/03/30/redis-recommend/
之前写了一系列文章,已经很深入的探讨了 Redis 的数据结构,数据库的实现,key的过期策略以及 Redis 是怎么处理事件的。所以距离 Redis 的单机实现只差最后一步了,就是 Redis 是怎么处理 client 发来的命令并返回结果的,所以我们就仔细讨论一下 Redis 是怎么执行命令的。
阅读这篇文章你将会了解到:
- Redis 是怎么执行远程客户端发来的命令的
Redis client(客户端)
Redis 是单线程应用,它是如何与多个客户端简历网络链接并处理命令的?
由于 Redis 是基于 I/O 多路复用技术,为了能够处理多个客户端的请求,Redis 在本地为每一个链接到 Redis 服务器的客户端创建了一个 redisClient 的数据结构,这个数据结构包含了每个客户端各自的状态和执行的命令。 Redis 服务器使用一个链表来维护多个 redisClient 数据结构。
在服务器端用一个链表来管理所有的 redisClient。
1 2 3 4 5 6 7 |
struct redisServer { //... list *clients; /* List of active clients */ //... } |
所以我就看看 redisClient 包含的数据结构和重要参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
typedef struct redisClient { // 客户端状态标志 int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */ // 套接字描述符 int fd; // 当前正在使用的数据库 redisDb *db; // 当前正在使用的数据库的 id (号码) int dictid; // 客户端的名字 robj *name; /* As set by CLIENT SETNAME */ // 查询缓冲区 sds querybuf; // 查询缓冲区长度峰值 size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */ // 参数数量 int argc; // 参数对象数组 robj **argv; // 记录被客户端执行的命令 struct redisCommand *cmd, *lastcmd; // 请求的类型:内联命令还是多条命令 int reqtype; // 剩余未读取的命令内容数量 int multibulklen; /* number of multi bulk arguments left to read */ // 命令内容的长度 long bulklen; /* length of bulk argument in multi bulk request */ // 回复链表 list *reply; // 回复链表中对象的总大小 unsigned long reply_bytes; /* Tot bytes of objects in reply list */ // 已发送字节,处理 short write 用 int sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ // 回复偏移量 int bufpos; // 回复缓冲区 char buf[REDIS_REPLY_CHUNK_BYTES]; // ... } |
这里需要特别的注意,redisClient 并非指远程的客户端,而是一个 Redis 服务本地的数据结构,我们可以理解这个 redisClient 是远程客户端的一个映射或者代理。
flags
flags 表示了目前客户端的角色,以及目前所处的状态。他比较特殊可以单独表示一个状态或者多个状态。
querybuf
querybuf 是一个 sds 动态字符串类型,所谓 buf 说明是它只是一个缓冲区,用于存储没有被解析的命令。
argc & argv
上文的 querybuf 是一个没有处理过的命令,当 Redis 将 querybuf 命令解析以后,会将得出的参数个数和以及参数分别保存在 argc 和 argv 中。argv 是一个 redisObject 的数组。
cmd
Redis 使用一个字典保存了所有的 redisCommand。key 是 redisCommand 的名字,值就是一个 redisCommand 结构,这个结构保存了命令的实现函数,命令的标志,命令应该给定的参数个数,命令的执行次数和总消耗时长等统计信息,cmd 是一个 redisCommand。
当 Redis 解析出 argv 和 argc 后,会根据数组 argv[0],到字典中查询出对应的 redisCommand。上文的例子中 Redis 就会去字典去查找 SET
这个命令对应的 redisCommand。redis 会执行 redisCommand 中命令的实现函数。
buf & bufpos & reply
buf 是一个长度为 REDIS_REPLY_CHUNK_BYTES 的数组。Redis 执行相应的操作以后,就会将需要返回的返回的数据存储到 buf 中,bufpos 用于记录 buf 中已用的字节数数量,当需要恢复的数据大于 REDIS_REPLY_CHUNK_BYTES 时,redis 就会是用 reply 这个链表来保存数据。
其他参数
其他参数大家看注释就能明白,就是字面的意思。省略的参数基本上涉及 Redis 集群管理的参数,在之后的文章中会继续讲解。
客户端的链接和断开
上文说过 redisServer 是用一个链表来维护所有的 redisClient 状态,每当有一个客户端发起链接以后,就会在 Redis 中生成一个对应的 redisClient 数据结构,增加到clients
这个链表之后。
一个客户端很可能被多种原因断开。
总体分为几种类型:
- 客户端主动退出或者被 kill。
- timeout 超时。
- Redis 为了自我保护,会断开发的数据超过限制大小的客户端。
- Redis 为了自我保护,会断需要返回的数据超过限制大小的客户端。
调用总结
当客户端和服务器端的嵌套字变得可读的时候,服务器将会调用命令请求处理器来执行以下操作:
- 读取嵌套字中的数据,写入 querybuf。
- 解析 querybuf 中的命令,记录到 argc 和 argv 中。
- 根据 argv[0] 查找对应的 recommand。
- 执行 recommand 对应的实现函数。
- 执行以后将结果存入 buf & bufpos & reply 中,返回给调用方。
Redis Server (服务端)
上文是从 redisClient 的角度来观察命令的执行,文章接下来的部分将会从 Redis 的代码层面,微观的观察 Redis 是怎么实现命令的执行的。
redisServer 的启动
在了解redisServer 的工作机制的工作机制之前,需要了解 redisServer 的启动做了什么:
可以继续观察 Redis 的 main() 函数。
1 2 3 4 5 6 7 8 9 |
int main(int argc, char **argv) { //... // 创建并初始化服务器数据结构 initServer(); //... } |
我们只关注 initServer()
这个函数,他负责初始化服务器的数据结构。继续跟踪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
void initServer() { //... //创建eventLoop server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR); /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ // 为 TCP 连接关联连接应答(accept)处理器 // 用于接受并应答客户端的 connect() 调用 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } // 为本地套接字关联应答处理器 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); //... } |
篇幅限制,我们省略了很多与本编文章无关的代码,保留了核心逻辑代码。
在上一篇文章中 《Redis 中的事件驱动模型》 我们讲解过,redis 使用不同的事件处理器,处理不同的事件。
在这段代码里面:
- 初始化了事件处理器的 eventLoop
- 向 eventLoop 中注册了两个事件处理器
acceptTcpHandler
和acceptUnixHandler
,分别处理远程的链接和本地链接。
redisClient 的创建
当有一个远程客户端连接到 Redis 的服务器,会触发 acceptTcpHandler
事件处理器.
acceptTcpHandler
事件处理器,会创建一个链接。然后继续调用 acceptCommonHandler
。
acceptCommonHandler
事件处理器的作用是:
- 调用
createClient()
方法创建 redisClient - 检查已经创建的 redisClient 是否超过 server 允许的数量的上限
- 如果超过上限就拒绝远程连接
- 否则创建 redisClient 创建成功
- 并更新连接的统计次数,更新 redisClinet 的 flags 字段
这个时候 Redis 在服务端创建了 redisClient 数据结构,这个时候远程的客户端就在 redisServer 中创建了一个代理。远程的客户端就与 Redis 服务器建立了联系,就可以向服务器发送命令了。
处理命令
在 createClient()
行数中:
1 2 |
// 绑定读事件到事件 loop (开始接收命令请求) if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR) |
向 eventLoop 中注册了 readQueryFromClient
。readQueryFromClient
的作用就是从client中读取客户端的查询缓冲区内容。
然后调用函数 processInputBuffer
来处理客户端的请求。在 processInputBuffer
中有几个核心函数:
processInlineBuffer
和processMultibulkBuffer
解析 querybuf 中的命令,记录到 argc 和 argv 中。processCommand
根据 argv[0] 查找对应的 recommen,执行 recommend 对应的执行函数。在执行之前还会验证命令的正确性。将结果存入 buf & bufpos & reply 中
返回数据
万事具备了,执行完了命令就需要把数据返回给远程的调用方。调用链如下
processCommand -> addReply -> prepareClientToWrite
在 prepareClientToWrite
中我们有见到了熟悉的代码:
1
|
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
|
向 eventloop 绑定了 sendReplyToClient
事件处理器。
在 sendReplyToClient
中观察代码发现,如果 bufpos 大于 0,将会把 buf 发送给远程的客户端,如果链表 reply 的长度大于0,就会将遍历链表 reply,发送给远程的客户端,这里需要注意的是,为了避免 reply 数据量过大,就会过度的占用资源引起 Redis 相应慢。为了解决这个问题,当写入的总数量大于 REDIS_MAX_WRITE_PER_EVENT 时,Redis 将会临时中断写入,记录操作的进度,将处理时间让给其他操作,剩余的内容等下次继续。这样的套路我们一路走来看过太多了。
总结
- 远程客户端连接到 redis 后,redis服务端会为远程客户端创建一个 redisClient 作为代理。
- redis 会读取嵌套字中的数据,写入 querybuf 中。
- 解析 querybuf 中的命令,记录到 argc 和 argv 中。
- 根据 argv[0] 查找对应的 recommand。
- 执行 recommend 对应的执行函数。
- 执行以后将结果存入 buf & bufpos & reply 中。
- 返回给调用方。返回数据的时候,会控制写入数据量的大小,如果过大会分成若干次。保证 redis 的相应时间。
Redis 作为单线程应用,一直贯彻的思想就是,每个步骤的执行都有一个上限(包括执行时间的上限或者文件尺寸的上限)一旦达到上限,就会记录下当前的执行进度,下次再执行。保证了 Redis 能够及时响应不发生阻塞。