Redis中的事件驱动框架(一)

本文涉及的产品
云原生内存数据库 Tair,内存型 2GB
云数据库 Redis 版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis中的事件驱动框架

在上篇文章,学习了 Redis 事件驱动框架的基本工作机制,其中介绍了事件驱动框架基于的 Reactor 模型,并以 IO 事件中的客户端连接事件为例,给你介绍了框架运行的基本流程:从 server 初始化时调用 aeCreateFileEvent 函数注册监听事件,到 server 初始化完成后调用 aeMain 函数,而 aeMain 函数循环执行 aeProceeEvent 函数,来捕获和处理客户端请求触发的事件。

但是在上篇文章当中,我们主要关注的是框架基本流程,所以到这里,你或许仍然存有一些疑问,比如说:

  • Redis 事件驱动框架监听的 IO 事件,除了上节课介绍的客户端连接以外,还有没有其他事件?
  • 而除了 IO 事件以外,框架还会监听其他事件么?
  • 这些事件的创建和处理又分别对应了 Redis 源码中的哪些具体操作?

今天这节课,我就来给你介绍下 Redis 事件驱动框架中的两大类事件类型:IO 事件和时间事件以及它们相应的处理机制。

事实上,了解和学习这部分内容,一方面可以帮助我们更加全面地掌握,Redis 事件驱动框架是如何以事件形式,处理 server 运行过程中面临的请求操作和多种任务的。比如,正常的客户端读写请求是以什么事件、由哪个函数进行处理,以及后台快照任务又是如何及时启动的。

因为事件驱动框架是 Redis server 运行后的核心循环流程,了解它何时用什么函数处理哪种事件,对我们排查 server 运行过程中遇到的问题,是很有帮助的。

另一方面,我们还可以学习到如何在一个框架中,同时处理 IO 事件和时间事件。我们平时开发服务器端程序,经常需要处理周期性任务,而 Redis 关于两类事件的处理实现,就给了我们一个不错的参考。

好,为了对这两类事件有个相对全面的了解,接下来,我们先从事件驱动框架循环流程的数据结构及其初始化开始学起,因为这里面就包含了针对这两类事件的数据结构定义和初始化操作。

aeEventLoop 结构体与初始化

首先,我们来看下 Redis 事件驱动框架循环流程对应的数据结构 aeEventLoop。这个结构体是在事件驱动框架代码ae.h中定义的,记录了框架循环运行过程中的信息,其中,就包含了记录两类事件的变量,分别是:

  • aeFileEvent 类型的*指针 events,表示 IO 事件。之所以类型名称为 aeFileEvent,是因为所有的 IO 事件都会用文件描述符进行标识;
  • aeTimeEvent 类型的*指针 timeEventHead,表示时间事件,即按一定时间周期触发的事件。

此外,aeEventLoop 结构体中还有一个 aeFiredEvent 类型的指针 fired,这个并不是一类专门的事件类型,它只是用来记录已触发事件对应的文件描述符信息*。

下面的代码显示了 Redis 中事件循环的结构体定义,你可以看下。

ae.h文件中查看

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    // IO事件数组
    aeFileEvent *events; /* Registered events */ 
    // 已经触发事件数组
    aeFiredEvent *fired; /* Fired events */
    // 记录时间事件链表头
    aeTimeEvent *timeEventHead;
    int stop;
    // 和API调用接口相关的数据
    void *apidata; /* This is used for polling API specific data */
    // 进入事件循环流程前执行的函数
    aeBeforeSleepProc *beforesleep;
    // 退出事件循环流程后执行的函数
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

了解了 aeEventLoop 结构体后,我们再来看下,这个结构体是如何初始化的,这其中就包括了 IO 事件数组和时间事件链表的初始化。

aeCreateEventLoop 函数的初始化操作

因为 Redis server 在完成初始化后,就要开始运行事件驱动框架的循环流程,所以,aeEventLoop 结构体在server.c的 initServer 函数中,就通过调用 aeCreateEventLoop 函数进行初始化了。这个函数的参数只有一个,是 setsize。

下面的代码展示了 initServer 函数中对 aeCreateEventLoop 函数的调用。

server.c文件中查看

initServer() {
//调用aeCreateEventLoop函数创建aeEventLoop结构体,并赋值给server结构的el变量
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
}

从这里我们可以看到参数 setsize 的大小,其实是由 server 结构的 maxclients 变量和宏定义 CONFIG_FDSET_INCR 共同决定的。其中,maxclients 变量的值大小,可以在 Redis 的配置文件 redis.conf 中进行定义,默认值是 1000。而宏定义 CONFIG_FDSET_INCR 的大小,等于宏定义 CONFIG_MIN_RESERVED_FDS 的值再加上 96,如下所示,这里的两个宏定义都是在server.h文件中定义的。

#define CONFIG_MIN_RESERVED_FDS 32
#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96)

好了,到这里,你可能有疑问了:aeCreateEventLoop 函数的参数 setsize,设置为最大客户端数量加上一个宏定义值,可是这个参数有什么用呢?这就和 aeCreateEventLoop 函数具体执行的初始化操作有关了。接下来,我们就来看下 aeCreateEventLoop 函数执行的操作,大致可以分成以下三个步骤。

  • 第一步,aeCreateEventLoop 函数会创建一个 aeEventLoop 结构体类型的变量 eventLoop。然后,该函数会给 eventLoop 的成员变量分配内存空间,比如,按照传入的参数 setsize,给 IO 事件数组和已触发事件数组分配相应的内存空间。此外,该函数还会给 eventLoop 的成员变量赋初始值。
  • 第二步,aeCreateEventLoop 函数会调用 aeApiCreate 函数。aeApiCreate 函数封装了操作系统提供的 IO 多路复用函数,假设 Redis 运行在 Linux 操作系统上,并且 IO 多路复用机制是 epoll,那么此时,aeApiCreate 函数就会调用 epoll_create 创建 epoll 实例,同时会创建 epoll_event 结构的数组,数组大小等于参数 setsize。

这里你需要注意,aeApiCreate 函数是把创建的 epoll 实例描述符和 epoll_event 数组,保存在了 aeApiState 结构体类型的变量 state,如下所示:

在ae_epoll.c文件中查看

// aeApiState结构体定义
typedef struct aeApiState {
    // epoll实例的描述符
    int epfd;
    // epoll_event结构体数组,记录监听事件
    struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    if (!state) return -1;
    // 将epoll_event数组保存在aeApiState结构体变量state中
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    // 将epoll实例描述符保存在aeApiState结构体变量state中
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    anetCloexec(state->epfd);
    eventLoop->apidata = state;
    return 0;
}

紧接着,aeApiCreate 函数把 state 变量赋值给 eventLoop 中的 apidata。这样一来,eventLoop 结构体中就有了 epoll 实例和 epoll_event 数组的信息,这样就可以用来基于 epoll 创建和处理事件了。我一会儿还会给你具体介绍。

eventLoop->apidata = state;
  • 第三步,aeCreateEventLoop 函数会把所有网络 IO 事件对应文件描述符的掩码,初始化为 AE_NONE,表示暂时不对任何事件进行监听。

我把 aeCreateEventLoop 函数的主要部分代码放在这里,你可以看下。

ae.c文件中查看

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    monotonicInit();    /* just in case the calling app didn't initialize */
    // 给eventLoop变量分配内存空间
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    // 给IO事件、已触发事件分配内存空间
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    // 设置时间事件的链表头为NULL
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    // 调用aeApiCreate函数,去实际调用操作系统提供的IO多路复用函数
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    // 将所有网络IO事件对应文件描述符的掩码设置为AE_NONE
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
//初始化失败后的处理逻辑
err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

好,那么从 aeCreateEventLoop 函数的执行流程中,我们其实可以看到以下两个关键点:

  • 事件驱动框架监听的 IO 事件数组大小就等于参数 setsize,这样决定了和 Redis server 连接的客户端数量。所以,当你遇到客户端连接 Redis 时报错“max number of clients reached”,你就可以去 redis.conf 文件修改 maxclients 配置项,以扩充框架能监听的客户端数量。
  • 当使用 Linux 系统的 epoll 机制时,框架循环流程初始化操作,会通过 aeApiCreate 函数创建 epoll_event 结构数组,并调用 epoll_create 函数创建 epoll 实例,这都是使用 epoll 机制的准备工作要求。

IO 事件处理

事实上,Redis 的 IO 事件主要包括三类,分别是可读事件、可写事件和屏障事件。

其中,可读事件和可写事件其实比较好理解,也就是对应于 Redis 实例,我们可以从客户端读取数据或是向客户端写入数据。而屏障事件的主要作用是用来反转事件的处理顺序。比如在默认情况下,Redis 会先给客户端返回结果,但是如果面临需要把数据尽快写入磁盘的情况,Redis 就会用到屏障事件,把写数据和回复客户端的顺序做下调整,先把数据落盘,再给客户端回复。

在 Redis 源码中,IO 事件的数据结构是 aeFileEvent 结构体,IO 事件的创建是通过 aeCreateFileEvent 函数来完成的。下面的代码展示了 aeFileEvent 结构体的定义,你可以再回顾下:

typedef struct aeFileEvent {
    int mask; //掩码标记,包括可读事件、可写事件和屏障事件
    aeFileProc *rfileProc;   //处理可读事件的回调函数
    aeFileProc *wfileProc;   //处理可写事件的回调函数
    void *clientData;  //私有数据
} aeFileEvent;

而对于 aeCreateFileEvent 函数来说,在上节课我们已经了解了它是通过 aeApiAddEvent 函数来完成事件注册的。那么接下来,我们再从代码级别看下它是如何执行的,这可以帮助我们更加透彻地理解,事件驱动框架对 IO 事件监听是如何基于 epoll 机制对应封装的。

IO 事件创建

首先,我们来看 aeCreateFileEvent 函数的原型定义,如下所示:

ae.c文件中查看

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)

这个函数的参数有 5 个,分别是

  • 循环流程结构体 *eventLoop
  • IO 事件对应的文件描述符 fd
  • 事件类型掩码 mask
  • 事件处理回调函数*proc
  • 事件私有数据*clientData。

因为循环流程结构体*eventLoop中有 IO 事件数组,这个数组的元素是 aeFileEvent 类型,所以,每个数组元素都对应记录了一个文件描述符(比如一个套接字)相关联的监听事件类型和回调函数。

aeCreateFileEvent 函数会先根据传入的文件描述符 fd,在 eventLoop 的 IO 事件数组中,获取该描述符关联的 IO 事件指针变量*fe,如下所示:

aeFileEvent *fe = &eventLoop->events[fd];

紧接着,aeCreateFileEvent 函数会调用 aeApiAddEvent 函数,添加要监听的事件:

if (aeApiAddEvent(eventLoop, fd, mask) == -1)
   return AE_ERR;

aeApiAddEvent 函数实际上会调用操作系统提供的 IO 多路复用函数,来完成事件的添加。我们还是假设 Redis 实例运行在使用 epoll 机制的 Linux 上,那么 aeApiAddEvent 函数就会调用 epoll_ctl 函数,添加要监听的事件。我在第 9 讲中其实已经给你介绍过 epoll_ctl 函数,这个函数会接收 4 个参数,分别是:

  • epoll 实例;
  • 要执行的操作类型(是添加还是修改);
  • 要监听的文件描述符;
  • epoll_event 类型变量。

那么,这个调用过程是如何准备 epoll_ctl 函数需要的参数,从而完成执行的呢?

  • 首先,epoll 实例是我刚才给你介绍的 aeCreateEventLoop 函数,它是通过调用 aeApiCreate 函数来创建的,保存在了 eventLoop 结构体的 apidata 变量中,类型是 aeApiState。所以,aeApiAddEvent 函数会先获取该变量,如下所示:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    //从eventLoop结构体中获取aeApiState变量,里面保存了epoll实例
  aeApiState *state = eventLoop->apidata;
    ...
 }
  • 其次,对于要执行的操作类型的设置,aeApiAddEvent 函数会根据传入的文件描述符 fd,在 eventLoop 结构体中 IO 事件数组中查找该 fd。因为 IO 事件数组的每个元素,都对应了一个文件描述符,而该数组初始化时,每个元素的值都设置为了 AE_NONE。

所以,如果要监听的文件描述符 fd 在数组中的类型不是 AE_NONE,则表明该描述符已做过设置,那么操作类型就是修改操作,对应 epoll 机制中的宏定义 EPOLL_CTL_MOD。否则,操作类型就是添加操作,对应 epoll 机制中的宏定义 EPOLL_CTL_ADD。这部分代码如下所示:

//如果文件描述符fd对应的IO事件已存在,则操作类型为修改,否则为添加
 int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;

第三,epoll_ctl 函数需要的监听文件描述符,就是 aeApiAddEvent 函数接收到的参数 fd。

  • 最后,epoll_ctl 函数还需要一个 epoll_event 类型变量,因此 aeApiAddEvent 函数在调用 epoll_ctl 函数前,会新创建 epoll_event 类型变量 ee。然后,aeApiAddEvent 函数会设置变量 ee 中的监听事件类型和监听文件描述符。

aeApiAddEvent 函数的参数 mask,表示的是要监听的事件类型掩码。所以,aeApiAddEvent 函数会根据掩码值是可读(AE_READABLE)或可写(AE_WRITABLE)事件,来设置 ee 监听的事件类型是 EPOLLIN 还是 EPOLLOUT。这样一来,Redis 事件驱动框架中的读写事件就能够和 epoll 机制中的读写事件对应上来。下面的代码展示了这部分逻辑,你可以看下。

struct epoll_event ee = {0}; //创建epoll_event类型变量
//将可读或可写IO事件类型转换为epoll监听的类型EPOLLIN或EPOLLOUT
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;  //将要监听的文件描述符赋值给ee

好了,到这里,aeApiAddEvent 函数就准备好了 epoll 实例、操作类型、监听文件描述符以及 epoll_event 类型变量,然后,它就会调用 epoll_ctl 开始实际创建监听事件了,如下所示:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
...
//调用epoll_ctl实际创建监听事件
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}

了解了这些代码后,我们可以学习到事件驱动框架是如何基于 epoll,封装实现了 IO 事件的创建。那么,在 Redis server 启动运行后,最开始监听的 IO 事件是可读事件,对应于客户端的连接请求。具体是 initServer 函数调用了 aeCreateFileEvent 函数,创建可读事件,并设置回调函数为 acceptTcpHandler,用来处理客户端连接。

接下来,我们再来看下一旦有了客户端连接请求后,IO 事件具体是如何处理的呢?

读事件处理

当 Redis server 接收到客户端的连接请求时,就会使用注册好的 acceptTcpHandler 函数进行处理。

acceptTcpHandler 函数是在networking.c文件中,它会接受客户端连接,并创建已连接套接字 cfd。然后,acceptCommonHandler 函数(在 networking.c 文件中)会被调用,同时,刚刚创建的已连接套接字 cfd 会作为参数,传递给 acceptCommonHandler 函数。

networking文件中查看

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        anetCloexec(cfd);
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

此时,aeCreateFileEvent 函数会针对已连接套接字上,创建监听事件,类型为 AE_READABLE,回调函数是 readQueryFromClient(在 networking.c 文件中)。

好了,到这里,事件驱动框架就增加了对一个客户端已连接套接字的监听。一旦客户端有请求发送到 server,框架就会回调 readQueryFromClient 函数处理请求。这样一来,客户端请求就能通过事件驱动框架进行处理了

下面代码展示了 createClient 函数调用 aeCreateFileEvent 的过程

client *createClient(int fd) {
    if (fd != -1) {
            //调用aeCreateFileEvent,监听读事件,对应客户端读写请求,使用readQueryFromclient回调函数处理
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR)
            {
                close(fd);
                zfree(c);
                return NULL;
            } 
    }
}

为了便于你掌握从监听客户端连接请求到监听客户端常规读写请求的事件创建过程,我画了下面这张图,你可以看下。

了解了事件驱动框架中的读事件处理之后,我们再来看下写事件的处理。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4月前
|
NoSQL Java Redis
springboot搭建后台框架 (二)整合Redis
springboot搭建后台框架 (二)整合Redis
65 0
|
4月前
|
缓存 NoSQL Java
微服务框架(十二)Spring Boot Redis 缓存
  此系列文章将会描述Java框架Spring Boot、服务治理框架Dubbo、应用容器引擎Docker,及使用Spring Boot集成Dubbo、Mybatis等开源框架,其中穿插着Spring Boot中日志切面等技术的实现。 本文为Spring Boot集成Redis。 在这篇文章中,我们将配置一个Spring Boot应用程序示例,并将其与Redis Cache 集成。虽然Redis是一个开源是一个开源内存数据结构存储,用作数据库,缓存和消息代理,但本文仅演示缓存集成。
|
11天前
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。
京东T9纯手打688页神笔记,SSM框架整合Redis搭建高效互联网应用
Spring框架是Java应用最广的框架。它的成功来源于理念,而不是技术本身,它的理念包括loC (Inversion of Control,控制反转)和AOP (Aspect Oriented Programming,面向切面编程)。
|
缓存 NoSQL Redis
Redis中的事件驱动框架(二)
Redis中的事件驱动框架
94 0
|
NoSQL 网络协议 Go
用go写一个简单的Redis客户端框架
用go写一个简单的Redis客户端框架
127 0
|
存储 NoSQL 安全
「Redis」事件驱动模型
Redis事件驱动模型
|
缓存 NoSQL API
分布式服务器框架之Servers.Common中使用CSRedis测试操作Redis中的string、hash_table、list、set、zset
在Servers.Common类库总创建了一个TestRedis.cs文件,我是在Servers.GameServer中去初始化的。主要是测试了Redis中常用的数据类型和常用的API,有一些不常用的可能没有写。需要详细了解API的话可以安装上CSRedis之后F12追进去查看,上面有详细的API说明。
分布式服务器框架之Servers.Common中使用CSRedis测试操作Redis中的string、hash_table、list、set、zset
|
NoSQL Redis CDN
分布式服务器框架之搭建C#+MongoDB+Redis初步
WebAccount站点主要干的事儿是下发 服务器状态信息,这个服务器会和WorldServer建立连接,等所有的GameServer初始化完成之后会同步给WorldServer,WorldServer同步给账号服务器站点,然后账号站点等待玩家请求。
|
5天前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案