
Redis 开源版,标准版 2GB
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis中的事件驱动框架


Redis 实例在收到客户端请求后,会在处理客户端命令后,将要返回的数据写入客户端输出缓冲区。下图就展示了这个过程的函数调用逻辑:

而在 Redis 事件驱动框架每次循环进入事件处理函数前,也就是在框架主函数 aeMain 中调用 aeProcessEvents,来处理监听到的已触发事件或是到时的时间事件之前,都会调用 server.c 文件中的 beforeSleep 函数,进行一些任务处理,这其中就包括了调用 handleClientsWithPendingWrites 函数,它会将 Redis sever 客户端缓冲区中的数据写回客户端。

下面给出的代码是事件驱动框架的主函数 aeMain。在该函数每次调用 aeProcessEvents 函数前,就会调用 beforeSleep 函数,你可以看下。


void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
  while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);

这里你要知道,beforeSleep 函数调用的 handleClientsWithPendingWrites 函数,会遍历每一个待写回数据的客户端,然后调用 writeToClient 函数,将客户端输出缓冲区中的数据写回。下面这张图展示了这个流程,你可以看下。

不过,如果输出缓冲区的数据还没有写完,此时,handleClientsWithPendingWrites 函数就会调用 aeCreateFileEvent 函数,创建可写事件,并设置回调函数 sendReplyToClientsendReplyToClient 函数里面会调用 writeToClient 函数写回数据。

下面的代码展示了 handleClientsWithPendingWrite 函数的基本流程,你可以看下。


/* This function is called just before entering the event loop, in the hope
 * we can just write the replies to the client output buffer without any
 * need to use a syscall in order to install the writable event handler,
 * get it called, and so forth. */
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);
    // 获取待写回的客户端列表
    // 遍历每一个待写回的客户端
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        /* If a client is protected, don't do anything,
         * that may trigger write error or recreate handler. */
        if (c->flags & CLIENT_PROTECTED) continue;
        /* Don't write to clients that are going to be closed anyway. */
        if (c->flags & CLIENT_CLOSE_ASAP) continue;
        // 调用writeToClient将当前客户端的输出缓冲区数据写回
        /* Try to write buffers to the client socket. */
        if (writeToClient(c,0) == C_ERR) continue;
        // 如果还有待写回数据
        /* If after the synchronous writes above we still have data to
         * output to the client, we need to install the writable handler. */
        if (clientHasPendingReplies(c)) {
            int ae_barrier = 0;
            /* For the fsync=always policy, we want that a given FD is never
             * served for reading and writing in the same event loop iteration,
             * so that in the middle of receiving the query, and serving it
             * to the client, we'll call beforeSleep() that will do the
             * actual fsync of AOF to disk. the write barrier ensures that. */
            // 创建可写事件的监听,以及设置回调函数
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
                ae_barrier = 1;
            if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
    return processed;

好了,我们刚才了解的是读写事件对应的回调处理函数。实际上,为了能及时处理这些事件,Redis 事件驱动框架的 aeMain 函数还会循环调用 aeProcessEvents 函数,来检测已触发的事件,并调用相应的回调函数进行处理

从 aeProcessEvents 函数的代码中,我们可以看到该函数会调用 aeApiPoll 函数,查询监听的文件描述符中,有哪些已经就绪。一旦有描述符就绪,aeProcessEvents 函数就会根据事件的可读或可写类型,调用相应的回调函数进行处理。aeProcessEvents 函数调用的基本流程如下所示:


/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 * If flags is 0, the function does nothing and returns.
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * the events that's possible to process without to wait are processed.
 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
 * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
 * The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    int processed = 0, numevents;
    /* 若没有事件处理,则立刻返回*/
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    /* Note that we want to call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    // 请注意,即使没有要处理的文件事件,只要我们想要处理时间事件,我们也想调用 select(),以便在下一次事件准备好触发之前sleep
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        struct timeval tv, *tvp;
        int64_t usUntilTimer = -1;
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            usUntilTimer = usUntilEarliestTimer(eventLoop);
        if (usUntilTimer >= 0) {
            tv.tv_sec = usUntilTimer / 1000000;
            tv.tv_usec = usUntilTimer % 1000000;
            tvp = &tv;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
        if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        // 调用aeApiPoll获取就绪的描述符
        numevents = aeApiPoll(eventLoop, tvp);
        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */
            /* Normally we execute the readable event first, and the writable
             * event later. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsyncing a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;
            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             * Fire the readable event if the call sequence is not
             * inverted. */
            // 如果触发的是可读事件,调用事件注册时设置的读事件回调处理函数
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            // 如果触发的是可写事件,调用事件注册时设置的写事件回调处理函数
            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
    /* Check time events */
    /* 检查是否有时间事件,若有,则调用processTimeEvents函数处理 */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    /* 返回已经处理的文件或时间*/
    return processed; /* return the number of processed file/time events */

到这里,我们就了解了 IO 事件的创建函数 aeCreateFileEvent,以及在处理客户端请求时对应的读写事件和它们的处理函数。那么接下来,我们再来看看事件驱动框架中的时间事件是怎么创建和处理的。


其实,相比于 IO 事件有可读、可写、屏障类型,以及不同类型 IO 事件有不同回调函数来说,时间事件的处理就比较简单了。下面,我们就来分别学习下它的定义、创建、回调函数和触发处理。




/* Time event structure */
typedef struct aeTimeEvent {
    // 时间事件ID
    long long id; /* time event identifier. */
    // 事件到达的微秒级时间戳
    monotime when;
    // 时间事件触发后的处理函数
    aeTimeProc *timeProc;
    // 事件结束后的处理函数
    aeEventFinalizerProc *finalizerProc;
    // 事件相关的私有数据
    void *clientData;
    // 时间事件链表的前向指针
    struct aeTimeEvent *prev;
    // 时间事件链表的后向指针
    struct aeTimeEvent *next;
    int refcount; /* refcount to prevent timer events from being
         * freed in recursive time event calls. */
} aeTimeEvent;

时间事件结构体中主要的变量,包括以微秒记录的时间事件触发时的时间戳 when,以及时间事件触发后的处理函数*timeProc。另外,在时间事件的结构体中,还包含了前向和后向指针*prev和*next,这表明时间事件是以链表的形式组织起来的。



与 IO 事件创建使用 aeCreateFileEvent 函数类似,时间事件的创建函数是 aeCreateTimeEvent 函数。这个函数的原型定义如下所示:


long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    te->when = getMonotonicUs() + milliseconds * 1000;
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;

在它的参数中,有两个需要我们重点了解下,以便于我们理解时间事件的处理。一个是 milliseconds,这是所创建时间事件的触发时间距离当前时间的时长,是用毫秒表示的。另一个是 *proc,这是所创建时间事件触发后的回调函数。

aeCreateTimeEvent 函数的执行逻辑不复杂,主要就是创建一个时间事件的变量 te,对它进行初始化,并把它插入到框架循环流程结构体 eventLoop 中的时间事件链表中。在这个过程中,aeCreateTimeEvent 函数会调用 aeAddMillisecondsToNow 函数,根据传入的 milliseconds 参数,计算所创建时间事件具体的触发时间戳,并赋值给 te。

实际上,Redis server 在初始化时,除了创建监听的 IO 事件外,也会调用 aeCreateTimeEvent 函数创建时间事件。下面代码显示了 initServer 函数对 aeCreateTimeEvent 函数的调用:

initServer() {
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR){
    … //报错信息

从代码中,我们可以看到,时间事件触发后的回调函数是 serverCron。所以接下来,我们就来了解下 serverCron 函数。


serverCron 函数是在 server.c 文件中实现的。

  • 一方面,它会顺序调用一些函数,来实现时间事件被触发后,执行一些后台任务。比如,serverCron 函数会检查是否有进程结束信号,若有就执行 server 关闭操作。serverCron 会调用 databaseCron 函数,处理过期 key 或进行 rehash 等。你可以参考下面给出的代码:
 if (server.shutdown_asap) {
        if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
clientCron();  //执行客户端的异步操作
databaseCron(); //执行数据库的后台操作
  • 另一方面,serverCron 函数还会以不同的频率周期性执行一些任务,这是通过执行宏 run_with_period 来实现的。

run_with_period 宏定义如下,该宏定义会根据 Redis 实例配置文件 redis.conf 中定义的 hz 值,来判断参数 ms 表示的时间戳是否到达。一旦到达,serverCron 就可以执行相应的任务了。


/* Using the following macro you can run code inside serverCron() with the
 * specified period, specified in milliseconds.
 * The actual resolution depends on server.hz. */
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

比如,serverCron 函数中会以 1 秒 1 次的频率,检查 AOF 文件是否有写错误。如果有的话,serverCron 就会调用 flushAppendOnlyFile 函数,再次刷回 AOF 文件的缓存数据。下面的代码展示了这一周期性任务:

serverCron() {
   run_with_period(1000) {
        if (server.aof_last_write_status == C_ERR)

如果你想了解更多的周期性任务,可以再详细阅读下 serverCron 函数中,以 run_with_period 宏定义包含的代码块。

好了,了解了时间事件触发后的回调函数 serverCron,我们最后来看下,时间事件是如何触发处理的。


其实,时间事件的检测触发比较简单,事件驱动框架的 aeMain 函数会循环调用 aeProcessEvents 函数,来处理各种事件。而 aeProcessEvents 函数在执行流程的最后,会调用 processTimeEvents 函数处理相应到时的任务。

    if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);

那么,具体到 proecessTimeEvent 函数来说,它的基本流程就是从时间事件链表上逐一取出每一个事件,然后根据当前时间判断该事件的触发时间戳是否已满足。如果已满足,那么就调用该事件对应的回调函数进行处理。这样一来,周期性任务就能在不断循环执行的 aeProcessEvents 函数中,得到执行了。

下面的代码显示了 processTimeEvents 函数的基本流程,你可以再看下。


/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    // 从时间事件链表中取出事件
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    // 获取当前时间
    monotime now = getMonotonicUs();
    while(te) {
        long long id;
        /* Remove events scheduled for deletion. */
        // 删除计划删除的事件
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            /* If a reference exists for this timer event,
             * don't free it. This is currently incremented
             * for recursive timerProc calls */
            // 如果此计时器事件存在引用,请不要释放它。因为它会由于递归 timerProc 调用而增加
            if (te->refcount) {
                te = next;
            if (te->prev)
                te->prev->next = te->next;
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc) {
                te->finalizerProc(eventLoop, te->clientData);
                now = getMonotonicUs();
            te = next;
        /* Make sure we don't process time events created by time events in
         * this iteration. Note that this check is currently useless: we always
         * add new timers on the head, however if we change the implementation
         * detail, this check may be useful again: we keep it here for future
         * defense. */
        if (te->id > maxId) {
            te = te->next;
        // 时间事件的时间小于当前时间点
        if (te->when <= now) {
            int retval;
            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            now = getMonotonicUs();
            if (retval != AE_NOMORE) {
                te->when = now + retval * 1000;
            } else {
                te->id = AE_DELETED_EVENT_ID;
        // 获取下一个时间事件
        te = te->next;
    return processed;

这节课,我给你介绍了 Redis 事件驱动框架中的两类事件:IO 事件和时间事件。

对于 IO 事件来说,它可以进一步分成可读、可写和屏障事件。因为可读、可写事件在 Redis 和客户端通信处理请求过程中使用广泛,所以今天我们重点学习了这两种 IO 事件。当 Redis server 创建 Socket 后,就会注册可读事件,并使用 acceptTCPHandler 回调函数处理客户端的连接请求。

当 server 和客户端完成连接建立后,server 会在已连接套接字上监听可读事件,并使用 readQueryFromClient 函数处理客户端读写请求。这里,你需要再注意下,无论客户端发送的请求是读或写操作,对于 server 来说,都是要读取客户端的请求并解析处理。所以,server 在客户端的已连接套接字上注册的是可读事件。而当实例需要向客户端写回数据时,实例会在事件驱动框架中注册可写事件,并使用 sendReplyToClient 作为回调函数,将缓冲区中数据写回客户端。

我总结了一张表格,以便你再回顾下 IO 事件和相应套接字、回调函数的对应关系。

然后,对于时间事件来说,它主要是用于在事件驱动框架中注册一些周期性执行的任务,以便 Redis server 进行后台处理。时间事件的回调函数是 serverCron 函数,你可以做进一步阅读了解其中的具体任务。

Redis 在调用 aeApiCreate、aeApiAddEvent 这些函数时,是根据什么条件来决定,具体调用哪个文件中的 IO 多路复用函数的?

在 ae.c 中,根据不同平台,首先定义好了要导入的封装好的 IO 多路复用函数,每个平台对应的文件中都定义了 aeApiCreate、aeApiAddEvent 这类函数,在执行时就会执行对应平台的函数逻辑。

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
// 以下应按性能顺序排列,降序排列
#include "ae_evport.c" // Solaris
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c" // Linux
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c" // MacOS
        #include "ae_select.c" // windows
