面试被问到:“epoll在Linux内核的实现原理”,从这四个方面回答
1.epoll管理fd的数据结构如何选择?
2.io就绪如何判断(以tcp为例)?
3.epoll的线程安全如何实现?
4.epoll的水平触发和边缘触发如何实现?
先给出答案,随后进行源码讲解
一、答案
1.管理fd的数据结构如何选择?
需求分析:
要求查找性能高:epoll管理大量的fd,epoll_ctl和epoll_wait需要对就绪的fd进行查找,并且查找的频率很高
选项:数组和链表O(N)、红黑树O(lgN)、哈希表O(1)、b树
选择:对象数量较少,时哈希表查找效率最高,但数量大时对内存消耗大(内存指数增长);b\b+树查找性能低于红黑树,主要用于磁盘存储,最终选择红黑树(内存线性增长)
管理就绪的fd使用队列(链表实现)进行存储,就绪的fd优先级为先进先出,确保所有的io都能公平及时处理
2.网络io就绪如何判断(以tcp为例)?
a.tcp三次握手完成,listenfd可读,需要进行accept处理连接请求,调用epoll回调函数,添加到就绪队列中
b.recvbuffer收到数据,注册了EPOLLIN的clientfd可读,调用epoll回调函数,添加到就绪队列中
c.sendbuffer有空间,注册了EPOLLOUT的clienfd可写,调用epoll回调函数,添加到就绪队列中
d.接收到fin包,clientfd可读,需要进行ack回复,调用epoll回调函数,添加到就绪队列中
3.epoll的线程安全如何实现?
epoll的线程安全体现在
a.防止多线程同时对fd在红黑树、就绪队列中的添加、删除、修改
对红黑树进行操作时锁整棵树,因为锁子树太复杂(代码量不确定,可能很大,所以用互斥锁)
b.防止多线程同时对fd在就绪队列中的添加和删除
对就绪队列操作时代码量较小,使用自旋锁
4.水平触发和边沿触发如何实现?
水平触发和边缘触发生在内核协议栈调用epoll的回调函数时
a.水平触发:
“EPOLLIN”: 协议栈判断接收缓冲区有数据,也就是看 event 如果为 EPOLLIN,就一直循环调用回调函数,将对应fd添加到就绪队列中
“EPOLLOUT”: 协议栈判断发送缓冲区有空间,也就是看 event 如果为 EPOLLOUT,就一直循环调用回调函数,将对应fd添加到就绪队列中
b.边缘触发:
“EPOLLIN”: 协议栈判断接收缓冲区的状态变化时才调用回调函数,例如event 如果从 EPOLLOUT 变化为 EPOLLIN 的时候,才会调用回调函数。
“EPOLLOUT”: 协议栈判断event 如果从 EPOLLIN 变化为 EPOLLOUT 的时候,才会调用回调函数
二、epoll源码讲解
1.epoll底层实现最重要的两个数据结构:epitem和eventpoll
epitem的组成:
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; epoll_data_t data; }; struct epitem { RB_ENTRY(epitem) rbn; LIST_ENTRY(epitem) rdlink; int rdy; //exist in list int sockfd; struct epoll_event event; };
可以看到这三个结构体是自上而下层级嵌套的,其中的epoll_event是我们常用的结构体
而epitem就是底层用到的结构体:
epitem 是每一个 IO 所对应的的事件。比如 epoll_ctl (EPOLL_CTL_ADD) 操作的时候,就需要根据传入的epoll_event创建一个 epitem(同时作为红黑树、链表的节点)
eventpoll结构体
struct eventpoll { int fd; ep_rb_tree rbr; int rbcnt; LIST_HEAD( ,epitem) rdlist; int rdnum; int waiting; pthread_mutex_t mtx; //rbtree update pthread_spinlock_t lock; //rdlist update pthread_cond_t cond; //block for event pthread_mutex_t cdmtx; //mutex for cond };
epoll_create会创建一个类型为struct eventpoll的对象,并返回一个与之对应文件描述符,之后应用程序在用户态使用epoll的时候都将依靠这个文件描述符,而在epoll内部也是通过该文件描述符进一步获取到eventpoll类型对象,再进行对应的操作,完成了用户态和内核态的贯穿。
2.epoll三板斧+回调函数
a.epoll_create()
创建一个struct eventpoll实例
int epoll_create(int size) { if (size <= 0) return -1; // epfd --> struct eventpoll int epfd = get_fd_frombitmap(); //tcp, udp struct eventpoll *ep = (struct eventpoll*)rte_malloc("eventpoll", sizeof(struct eventpoll), 0); if (!ep) { set_fd_frombitmap(epfd); return -1; } struct ng_tcp_table *table = tcpInstance(); table->ep = ep; ep->fd = epfd; ep->rbcnt = 0; RB_INIT(&ep->rbr); LIST_INIT(&ep->rdlist); if (pthread_mutex_init(&ep->mtx, NULL)) { free(ep); set_fd_frombitmap(epfd); return -2; } if (pthread_mutex_init(&ep->cdmtx, NULL)) { pthread_mutex_destroy(&ep->mtx); free(ep); set_fd_frombitmap(epfd); return -2; } if (pthread_cond_init(&ep->cond, NULL)) { pthread_mutex_destroy(&ep->cdmtx); pthread_mutex_destroy(&ep->mtx); free(ep); set_fd_frombitmap(epfd); return -2; } if (pthread_spin_init(&ep->lock, PTHREAD_PROCESS_SHARED)) { pthread_cond_destroy(&ep->cond); pthread_mutex_destroy(&ep->cdmtx); pthread_mutex_destroy(&ep->mtx); free(ep); set_fd_frombitmap(epfd); return -2; } return epfd; }
b.epoll_ctl()
EPOLL_CTL_ADD:创建一个节点(epitem),为其注册回调函数,并向添加到红黑树中,
EPOLL_CTL_DEL:删除在红黑树中的一个节点
EPOLL_CTL_MOD:查找红黑树中的一个节点,并修改它的事件类型
int epoll_ctl(int epfd, int op, int sockid, struct epoll_event *event) { struct eventpoll *ep = (struct eventpoll*)get_hostinfo_fromfd(epfd); if (!ep || (!event && op != EPOLL_CTL_DEL)) { errno = -EINVAL; return -1; } if (op == EPOLL_CTL_ADD) { pthread_mutex_lock(&ep->mtx); struct epitem tmp; tmp.sockfd = sockid; struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (epi) { pthread_mutex_unlock(&ep->mtx); return -1; } epi = (struct epitem*)rte_malloc("epitem", sizeof(struct epitem), 0); if (!epi) { pthread_mutex_unlock(&ep->mtx); rte_errno = -ENOMEM; return -1; } epi->sockfd = sockid; memcpy(&epi->event, event, sizeof(struct epoll_event)); epi = RB_INSERT(_epoll_rb_socket, &ep->rbr, epi); ep->rbcnt ++; pthread_mutex_unlock(&ep->mtx); } else if (op == EPOLL_CTL_DEL) { pthread_mutex_lock(&ep->mtx); struct epitem tmp; tmp.sockfd = sockid; struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (!epi) { pthread_mutex_unlock(&ep->mtx); return -1; } epi = RB_REMOVE(_epoll_rb_socket, &ep->rbr, epi); if (!epi) { pthread_mutex_unlock(&ep->mtx); return -1; } ep->rbcnt --; rte_free(epi); pthread_mutex_unlock(&ep->mtx); } else if (op == EPOLL_CTL_MOD) { struct epitem tmp; tmp.sockfd = sockid; struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (epi) { epi->event.events = event->events; epi->event.events |= EPOLLERR | EPOLLHUP; } else { rte_errno = -ENOENT; return -1; } } return 0; }
c.epoll_wait()
从内核的就绪队列中copy出规定数量的节点中的epoll_event结构体copy到用户空间的epoll_event数组中
源码太长了,这里列举copy的部分:
pthread_spin_lock(&ep->lock); int cnt = 0; int num = (ep->rdnum > maxevents ? maxevents : ep->rdnum); int i = 0; while (num != 0 && !LIST_EMPTY(&ep->rdlist)) { //EPOLLET struct epitem *epi = LIST_FIRST(&ep->rdlist); LIST_REMOVE(epi, rdlink); epi->rdy = 0; memcpy(&events[i++], &epi->event, sizeof(struct epoll_event)); num --; cnt ++; ep->rdnum --; } pthread_spin_unlock(&ep->lock); return cnt;
d.epoll_callback()
向就绪队列中添加一个节点
int epoll_callback(struct eventpoll *ep, int sockid, uint32_t event) { struct epitem tmp; tmp.sockfd = sockid; struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp); if (!epi) { printf("rbtree not exist\n"); return -1; } if (epi->rdy) { epi->event.events |= event; return 1; } printf("epoll_event_callback --> %d\n", epi->sockfd); pthread_spin_lock(&ep->lock); epi->rdy = 1; LIST_INSERT_HEAD(&ep->rdlist, epi, rdlink); // 向就绪队列中添加该节点 ep->rdnum ++; pthread_spin_unlock(&ep->lock); pthread_mutex_lock(&ep->cdmtx); pthread_cond_signal(&ep->cond); pthread_mutex_unlock(&ep->cdmtx); return 0; }
至此,epoll的底层原理已经实现!