概述
当发数据的时候,协议栈是知道的,但是应用程序是不知道的,epoll的作用就是在协议栈和应用层之间做到对应的连接有没有数据的检测,并且epoll会管理众多的fd。epoll其实就相当于协议栈和应用程序之间的一个组件。应用程序通过调用epoll的三个接口跟epoll交互,而协议栈所做的事情就是当协议栈触发收或者发数据的时候,就将红黑树的节点添加到就绪队列当中,这里面会做一个回调,由协议栈callback到epoll,使得epoll_wait能够拷贝到数据.
实现自定义epoll时,需要实现哪些东西呢?
- epitem/epoll_event结构体
- epoll_create
- epoll_ctl
- epoll_wait
- epollin/epollout 对应的事件
- callback 协议栈有触发收或者发数据的时候(总共有四处:三次握手完成、接收数据、发送数据、接收到fin包),就会触发回调,回调中红黑树的节点添加到就绪队列当中,并条件通知到epoll_wait
epoll数据结构
epoll数据结构的选择
我们发现,当我们添加、删除、修改一个fd对应的事件的时候,都需要去查找,所以它是一个查找频率很高的数据结构,对于查找频率很高的数据结构,一般有如下几个数据结构:
- 红黑树
- 哈希
- b/b+树
我们分析一下,首先epoll工作的fd的数量是不确定的,有可能一个,也有可能n多个,hash从一开始占的内存就比较大,如果从一开始你就知道fd的数量很多的时候,你就可以选择hash,B树/B+树查找的性能没有红黑树高,综上,红黑树是最合适的
epoll数据结构定义
epoll 主要由两个结构体:eventpoll 与 epitem。epitem 是每一个 IO 所对应的的事件。比如 epoll_ctl EPOLL_CTL_ADD 操作的时候,就需要创建一个 epitem。eventpoll 是每一个 epoll 所 对应的的。比如 epoll_create 就是创建一个 eventpoll。
- epitem定义
struct epitem { RB_ENTRY(epitem) rbn; // 红黑树节点 LIST_ENTRY(epitem) rdlink; // 用来做就绪的队列 int rdy; // 是否在就绪队列中,1存在,0不存在 int sockfd; // key struct epoll_event event; // value };
- eventpoll 定义
struct eventpoll { int fd; ep_rb_tree rbr; // 红黑树的根节点 int rbcnt; // 添加到红黑树中节点的数量 LIST_HEAD( ,epitem) rdlist; // 队列的节点 int rdnum; // 就绪数量 int waiting; pthread_mutex_t mtx; // rbtree update pthread_spinlock_t lock; // rdlist update pthread_cond_t cond; // block for event 使用条件变量主要是因为epoll_wait这个函数里面是带阻塞的 pthread_mutex_t cdmtx; // mutex for cond };
具体如下图所示:
List 用来存储准备就绪的 IO。对于数据结构主要讨论两方面:insert 与 remove。同样如此, 对于 list 我们也讨论 insert 与 remove。何时将数据插入到 list 中呢?当内核 IO 准备就绪的时 候,则会执行 epoll_event_callback 的回调函数,将 epitem 添加到 list 中。 那何时删除 list 中的数据呢?当 epoll_wait 激活重新运行的时候,将 list 的 epitem 逐一 copy 到 events 参数中。 Rbtree 用来存储所有 io 的数据,方便快速通 io_fd 查找。也从 insert 与 remove 来讨论。 对于rbtree 何时添加:当 App 执行 epoll_ctl EPOLL_CTL_ADD 操作,将 epitem 添加到 rbtree 中。何时删除呢?当 App 执行 epoll_ctl EPOLL_CTL_DEL 操作,将 epitem 从rbtree中移除。
epoll三大应用层接口实现
nepoll_create
// 创建eventpoll结构体,并对其进行初始化 int nepoll_create(int size) { if (size <= 0) return -1; int epfd = get_fd_frombitmap(); struct eventpoll *ep = (struct eventpoll*)rte_calloc("eventpoll", 1, sizeof(struct eventpoll), 0); if (!ep) { set_fd_frombitmap(epfd); return -1; } ep->rbcnt = 0; RB_INIT(&ep->rbr); LIST_INIT(&ep->rdlist); if (pthread_mutex_init(&ep->mtx, NULL)) { rte_free(ep); set_fd_frombitmap(epfd); return -2; } if (pthread_mutex_init(&ep->cdmtx, NULL)) { pthread_mutex_destroy(&ep->mtx); rte_free(ep); // dpdk中提供了一系列内存操作,不允许私自分配和释放 set_fd_frombitmap(epfd); return -2; } if (pthread_cond_init(&ep->cond, NULL)) { pthread_mutex_destroy(&ep->cdmtx); pthread_mutex_destroy(&ep->mtx); rte_free(ep); set_fd_frombitmap(epfd); return -2; } if (pthread_spin_init(&ep->lock, PTHREAD_PROCESS_SHARED)) { pthread_cond_destroy(&ep->cond); pthread_mutex_destroy(&ep->cdmtx); pthread_mutex_destroy(&ep->mtx); rte_free(ep); set_fd_frombitmap(epfd); return -2; } return epfd; }
nepoll_ctl
// 这个函数主要有三个操作:ADD/DEL/MOD int nepoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { struct eventpoll *ep = (struct eventpoll *)get_hostinfo_fromfd(epfd); if (!ep || (!event && op != EPOLL_CTL_DEL)) { errno = -EINVAL; return -1; } if (op == EPOLL_CTL_ADD) { pthread_mutex_lock(&ep->mtx); struct epitem tmp; tmp.sockfd = fd; struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (epi) { // 走进这里说明红黑树中已经存在这个节点了 pthread_mutex_unlock(&ep->mtx); return -1; } // 不存在就创建一个节点 epi = (struct epitem*)rte_calloc("epitem", 1, sizeof(struct epitem), 0); if (!epi) { pthread_mutex_unlock(&ep->mtx); errno = -ENOMEM; return -1; } epi->sockfd = fd; memcpy(&epi->event, event, sizeof(struct epoll_event)); epi = RB_INSERT(_epoll_rb_socket, &ep->rbr, epi); //assert(epi == NULL); ep->rbcnt ++; pthread_mutex_unlock(&ep->mtx); } else if (op == EPOLL_CTL_DEL) { pthread_mutex_lock(&ep->mtx); struct epitem tmp; tmp.sockfd = fd; struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (!epi) { pthread_mutex_unlock(&ep->mtx); return -1; } epi = RB_REMOVE(_epoll_rb_socket, &ep->rbr, epi); if (!epi) { pthread_mutex_unlock(&ep->mtx); return -1; } ep->rbcnt --; rte_free(epi); pthread_mutex_unlock(&ep->mtx); } else if (op == EPOLL_CTL_MOD) { struct epitem tmp; tmp.sockfd = fd; struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (epi) { epi->event.events = event->events; epi->event.events |= EPOLLERR | EPOLLHUP; } else { errno = -ENOENT; return -1; } } else { //assert(0); } return 0; }
nepoll_wait
// 作用就是从就绪队列里面把节点拷贝出来,跟红黑树没啥关系 // epfd指具体对应哪个epfd // events:就绪队列里面的数据会最终拷贝到events中 // maxevents:指events中最多装多少个 // timeout:指如果就绪队列为空,我们等待时间有多长 // -1,表示如果为空,就一直等 // 0,立马返回,不管就绪队列中有没有数据 // >0的数,比如10,那么就等待10ms返回 // 这个timeout的10ms我们是怎么实现的? // 主要用到pthread_cond_wait和pthread_cond_timedwait两个函数去解决 int nepoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) { struct eventpoll *ep = (struct eventpoll *)get_hostinfo_fromfd(epfd); if (!ep || !events || maxevents <= 0) { errno = -EINVAL; return -1; } if (pthread_mutex_lock(&ep->cdmtx)) { if (errno == EDEADLK) { } //assert(0); } while (ep->rdnum == 0 && timeout != 0) { ep->waiting = 1; if (timeout > 0) { struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); if (timeout >= 1000) { int sec; sec = timeout / 1000; deadline.tv_sec += sec; timeout -= sec * 1000; } deadline.tv_nsec += timeout * 1000000; if (deadline.tv_nsec >= 1000000000) { deadline.tv_sec++; deadline.tv_nsec -= 1000000000; } // timeout>0,就用pthread_cond_timedwait处理 int ret = pthread_cond_timedwait(&ep->cond, &ep->cdmtx, &deadline); if (ret && ret != ETIMEDOUT) { pthread_mutex_unlock(&ep->cdmtx); return -1; } timeout = 0; } else if (timeout < 0) { // timeout < 0就使用pthread_cond_wait函数进行条件等待 int ret = pthread_cond_wait(&ep->cond, &ep->cdmtx); if (ret) { pthread_mutex_unlock(&ep->cdmtx); return -1; } } ep->waiting = 0; } pthread_mutex_unlock(&ep->cdmtx); pthread_spin_lock(&ep->lock); int cnt = 0; int num = (ep->rdnum > maxevents ? maxevents : ep->rdnum); int i = 0; while (num != 0 && !LIST_EMPTY(&ep->rdlist)) { //EPOLLET struct epitem *epi = LIST_FIRST(&ep->rdlist); LIST_REMOVE(epi, rdlink); epi->rdy = 0; memcpy(&events[i++], &epi->event, sizeof(struct epoll_event)); num --; cnt ++; ep->rdnum --; } pthread_spin_unlock(&ep->lock); return cnt; }
epoll与协议栈
epoll 的回调函数何时执行,此部分需要与 tcp 的协议栈一起来阐述。tcp 协议栈的时序图如 下图所示,epoll 从协议栈回调的部分从下图的编号 1,2,3,4。具体 tcp 协议栈的实现,后续 从另外的文章中表述出来。下面分别对四个步骤详细描述 :
- 编号 1:是 tcp 三次握手,对端反馈 ack 后,socket 进入 rcvd 状态。需要将监听 socket 的 event 置为 EPOLLIN,此时标识可以进入到 accept 读取 socket 数据。
- 编号 2:在 established 状态,收到数据以后,需要将 socket 的 event 置为 EPOLLIN 状态。
- 编号 3:在 established 状态,收到 fin 时,此时 socket 进入到 close_wait。需要 socket 的 event 置为 EPOLLIN。读取断开信息。
- 编号 4:检测 socket 的 send 状态,如果对端 cwnd>0 是可以,发送的数据。故需要将 socket 置为 EPOLLOUT。 所以在此四处添加 EPOLL 的回调函数,即可使得 epoll 正常接收到 io 事件。
nepoll_event_callback
// 这个函数的作用就是找到红黑树中的fd对应的节点,并且把它假如到就绪队列中 // 这个函数是从协议栈回调到epoll的模块中间 // nepoll_event_callback和nepoll_wait其实是一个生产者和消费者的过程 int nepoll_event_callback(struct eventpoll *ep, int sockid, uint32_t event) { struct epitem tmp; tmp.sockfd = sockid; struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (!epi) { return -1; } if (epi->rdy) { // 如果就绪队列中已经存在了 epi->event.events |= event; return 1; } pthread_spin_lock(&ep->lock); epi->rdy = 1; LIST_INSERT_HEAD(&ep->rdlist, epi, rdlink); ep->rdnum ++; pthread_spin_unlock(&ep->lock); pthread_mutex_lock(&ep->cdmtx); pthread_cond_signal(&ep->cond); // 条件通知的动作 pthread_mutex_unlock(&ep->cdmtx); return 0; }
注意:
nepoll_event_callback和nepoll_wait其实是一个生产者和消费者的过程。
epoll锁机制
epoll 从以下几个方面是需要加锁保护的。list 的操作,rbtree 的操作,epoll_wait 的等待。 list 使用最小粒度的锁 spinlock,便于在 SMP 下添加操作的时候,能够快速操作 list。
- list添加
346 行:获取 spinlock。
347 行:epitem 的 rdy 置为 1,代表 epitem 已经在就绪队列中,后续再触发相同事件就只需 更改 event。
348 行:添加到 list 中。
349 行:将 eventpoll 的 rdnum 域 加 1。
350 行:释放 spinlock
- list删除
301 行:获取 spinlock
304 行:判读 rdnum 与 maxevents 的大小,避免 event 溢出。
307 行:循环遍历 list,判断添加 list 不能为空
309 行:获取 list 首个结点 310 行:移除 list 首个结点。
311 行:将 epitem 的 rdy 域置为 0,标识 epitem 不再就绪队列中。
313 行:copy epitem 的 event 到用户空间的 events。
316 行:copy 数量加 1
317 行:eventpoll 中 rdnum 减一。
避免 SMP 体系下,多核竞争。此处采用自旋锁,不适合采用睡眠锁。
- rbtree添加
149 行:获取互斥锁。
153 行:查找 sockid 的 epitem 是否存在。存在则不能添加,不存在则可以添加。
160 行:分配 epitem。
167 行:sockid 赋值
168 行:将设置的 event 添加到 epitem 的 event 域。
170 行:将 epitem 添加到 rbrtree 中。 173 行:释放互斥锁。
- rbtree删除
177 行:获取互斥锁。
181 行:删除 sockid 的结点,如果不存在,则 rbtree 返回-1。
188 行:释放 epitem
190 行:释放互斥锁。
- epoll_wait的等待
具体参见nepoll_event_callback(生产者)与nepoll_wait(消费者)
LT与ET
1、et 与lt的区别
- et 边沿触发:当调用recv函数如果没有把协议栈中的recvbuffr中的数据全部读完,就不会继续触发,当客户端再发数据的时候,才会再次被触发。
- lt水平触发:当调用recv函数如果没有把协议栈中的recvbuffr中的数据全部读完,就回一直触发,直到全部读完为止。
2、为什么会有水平触发和边沿触发?
水平触发和边沿触发不是一开始就故意设计出来的,其理念来自于嵌入式的电平的高低变化
3、如何实现水平触发和边沿触发?
et从协议栈中检测到recvbuffer中接收数据就调用回调,水平触发检测recvbuffer有数据就调回调,水平触发和边沿触发代码实现核心是内核通知epoll时执行回调函数的次数的区别。