我们学习epoll分为四部分
- 快速理解部分概念 快速的看一下部分接口
- 讲解epoll的工作原理
- 手写epoll服务器
- 工作模式
并且在这四个部分的内容学习完毕之后我们学习一下Reactor模式
初识epoll
按照man手册的说法
epoll是为了处理大量句柄而做出改进的poll
它在2.5.44内核中被引入到Linux
也是目前来说最常用的一种多路转接IO方式
epoll相关系统调用
epoll函数有三个相关的系统调用 分别是
- epoll_create
- epoll_ctl
- epoll_wait
epoll_create函数
epoll_create函数的作用是创建一个epoll模型 函数原型如下
int epoll_create(int size);
参数说明:
- 目前来说 epoll_create的参数是被废弃的 我们设置为256或者512就行 这样设计的原因是为了向前兼容
返回值说明:
- 返回一个epoll模型 (实际上就是一个文件描述符)
epoll_ctl函数
epoll_ctl函数的作用是对创建出来的epoll模型进行操控 函数原型如下
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数说明:
- int epfd 标识一个我们的IO模型
- int op operator 表示我们想要做出什么样的操作
- int fd 表示我们需要添加的文件描述符
- epoll_event *event 表示我们需要关心哪些事件
返回值说明:
- 函数成功调用返回0 失败返回-1 同时错误码将被设置
epoll_wait函数的作用是监视我们关心的关键描述符 函数原型如下
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
参数说明:
int epfd 标识我们的epoll模型
struct epoll_event *events 输出型参数 内核会拷贝已经就绪的事件到这里面
int maxevents events数组的元素个数
int timeout 和poll函数中的timeout一样 等待事件 单位是毫秒
epoll的工作原理
我们之前的学习的多路转接函数 无论是select还有poll 它们都需要我们做下面的操作
- 让我们维护一个第三方的数组
- 都需要遍历整个数组
- 都需要经历用户到内核 内核到用户的事件通知
而我们的epoll工作模式则不同
操作系统硬件上的工作模式如下
这是一个缩略版的操作系统图
那么现在问题来了 操作系统是如何知道硬件里面有数据了呢?
(这个硬件可以是网卡 可以是键盘等等)
具体解释如下图
而epoll的工作原理如下
还是该图
当我们创建一个epoll模型之后操作系统底层会帮助我们维护一颗红黑树
红黑树的节点里面维护着很多元素 其中最重要的是两个
- 文件描述符
- 事件
所以说这颗红黑树解决的是用户通知内核的问题
用户通知内核自己要关心哪些文件描述符的哪些事件之后 操作系统就会生成一个节点然后插入到这颗红黑树当中
而这颗红黑树就是对应我们select和poll当中的数组
只不过此时它就由操作系统进行维护了
而我们内核通知用户的则是通过消息队列通知
我们可以这么理解 在内核维护的红黑树旁边有一个消息队列
每当有fd的事件就绪的时候就会在该队列上添加一个元素
于是我们用户读取的时候时间复杂度变为了O(1)
操作系统什么时候构建就绪队列节点呢?
操作系统在调用驱动的时候构建就绪队列节点
在生成红黑树节点的时候 在驱动中 每个节点都会生成一个自己的回调函数
于是在经历了硬件中断到读取数据的过程后 操作系统会调用驱动中的回调函数来获取该节点的数据 并且根据这些数据(fd和events)构建就绪节点 最后将构建好的节点插入到队列中
我们将上面的一整套机制称为epoll模型
那么我们现在再来回顾下epoll的三个函数
- epoll_create
- epoll_ctl
- epoll_wait
它们的作用分别是
- epoll_create : 创建epoll模型 包括红黑树 就绪队列 回调函数等
- epoll_ctl : 对于红黑树的节点进行注册
- epoll_wait : 获取就绪队列中的内容
为什么epoll_create返回一个文件描述符 而epoll_ctl和epoll_wait需要用到这个文件描述符呢?
这个问题最本质的原因是因为文件描述符表是随进程的 具体理解我们可以看下图
我们都知道每个进程都对应一个PCB结构 而每个PCB结构中都会有一个file struct结构体 这个结构体中有一个文件数组 每个下标对应一个文件描述符
而epoll_create的本质就是打开了一个文件 所以被分配了一个文件描述符
在这个文件中有个void* p指针 可以找到我们上面说的那些红黑树 就绪队列等等
这里还有一些关于epoll服务器的一些小细节
epoll底层维护的红黑树key值是什么呢?
是fd文件描述符 它是一个绝佳的天然key值 既不会重复 又容易排序
用户需要关系os对于fd和event的管理吗
不需要 os会在底层完成这些事
epoll为什么高效呢
- 因为epoll底层维护的是红黑树结构 对比数组来说增删改查有着天然的优势
- 我们不需要主动去询问哪些文件是否就绪 os会自动将其添加到就绪队列中
- 在寻找就绪文件的时候 由于我们使用的是就绪队列 时间复杂度是O(1) 而遍历数组的时间复杂度则是O(N)
epoll有线程安全问题嘛?
没有
实际上就绪队列是一个经典的生产者消费者模型 os生成数据 而用户消费数据 所以说这个队列实际上是一个临界资源 所以说操作系统在底层对其做了一些加锁处理 让他变为线程安全的
如果底层没有就绪事件 我们上层应该怎么办呢?
根据timeout参数来决定
- 如果timeout为0 则是非阻塞
- 如果timeout为-1 则是阻塞
- 如果timeout大于0 则表示我们要等待多少毫秒之后去读取
epoll服务器编写
接下来我们开始设计一个epoll服务器
成员变量
首先作为一个基于TCP协议的服务器 我们必须要有listen套接字和端口号
int _listensock; uint16_t _port;
其次作为一个epoll服务器 我们还必须要有一个epfd作为句柄来标识一个epoll模型
int _epfd;
此外我们还需要设置一个数组来接收epoll_wait的数据
struct epoll_event* _revs; int _revs_num;
构造函数
ep_server(const int& port = default_port) :_port(port) { // 1. create listensock _listensock = Sock::Socket(); Sock::Bind(_listensock , _port); Sock::Listen(_listensock); // 2. create epoll _epfd = epoll::createepoll(); logMessage(DEBUG , "create epoll_server success, epfd: %d , listensock: %d " ,_epfd , _listensock); // 3. append listen socket to epoll if(epoll::epollctl(_epfd , EPOLL_CTL_ADD , _listensock , EPOLLIN)) { logMessage(DEBUG , "epollctl add success %d"); } else { exit(6); } }
我们这里不直接使用epoll的原生函数来进行操作 而是进行一下封装
封装后的epoll类如下
class epoll { public: static const int gsize = 256; public: static int createepoll() { int epfd = epoll_create(gsize); if (epfd > 0) { return epfd; } else { // err exit(5); } } static bool epollctl(int epfd , int oper , int sock , uint32_t events) { struct epoll_event ev; ev.data.fd = sock; ev.events = events; int ret = epoll_ctl(epfd , oper , sock , &ev); return ret == 0; } static int epollwait(int epfd , struct epoll_event res[] , int num , int timeout) { return epoll_wait(epfd , res , num , timeout); } };
循环函数
我们服务器肯定不是只accept一次就完事了 所以说我们需要设计一个循环函数来重复执行accept的动作
我们分析下 首先我们每次循环肯定是要检测一次epoll就绪队列中有没有数据的 如果有的话 我们就直接从这个里面拿数据 并且把这个数据拿出来
特别注意 如果是listen套接字中的数据 我们还需要往 struct_events
中添加数据
每次循环的大概代码如下
int n = epoll_wait(_epfd, _revs, _num, timeout); switch (n) { case 0: logMessage(NORMAL, "timeout ..."); break; case -1: logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno)); break; default: logMessage(NORMAL, "have event ready"); //HandlerEvent(n); break;
我们将处理函数重新封装
HandlerEvent函数
在每次循环的时候我们成功使用epoll_wait拿到了就绪队列里的数据之后会走到这里
这里我们要进行判断 到底是listensock就绪了还是普通sock套接字就绪了
如果是listensock套接字就绪就代表我们要接收一个新的请求 如果是普通sock就绪就代表我们可以读取请求了
void HandlerEvent(int readyNum) { logMessage(DEBUG, "HandlerEvent in"); for (int i = 0; i < readyNum; i++) { uint32_t events = _revs[i].events; int sock = _revs[i].data.fd; if (sock == _listensock && (events & EPOLLIN)) { //_listensock读事件就绪, 获取新连接 std::string clientip; uint16_t clientport; int fd = Sock::Accept(sock, &clientip, &clientport); if (fd < 0) { logMessage(WARNING, "accept error"); continue; } // 获取fd成功,可以直接读取吗??不可以,放入epoll struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = fd; epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev); } else if (events & EPOLLIN) { // 普通的读事件就绪 // 依旧有问题 char buffer[1024]; // 把本轮数据读完,就一定能够读到一个完整的请求吗?? int n = recv(sock, buffer, sizeof(buffer), 0); if (n > 0) { buffer[n] = 0; logMessage(DEBUG, "client# %s", buffer); // TODO std::string response = func_(buffer); send(sock, response.c_str(), response.size(), 0); } else if (n == 0) { // 建议先从epoll移除,才close fd epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr); close(sock); logMessage(NORMAL, "client quit"); } else { // 建议先从epoll移除,才close fd epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr); close(sock); logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno)); } } else { } } logMessage(DEBUG, "HandlerEvent out"); }
其实到这里 我们简单的epoll服务器就做完了
我们接下来还要学习下epoll服务器的工作模式