下面这个部分来介绍一下定时器及封装互斥锁信号量的相关知识。
一、定时器相关
关于定时器部分,核心就在于信号处理机制(信号处理函数、信号响应函数等)。(关于信号处理这个部分的流程我还有些不明白,还需要再看一次)
本项目中,服务器主循环为每一个连接创建一个定时器,并对每个连接进行定时。另外,利用升序时间链表容器将所有定时器串联起来,若主循环接收到定时通知,则在链表中依次执行定时任务。
Linux下提供了三种定时的方法:
·socket选项SO_RECVTIMEO和SO_SNDTIMEO ·SIGALRM信号 ·I/O复用系统调用的超时参数
具体的,利用alarm函数周期性地触发SIGALRM信号,信号处理函数利用管道通知主循环,主循环接收到该信号后对升序链表上所有定时器进行处理,若该段时间内没有交换数据,则将该连接关闭,释放所占用的资源。
进一步介绍Sigalarm信号:https://mp.weixin.qq.com/s/mmXLqh_NywhBXJvI45hchA
//利用链表将每个定时器时间串联起来 class util_timer { public: util_timer() : prev(NULL), next(NULL) {} public: time_t expire; //超时时间 void (*cb_func)(client_data *); //回调函数 client_data *user_data; util_timer *prev; util_timer *next; };
该双向链表节点中的expire是截止时间,即到该时间则释放连接,整个链表动态维护从小到大的顺序(即先到截止时间的节点在最前面);
整个定时器主体实现在于以下这个函数:
void tick() { if (!head) { return; } //printf( "timer tick\n" ); LOG_INFO("%s", "timer tick"); Log::get_instance()->flush(); time_t cur = time(NULL); util_timer *tmp = head; while (tmp) { if (cur < tmp->expire) { break; } tmp->cb_func(tmp->user_data); head = tmp->next; if (head) { head->prev = NULL; } delete tmp; tmp = head; } }
·获取当前的时间,同时遍历定时器链表,如果有应该结束的定时事件,就调用回调函数将其从内核注册表中删除;
·使用统一事件源,SIGALRM信号每次被触发,主循环中调用一次定时任务处理函数,处理链表容器中到期的定时器。
·基本处理流程即是遍历定时器升序链表,如果找到大于定时任务的定时器对象就将其删除。
统一事件源,是指将信号事件与其他事件一样被处理。
·具体的,信号处理函数使用管道将信号传递给主循环,信号处理函数往管道的写端写入信号值,主循环则从管道的读端读出信号值,使用I/O复用系统调用来监听管道读端的可读事件,这样信号事件与其他文件描述符都可以通过epoll来监测,从而实现统一处理。
那么在实际服务器项目中具体如何使用定时器呢?
·服务器首先创建定时器容器链表,然后用统一事件源将异常事件,读写事件和信号事件统一处理,根据不同事件的对应逻辑使用定时器。
二、log日志系统
该web服务器项目的日志系统利用单例模式来实现日志系统。(这个部分也没看完,日志系统涉及消费者生产者的实际用法)
单例模式利用双检测懒汉模式:
single* single::p = NULL; single* single::getinstance(){ if (NULL == p){ pthread_mutex_lock(&lock); if (NULL == p){ p = new single; } pthread_mutex_unlock(&lock); } return p; }
为什么要检测两次?
如果只检测一次,在每次调用获取实例的方法时,都需要加锁,这将严重影响程序性能。双层检测可以有效避免这种情况,仅在第一次创建单例的时候加锁,其他时候都不再符合NULL == p的情况,直接返回已创建好的实例。
PS:C++11之后局部静态变量初始化线程安全(某一个线程对静态变量进行初始化时,其他线程企图操作会阻塞,这就保证了只有一个对象),故单例模式可以这么实现:
(参考:https://www.cnblogs.com/wangshaowei/p/13498412.html)
class single{ private: single(){} ~single(){} public: static single* getinstance(); }; single* single::getinstance(){ static single obj; return &obj; }
PS:为何判断信号量是否满足时使用while而不是if?除了之前所说的系统未知原因造成信号量改变的原因之外,还有一个原因在于notify之后只有一个线程获取了,其他线程必须继续等待;
三、封装信号量和互斥锁
在该项目中,可以通过封装linux原本的信号量和pthread的mutex来实现自己所需要的信号量操作及互斥锁操作。将重复使用的代码封装为函数,减少代码的重复,使其更简洁。将锁(信号量、互斥锁、条件变量)的创建于销毁函数封装在类的构造与析构函数中,实现RAII机制。
7.1 信号量sem
class sem { public: sem() { if (sem_init(&m_sem, 0, 0) != 0) { throw std::exception(); } } sem(int num) { if (sem_init(&m_sem, 0, num) != 0) { throw std::exception(); } } ~sem() { sem_destroy(&m_sem); } bool wait() { return sem_wait(&m_sem) == 0; } bool post() { return sem_post(&m_sem) == 0; } private: sem_t m_sem; };
PS:以这种方式实现的信号量可以保证资源的正确获取和销毁。
7.2 互斥锁
class locker { public: locker() { if (pthread_mutex_init(&m_mutex, NULL) != 0) { throw std::exception(); } } ~locker() { pthread_mutex_destroy(&m_mutex); } bool lock() { return pthread_mutex_lock(&m_mutex) == 0; } bool unlock() { return pthread_mutex_unlock(&m_mutex) == 0; } pthread_mutex_t *get() { return &m_mutex; } private: pthread_mutex_t m_mutex; };
7.3 条件变量
class cond { public: cond() { if (pthread_cond_init(&m_cond, NULL) != 0) { //pthread_mutex_destroy(&m_mutex); throw std::exception(); } } ~cond() { pthread_cond_destroy(&m_cond); } bool wait(pthread_mutex_t *m_mutex) { int ret = 0; //pthread_mutex_lock(&m_mutex); ret = pthread_cond_wait(&m_cond, m_mutex); //pthread_mutex_unlock(&m_mutex); return ret == 0; } bool timewait(pthread_mutex_t *m_mutex, struct timespec t) { int ret = 0; //pthread_mutex_lock(&m_mutex); ret = pthread_cond_timedwait(&m_cond, m_mutex, &t); //pthread_mutex_unlock(&m_mutex); return ret == 0; } bool signal() { return pthread_cond_signal(&m_cond) == 0; } bool broadcast() { return pthread_cond_broadcast(&m_cond) == 0; } private: //static pthread_mutex_t m_mutex; pthread_cond_t m_cond; };
PS:可以了解一下pthread.h中有关互斥和信号量的再底层是如何实现的,pthread的源码见博客:https://blog.csdn.net/jhsword/article/details/91825038。源码的简单分析见下。
7.4 锁机制的底层简析
互斥锁是对临界区的保护,能否进入临界区(获取到锁),基本思路都是判断并置位一个标志位,关键就在于对这个变量的操作是原子的。
那么如何实现原子操作呢?
单核平台可以利用关中断防止调度即可,多核则可以通过spinlock(自旋锁即是获取不到lock就原地等待)保护,更多的是利用原子汇编指令,比如x86系列的tsl(test and set)指令,该汇编指令是无法被分割的,具体是该指令在执行时锁定总线,其他核都无法访问了。
现在许多的CPU会提供一些用来构建锁的atomic指令,例如譬如x86的CMPXCHG(加上LOCK prefix),LOCK前缀的作用是锁定系统总线(或者锁定某一块cache line)来实现atomicity(即在同一个时间段内,只有一个CPU能够执行某个操作,如访问某个内存位置)。
如对于x86而言,有一个xchg指令,它的作用就是将一个CPU寄存器的内容和一个指定位置的物理内存的内容互换。当CPU执行次操作的时候,内存控制器会将该内存位置锁定,其他CPU无法在这个时间段内访问该内存,即使是在这个时候其他CPU发出访问该内存的指令,内存控制器也会先将该访问请求在硬件上缓存到请求队列中,等到前面的xchg指令完成后,再执行该命令。