定时器方案之红黑树、最小堆、时间轮

简介: 定时器方案之红黑树、最小堆、时间轮

定时器概述

定时器使用场景

  • ⼼跳检测
  • 技能冷却
  • 武器冷却
  • 倒计时
  • CSDN文章的定时发布
  • 其它需要使⽤定时机制的功能

定时器触发方式

网络事件与定时事件在一个线程中处理

定时器通常是与网络组件一起工作,⽹络事件和时间事件在⼀个线程当中配合使⽤;例如nginx、redis,我们将epoll_wait的第四个参数timeout设置为最近要触发的定时器的时间差,这样就可以兼顾对网络事件的处理,又可以兼顾对时间事件的处理。

while (!quit) {
  int now = get_now_time();// 单位:ms
  //找出最近要触发的定时器时间
  int timeout = get_nearest_timer() - now;
  if (timeout < 0) timeout = 0;
  int nevent = epoll_wait(epfd, ev, nev, timeout);
  for (int i=0; i<nevent; i++) {
      //... ⽹络事件处理
  }
  update_timer(); // 时间事件处理
}

但是epoll_wait毕竟涉及到内核态与用户态的切换,以及网络事件处理的时间开销,所以定时事件就会一段时间的延时了。换句话说,受网络事件处理和系统调用的影响,定时器误差有点大。

网络事件和定时事件在不同线程中处理

大量定时任务怎么处理

如果有大量的定时任务,我们首先要想到用哪一个数据结构去组织这些大量的定时任务。定时器的本质是越近要触发的任务,其优先级越高。也就是说,需要根据时间这个key来排序。那么有序的数据结构有哪些呢?红黑树,最小堆,跳表,时间轮。

  • 红黑树(单线程):nginx
  • 跳表(单线程):redis
  • 最小堆(单线程):libevent,go(最小四叉堆),libev(最小四叉堆);大部分都是用最小堆来实现定时器
  • 时间轮(多线程):netty,kafka,skynet,crontab,linux内核

定时器设计

接口设计

// 初始化定时器
void init_timer();
// 添加定时器
Node* add_timer(int expire, callback cb);
// 删除定时器
bool del_timer(Node* node);
// 找到最近要触发的定时任务
Node* find_nearest_timer();
// 更新检测定时器
void update_timer();
// 清除定时器
void clear_timer();

说明:

  • find_nearest_timer接口只有在网络事件和定时事件在同一个线程中处理的时候才需要(因为同一个线程中一般是按照时间顺序组织延时任务的,而时间轮中是按执行顺序组织的,所以不需要这个接口),其他几个接口无论是定时器的哪种触发方式都需要

数据结构设计

从接口上我们可以看出定时器数据结构的基本要求:

  • 能够快速插入删除结点
  • 能够快速找到最小的结点

本文对红黑树与最小堆以及跳表不做详细介绍,主要介绍时间轮

红黑树:插入O(logn),删除O(logn),快速找到最小的结点O(logn)

跳表:插入O(logn),删除O(logn),快速找到最小的结点O(1)

最小堆:插入O(logn),删除O(logn),快速找到最小的结点O(1)

时间轮(哈希表+链表):插入O(1),删除O(1),快速找到最小的结点O(1)(存在踏空问题,后续介绍)

在上面也写过,红黑树和最小堆适用于单线程,其核心原因在于如果在多线程环境下,锁红黑树或者最小堆,那就要锁整个,粒度太大了。而时间轮的时间复杂度可以到O(1),粒度较小,所以时间轮更适合多线程环境。

时间轮

时间轮的概念

从时钟表盘出来,描述此时间轮的时间精度是1秒时间范围是12小时(即tick的取值范围在0-12小时之内)

时间轮参考时钟进行理解,秒针tick走一圈,则分针走一格,分针走一圈则时针走一格。随着时间tick的踏步,任务不断的从上层流到下一层,最终流到秒针轮上,当秒针走到对应的格子上时,执行链表内的所有任务。

如图所示,秒针和分钟对应60个格子,秒针每走一步,则执行其对应格子指向链表内的时间任务。比如现在tick=1,要添加一个3s后的任务,则在第4格链表中添加一个任务即可,如果要在60秒后执行一个任务,由于60大于了秒针的范围,则要把该任务放到分钟上。可以看到秒针的时间精度一格是1秒,而分钟的一格时间精度是60秒,时针的一格时间精度是60*60秒。

正因为如此,当分钟指向第一个格子上时,会把其对应的链表任务重新映射到下一层,即秒针。当时针走到11时,会把对应任务重新映射到分针上面。

由此可见,秒针轮保存着即将要执行的任务,而别的轮的时间跨度则越来越大,随着时间的流逝,任务会慢慢的从上层流到秒针轮中进行执行。

注意到上面写的重新映射了吗,这意味着时间轮无法删除任务,那么这个问题该如何解?我们可以添加一个删除标记,在函数回调中根据这个标记判断是否需要处理。

truct timer_node {
  struct timer_node *next;
  uint32_t expire;
    handler_pt callback;
    uint8_t cancel;//是否删除
};

设计单层级时间轮

**场景:**客户端每 5 秒钟发送⼼跳包;服务端若 10 秒内没收到⼼跳数据,则清除连接;

**普通做法:*我们假设使⽤ map<int, conn> 来存储所有连接数;每秒检测 map 结构,那么每秒需要遍历所有的连接,如果这个map结构包含⼏万条连接,那么我们做了很多⽆效检测;考虑极端情况,刚添加进来的连接,下⼀秒就需要去检测,实际上只需要10秒后检测就⾏了。

**时间轮做法:**只需检测快过期的连接, 采用hash数组+链表形式,相同过期时间的放入一个数组,因此,每次只需检测最近过期的数组即可,不需要遍历所有。

时间轮需要考虑两个因素,1. 时间轮的大小,2. 时间精度。

时间轮的大小

因为10秒一检测,所以时间轮的大小要大于10。我们一般将时间轮的大小,也就是数组长度,设置为2^n

因为我们总是要进行取余操作,m%n在计算机内部等于m−n∗floor(m/n) ,乘法除法运算效率太低,我们可以通过位运算来优化。

m % 2^n = m & (2^n − 1)

m % 16 = m & 15

时间精度

时间精度就要看业务需求了,目前的需求是以秒为单位,那么时间精度就设置为秒即可

空推进问题

对于单层级的时间轮来说,如果大小设置太大了,就会出现踏空的现象,空推进。这里时间轮设置了1024大小,但是定时任务只有两个,从5到1022都是没有任务的,那么这就是空推进的情况。这是分布式定时器必须要解决的问题,分布式定时器一般都是用单层级时间轮。

那么怎么解决这个问题呢?第一种做法就是使用辅助数据结构最小堆+单层级时间轮。用最小堆告诉tick下一次检测是第几个格子,直接跳跃,而不是一格一格的走。时间精度设置不当也会造成空推进。第二种做法就是多层级时间轮。

多层级时间轮

定时任务时间跨度特别大,有几秒的任务,几个小时的任务,几天的定时任务。那么对于单层级时间轮来说,无论它怎么设置都解决不了这个问题,肯定会出现空推进的问题。

那么我们设计把最近要触发的定时任务放到第一层,几分钟的放到第二层,几个小时的放到第三层…这就是多层级的意思。

我们以时钟这个时间轮来举例

时间轮中的数组是指针数组,因为数据下面要挂链表的。

小于1分钟的任务放在秒针层级,大于1分钟小于1小时的任务放在分针层级,大于1小时小于12小时的任务放在时针层级;时间表盘上是有3个指针的,而我们算法实现的时候只需要1个指针就行(让这一个指针在0-12*3600范围内活动即可),因为我们根据这1个指针就可以算出时针在哪里( ( tick / (60 * 60) ) % 12),分针在哪里( (tick / 60) % 60 ),秒针在哪里( tick % 60 )。

对于任务节点来说,如果expire相同,我们采用类似hash一样的拉链法,使用next指针将expire相同的链接起来,先来的先执行,这里采用尾插。

分针和时针里面都是采用的稀疏存储,当秒针移动一圈,说明下一分钟的任务快执行了,此时就需要重新映射(将分针层的任务重新映射到秒针层级),因为我们时间精度为秒,只会在秒针层执行任务,分针和时针的任务都需要往上一层级重新映射。

Skynet定时器实现方案

假设时间精度为 10ms ;在第 1 层级每 10ms 移动⼀格;每移动⼀格执⾏该格⼦当中所有的定时任务;当第 1 层指针从 255 格开始移动,此时层级 2 移动⼀格;层级 2 移动⼀格的⾏为定义为,将该格当中的定时任务重新映射到层级 1 当中;同理,层级 2 当中从 63 格开始移动,层级 3 格⼦中的定时任务重新映射到层级 2 ; 以此类推层级 4 往层级 3 映射,层级 5 往层级 4 映射;

skynet中定时器数据结构

采用时间轮方式,hash表+链表实现。其中time为32位无符号整数, 记录时间片对应数组near[256] ,表示即将到来的定时任务, t[4][64],表示较为遥远的定时任务。

typedef struct timer_node timer_node_t;
struct timer_node {
  struct timer_node *next;
  uint32_t expire;
    handler_pt callback;
    uint8_t cancel;
  int id; // 此时携带参数
};
typedef struct link_list {      // 链表
  timer_node_t head;
  timer_node_t *tail;
}link_list_t;
typedef struct timer {
  link_list_t near[TIME_NEAR];  // 记录1个即将到来的定时器,指针数组
  link_list_t t[4][TIME_LEVEL]; // 记录4个相对较遥远的定时器,每一个都是一个指针数组
  struct spinlock lock;
  uint32_t time;          // 记录当前滴答数
  uint64_t current;
  uint64_t current_point;
}s_timer_t;

接口介绍
定时器初始化
static s_timer_t * TI = NULL;
typedef void (*handler_pt) (struct timer_node *node);
s_timer_t * timer_create_timer() {
  s_timer_t *r=(s_timer_t *)malloc(sizeof(s_timer_t));
  memset(r,0,sizeof(*r));
  int i,j;
  for (i=0;i<TIME_NEAR;i++) {
    link_clear(&r->near[i]);
  }
  for (i=0;i<4;i++) {
    for (j=0;j<TIME_LEVEL;j++) {
      link_clear(&r->t[i][j]);
    }
  }
  spinlock_init(&r->lock);
  r->current = 0;
  return r;
}
uint64_t gettime() {
  uint64_t t;
#if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER)
  struct timespec ti;
  clock_gettime(CLOCK_MONOTONIC, &ti);
  t = (uint64_t)ti.tv_sec * 1000;   // 将秒转化为毫秒
  t += ti.tv_nsec / 1000000;      // 将纳秒转化为毫秒,表示t的时间精度是毫秒
#else
  struct timeval tv;
  gettimeofday(&tv, NULL);
  t = (uint64_t)tv.tv_sec * 100;
  t += tv.tv_usec / 10000;
#endif
  return t;
}
void init_timer(void) {
  TI = timer_create_timer();
  TI->current_point = gettime();
}
添加定时器以及定时任务
// 添加到定时器链表里,如果定时器的到期滴答数跟当前比较近(<2^8),表示即将触发定时器添加到T->near数组里
// 否则根据差值大小添加到对应的T->T[i]中
void add_node(s_timer_t *T, timer_node_t *node) {
  uint32_t time=node->expire;
  uint32_t current_time=T->time;
  uint32_t msec = time - current_time;
  if (msec < TIME_NEAR) { //[0, 0x100)
    link(&T->near[time&TIME_NEAR_MASK],node);
  } else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) {//[0x100, 0x4000)
    link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node); 
  } else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) {//[0x4000, 0x100000)
    link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);  
  } else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) {//[0x100000, 0x4000000)
    link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);  
  } else {//[0x4000000, 0xffffffff]
    link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);  
  }
}
timer_node_t* add_timer(int time, handler_pt func, int threadid) {
  timer_node_t *node = (timer_node_t *)malloc(sizeof(*node));
  spinlock_lock(&TI->lock);
  node->expire = time+TI->time;
  node->callback = func;
  node->id = threadid;
  if (time <= 0) {
    spinlock_unlock(&TI->lock);
    node->callback(node);
    free(node);
    return NULL;
  }
  add_node(TI, node);
  spinlock_unlock(&TI->lock);
  return node;
}
驱动方式
void dispatch_list(timer_node_t *current) {
  do {
    timer_node_t * temp = current;
    current=current->next;
        if (temp->cancel == 0)
            temp->callback(temp);
    free(temp);
  } while (current);
}
void timer_execute(s_timer_t *T) {
  int idx = T->time & TIME_NEAR_MASK;
  while (T->near[idx].head.next) {
    timer_node_t *current = link_clear(&T->near[idx]);
    spinlock_unlock(&T->lock);
    dispatch_list(current);
    spinlock_lock(&T->lock);
  }
}
void timer_update(s_timer_t *T) {
  spinlock_lock(&T->lock);
  timer_execute(T); // 检查T->near是否位空,有就处理到期定时器
  timer_shift(T);   // 时间片time++,并判断是否需要重新映射,需要则映射
  timer_execute(T);
  spinlock_unlock(&T->lock);
}
void expire_timer(void) {
  uint64_t cp = gettime();
  if (cp != TI->current_point) {
    uint32_t diff = (uint32_t)(cp - TI->current_point);
    TI->current_point = cp;
    int i;
    for (i=0; i<diff; i++) {
      timer_update(TI);
    }
  }
}
while (!ctx.quit) {
        expire_timer();
        //为什么1/4精度的时间去检测一次,因为dispatch是需要时间的,它会影响到时间精度,
        //所以我们搞一个更小的单位去检测,这样去消除精度误差
        usleep(250);  //精度是1毫秒,这里每次睡眠0.25毫秒
}
重新映射
void move_list(s_timer_t *T, int level, int idx) {
  timer_node_t *current = link_clear(&T->t[level][idx]);
  while (current) {
    timer_node_t *temp=current->next;
    add_node(T,current);
    current=temp;
  }
}
// 判断是否需要重新映射
void timer_shift(s_timer_t *T) {
  int mask = TIME_NEAR;
  uint32_t ct = ++T->time;
  if (ct == 0) {
    move_list(T, 3, 0);
  } else {
    // ct / 256
    uint32_t time = ct >> TIME_NEAR_SHIFT;
    int i=0;
    // ct % 256 == 0 说明移动到了不同层级的最后一格
    while ((ct & (mask-1))==0) {
      int idx=time & TIME_LEVEL_MASK;
      if (idx!=0) {
        // 这里发生重新映射,将i+1层级idx格子中的定时任务重新映射到i层级中
        move_list(T, i, idx);
        break;        
      }
      mask <<= TIME_LEVEL_SHIFT;
      time >>= TIME_LEVEL_SHIFT;
      ++i;
    }
  }
}

如何设计时间轮

问题总结

1、怎么减少任务的无效检测?

  • 优化任务添加和删除操作:在添加和删除任务时,避免频繁地进行无效检测。可以使用合适的数据结构(如最小堆、红黑树等)来存储定时器任务,并保持有序性,以便更高效地进行添加和删除。
  • 使用时间轮(Time Wheel)算法:时间轮是一种基于哈希桶的定时器调度算法。它将时间划分为多个粒度相等的槽,每个槽对应一个时间段。定时任务被放入对应的槽中,在每个时间周期结束后,只需检查当前槽是否为空,从而避免对所有任务进行无效检测。
目录
相关文章
|
8月前
|
机器学习/深度学习 算法 Windows
数据结构中的几种时间复杂度分析方式
数据结构中的几种时间复杂度分析方式
114 0
|
8月前
|
消息中间件 NoSQL 应用服务中间件
定时器方案,红黑树,时间轮
定时器方案,红黑树,时间轮
91 0
|
8月前
|
消息中间件 应用服务中间件 网络安全
定时器方案:红黑树、最小堆和时间轮的原理
定时器方案:红黑树、最小堆和时间轮的原理
199 0
|
8月前
|
存储 NoSQL Linux
定时器的实现方案:红黑树和多级时间轮
定时器的实现方案:红黑树和多级时间轮
|
8月前
|
Linux 容器
002. 使用最小堆实现高性能定时器实现
002. 使用最小堆实现高性能定时器实现
132 0
|
存储 NoSQL 应用服务中间件
分别基于红黑树、timefd、多级时间轮实现定时器-1
分别基于红黑树、timefd、多级时间轮实现定时器
102 0
|
存储 缓存 索引
分别基于红黑树、timefd、多级时间轮实现定时器-2
分别基于红黑树、timefd、多级时间轮实现定时器
106 0
|
缓存 算法 安全
环形数组无锁队列的原理与实现
环形数组无锁队列的原理与实现
215 0
|
索引
判断环形链表及寻找入环口问题详解
链表在面试和笔试中都是十分常见的问题。本篇文章会简述到判断环形链表和返回环形链表的入环点。其中有较多的细节,本篇文章会详细解释。
78 0
剑指offer 16. 在O(1)时间删除链表结点
剑指offer 16. 在O(1)时间删除链表结点
33 0