定时器应用
心跳检测 、技能冷却、 倒计时 、其他需要延时处理的功能
定时器触发方式
对于服务端来说,驱动服务端业务逻辑的事件包括网络事件、 定时事件、以及信号事件;通常网络事件和定时事件会进行协同处理;定时器触发形式通常有两种:
1 网络事件和定时事件在一个线程中处理;例如:nginx、 redis、memcached;
定时任务比较少的时候。
对于多线程情况,引起时间处理不均衡。
任务多了,会影响网络事件处理。
2 网络事件和定时事件在不同线程中处理;例如: skynet,...;
usleep(time) time要小于最小时间精度
通常采用的数据结构是 时间轮
加锁粒度比较小
时间轮只负责检测,通过信号或者插入执行队列让其他线程处理。
// 网络事件和定时事件在一个线程中处理 while (!quit) { int timeout = get_nearest_timer() - now(); if (timeout < 0) timeout = -1; int nevent = epoll_wait(epfd, ev, nev, timeout); for (int i=0; i<nevent; i++) { // ... 处理网络事件 } // 处理定时事件 update_timer(); } // 网络事件和定时事件在不同线程中处理 void * thread_timer(void *thread_param) { init_timer(); while (!quit) { update_timer(); sleep(t); } clear_timer(); return NULL; } pthread_create(&pid, NULL, thread_timer, &thread_param);
定时器设计
接口设计
// 初始化定时器 void init_timer();
// 添加定时器 Node* add_timer(int expire, callback cb);
// 删除定时器 bool del_timer(Node* node);
// 找到最近要触发的定时任务 Node* find_nearest_timer(); // 只有第一种情况才需要
// 更新检测定时器 void update_timer();
// 清除定时器 void clear_timer();
数据结构设计
对定时任务的组织本质是要处理对定时任务优先级的处理;由此产生两类数据结构;
按触发时间进行顺序组织
要求数据结构有序(红黑树、跳表),或者相对有序 (最小堆);
能快速查找最近触发的定时任务;
需要考虑怎么处理相同时间触发的定时任务;
按执行顺序进行组织
时间轮
红黑树
绝对有序 , nginx
跳表
绝对有序,redis未来会引用跳表
最小堆
满二叉树:所有的层节点数都是该层所能容纳节点的最大数量 (满足 )
完全二叉树:若二叉树的深度为 h,去除了 h 层的节点,就是 一个满二叉树;且 h 层都集中在最左侧排列;
最小堆:
是一颗完全二叉树;
某一个节点的值总是小于等于它的子节点的值;
堆中任意一个节点的子树都是最小堆;
libevent,libev
为了满足完全二叉树的定义,往二叉树最高层沿着最左侧添加 一个节点;然后考虑是否能上升操作;如果此时添加值为 4 的 节点,4 节点是 5 节点的左子树;4 比 5 小,4 和 5 需要交换 值;
最小堆删除节点
删除操作需要先查找是否包含这个节点;确定存在后,交换最后一个节点,先考虑能否执行下降操作,否则执行上升操作; 最后删除最后一个节点; 例如:删除 1 号节点,则需要下沉操作;删除 9 号节点,则需要上升操作;
时间轮
为什么要分成多个层级:
1 减少空间占用
2 只需要关注最近要触发的定时任务 最近一分钟
3 按照任务触发的轻重缓急进行组织
4 减少任务的检测
tick
取值范围为时间范围,只需要记录一个指针,当秒针移动一圈,说明下一分钟的任务要进行了。
任务节点
expire , callback , next (解决相同触发时间的任务)
添加节点
根据time判断放在哪一层, expire = tick0 + time
重新映射
当分针的任务移动到秒针层级时
为什么需要重新映射? 时间精度为秒,只执行秒针层级的任务。
怎么重新映射:
1 确定重新映射的位置(算出分针的指针位置) tick | (tick/60)%60
2 取出该指针指向槽位的所有任务
3 time = tick0 + time - tick
4 再添加节点的逻辑。
删除节点
1 节点位置可能发生变化
2 添加一个字段,cancel = true ,任务触发时遇到这个标记不执行具体任务
时间轮场景
内核 ,skynet , kafka , netty
运行图
原理图
#include <sys/epoll.h> #include <functional> #include <chrono> #include <set> #include <memory> #include <iostream> using namespace std; struct TimerNodeBase{ time_t expire; int64_t id; }; struct TimerNode: public TimerNodeBase{ using Callback = std::function<void(const TimerNode& node)>; Callback func; TimerNode(int64_t id , time_t expire , Callback func):func(func){ this->expire = expire; this->id = id; // 如果时间相同,根据id的先后顺序执行 }; }; bool operator < (const TimerNodeBase& lhd , const TimerNodeBase& rhd){// std::less<> 需要 if(lhd.expire < rhd.expire) return true; else if(lhd.expire > rhd.expire) return false; return lhd.id < rhd.id; } class Timer{ public: static time_t GetTick(){ auto sc = chrono::time_point_cast<chrono::milliseconds>(chrono::steady_clock::now()); auto temp = chrono::duration_cast<chrono::milliseconds>(sc.time_since_epoch()); return temp.count(); } TimerNodeBase AddTimer(time_t msec , TimerNode::Callback func){ time_t expire = GetTick() + msec; auto ele = timermap.emplace(GenID() , expire , func); return static_cast<TimerNodeBase>(*ele.first); } bool DelTimer(TimerNodeBase& node){ auto iter = timermap.find(node); // c++14新特性, 传一个相似值 , 基类引用 if(iter != timermap.end()){ timermap.erase(iter); return true; } return false; } bool CheckTimer(){ // 检查是否有定时事件 auto iter = timermap.begin(); if(iter != timermap.end() && iter->expire <= GetTick()){ // 时间到了 iter->func(*iter); // 执行任务 timermap.erase(iter); return true; } return false; } time_t TimeToSleep(){ auto iter = timermap.begin(); if(iter == timermap.end()) return -1; time_t diss = iter->expire - GetTick(); // 是否需要睡眠 return diss > 0 ? diss : 0; // } private: static int64_t GenID(){ return gid++; } static int64_t gid; set<TimerNode , std::less<>> timermap; }; int64_t Timer::gid = 0; int main(){ int epfd = epoll_create(1); unique_ptr<Timer> timer = make_unique<Timer>(); int i = 0; timer->AddTimer(1000 , [&](const TimerNode& node){ cout << Timer::GetTick() << "node id:" << node.id << "revoked times:" << ++i << endl; }); timer->AddTimer(1000, [&](const TimerNode &node) { cout << Timer::GetTick() << "node id:" << node.id << " revoked times:" << ++i << endl; }); timer->AddTimer(3000, [&](const TimerNode &node) { cout << Timer::GetTick() << "node id:" << node.id << " revoked times:" << ++i << endl; }); auto node = timer->AddTimer(2100, [&](const TimerNode &node) { cout << Timer::GetTick() << "node id:" << node.id << " revoked times:" << ++i << endl; }); timer->DelTimer(node); cout << "now time:" << Timer::GetTick() << endl; struct epoll_event ev[64] = {0}; while(true){ /* -1 永久阻塞 0 没有事件立刻返回,有事件就拷贝到 ev 数组当中 t > 0 阻塞等待 t ms, timeout 最近触发的定时任务离当前的时间 */ int n = epoll_wait(epfd, ev, 64, timer->TimeToSleep()); for (int i = 0; i < n; i++) { /*网络事件处理*/ } /* 处理定时事件*/ while(timer->CheckTimer()); } return 0; }