在前文已经基于dpdk实现了用户态协议栈,但是有个缺陷就是不能连接多服务端。这也就引出了本文的目的——如何实现自定的 epoll
。
为什么不用系统自带的epoll?
用户态协议栈是指运行在用户态的协议栈,与传统的内核态协议栈相比,它有许多优点,如灵活性、可扩展性、高性能等。因为可以避免内核态和用户态之间频繁的上下文切换,从而提高网络应用的处理性能。
在用户态协议栈中,一般需要自定义 I/O 多路复用机制,以便实现事件驱动的网络编程模型。传统的 Linux 系统提供了 epoll
这种高效的 I/O 多路复用机制,但是无法直接在用户态协议栈中使用。主要原因是,Linux 中的 epoll
实现涉及到了内核态和用户态之间的交互,如果要在用户态协议栈中使用 Linux 的 epoll
,就必须频繁地进行系统调用和内存拷贝,这样会导致性能严重下降,并且违背了使用用户态协议栈的初衷。
因此在实现 epoll
之前,需要先学习内核 epoll
的原理。可以从四个方面入手:
1) epoll
的数据结构
2) epoll
的线程安全
3) epoll
的内核回调
4) epoll
的LT和ET
一、epoll的数据结构
1、epoll的数据结构选择
epoll需要为以下两类fd选择合适的数据结构
1)总集:所有fd的集合
2)就绪:就绪fd的集合
对于总集,首先需要明确几点:
1)每一个fd底层对应了一个tcb,包含了该连接的所有信息。也就是key—value的存储模式
2)对于io主要操作有:查找、删除、修改、插入。对于epoll总集,强查找的频率、数量会远高于其他操作。
3)io数量不确定,有时候多,有时候少。但不管什么时候,性能都不能太差。
因此,基于上面,总集可选择的数据结构有
1)hash —— O(1)
\quad 优点是查询速度快。缺点是创建时候需要指定hash的大小,但是不能太小,因为很可能会面临百万的fd。但是大多时候仅需要管理一小部分的fd,那就浪费了很大的空间。
2)rbtree —— O(lg N)
\quad 查找速度快,且不需分配初始大小。
3)链表 —— O(N)
\quad 随着链表越长,查找性能越来越差。
4)跳表
\quad 随着fd越来越多,分级实现越来越复杂。
而对于就绪fd,不需要查找,仅需挨个取出处理,因此选择双向链表即可。
2、epoll数据结构图
epoll的总集eventpoll和就绪epitem的数据结构如下,List 用来存储准备就绪的 IO,Rbtree 用来存储所有 io 的数据。
epitem
:存储每个 io 对应的事件,每个注册到 eventpoll
的 fd 对应1个 epitem
struct epitem { RB_ENTRY(epitem) rbn; LIST_ENTRY(epitem) rdlink; int rdy; //是否在就绪队列中 int sockfd; struct epoll_event event; //注册事件的类型 };
eventpoll
:管理epoll 对象
struct eventpoll { int fd; ep_rb_tree rbr; int rbcnt; LIST_HEAD( ,epitem) rdlist; int rdnum; int waiting; pthread_mutex_t mtx; //rbtree update pthread_spinlock_t lock; //rdlist update pthread_cond_t cond; //block for event pthread_mutex_t cdmtx; //mutex for cond };
二、epoll的线程安全
epoll
为用户态提供三个接口
1)epoll_create
:创建fd — 创建红黑树的根节点
2)epoll_ctl
:控制fd — EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL分别对应增加、修改、删除结点
3)epoll_wait
:把就绪队列的结点,通过指针赋值的方式,复制到用户态的events
epoll
的线程安全,无非就是保证两个数据结构的能够多线程并行操作
1)多个线程可以同时操作红黑树,即调用epoll_ctl
——> 对红黑树加锁(互斥锁),锁根节点
2)多个线程可以同时复制,即调用epoll_wait
——> 对就绪队列加锁(自旋锁)
三、epoll的内核回调
epoll
回调函数
调用回调函数,在总集红黑树查找epitem
,
1)若存在,将新的要关注的事件掩码添加到已有的事件掩码
2)若不存在,加入到就绪队列,然后唤醒epoll_wai
t。epoll_wait
运行后,将就绪队列里的epitem
cpoy到用户态的events
中,并删除就绪队列里相应的epitem
。
epoll_callback 是生产者,放入结点,唤醒 epoll_wait;epoll_wait 是消费者,消费结点。
int epoll_event_callback(struct eventpoll *ep, int sockid, uint32_t event) { struct epitem tmp; tmp.sockfd = sockid; //在红黑树查找tmp struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (!epi) { printf("rbtree not exist\n"); return -1; } //已在就绪队列中,将新的事件掩码添加到已有的事件掩码 if (epi->rdy) { epi->event.events |= event; return 1; } printf("epoll_event_callback --> %d\n", epi->sockfd); //不在就绪队列,则将结点加入到就绪队列 pthread_spin_lock(&ep->lock); epi->rdy = 1; LIST_INSERT_HEAD(&ep->rdlist, epi, rdlink); ep->rdnum ++; pthread_spin_unlock(&ep->lock); pthread_mutex_lock(&ep->cdmtx); //唤醒epoll_wait pthread_cond_signal(&ep->cond); pthread_mutex_unlock(&ep->cdmtx); return 0; }
epoll
回调时机
一个有四个调用时机
- 三次握手完成之后
tcp 三次握手,对端反馈 ack 后,socket 进入 rcvd 状态。需要将监听 socket 的
event 置为 EPOLLIN,此时标识可以进入到 accept 读取 socket 数据。 - 接收数据回复ACK之后
在 established 状态,收到数据以后,需要将 socket 的 event 置为 EPOLLIN 状态。 - 发送数据收到ACK之后
检测 socket 的 send 状态,如果对端 cwnd>0 ,可以发送的数据。故需要将 socket
置为 EPOLLOUT。 - 接收FIN回复ACK之后
在 established 状态,收到 fin 时,此时 socket 进入到 close_wait。需要 socket 的 event置为 EPOLLIN。读取断开信息。
四、epoll的用户态接口
epoll_create
创建 eventpoll 结构体
int nepoll_create(int size) { if (size <= 0) return -1; //从位图中获取一个可用文件描述符fd int epfd = get_fd_frombitmap(); struct eventpoll *ep = (struct eventpoll*)rte_malloc("eventpoll", sizeof(struct eventpoll), 0); if (!ep) { //若创建失败,从位图删除 set_fd_frombitmap(epfd); return -1; } //获得指向全局结构体 ng_tcp_table 类型的指针 struct ng_tcp_table *table = tcpInstance(); table->ep = ep; //初始化红黑树和就绪队列 ep->fd = epfd; ep->rbcnt = 0; RB_INIT(&ep->rbr); LIST_INIT(&ep->rdlist); if (pthread_mutex_init(&ep->mtx, NULL)) { free(ep); set_fd_frombitmap(epfd); return -2; } if (pthread_mutex_init(&ep->cdmtx, NULL)) { pthread_mutex_destroy(&ep->mtx); free(ep); set_fd_frombitmap(epfd); return -2; } if (pthread_cond_init(&ep->cond, NULL)) { pthread_mutex_destroy(&ep->cdmtx); pthread_mutex_destroy(&ep->mtx); free(ep); set_fd_frombitmap(epfd); return -2; } if (pthread_spin_init(&ep->lock, PTHREAD_PROCESS_SHARED)) { pthread_cond_destroy(&ep->cond); pthread_mutex_destroy(&ep->cdmtx); pthread_mutex_destroy(&ep->mtx); free(ep); set_fd_frombitmap(epfd); return -2; } return epfd; }
epoll_ctl
对红黑树进行增添,修改、删除
int nepoll_ctl(int epfd, int op, int sockid, struct epoll_event *event) { // 通过epfd获取所关联的内核对象,并强转为 eventpoll 结构体类型 struct eventpoll *ep = (struct eventpoll*)get_hostinfo_fromfd(epfd); if (!ep || (!event && op != EPOLL_CTL_DEL)) { errno = -EINVAL; return -1; } // 增加--EPOLL_CTL_ADD if (op == EPOLL_CTL_ADD) { pthread_mutex_lock(&ep->mtx); struct epitem tmp; tmp.sockfd = sockid; //在红黑树查找要增加的结点 struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (epi) { //若存在,则返回 pthread_mutex_unlock(&ep->mtx); return -1; } //若不存在,则创建新的结点epitem,并添加相应的sockfd、event,最后插入到红黑树中 epi = (struct epitem*)rte_malloc("epitem", sizeof(struct epitem), 0); if (!epi) { pthread_mutex_unlock(&ep->mtx); rte_errno = -ENOMEM; return -1; } epi->sockfd = sockid; memcpy(&epi->event, event, sizeof(struct epoll_event)); epi = RB_INSERT(_epoll_rb_socket, &ep->rbr, epi); ep->rbcnt ++; pthread_mutex_unlock(&ep->mtx); } else if (op == EPOLL_CTL_DEL) { //删除--EPOLL_CTL_DEL pthread_mutex_lock(&ep->mtx); struct epitem tmp; tmp.sockfd = sockid; //在红黑树查找要删除的结点 struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (!epi) {//若不存在,则返回 pthread_mutex_unlock(&ep->mtx); return -1; } //若存在,则从红黑树删除 epi = RB_REMOVE(_epoll_rb_socket, &ep->rbr, epi); if (!epi) { pthread_mutex_unlock(&ep->mtx); return -1; } ep->rbcnt --; rte_free(epi); pthread_mutex_unlock(&ep->mtx); } else if (op == EPOLL_CTL_MOD) { //修改--EPOLL_CTL_MOD struct epitem tmp; tmp.sockfd = sockid; struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (epi) { epi->event.events = event->events; epi->event.events |= EPOLLERR | EPOLLHUP; } else { rte_errno = -ENOENT; return -1; } } return 0; }
epoll_wait
监控就绪队列,等待 fd 就绪。若有读写事件发生,从内核拷贝数据到用户空间,加入到事件列表中,并通知等待这些事件的进程或线程;若没有则阻塞等待。
int nepoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) { // 通过epfd获取所关联的内核对象,并强转为 eventpoll 结构体类型 struct eventpoll *ep = (struct eventpoll*)get_hostinfo_fromfd(epfd);; if (!ep || !events || maxevents <= 0) { rte_errno = -EINVAL; return -1; } // if (pthread_mutex_lock(&ep->cdmtx)) { if (rte_errno == EDEADLK) { printf("epoll lock blocked\n"); } } // 1.若就绪队列为空,且timeout != 0 等待 while (ep->rdnum == 0 && timeout != 0) { //设置状态为等待 ep->waiting = 1; if (timeout > 0) { //timeout > 0,等待timeout struct timespec deadline; //计算定时器的超时时间 clock_gettime(CLOCK_REALTIME, &deadline); if (timeout >= 1000) { int sec; sec = timeout / 1000; deadline.tv_sec += sec; timeout -= sec * 1000; } deadline.tv_nsec += timeout * 1000000; if (deadline.tv_nsec >= 1000000000) { deadline.tv_sec++; deadline.tv_nsec -= 1000000000; } //指定的时间deadline内等待条件变量cond的状态改变。 int ret = pthread_cond_timedwait(&ep->cond, &ep->cdmtx, &deadline); if (ret && ret != ETIMEDOUT) { printf("pthread_cond_timewait\n"); pthread_mutex_unlock(&ep->cdmtx); return -1; } timeout = 0; } else if (timeout < 0) { //timeout < 0,一直等待 int ret = pthread_cond_wait(&ep->cond, &ep->cdmtx); if (ret) { printf("pthread_cond_wait\n"); pthread_mutex_unlock(&ep->cdmtx); return -1; } } ep->waiting = 0; //更新状态为不等待 } pthread_mutex_unlock(&ep->cdmtx); //对就绪队列加锁(自旋锁) pthread_spin_lock(&ep->lock); int cnt = 0; int num = (ep->rdnum > maxevents ? maxevents : ep->rdnum); int i = 0; //2.从就绪队列中拷贝事件到用户态数组 while (num != 0 && !LIST_EMPTY(&ep->rdlist)) { //EPOLLET //从就绪队列取出第一个结点 struct epitem *epi = LIST_FIRST(&ep->rdlist); LIST_REMOVE(epi, rdlink); epi->rdy = 0; //将事件拷贝到用户态 memcpy(&events[i++], &epi->event, sizeof(struct epoll_event)); num --; cnt ++; ep->rdnum --; } pthread_spin_unlock(&ep->lock); return cnt; }
五、epoll
的LT和ET
- ET边沿触发,只触发一次
- LT水平触发,如果有事件(比如数据recv_buffer没读完)就一直触发