写事件处理
Redis 实例在收到客户端请求后,会在处理客户端命令后,将要返回的数据写入客户端输出缓冲区。下图就展示了这个过程的函数调用逻辑:
而在 Redis 事件驱动框架每次循环进入事件处理函数前,也就是在框架主函数 aeMain 中调用 aeProcessEvents,来处理监听到的已触发事件或是到时的时间事件之前,都会调用 server.c 文件中的 beforeSleep 函数,进行一些任务处理,这其中就包括了调用 handleClientsWithPendingWrites 函数,它会将 Redis sever 客户端缓冲区中的数据写回客户端。
下面给出的代码是事件驱动框架的主函数 aeMain。在该函数每次调用 aeProcessEvents 函数前,就会调用 beforeSleep 函数,你可以看下。
ae.c文件中查看
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { //如果beforeSleep函数不为空,则调用beforeSleep函数 if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); //调用完beforeSleep函数,再处理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
这里你要知道,beforeSleep 函数调用的 handleClientsWithPendingWrites 函数,会遍历每一个待写回数据的客户端,然后调用 writeToClient 函数,将客户端输出缓冲区中的数据写回。下面这张图展示了这个流程,你可以看下。
不过,如果输出缓冲区的数据还没有写完,此时,handleClientsWithPendingWrites 函数就会调用 aeCreateFileEvent 函数,创建可写事件,并设置回调函数 sendReplyToClient。sendReplyToClient 函数里面会调用 writeToClient 函数写回数据。
下面的代码展示了 handleClientsWithPendingWrite 函数的基本流程,你可以看下。
networking.c文件中查看
/* This function is called just before entering the event loop, in the hope * we can just write the replies to the client output buffer without any * need to use a syscall in order to install the writable event handler, * get it called, and so forth. */ int handleClientsWithPendingWrites(void) { listIter li; listNode *ln; int processed = listLength(server.clients_pending_write); // 获取待写回的客户端列表 listRewind(server.clients_pending_write,&li); // 遍历每一个待写回的客户端 while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(server.clients_pending_write,ln); /* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ if (c->flags & CLIENT_PROTECTED) continue; /* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue; // 调用writeToClient将当前客户端的输出缓冲区数据写回 /* Try to write buffers to the client socket. */ if (writeToClient(c,0) == C_ERR) continue; // 如果还有待写回数据 /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { int ae_barrier = 0; /* For the fsync=always policy, we want that a given FD is never * served for reading and writing in the same event loop iteration, * so that in the middle of receiving the query, and serving it * to the client, we'll call beforeSleep() that will do the * actual fsync of AOF to disk. the write barrier ensures that. */ // 创建可写事件的监听,以及设置回调函数 if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) { ae_barrier = 1; } if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { freeClientAsync(c); } } } return processed; }
好了,我们刚才了解的是读写事件对应的回调处理函数。实际上,为了能及时处理这些事件,Redis 事件驱动框架的 aeMain 函数还会循环调用 aeProcessEvents 函数,来检测已触发的事件,并调用相应的回调函数进行处理。
从 aeProcessEvents 函数的代码中,我们可以看到该函数会调用 aeApiPoll 函数,查询监听的文件描述符中,有哪些已经就绪。一旦有描述符就绪,aeProcessEvents 函数就会根据事件的可读或可写类型,调用相应的回调函数进行处理。aeProcessEvents 函数调用的基本流程如下所示:
ae.c文件中查看
/* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * If flags is 0, the function does nothing and returns. * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * if flags has AE_FILE_EVENTS set, file events are processed. * if flags has AE_TIME_EVENTS set, time events are processed. * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* 若没有事件处理,则立刻返回*/ /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want to call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ /*如果有IO事件发生,或者紧急的时间事件发生,则开始处理*/ // 请注意,即使没有要处理的文件事件,只要我们想要处理时间事件,我们也想调用 select(),以便在下一次事件准备好触发之前sleep if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; struct timeval tv, *tvp; int64_t usUntilTimer = -1; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) usUntilTimer = usUntilEarliestTimer(eventLoop); if (usUntilTimer >= 0) { tv.tv_sec = usUntilTimer / 1000000; tv.tv_usec = usUntilTimer % 1000000; tvp = &tv; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } if (eventLoop->flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when * some event fires. */ // 调用aeApiPoll获取就绪的描述符 numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsyncing a file to disk, * before replying to a client. */ int invert = fe->mask & AE_BARRIER; /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ // 如果触发的是可读事件,调用事件注册时设置的读事件回调处理函数 if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } // 如果触发的是可写事件,调用事件注册时设置的写事件回调处理函数 /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } /* Check time events */ /* 检查是否有时间事件,若有,则调用processTimeEvents函数处理 */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); /* 返回已经处理的文件或时间*/ return processed; /* return the number of processed file/time events */ }
到这里,我们就了解了 IO 事件的创建函数 aeCreateFileEvent,以及在处理客户端请求时对应的读写事件和它们的处理函数。那么接下来,我们再来看看事件驱动框架中的时间事件是怎么创建和处理的。
时间事件处理
其实,相比于 IO 事件有可读、可写、屏障类型,以及不同类型 IO 事件有不同回调函数来说,时间事件的处理就比较简单了。下面,我们就来分别学习下它的定义、创建、回调函数和触发处理。
时间事件定义
首先,我们来看下时间事件的结构体定义,代码如下所示:
ae.h文件中查看
/* Time event structure */ typedef struct aeTimeEvent { // 时间事件ID long long id; /* time event identifier. */ // 事件到达的微秒级时间戳 monotime when; // 时间事件触发后的处理函数 aeTimeProc *timeProc; // 事件结束后的处理函数 aeEventFinalizerProc *finalizerProc; // 事件相关的私有数据 void *clientData; // 时间事件链表的前向指针 struct aeTimeEvent *prev; // 时间事件链表的后向指针 struct aeTimeEvent *next; int refcount; /* refcount to prevent timer events from being * freed in recursive time event calls. */ } aeTimeEvent;
时间事件结构体中主要的变量,包括以微秒记录的时间事件触发时的时间戳 when,以及时间事件触发后的处理函数*timeProc。另外,在时间事件的结构体中,还包含了前向和后向指针*prev和*next,这表明时间事件是以链表的形式组织起来的。
在了解了时间事件结构体的定义以后,我们接着来看下,时间事件是如何创建的。
时间事件创建
与 IO 事件创建使用 aeCreateFileEvent 函数类似,时间事件的创建函数是 aeCreateTimeEvent 函数。这个函数的原型定义如下所示:
ae.c文件中查看
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; te->when = getMonotonicUs() + milliseconds * 1000; te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL; te->next = eventLoop->timeEventHead; te->refcount = 0; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; return id; }
在它的参数中,有两个需要我们重点了解下,以便于我们理解时间事件的处理。一个是 milliseconds,这是所创建时间事件的触发时间距离当前时间的时长,是用毫秒表示的。另一个是 *proc,这是所创建时间事件触发后的回调函数。
aeCreateTimeEvent 函数的执行逻辑不复杂,主要就是创建一个时间事件的变量 te,对它进行初始化,并把它插入到框架循环流程结构体 eventLoop 中的时间事件链表中。在这个过程中,aeCreateTimeEvent 函数会调用 aeAddMillisecondsToNow 函数,根据传入的 milliseconds 参数,计算所创建时间事件具体的触发时间戳,并赋值给 te。
实际上,Redis server 在初始化时,除了创建监听的 IO 事件外,也会调用 aeCreateTimeEvent 函数创建时间事件。下面代码显示了 initServer 函数对 aeCreateTimeEvent 函数的调用:
initServer() { … //创建时间事件 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR){ … //报错信息 } }
从代码中,我们可以看到,时间事件触发后的回调函数是 serverCron。所以接下来,我们就来了解下 serverCron 函数。
时间事件回调函数
serverCron 函数是在 server.c 文件中实现的。
- 一方面,它会顺序调用一些函数,来实现时间事件被触发后,执行一些后台任务。比如,serverCron 函数会检查是否有进程结束信号,若有就执行 server 关闭操作。serverCron 会调用 databaseCron 函数,处理过期 key 或进行 rehash 等。你可以参考下面给出的代码:
... //如果收到进程结束信号,则执行server关闭操作 if (server.shutdown_asap) { if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); ... } ... clientCron(); //执行客户端的异步操作 databaseCron(); //执行数据库的后台操作 ...
- 另一方面,serverCron 函数还会以不同的频率周期性执行一些任务,这是通过执行宏 run_with_period 来实现的。
run_with_period 宏定义如下,该宏定义会根据 Redis 实例配置文件 redis.conf 中定义的 hz 值,来判断参数 ms 表示的时间戳是否到达。一旦到达,serverCron 就可以执行相应的任务了。
server.h文件中查看
/* Using the following macro you can run code inside serverCron() with the * specified period, specified in milliseconds. * The actual resolution depends on server.hz. */ #define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
比如,serverCron 函数中会以 1 秒 1 次的频率,检查 AOF 文件是否有写错误。如果有的话,serverCron 就会调用 flushAppendOnlyFile 函数,再次刷回 AOF 文件的缓存数据。下面的代码展示了这一周期性任务:
serverCron() { … //每1秒执行1次,检查AOF是否有写错误 run_with_period(1000) { if (server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); } … }
如果你想了解更多的周期性任务,可以再详细阅读下 serverCron 函数中,以 run_with_period 宏定义包含的代码块。
好了,了解了时间事件触发后的回调函数 serverCron,我们最后来看下,时间事件是如何触发处理的。
时间事件的触发处理
其实,时间事件的检测触发比较简单,事件驱动框架的 aeMain 函数会循环调用 aeProcessEvents 函数,来处理各种事件。而 aeProcessEvents 函数在执行流程的最后,会调用 processTimeEvents 函数处理相应到时的任务。
aeProcessEvents(){ … //检测时间事件是否触发 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); … }
那么,具体到 proecessTimeEvent 函数来说,它的基本流程就是从时间事件链表上逐一取出每一个事件,然后根据当前时间判断该事件的触发时间戳是否已满足。如果已满足,那么就调用该事件对应的回调函数进行处理。这样一来,周期性任务就能在不断循环执行的 aeProcessEvents 函数中,得到执行了。
下面的代码显示了 processTimeEvents 函数的基本流程,你可以再看下。
ae.c文件中查看
/* Process time events */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; // 从时间事件链表中取出事件 te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; // 获取当前时间 monotime now = getMonotonicUs(); while(te) { long long id; /* Remove events scheduled for deletion. */ // 删除计划删除的事件 if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */ // 如果此计时器事件存在引用,请不要释放它。因为它会由于递归 timerProc 调用而增加 if (te->refcount) { te = next; continue; } if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) { te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); } zfree(te); te = next; continue; } /* Make sure we don't process time events created by time events in * this iteration. Note that this check is currently useless: we always * add new timers on the head, however if we change the implementation * detail, this check may be useful again: we keep it here for future * defense. */ if (te->id > maxId) { te = te->next; continue; } // 时间事件的时间小于当前时间点 if (te->when <= now) { int retval; id = te->id; te->refcount++; retval = te->timeProc(eventLoop, id, te->clientData); te->refcount--; processed++; now = getMonotonicUs(); if (retval != AE_NOMORE) { te->when = now + retval * 1000; } else { te->id = AE_DELETED_EVENT_ID; } } // 获取下一个时间事件 te = te->next; } return processed; }
这节课,我给你介绍了 Redis 事件驱动框架中的两类事件:IO 事件和时间事件。
对于 IO 事件来说,它可以进一步分成可读、可写和屏障事件。因为可读、可写事件在 Redis 和客户端通信处理请求过程中使用广泛,所以今天我们重点学习了这两种 IO 事件。当 Redis server 创建 Socket 后,就会注册可读事件,并使用 acceptTCPHandler 回调函数处理客户端的连接请求。
当 server 和客户端完成连接建立后,server 会在已连接套接字上监听可读事件,并使用 readQueryFromClient 函数处理客户端读写请求。这里,你需要再注意下,无论客户端发送的请求是读或写操作,对于 server 来说,都是要读取客户端的请求并解析处理。所以,server 在客户端的已连接套接字上注册的是可读事件。而当实例需要向客户端写回数据时,实例会在事件驱动框架中注册可写事件,并使用 sendReplyToClient 作为回调函数,将缓冲区中数据写回客户端。
我总结了一张表格,以便你再回顾下 IO 事件和相应套接字、回调函数的对应关系。
然后,对于时间事件来说,它主要是用于在事件驱动框架中注册一些周期性执行的任务,以便 Redis server 进行后台处理。时间事件的回调函数是 serverCron 函数,你可以做进一步阅读了解其中的具体任务。
Redis 在调用 aeApiCreate、aeApiAddEvent 这些函数时,是根据什么条件来决定,具体调用哪个文件中的 IO 多路复用函数的?
在 ae.c 中,根据不同平台,首先定义好了要导入的封装好的 IO 多路复用函数,每个平台对应的文件中都定义了 aeApiCreate、aeApiAddEvent 这类函数,在执行时就会执行对应平台的函数逻辑。
/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ // 以下应按性能顺序排列,降序排列 #ifdef HAVE_EVPORT #include "ae_evport.c" // Solaris #else #ifdef HAVE_EPOLL #include "ae_epoll.c" // Linux #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" // MacOS #else #include "ae_select.c" // windows #endif #endif #endif