redis server事件模型

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 事件驱动 redis服务器是一个事件驱动的程序,内部需要处理两类事件,一类是文件事件(file event),一类是时间事件(time event),前者对应着处理各种io事件,后者对应着处理各种定时任务。

事件驱动

 redis服务器是一个事件驱动的程序,内部需要处理两类事件,一类是文件事件(file event),一类是时间事件(time event),前者对应着处理各种io事件,后者对应着处理各种定时任务。
 印象中我们都知道redis是单线程服务,其实本质上的确就是这样子的,上面说的file event 和 time event都是由单个线程驱动的,file event 底层其实是通过select/epoll等模型去执行驱动的,time event是通过内部维持一个定时任务列表来实现的。
 redis server的事件模型其实就是经典的NIO模型,底层通过select/epoll等机制实现异步NIO,通过检测到event到来后for循环实现串行处理。


事件注册

server启动流程

 redis server的main函数在启动过程中按照以下三个步骤进行初始化并进入运行状态:

  • 初始化服务器配置
  • 载入配置文件初始化配置(redis的监听端口在这一步完成配置)
  • 创建并初始化服务器的数据结构监听端口等(完成端口的监听,事件注册等)
  • 循环处理事件(循环处理file event 和 time event)
int main(int argc, char **argv) {
    
    // 初始化服务器
    initServerConfig();

    // 载入配置文件, options 是前面分析出的给定选项
    loadServerConfig(configfile,options);

    // 创建并初始化服务器数据结构
    initServer();

    // 运行事件处理器,一直到服务器关闭为止
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeMain(server.el);

    // 服务器关闭,停止事件循环
    aeDeleteEventLoop(server.el);

    return 0;
}

server初始化流程

 整个server初始化过程删除了不相关的代码保留跟这部分文章相关的内容,整个初始化过程核心逻辑如下:

  • 创建监听的socket用于accept连接
  • 注册监听socket到底层时间驱动如select/epoll当中(aeCreateFileEvent)
  • 创建时间事件(aeCreateTimeEvent)
void initServer() {
    // 打开 TCP 监听端口,用于等待客户端的命令请求
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
        exit(1);


    // 为 serverCron() 创建时间事件
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        redisPanic("Can't create the serverCron time event.");
        exit(1);
    }

    // 为 TCP 连接关联连接应答(accept)处理器,用于接受并应答客户端的 connect() 调用
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
}

server绑定文件事件监听过程

  aeCreateFileEvent方法内部关键点在于aeApiAddEvent这个方法,将fd绑定到具体的eventLoop当中

/*
 * 根据 mask 参数的值,监听 fd 文件的状态,
 * 当 fd 可用时,执行 proc 函数
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }

    if (fd >= eventLoop->setsize) return AE_ERR;

    // 取出文件事件结构
    aeFileEvent *fe = &eventLoop->events[fd];

    // 监听指定 fd 的指定事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;

    // 设置文件事件类型,以及事件的处理器
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    // 私有数据
    fe->clientData = clientData;

    // 如果有需要,更新事件处理器的最大 fd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;

    return AE_OK;
}

server处理accpet事件过程

 整个accept的处理过程按照anetTcpAccept -> acceptCommonHandler -> createClient -> aeCreateFileEvent实现socket的accept并注册到eventLoop。

  void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

    while(max--) {
        // accept 客户端连接
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
      
        acceptCommonHandler(cfd,0);
    }
}

  static void acceptCommonHandler(int fd, int flags) {

    // 创建客户端
    redisClient *c;
    if ((c = createClient(fd)) == NULL) {
        redisLog(REDIS_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
}

redisClient *createClient(int fd) {

    // 分配空间
    redisClient *c = zmalloc(sizeof(redisClient));

    // 当 fd 不为 -1 时,创建带网络连接的客户端
    // 如果 fd 为 -1 ,那么创建无网络连接的伪客户端
    // 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时
    // 需要用到这种伪终端
    if (fd != -1) {
        // 绑定读事件到事件 loop (开始接收命令请求)
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
//此处省略很多代码
}


事件处理过程

 redis server启动事件处理的主循环,会循环执行aeProcessEvents方法。

/*
 * 事件处理器的主循环
 */
void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

 aeProcessEvents方法有一个非常巧妙的实现,在内部需要同时处理time event 和 file event,那么如何处理先后问题呢。思路如下:

  • 首先计算time event距离当前的时间最近的时间点得出时间差X
  • 其次在处理file event的时候wait的设置超时时间为X
  • 如果在超时时间X内有file event发生那么就处理file event然后处理time event
  • 如果在超时时间X内没有file event那么刚好可以处理time event事件
  • 最后说明的是在处理event的过程中我们其实通过for循环的串行读写事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 获取最近的时间事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果时间事件存在的话
            // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            // 计算距今最近的时间事件还要多久才能达到
            // 并将该时间距保存在 tv 结构中
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }

            // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            
            // 执行到这一步,说明没有时间事件
            // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度

            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                // 设置文件事件不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 文件事件可以阻塞直到有事件到达为止
                tvp = NULL; /* wait forever */
            }
        }

        // 处理文件事件,阻塞时间由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    }

    /* Check time events */
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
NoSQL Linux Redis
Redis原理之网络模型笔记
Redis采用单线程模型,这意味着一个Redis服务器在任何时刻都只会处理一个请求。Redis的网络模型涉及到阻塞I/O(Blocking I/O)、非阻塞I/O(Non-blocking I/O)、I/O多路复用(I/O Multiplexing)、信号驱动I/O(Signal-driven I/O)以及异步I/O(Asynchronous I/O)。
|
3月前
|
NoSQL Linux Redis
Redis 的网络框架是实现了 Reactor 模型吗?
Redis 的网络框架是实现了 Reactor 模型吗?
|
7月前
|
NoSQL Unix Linux
Redis核心技术与实践 03 | 高性能IO模型:为什么单线程Redis能那么快?
Redis核心技术与实践 03 | 高性能IO模型:为什么单线程Redis能那么快?
|
5月前
|
存储 NoSQL 调度
redis线程模型
redis线程模型
40 0
|
28天前
|
存储 NoSQL 算法
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)(二)
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)
44 0
|
3月前
|
存储 NoSQL 网络协议
redis主从同步对象模型学习笔记
redis主从同步对象模型学习笔记
44 0
|
1天前
|
存储 NoSQL Linux
Redis入门到通关之Redis5种网络模型详解
Redis入门到通关之Redis5种网络模型详解
|
28天前
|
存储 NoSQL Redis
作者推荐 |【Redis技术进阶之路】「原理系列开篇」揭秘高效存储模型与数据结构底层实现(SDS)(三)
作者推荐 |【Redis技术进阶之路】「原理系列开篇」揭秘高效存储模型与数据结构底层实现(SDS)
29 0
|
5月前
|
存储 缓存 NoSQL
如何解决Ubuntu server 下 Redis安装报错:“You need tcl 8.5 or newer in order to run the Redis test”.
如何解决Ubuntu server 下 Redis安装报错:“You need tcl 8.5 or newer in order to run the Redis test”.
138 0
|
2月前
|
NoSQL Java Redis
Spring boot 实现监听 Redis key 失效事件
【2月更文挑战第2天】 Spring boot 实现监听 Redis key 失效事件
79 0

热门文章

最新文章