定时器的实现方案:红黑树和多级时间轮

简介: 定时器的实现方案:红黑树和多级时间轮

1、定时器的使用场景

定时器用于执行定时任务,各种场景都需要用到,比如

  • 心跳检测keep-alive
  • 倒计时
  • 游戏技能CD等
  • 各种延时处理等

2、定时器的触发方式

对于服务端来说,驱动服务端业务逻辑的事件包括网络事件、定时事件、以及信号事件;通常网络事件和定时事件会进行协同处理。

定时器触发形式,根据网络事件和定时事件是否在同一线程中处理,分为:

  • 一个线程中处理,协同处理
  • 不同线程中处理,定时任务在单独的线程中处理

2.1、同一线程处理

协同处理通常于定时任务比较少的场景。对于多线程而言,同一线程处理会引起事件处理的不均衡。当一个线程拥有较多定时任务时,会影响其对网络事件处理的效率。

为什么网络事件和定时事件可以协同处理?

reactor 是基于事件的网络模型,对 io 的处理是同步的,对事件的处理是异步的,定时任务是事件,对定时任务的处理也是异步的。

如何进行协同处理?

以 io 多路复用作为定时器驱动,“阻塞”地收集就绪事件,timeout 参数用于设置定时。数据结构通常选择红黑树、跳表、最小堆等来实现定时器,后文会有详细的解释。

// 网络事件和定时事件在一个线程中处理,协同处理
 while (!quit) {
     // 最近定时任务的触发时间 = 最近定时任务添加时设置的触发时间 - 当前时间
     int timeout = get_nearest_timer() - now();  
     if (timeout < 0) timeout = -1;
      // 最近定时任务的触发时间作为 timeout 参数,timetout 定时任务触发
     // 1、若没有网络事件,先去处理定时任务
     // 2、若收到网络事件,先处理网络事件,再处理定时任务
     int nevent = epoll_wait(epfd, ev, nev, timeout);
     for (int i = 0; i < nevent; i++) {
         // ... 处理网络事件
    }
     // 轮询处理定时事件
     update_timer();
 }

基于协同处理的开源框架

  • 单 reactor:redis
  • 多 reactornginx、memcached

2.2、不同线程中处理

定时任务在单独的线程中检测,通常用于处理大量定时任务。

以 usleep(time)作为定时器驱动,time 参数用于设置定时,要小于最小时间精度。

// 网络事件和定时事件在不同线程中处理
 void * thread_timer(void *thread_param) {
     init_timer();
     while (!quit) {
         update_timer();
         sleep(t);
    }
     clear_timer();
     return NULL; 
 }
 pthread_create(&pid, NULL, thread_timer, &thread_param);

数据结构通常选择时间轮,加锁粒度比较小(对1个格子加锁)。时间轮只负责检测,通过信号或者插入执行队列让其他线程执行。

3、定时器设计

3.1、接口设计

所有定时器都要实现的接口

// 初始化定时器
 void init_timer();
 // 定时器的添加,添加任务结点,基于此可以做多次触发的接口,经过 expire 时间触发cb
 Node* add_timer(int expire, callback cb);
 // 定时器的删除,删除定时任务
 bool del_timer(Node* node);
 // 定时器的更新,到期任务的处理
 void update_timer();

在协同处理的方案中,需要额外添加接口,来查找最近定时任务的触发时间。

// 返回最近定时任务的触发时间,用于协同处理
 Node* find_nearest_timer();

3.2、数据结构设计

本质:按照定时任务的优先级进行组织任务的执行顺序。

组织方式

  • 按照时间顺序组织,要求数据结构有序,能快速查找最近触发的定时任务。在实现中,关键是要考虑相同时间触发的定时任务如何处理。
  • 红黑树(绝对有序): nginx
  • 跳表(绝对有序):redis未来引入
  • 最小堆(相对有序): libevent, libev, go


  • 按照执行顺序组织:时间轮

4、红黑树

红黑树中序有序,查找效率高,按照时间顺序组织定时任务(考虑相同触发时间如何组织),用于协同处理的方式,但加锁粒度大(对整个红黑树加锁)。

4.1、定时器的驱动

首先,选择定时器驱动的方式,这里选择 epoll 来实现,通过参数 timeout 设置定时。

while (true) {
     // 最近任务的触发时间接口:TimeToSlee,作为 timeout 参数
     int n = epoll_wait(epfd, ev, 64, timer->TimeToSleep());
     for (int i = 0; i < n; i++) {
         /* 处理网络事件 */
     }
     // 处理定时事件
     while(timer->CheckTimer());
 }

4.2、数据结构设计

C++ 中提供了map set等容器。为了简单,我们这里选择 set来存储定时器任务

set<TimerNode, std::less<>> timermap;

在设计定时器任务结点时,有一个关键的问题,相同触发时间的定时任务的如何处理?

我们不能将触发时间 expire 作为红黑树的 key 值。举个栗子,A 事件到来时 tick=15,15 后执行,expire=30;B 事件到来时 tick=20, 10 后执行,expire=30。两者的触发时间相同,将在 tick=30 后触发,这样无法对事件排序。为避免上述情况的出现,在触发时间相同时,我们根据插入的先后顺序来决定事件的执行顺序,先插入的先执行,放在红黑树左侧。后插入后执行,放在红黑树右侧。我们使用 id 属性来描述事件到来的先后顺序。

这里,我们选择 expire和 id 来唯一标识一个定时结点:

// 定义定时结点的基类,存储唯一标识的元素
 struct TimerNodeBase {
     time_t expire;  // 任务触发时间(到期时间)
     int64_t id;     // 用来描述插入先后顺序,int64_t,能记录5000多年
 };
 // 定时结点,包含定时任务等
 struct TimerNode : public TimerNodeBase {
     // 定时器任务回调函数
     // 函数对象拷贝代价高,在容器内拷贝构造后不会再去移动
     using Callback = std::function<void(const TimerNode &node)>;
     Callback func;
     // 构造函数,容器内部就地拷贝构造调用一次,此后不会再去调用
     TimerNode(int64_t id, time_t expire, Callback func) : func(func) {
         this->expire = expire;
         this->id = id;
     }
 };

函数对象作为类,占用大量的空间,复制控制和移动代价高,因此,我们拆分成基类和派生类,基类存储标识,用于复制控制和移动;子类存储函数对象等,在容器内就地构造后,不再赋复制控制和移动。

同样在用函数对象实现比较函数,采用基类引用比较,体现多态特性,减少拷贝移动。

// 按触发时间的先后顺序对结点进行排序
 // 基类引用,多态特性
 bool operator < (const TimerNodeBase &lhd, const TimerNodeBase &rhd) {
     // 先比较触发时间
     if (lhd.expire < rhd.expire)
         return true;
     else if (lhd.expire > rhd.expire) 
         return false;
     // 触发时间相同,比较插入的先后顺序
     // 比较id大小,先插入的结点id小,先执行
     return lhd.id < rhd.id;
 }

在 timer 类的接口实现中 find 函数利用了 C++14 的新特性,利用等价 key 比较,不需要构造 key 对象进行比较,也是同样的原理。

4.3、timer 接口实现

初始化定时器

获取当前时间接口,用于计算触发时间

  • steady_clock:系统启动到当前时间,用于计算程序运行时间
  • system_clock:时间戳,可以修改
  • high_resolution_clock:高精度版本的steady_clock
// 获取当前时间
 static time_t GetTick() {
     auto sc = chrono::time_point_cast<chrono::milliseconds>(chrono::steady_clock::now());
     auto temp = chrono::duration_cast<chrono::milliseconds>(sc.time_since_epoch());
     return temp.count();
 }

定时器的添加

// 参数: msec 任务触发时间间隔,func 任务执行的回调函数
 TimerNodeBase AddTimer(time_t msec, TimerNode::Callback func) {
     time_t expire = GetTick() + msec;
     // emplace 容器内就地构造,避免拷贝构造和移动构造,
     auto ele = timermap.emplace(GenID(), expire, func);
     return static_cast<TimerNodeBase>(*ele.first);
 }

定时器的删除

bool DelTimer(TimerNodeBase &node) {
     // C++14的新特性:只需传递等价 key 比较,无需创建 key 对象比较,
     // 代替子类结点,避免函数对象复制控制和移动
     auto iter = timermap.find(node);
     // 若存在,则删除该结点
     if (iter != timermap.end()) {
         timermap.erase(iter);
         return true;
     }
     return false;
 }

定时器的更新

bool CheckTimer() {
     auto iter = timermap.begin();
     if (iter != timermap.end() && iter->expire <= GetTick()) {
         // 定时任务被触发,则执行对应的定时任务
         iter->func(*iter);
         // 删除执行完毕的定时任务
         timermap.erase(iter);
         return true;
     }
     return false;
 }

返回最近定时任务的触发时间,用于一个线程协同处理,返回值作为 timeout 参数

time_t TimeToSleep() {
     auto iter = timermap.begin();
     if (iter == timermap.end()) {
         return -1;
     }
     // 最近任务的触发时间 = 最近任务初始设置的触发时间 - 当前时间
     time_t diss = iter->expire - GetTick();
     // 最近要触发的任务时间 > 0,继续等待;= 0,立即处理任务
     return diss > 0 ? diss : 0;
 }

4.4、代码实现

#include <sys/epoll.h>
 #include <functional>
 #include <chrono>
 #include <set>
 #include <memory>
 #include <iostream>
 using namespace std;
 // 定时结点的基类,存储唯一标识的元素,轻量级,用于比较
 struct TimerNodeBase {
     time_t expire;  // 任务触发时间
     int64_t id;     // 用来描述插入先后顺序,int64_t,能记录5000多年
 };
 // 定时结点,包含定时任务等
 struct TimerNode : public TimerNodeBase {
     // 定时器任务回调函数
     // 函数对象拷贝代价高,在容器内拷贝构造后不会再去移动
     using Callback = std::function<void(const TimerNode &node)>;
     Callback func;
     // 构造函数,容器内部就地拷贝构造调用一次,此后不会再去调用
     TimerNode(int64_t id, time_t expire, Callback func) : func(func) {
         this->expire = expire;
         this->id = id;
     }
 };
 // 根据触发时间对结点进行排序
 // 基类引用,多态特性,基类代替timerNode结点,避免拷贝构造子类
 bool operator < (const TimerNodeBase &lhd, const TimerNodeBase &rhd) {
     // 先比较触发时间
     if (lhd.expire < rhd.expire)
         return true;
     else if (lhd.expire > rhd.expire) 
         return false;
     // 触发时间相同,比较插入的先后顺序
     // 比较id大小,先插入的结点id小,先执行
     return lhd.id < rhd.id;
 }
 // 定时器类的实现
 class Timer {
 public:
     // 获取当前时间
     static time_t GetTick() {
         // 获取系统时间戳,系统启动到当前的时间
         auto sc = chrono::time_point_cast<chrono::milliseconds>(chrono::steady_clock::now());
         // 获取到时间戳的时间段
         auto temp = chrono::duration_cast<chrono::milliseconds>(sc.time_since_epoch());
         return temp.count();
     }
     // 2、添加定时任务
     // 参数: msec 任务触发时间间隔,func 任务执行的回调函数
     TimerNodeBase AddTimer(time_t msec, TimerNode::Callback func) {
         time_t expire = GetTick() + msec;
         // emplace 容器内就地构造,避免拷贝构造和移动构造,
         // 此后不再移动 TimerNode 结点(函数对象内存占用多)
         auto ele = timermap.emplace(GenID(), expire, func);
         return static_cast<TimerNodeBase>(*ele.first);
     }
     // 3、删除/取消定时任务
     bool DelTimer(TimerNodeBase &node) {
         // C++14的新特性:只需传递等价 key 比较,无需创建 key 对象比较,
         // 代替子类结点,避免函数对象复制控制和移动
         auto iter = timermap.find(node);
         // 若存在,则删除该结点
         if (iter != timermap.end()) {
             timermap.erase(iter);
             return true;
         }
         return false;
     }
     // 4、检测定时任务是否被触发,触发则执行定时任务
     bool CheckTimer() {
         auto iter = timermap.begin();
         if (iter != timermap.end() && iter->expire <= GetTick()) {
             // 定时任务被触发,则执行对应的定时任务
             iter->func(*iter);
             // 删除执行完毕的定时任务
             timermap.erase(iter);
             return true;
         }
         return false;
     }
     // 5、返回最近定时任务触发时间,作为timeout的参数
     time_t TimeToSleep() {
         auto iter = timermap.begin();
         if (iter == timermap.end()) {
             return -1;
         }
         // 最近任务的触发时间 = 最近任务初始设置的触发时间 - 当前时间
         time_t diss = iter->expire - GetTick();
         // 最近要触发的任务时间 > 0,继续等待;= 0,立即处理任务
         return diss > 0 ? diss : 0;
     }
 private:
     // 产生 id 的方法
     static int64_t GenID() {
         return gid++;
     }
     static int64_t gid;
     // 利用 set 排序快速查找要到期的任务 
     set<TimerNode, std::less<>> timermap;
 };
 int64_t Timer::gid = 0;
 int main() {
     // 定时器驱动
     int epfd = epoll_create(1);
     // 创建定时器
     unique_ptr<Timer> timer = make_unique<Timer>();
     int i = 0;
     timer->AddTimer(1000, [&](const TimerNode &node) {
         cout << Timer::GetTick() << "node id:" << node.id << " revoked times:" << ++i << endl;
     });
     timer->AddTimer(1000, [&](const TimerNode &node) {
         cout << Timer::GetTick() << "node id:" << node.id << " revoked times:" << ++i << endl;
     });
     timer->AddTimer(3000, [&](const TimerNode &node) {
         cout << Timer::GetTick() << "node id:" << node.id << " revoked times:" << ++i << endl;
     });
     auto node = timer->AddTimer(2100, [&](const TimerNode &node) {
         cout << Timer::GetTick() << "node id:" << node.id << " revoked times:" << ++i << endl;
     });
     timer->DelTimer(node);
     cout << "now time:" << Timer::GetTick() << endl;
     epoll_event ev[64] = {0};
     while (true) {
         // 最近任务的触发时间接口:TimeToSlee,作为 timeout 参数
         int n = epoll_wait(epfd, ev, 64, timer->TimeToSleep());
         for (int i = 0; i < n; i++) {
             /*... 处理网络事件 ...*/
         }
         // 处理定时事件
         while(timer->CheckTimer());
     }
     return 0;
 }

5、多级时间轮

时间轮 timewheel 是一个环形结构,使用 hash + list 实现,类似时钟。时钟上的格子(槽位 slot)代表一段时间,slot大小表示时间精度,slot的数量表示时间范围。每个格子指向一条定时器 list,保存该格子上所有到期的任务。同一格子的任务触发时间相同,用 list 解决了 hash 冲突。表盘上的指针随时间一格一格转动 tick,当指针指向一个格子,执行格子对应 list 中的所有到期的任务。任务通过取模决定应该放入哪个格子。

1704877483377.jpg

简单时间轮

如果任务的时间跨度很大,数量也多,传统的单轮时间轮会造成任务的 round 很大,单个格子的任务 list 很长,并会维持很长一段时间。这时可将时间轮按时间粒度分级

1704877495053.jpg

多级时间轮


多级时间轮,低一级轮转动一圈,高一级轮转动一格,本质就是计数进制。秒针在每个定时周期移动一个格子,当秒针转一圈后,分钟转动一个格子。秒针轮指针主动转动,而分钟轮时针轮无法主动转动,只能等待低级轮进位才能转动,同时自动把即将到期的定时器任务迁移到低一级轮子里。

根据轮子的类型,可以分为主动轮和从动轮。

  • 主动轮:当刻度指针指向当前槽的时候,槽内的任务被顺序执行。
  • 从动轮:当对应轮的刻度指针指向当前槽位的时候,槽内的任务链依次向低级轮(序号较高的轮)转移,从动轮没有执行任务权限,只是对任务进行记录与缓存。

为什么时间轮分成多个层级?

  • 减少空间占用。若采用单级时间轮,则需要 12 * 60 * 60 个 slot ,若采用上述三级结构,则需要空间大小 60 + 60 + 12 个slot,极大地减少了空间占用。
  • 只需要关注最近要触发的定时任务(主动轮),按照任务触发的优先级组织任务,减少对任务的检测

多级时间轮定时器的应用场景很多,比如 linux 内核,skynet,kafka,netty等

以 skynet 为例,skynet 作为单 reactor 模型,适用于 cpu 密集型的场景。timer 由 timer 线程管理,当有定时任务时将任务派发给其他线程执行。下图即为多线程环境下 skynet 时间轮运行图。

1704877503205.jpg

skynet 运行原理图


接下来,对 skynet 源码做了简化改动,介绍多级时间轮的实现方式。

5.1、定时器的驱动

首先,选择定时器驱动的方式,这里选择 usleep 来实现

  • 时间精度:usleep gettime两个接口确定,10ms
  • 时间范围:uint32_t time
// 检测定时器,每过1/4时间精度执行一次
 // 原因是 dispatch 分发任务花费时间,影响精度
 void expire_timer(void) {
     // 获取当前系统运行时间,不受用户的影响
     uint64_t cp = gettime();
     // 当前系统启动时间与定时器记录的系统启动时间不相等 
     if (cp != TI->current_point) {  
         // 获取上述两者的差值
         uint32_t diff = (uint32_t)(cp - TI->current_point);
         // 更新定时器记录的系统运行时间
         TI->current_point = cp;
         // 更新timer的运行时间
         TI->current += diff;
         // 更新定时器的时间(time的值),并执行对应的过期任务
         int i;
         for (i=0; i<diff; i++) {
             // 每执行一次timer_update,其内部的函数
             // timer_shift: time+1,time代表从timer启动后至今一共经历了多少次tick
             // timer_execute: 执行near中的定时器
             timer_update(TI);
         }
     }
 }
 // timer 线程中,每过1/4时间精度,即2.5ms,执行一次定时器的检测
 while (!ctx.quit) {
     expire_timer();
     usleep(2500);    
 }

5.2、数据结构设计

定时器的设计

指针数组

以 skynet、linux 内核定时器的多级时间轮为例,定义了5个链表数组,每个数组里包含多个定时器链表,near 数组大小为28,其余数组大小为26,表示的时间范围 28+6+6+6+6 = 232。

1704877513261.jpg

多级指针数组


time 指针

我们仅关注存储最近要触发事件的 near 数组,其他数组指针可以通过计算获得,所以只需要一个时间指针 time 即可。该指针记录 timer 自启动到现在的 tick 数,指向时间轮上的一个槽 slot。当 tick 在低一级移动一圈,需要将高一级轮子中的定时任务重新映射。

在 skynet 中 32 位无符号整数 time 就是该指针,不同位数分别对应数组near[256] 和t[4][64],每过 10 ms 增加一次。当 time 溢出时,32位无符号循环,再次从0开始计数。

time 指针


定时器的定义如下:

typedef struct timer {
     link_list_t near[TIME_NEAR];    // 最低级的时间轮,主动轮
     link_list_t t[4][TIME_LEVEL];   // 其他层级的时间轮,从动轮
     struct spinlock lock;           // 自旋锁,O(1)
     uint32_t time;                  // tick 指针,当前时间片
     uint64_t current;               // timer运行时间,时间精度10ms
     uint64_t current_point;         // 系统运行时间,时间精度10ms
 }s_timer_t;

任务结点的设计

任务结点使用链表存储,链表中存储同一时间触发的任务结点

struct timer_node {
     struct timer_node *next;    // 相同过期时间的待执行的下一个任务
     uint32_t expire;           // 任务过期时间
     handler_pt callback;        // 任务回调函数
     uint8_t cancel;             // 删除任务的标记,取消任务的执行
     int id;                     // 执行该任务的线程 id
 };

5.3、timer 接口实现

初始化定时器

// 初始化定时器
 void init_timer(void) {
     TI = timer_create_timer();      // 创建定时器
     TI->current_point = gettime();  // 获取系统当前运行时间
 }
 // 创建定时器
 s_timer_t* timer_create_timer() {
     s_timer_t *r = (s_timer_t *)malloc(sizeof(s_timer_t));
     memset(r, 0, sizeof(*r));
     int i, j;
     // 创建主动轮,最低级时间轮
     for (i = 0; i < TIME_NEAR; ++i) {
         link_clear(&r->near[i]);
     }
     // 创建从动轮,高层级时间轮
     for (i = 0; i < 4; ++i) {
         for (j = 0;j < TIME_LEVEL; ++j) {
             link_clear(&r->t[i][j]);
         }
     }
     // 初始化自旋锁
     spinlock_init(&r->lock);
     r->current = 0;
     return r;
 }
 // 获取系统当前运行时间,时间精度10ms
 uint64_t gettime() {
     uint64_t t;
 #if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER)
     struct timespec ti;
     clock_gettime(CLOCK_MONOTONIC, &ti);    // CLOCK_MONOTONIC
     t = (uint64_t)ti.tv_sec * 1000;
     t += ti.tv_nsec / 1000000;
 #else
     struct timeval tv;
     gettimeofday(&tv, NULL);
     t = (uint64_t)tv.tv_sec * 100;
     t += tv.tv_usec / 10000;
 #endif
     return t;
 }

定时器的添加

  • 计算定时器到期时间 expire 和当前定时器启动时间 time 的差值,记为 msec
  • 根据 msec 的值判断结点应该添加到定时器指针数组的哪一层
// 添加任务结点到定时器中
 // 根据 msec 判断结点应该放入时间轮的层级
 void add_node(s_timer_t *T, timer_node_t *node) {
     uint32_t time = node->expire;           // 过期时间
     uint32_t current_time=T->time;          // 当前时间
     uint32_t msec = time - current_time;    //  剩余时间    
     //根据 expire-time 的差值将结点放入相应的层级
     //[0, 2^8)
     if (msec < TIME_NEAR) {
         link(&T->near[time&TIME_NEAR_MASK],node);
     } 
     //[2^8, 2^14)
     else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) {
         link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node);   
     }
     //[2^14, 2^20) 
     else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) {
         link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);  
     } 
     //[2^20, 2^26)
     else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) {
         link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);    
     } 
     //[2^26, 2^32)
     else {
         link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);    
     }
 }
 // 添加定时任务
 timer_node_t* add_timer(int time, handler_pt func, int threadid) {
     timer_node_t *node = (timer_node_t *)malloc(sizeof(*node));
     spinlock_lock(&TI->lock);
     // 设置定时任务结点的属性
     node->expire = time + TI->time; // 添加触发时间 = 触发时间间隔 + 当前时间
     node->callback = func;  // 添加任务回调函数
     node->id = threadid;    // 添加执行该任务的线程id
     // 判断是否需要立即执行任务
     if (time <= 0) {
         spinlock_unlock(&TI->lock);
         node->callback(node);
         free(node);
         return NULL;
     }
     // 添加任务结点到定时器中
     add_node(TI, node);
     spinlock_unlock(&TI->lock);
     return node;
 }

定时器的删除

由于结点位置可能发生变化(重新映射),不能找到任务结点的位置,无法删除。

在结点中添加一个 cancel 字段,任务触发碰到该标记则不执行任务,之后统一释放空间。

void del_timer(timer_node_t *node) {
     node->cancel = 1;
 }

定时器的更新

主要包括对到期任务的处理和对从动轮任务(time高24位对应的链表)的重新映射。

void timer_update(s_timer_t *T) {
     spinlock_lock(&T->lock);
     // 执行任务
     timer_execute(T);
     // time+1,并判断是否进行重新映射
     timer_shift(T);
     // 若发生重新映射,若time的指向有任务,则需要执行
     timer_execute(T);
     spinlock_unlock(&T->lock);
 }

到期任务的处理

在每次tick事件中,定时器以当前 tick 值的低8位作为索引,取出 near 数组中对应的 list,list 里面包含了所有在该 tick 到期的定时器列表。

// 执行任务
 // 以time的低8位对应的near数组的索引,取出该位置对应的list
 void timer_execute(s_timer_t *T) {
     // 取出time低8位对应的索引值
     int idx = T->time & TIME_NEAR_MASK; 
     // 如果低8位值对应的near数组元素有链表,则取出
     while (T->near[idx].head.next) {
         // 取出对应的定时器list
         timer_node_t *current = link_clear(&T->near[idx]);
         spinlock_unlock(&T->lock);
         // 将链表各结点的任务派发出去
         dispatch_list(current);
         spinlock_lock(&T->lock);
     }
 }
 // 任务派发
 void dispatch_list(timer_node_t *current) {
     do {
         timer_node_t *temp = current;
         current=current->next;
         // cancel 标记为0,执行任务回调函数;否则,不执行任务回调
         if (temp->cancel == 0)
             temp->callback(temp);
         free(temp);
     } while (current);
 }

重新映射

为什么要对结点重新映射

只有主动轮的结点要执行,从动轮只是存储结点,主动轮结点执行完后,需要从动轮补充。

重新映射的流程

  • 确定重新映射的位置
  • 取出该指针指向槽位的所有任务,time = tick0 + time - tick
  • 再次添加结点到时间轮中
如何确定重新映射的位置
  • 检查time是否溢出,如果溢出则将t[3][0]这个链表取出并依次将该链表中的节点添加(即实现该链表的移动操作),如果time未溢出,则进行下一步。
  • 检查 time 的第1-8位是否溢出产生进位,没有则结束,有则说明 time 恰走完 near 数组。继续判断检查第9-14位是否溢出。有则进行下一步,没有则说明是t[0]数组移动。此时将 time 的第9-14位的值作为索引 idx,取出 t[0][idx] 中的 list,并将 list 中的结点依次重新添加到定时器中。而这些结点一定会添加到 near 数组中,实现了 t[0]向 near的迁移。
  • 检查 time 的第9-14位是否溢出产生进位,没有则结束,有则说明 time 恰走完t[0]数组。继续判断检查第15-20位是否溢出,有则进行下一步,没有则说明是t[1]数组移动。此时将 time 的第15-20位作为索引 idx,取出t[1][idx]中的 list,将 list 中的结点依次重新添加到定时器中,而这些结点一定会添加到 t[0] 数组中,实现了 t[1]向 t[0]的迁移。
  • ......

后续所有操作以此类推,时间轮由低向高逐级判断,因为高级轮溢出,低级一定溢出,这样实现了多级定时器逐级迁移的过程

// 重新映射,判断是否需要重新映射
 // 时间片time自加1,将高24位对应的4个6位的数组中的各个元素的链表往低位移
 void timer_shift(s_timer_t *T) {
     int mask = TIME_NEAR;       
     // 时间片+1
     uint32_t ct = ++T->time;    
     // 时间片溢出,无符号整数,循环,time重置0
     if (ct == 0) {
         // 将对应的t[3][0]链表取出,重新移动到定时器中
         move_list(T, 3, 0);
     } 
     else {
         // ct右移8位,进入到从动轮
         uint32_t time = ct >> TIME_NEAR_SHIFT; 
         // 第 i 层时间轮
         int i = 0;
         // 判断是否需要重新映射?
         // 即循环判断当前层级对应的数位是否全0,即溢出产生进位
         while ((ct & (mask-1))==0) {
             // 取当前层级的索引值    
             int idx = time & TIME_LEVEL_MASK;
             // idx=0 说明当前层级溢出,继续循环判断直至当前层级不溢出
             if (idx != 0) {
                 // 将对应的t[i][idx]链表取出,依次移动到定时器中
                 move_list(T, i, idx);
                 break;              
             }
             mask <<= TIME_LEVEL_SHIFT;  // mask 右移
             time >>= TIME_LEVEL_SHIFT;  // time 左移
             ++i;                        // 时间轮层级增加
         }
     }
 }

5.4、代码实现

// timer_wheel.h
 #ifndef _MARK_TIMEWHEEL_
 #define _MARK_TIMEWHEEL_
 #include <stdint.h>
 #define TIME_NEAR_SHIFT 8
 #define TIME_NEAR (1 << TIME_NEAR_SHIFT)
 #define TIME_LEVEL_SHIFT 6
 #define TIME_LEVEL (1 << TIME_LEVEL_SHIFT)
 #define TIME_NEAR_MASK (TIME_NEAR-1)
 #define TIME_LEVEL_MASK (TIME_LEVEL-1)
 typedef struct timer_node timer_node_t;
 typedef void (*handler_pt) (struct timer_node *node);
 // 任务结点
 struct timer_node {
     struct timer_node *next;    // 相同过期时间的待执行的下一个任务
     uint32_t expire;            // 任务过期时间
     handler_pt callback;        // 任务回调函数
     uint8_t cancel;             // 删除任务,遇到该标记则取消任务的执行
     int id;                     // 此时携带参数
 };
 timer_node_t* add_timer(int time, handler_pt func, int threadid);
 void expire_timer(void);
 void del_timer(timer_node_t* node);
 void init_timer(void);
 void clear_timer();
 #endif
 // timer_wheel.c
 #include "spinlock.h"
 #include "timewheel.h"
 #include <string.h>
 #include <stddef.h>
 #include <stdlib.h>
 #if defined(__APPLE__)
 #include <AvailabilityMacros.h>
 #include <sys/time.h>
 #include <mach/task.h>
 #include <mach/mach.h>
 #else
 #include <time.h>
 #endif
 typedef struct link_list {
     timer_node_t head;
     timer_node_t *tail;
 }link_list_t;
 // 定时器的数据结构
 typedef struct timer {
     link_list_t near[TIME_NEAR];    // 最低级的时间轮,主动轮
     link_list_t t[4][TIME_LEVEL];   // 其他层级的时间轮,从动轮
     struct spinlock lock;           // 自旋锁,O(1)
     uint32_t time;                  // tick 指针,当前时间片
     uint64_t current;               // timer运行时间,精度10ms
     uint64_t current_point;         // 系统运行时间,精度10ms
 }s_timer_t;
 static s_timer_t * TI = NULL;
 // 清空链表
 // 并返回指向链表的第一个结点的指针
 timer_node_t* link_clear(link_list_t *list) {
     // 指向头指针的下一个位置
     timer_node_t * ret = list->head.next;
     // 头结点断链
     list->head.next = 0;
     // 尾指针指向头结点
     list->tail = &(list->head);
     return ret;
 }
 // 尾插法
 void link(link_list_t *list, timer_node_t *node) {
     list->tail->next = node;
     list->tail = node;
     node->next=0;
 }
 // 添加任务结点到定时器中
 // 根据 time 判断结点应该放入时间轮的层级
 void add_node(s_timer_t *T, timer_node_t *node) {
     uint32_t time = node->expire;           // 过期时间
     uint32_t current_time=T->time;          // 当前时间
     uint32_t msec = time - current_time;    // 剩余时间 
     //根据 expire-time 的差值将结点放入相应的层级
     //[0, 2^8)
     if (msec < TIME_NEAR) {
         link(&T->near[time&TIME_NEAR_MASK],node);
     } 
     //[2^8, 2^14)
     else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) {
         link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node);   
     }
     //[2^14, 2^20) 
     else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) {
         link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);  
     } 
     //[2^20, 2^26)
     else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) {
         link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);    
     } 
     //[2^26, 2^32)
     else {
         link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);    
     }
 }
 // 添加定时任务
 timer_node_t* add_timer(int time, handler_pt func, int threadid) {
     timer_node_t *node = (timer_node_t *)malloc(sizeof(*node));
     spinlock_lock(&TI->lock);
      // 设置定时任务结点的属性
     node->expire = time + TI->time; // 添加任务触发时间 expire = time + tick
     node->callback = func;  // 添加任务回调函数
     node->id = threadid;    // 添加执行该任务的线程id
     // 判断是否需要立即执行任务
     if (time <= 0) {
         spinlock_unlock(&TI->lock);
         node->callback(node);
         free(node);
         return NULL;
     }
     // 添加任务结点到定时器中
     add_node(TI, node);
     spinlock_unlock(&TI->lock);
     return node;
 }
 // 移动链表
 // 第level层第idx个位置的链表结点重新添加到定时器T中
 void move_list(s_timer_t *T, int level, int idx) {
     timer_node_t *current = link_clear(&T->t[level][idx]);
     while (current) {
         timer_node_t *temp = current->next;
         add_node(T, current);
         current = temp;
     }
 }
 // 重新映射,判断是否需要重新映射
 // 时间片time自加1,将高24位对应的4个6位的数组中的各个元素的链表往低位移
 void timer_shift(s_timer_t *T) {
     int mask = TIME_NEAR;       
     // 时间片+1
     uint32_t ct = ++T->time;    
     // 时间片溢出,无符号整数,循环,time重置0
     if (ct == 0) {
         // 将对应的t[3][0]链表取出,重新移动到定时器中
         move_list(T, 3, 0);
     } 
     else {
         // ct右移8位,进入到从动轮
         uint32_t time = ct >> TIME_NEAR_SHIFT; 
         // 第 i 层时间轮
         int i = 0;
         // 判断是否需要重新映射?
         // 即循环判断当前层级对应的数位是否全0,即溢出产生进位
         while ((ct & (mask-1))==0) {
             // 取当前层级的索引值    
             int idx = time & TIME_LEVEL_MASK;
             // idx=0 说明当前层级溢出,继续循环判断直至当前层级不溢出
             if (idx != 0) {
                 // 将对应的t[i][idx]链表取出,依次移动到定时器中
                 move_list(T, i, idx);
                 break;              
             }
             mask <<= TIME_LEVEL_SHIFT;  // mask 右移
             time >>= TIME_LEVEL_SHIFT;  // time 左移
             ++i;                        // 时间轮层级增加
         }
     }
 }
 // 任务派发给其他线程执行
 void dispatch_list(timer_node_t *current) {
     do {
         timer_node_t *temp = current;
         current=current->next;
         // cancel 标记为0,执行任务回调函数
         if (temp->cancel == 0)
             temp->callback(temp);
         free(temp);
     } while (current);
 }
 // 执行任务
 // 以time的低8位对应的near数组的索引,取出该位置对应的list
 void timer_execute(s_timer_t *T) {
     // 取出time低8位对应的值
     int idx = T->time & TIME_NEAR_MASK; 
     // 如果低8位值对应的near数组元素有链表,则取出
     while (T->near[idx].head.next) {
         // 取出对应的定时器list
         timer_node_t *current = link_clear(&T->near[idx]);
         spinlock_unlock(&T->lock);
         // 将链表各结点的任务派发出去
         dispatch_list(current);
         spinlock_lock(&T->lock);
     }
 }
 // 定时器更新
 void timer_update(s_timer_t *T) {
     spinlock_lock(&T->lock);
     // 执行任务
     timer_execute(T);
     /// time+1,并判断是否进行重新映射
     timer_shift(T);
     // 若发生重新映射,若time的指向有任务,则需要执行
     timer_execute(T);
     spinlock_unlock(&T->lock);
 }
 // 删除定时器任务
 void del_timer(timer_node_t *node) {
     node->cancel = 1;
 }
 // 创建定时器
 s_timer_t * timer_create_timer() {
     s_timer_t *r = (s_timer_t *)malloc(sizeof(s_timer_t));
     memset(r, 0, sizeof(*r));
     int i, j;
     // 创建主动轮,最低级时间轮
     for (i = 0; i < TIME_NEAR; ++i) {
         link_clear(&r->near[i]);
     }
     // 创建从动轮,高层级时间轮
     for (i = 0; i < 4; ++i) {
         for (j = 0;j < TIME_LEVEL; ++j) {
             link_clear(&r->t[i][j]);
         }
     }
     // 初始化自旋锁
     spinlock_init(&r->lock);
     r->current = 0;
     return r;
 }
 // 获取当前时间,时间精度10ms
 uint64_t gettime() {
     uint64_t t;
 #if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER)
     struct timespec ti;
     clock_gettime(CLOCK_MONOTONIC, &ti);// CLOCK_MONOTONIC,从系统启动这一刻起开始计时,不受系统时间被用户改变的影响
     t = (uint64_t)ti.tv_sec * 1000;
     t += ti.tv_nsec / 1000000;
 #else
     struct timeval tv;
     gettimeofday(&tv, NULL);
     t = (uint64_t)tv.tv_sec * 100;
     t += tv.tv_usec / 10000;
 #endif
     return t;
 }
 // 检测定时器,时间精度10ms,每过1/4时间精度2.5ms执行1次
 // 原因是dispatch分发任务花费时间,影响精度
 void expire_timer(void) {
     // 获取当前系统运行时间,不受系统时间被用户的影响
     uint64_t cp = gettime();
     // 当前系统启动时间与定时器记录的系统启动时间不相等 
     if (cp != TI->current_point) {  
         // 获取上述两者的差值
         uint32_t diff = (uint32_t)(cp - TI->current_point);
         // 更新定时器记录的系统运行时间
         TI->current_point = cp;
         // 更新timer的运行时间
         TI->current += diff;
         // 更新定时器的时间(time的值),并执行对应的过期任务
         int i;
         for (i=0; i<diff; i++) {
             // 每执行一次timer_update,其内部的函数
             // timer_shift: time+1,time代表从timer启动后至今一共经历了多少次tick
             // timer_execute: 执行near中的定时器
             timer_update(TI);
         }
     }
 }
 // 初始化定时器
 void init_timer(void) {
     TI = timer_create_timer();      // 创建定时器
     TI->current_point = gettime();  // 获取当前时间
 }
 void clear_timer() {
     int i,j;
     for (i=0;i<TIME_NEAR;i++) {
         link_list_t * list = &TI->near[i];
         timer_node_t* current = list->head.next;
         while(current) {
             timer_node_t * temp = current;
             current = current->next;
             free(temp);
         }
         link_clear(&TI->near[i]);
     }
     for (i=0;i<4;i++) {
         for (j=0;j<TIME_LEVEL;j++) {
             link_list_t * list = &TI->t[i][j];
             timer_node_t* current = list->head.next;
             while (current) {
                 timer_node_t * temp = current;
                 current = current->next;
                 free(temp);
             }
             link_clear(&TI->t[i][j]);
         }
     }
 }
 // tw-timer.c
 // gcc tw-timer.c timewheel.c -o tw -I./ -lpthread #include <stdio.h>
 #include <unistd.h>
 #include <pthread.h>
 #include <time.h>
 #include <stdlib.h>
 #include "timewheel.h"
 struct context {
     int quit;
     int thread;
 };
 struct thread_param {
     struct context *ctx;
     int id;
 };
 static struct context ctx = {0};
 void do_timer(timer_node_t *node) {
     printf("do_timer expired:%d - thread-id:%d\n", node->expire, node->id);
     add_timer(100, do_timer, node->id);
 }
 void do_clock(timer_node_t *node) {
     static int time;
     time ++;
     printf("---time = %d ---\n", time);
     add_timer(100, do_clock, node->id);
 }
 void* thread_worker(void *p) {
     struct thread_param *tp = p;
     int id = tp->id;
     struct context *ctx = tp->ctx;
     int expire = rand() % 200; 
     add_timer(expire, do_timer, id);
     while (!ctx->quit) {
         usleep(1000);
     }
     printf("thread_worker:%d exit!\n", id);
     return NULL;
 }
 void do_quit(timer_node_t * node) {
     ctx.quit = 1;
 }
 int main() {
     srand(time(NULL));
     ctx.thread = 2;
     pthread_t pid[ctx.thread];
     init_timer();
     add_timer(6000, do_quit, 100);
     add_timer(0, do_clock, 100);
     struct thread_param task_thread_p[ctx.thread];
     int i;
     for (i = 0; i < ctx.thread; i++) {
         task_thread_p[i].id = i;
         task_thread_p[i].ctx = &ctx;
         if (pthread_create(&pid[i], NULL, thread_worker, &task_thread_p[i])) {
             fprintf(stderr, "create thread failed\n");
             exit(1);
         }
     }
     while (!ctx.quit) {
         expire_timer();
         usleep(2500);    // 2.5ms
     }
     clear_timer();
     for (i = 0; i < ctx.thread; i++) {
         pthread_join(pid[i], NULL);
     }
     printf("all thread is closed\n");
     return 0;
 }

6、参考资料

相关文章
|
5天前
|
消息中间件 NoSQL 应用服务中间件
定时器方案,红黑树,时间轮
定时器方案,红黑树,时间轮
49 0
|
5天前
|
消息中间件 应用服务中间件 网络安全
定时器方案:红黑树、最小堆和时间轮的原理
定时器方案:红黑树、最小堆和时间轮的原理
77 0
|
监控 数据可视化 PHP
Laravel Crontab 支持的最小单位是分钟,怎么实现秒级执行的需求呢?
Laravel Crontab 支持的最小单位是分钟,怎么实现秒级执行的需求呢?
400 0
Laravel Crontab 支持的最小单位是分钟,怎么实现秒级执行的需求呢?
|
5天前
|
存储 消息中间件 算法
精华推荐 |【算法数据结构专题】「延时队列算法」史上非常详细分析和介绍如何通过时间轮(TimingWheel)实现延时队列的原理指南
精华推荐 |【算法数据结构专题】「延时队列算法」史上非常详细分析和介绍如何通过时间轮(TimingWheel)实现延时队列的原理指南
56 1
|
6月前
|
存储 算法 NoSQL
定时器方案之红黑树、最小堆、时间轮
定时器方案之红黑树、最小堆、时间轮
141 0
|
9月前
|
算法 Java Sentinel
限流算法(计数器、滑动时间窗口、漏斗、令牌)原理以及代码实现
> 本文会对这4个限流算法进行详细说明,并输出实现限流算法的代码示例。 > 代码是按照自己的理解写的,很简单的实现了功能,还请大佬们多多交流找bug。
239 0
|
5天前
|
Linux 容器
002. 使用最小堆实现高性能定时器实现
002. 使用最小堆实现高性能定时器实现
55 0
|
6月前
|
存储 NoSQL 应用服务中间件
分别基于红黑树、timefd、多级时间轮实现定时器-1
分别基于红黑树、timefd、多级时间轮实现定时器
41 0
|
6月前
|
存储 缓存 索引
分别基于红黑树、timefd、多级时间轮实现定时器-2
分别基于红黑树、timefd、多级时间轮实现定时器
52 0
|
10月前
|
编译器 程序员 C语言
编译时间和运行态时间交换的优缺点
编译时间和运行态时间交换的优缺点
61 0