Redis的实现四:事件循环和计时器

简介: 这篇文章详细解释了在Redis实现中如何通过引入事件循环和计时器来处理超时情况,包括使用链表数据结构管理计时器、更新事件循环以及处理计时器触发的事件。
   我们的服务器缺少了一个**内容**:**超时**。每个网络应用程序都需要处理超时,因为网络的另一边可能会消失。不要只进行持续的IO操作,如读/写需要超时,但启动空闲的TCP连接也是一个好主意。要实现超时,必须修改事件循环,因为轮询是唯一被阻塞的东西。

我们的代码如下:

int rv=poll(poll_args.data(),(nfds_t)poll_args.size(),1000);
     poll系统调用接受一个timeout参数,该参数规定了用于poll系统调用的时间上限。超时值目前是1000毫秒的任意值。如果我们根据计时器设置超时值,poll应该在过期时唤醒,或在此之前唤醒;然后我们就有机会在适当的时候启动计时器。

    问题是我们可能有多个计时器,poll的超时值应该是最近的计时器的超时值。需要一些数据结构来查找最近的计时器。堆数据结构是查找最小/最大值的常用选择,通常用于此目的。此外,还可以使用任何用于排序的数据结构。例如,我们可以使用AVL树来排序计时器,并可能扩展树来跟踪最小值。

    让我们从添加计时器来踢出空闲的TCP连接开始。对于每个连接都有一个计时器,设置为一个固定的超时,每次在连接上有IO活动时,计时器都会更新为一个固定的超时。注意,当我们更新计时器时,它变成了最遥远的一个;因此,我们可以利用这一事实来简化数据结构;一个简单的链表足以保持计时器的顺序:新的或更新的计时器只是到列表的末尾,列表保持有序的顺序。同样,链表上的操作是O(1),哪个比排序数据结构更好

定义链表是一个微不足道的任务:

struct DList {
   DList *prev = NULL;
   DList *next = NULL;
};
inline void dlist_init(DList *node) {
   node->prev = node->next = node;
}
inline bool dlist_empty(DList *node) {
   return node->next == node;
}
inline void dlist_detach(DList *node) {
   DList *prev = node->prev;
   DList *next = node->next;
   prev->next = next;
   next->prev = prev;
}
inline void dlist_insert_before(DList *target, DList *rookie) {
   DList *prev = target->prev;
   prev->next = rookie;
   rookie->prev = prev;
   rookie->next = target;
   target->prev = rookie;
}

get_monotonic_usec是获取时间的函数。注意时间戳必须是单调的。时间戳向后跳转会给计算机系统带来各种各样的麻烦。

static uint64_t get_monotonic_usec() {
   timespec tv = {0, 0};
   clock_gettime(CLOCK_MONOTONIC, &tv);
   return uint64_t(tv.tv_sec) * 1000000 + tv.tv_nsec / 1000;
}

下一步是将列表添加到服务器和连接结构中。

static struct {
   HMap db;
   // a map of all client connections, keyed by fd
   std::vector<Conn *> fd2conn;
   // timers for idle connections
   DList idle_list;
} g_data;
struct Conn {
   int fd = -1;
   uint32_t state = 0; // either STATE_REQ or STATE_RES
   // buffer for reading
   size_t rbuf_size = 0;
   uint8_t rbuf[4 + k_max_msg];
   // buffer for writing
   size_t wbuf_size = 0;
   size_t wbuf_sent = 0;
   uint8_t wbuf[4 + k_max_msg];
   uint64_t idle_start = 0;
   // timer
   DList idle_list;
};

修改后的事件循环概述:

int main() {
   // some initializations
   dlist_init(&g_data.idle_list);
   int fd = socket(AF_INET, SOCK_STREAM, 0);
   // bind, listen & other miscs
   // code omitted...
   // the event loop
   std::vector<struct pollfd> poll_args;
   while (true) {
       // prepare the arguments of the poll()
       // code omitted...
       // poll for active fds
      int timeout_ms = (int)next_timer_ms();
      int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), timeout_ms);
        if (rv < 0) {
         die("poll");
         }
   // process active connections
for (size_t i = 1; i < poll_args.size(); ++i) {
    if (poll_args[i].revents) {
       Conn *conn = g_data.fd2conn[poll_args[i].fd];
        connection_io(conn);
        if (conn->state == STATE_END) {
        // client closed normally, or something bad happened.
        // destroy this connection
        conn_done(conn);
        }
      }
}
    // handle timers
    process_timers();
    // try to accept a new connection if the listening fd is active
if (poll_args[0].revents) {
(void)accept_new_conn(fd);
     }
 }
  return 0;
}

修改了几件事:
1. poll的超时参数由next_timer_ms函数计算。
2. 销毁连接的代码被移到了conn_done函数中。
3. 添加了用于触发计时器的process_timers函数。
4. 计时器在connection_io中更新,在accept_new_conn中初始化。

目录
相关文章
|
5月前
|
缓存 NoSQL Redis
【后端面经】【缓存】36|Redis 单线程:为什么 Redis 用单线程而 Memcached 用多线程?--epoll调用和中断
【5月更文挑战第18天】`epoll`包含红黑树和就绪列表,用于高效管理文件描述符。关键系统调用有3个:`epoll_create()`创建epoll结构,`epoll_ctl()`添加/删除/修改文件描述符,`epoll_wait()`获取就绪文件描述符。`epoll_wait()`可设置超时时间(-1阻塞,0立即返回,正数等待指定时间)。当文件描述符满足条件(如数据到达)时,通过中断机制(如网卡或时钟中断)更新就绪列表,唤醒等待的进程。
68 6
|
5月前
|
缓存 NoSQL 中间件
【后端面经】【缓存】36|Redis 单线程:为什么 Redis 用单线程而 Memcached 用多线程?epoll、poll和select + Reactor模式
【5月更文挑战第19天】`epoll`、`poll`和`select`是Linux下多路复用IO的三种方式。`select`需要主动调用检查文件描述符,而`epoll`能实现回调,即使不调用`epoll_wait`也能处理就绪事件。`poll`与`select`类似,但支持更多文件描述符。面试时,重点讲解`epoll`的高效性和`Reactor`模式,该模式包括一个分发器和多个处理器,用于处理连接和读写事件。Redis采用单线程模型结合`epoll`的Reactor模式,确保高性能。在Redis 6.0后引入多线程,但基本原理保持不变。
61 2
面试官:Redis分布式锁超时了,任务还没执行完怎么办?
今天主要分享的是面试中常见的redis的一些面试内容。如果你正好需要刚好可以帮你回顾一下,如果不需要可以收藏起来后面用到的时候翻出来回顾。
|
存储 NoSQL Go
基于redis实现延迟队列
基于redis实现延迟队列
|
NoSQL Go Redis
Redis与异步队列
使用Redis可以很方便地实现异步队列。
93 0
|
存储 缓存 NoSQL
由Redis的hGetAll函数所引发的一次服务宕机事件
问题描述:固定并发数压测10分钟,压测开始后半小时,Redis连接数激增,连接耗尽,服务重启;
由Redis的hGetAll函数所引发的一次服务宕机事件
|
NoSQL Redis 数据库
单线程的Redis有哪些 "慢" 动作?
单线程的Redis有哪些 "慢" 动作?
|
消息中间件 设计模式 NoSQL
异步结果通知实现——基于Redis实现,我这操作很可以
前段时间,我在内存中实现了一个简单异步通知框架。但由于没有持久化功能,应用重启就会导致数据丢失,且不支持分布式和集群。今天这篇笔记,引入了 Redis 来解决这些问题,以下是几点理由: 数据结构丰富,支持 List、Sorted Set 等 具有持久化功能,消息的可靠性能得到保证 高可用性,支持单机、主从、集群部署 项目中已使用,接入成本更低 基于 Redis 实现延时队列也有几种方法,展开详细讲讲。
|
存储 缓存 NoSQL
Redis 事件循环函数serverCron
Redis 事件循环函数serverCron
156 0
Redis 事件循环函数serverCron
|
NoSQL 调度 Redis
Redis事件处理机制详解(下)
Redis事件处理机制详解(下)
162 0
Redis事件处理机制详解(下)