select:将所有的fd设置成1个标签,通过一个二进制的fd bit set来设置,看哪些fd可读写来判断的。
epoll也是如此。epoll有3个fd。epooll有三个接口:第一个时epoll_create创建epoll对象fd。
将创建的fd交由epoll进行管理。首先创建epoll,再使用epoll_ctl把这些fd加入到epoll中进行管理。然后每次accept返回的时候,创建一个新的连接,再次把它加到epoll进行管理。将fd加完之后,每次用epoll_wait进行判断这些fd当中有哪些可以进行读写操作。如果连接1可读,那么可读的fd会被epoll_wait中的events参数带回来,并且epoll_wait返回1,表示1个fd有事件。
由此可知,epooll可以将fd与事件对应起来。然后一个事件对应一个回调函数,不同事件对应不同的回调函数。一堆的事件就变成了一个核反应堆,核反应堆模式。
reactor:对IO进行集中式的管理,每一个IO对应不同的事件,走不同的回调函数。单独的reactor与多线程、多进程没有关系。但是在做法上,为了提高reactor的性能,提供它的这个逻辑,将这个逻辑说得更清楚,reactor就会有这种多线程或者多进程引入进来。
对于同一个fd,尽管可能同时出现可读和可写的情况,但是epoll一次循环只处理一个事件,要么处理可读、要么处理可写。epoll可以使用readcallback可读回调、writecallback可写回调两个事件处理,也可以只用一个callback回调进行处理。当处理完可读时间后,epoll将这个fd设置成可写,然后等待下一轮epoll_wait循环处理这个fd的可写事件;当处理完可写时间后,epoll将这个fd设置成可读,然后等待下一轮epoll_wait循环处理这个fd的可读事件。
epoll需要将客户端的连接fd listenfd加入到epoll中
水平触发:有数据在buffer缓存就会一直触发,直到用户取完数据才结束。(高平面一直触发,地平面不触发)
边沿触发:当buffer从没有数据到有数据的这个时间点触发,除非buffer再恢复到从无到有数据的过程,否则不再触发(只有从地平面到高平面的瞬间才触发)。
关键函数
int epoll_create(int size);
epoll_create()创建一个epoll实例,返回值为创建epoll的fd,相当于epoll的一个对象。参数max_size标识这个监听的数目最大有多大,从Linux 2.6.8开始,max_size参数将被忽略,但必须大于零。每创建一个epoll句柄,会占用一个fd,因此当不再需要时,应使用close关闭epoll_create()返回的文件描述符,否则可能导致fd被耗尽。当所有文件描述符引用已关闭的epoll实例,内核将销毁该实例并释放关联的资源以供重用。
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
新增事件、删除事件。
函数说明:
fd:要操作的文件描述符
op:指定操作类型
操作类型:
EPOLL_CTL_ADD:往事件表中注册fd上的事件
EPOLL_CTL_MOD:修改fd上的注册事件
EPOLL_CTL_DEL:删除fd上的注册事件
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待时间触发,执行触发函数。
成功时返回就绪的文件描述符的个数,失败时返回-1并设置errno
原始的epoll代码
#include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #include <pthread.h> #include <sys/poll.h> #include <sys/epoll.h> #define MAXLNE 4096 #define POLL_SIZE 1024 int main(int argc, char **argv) { int listenfd, connfd, n; struct sockaddr_in servaddr; char buff[MAXLNE]; if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { printf("create socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) { printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } if (listen(listenfd, 10) == -1) { printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } int epfd = epoll_create(1); //int size struct epoll_event events[POLL_SIZE] = {0}; struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = listenfd; epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); while (1) { int nready = epoll_wait(epfd, events, POLL_SIZE, 5); if (nready == -1) { continue; } int i = 0; for (i = 0;i < nready;i ++) { int clientfd = events[i].data.fd; if (clientfd == listenfd) { struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } printf("accept\n"); ev.events = EPOLLIN; ev.data.fd = connfd; epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); } else if (events[i].events & EPOLLIN) { n = recv(clientfd, buff, MAXLNE, 0); if (n > 0) { buff[n] = '\0'; printf("recv msg from client: %s\n", buff); send(clientfd, buff, n, 0); } else if (n == 0) { // ev.events = EPOLLIN; ev.data.fd = clientfd; epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev); close(clientfd); } } } } close(listenfd); return 0; }
reactor结构
Reactor方式一:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/epoll.h> #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #define BUFFER_LENGTH 4096 #define MAX_EPOLL_EVENTS 1024 #define SERVER_PORT 8888 typedef int NCALLBACK(int ,int, void*); struct ntyevent { int fd; int events; void *arg; int (*callback)(int fd, int events, void *arg); int status; char buffer[BUFFER_LENGTH]; int length; long last_active; }; struct ntyreactor { int epfd; struct ntyevent *events; }; int recv_cb(int fd, int events, void *arg); int send_cb(int fd, int events, void *arg); void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) { ev->fd = fd; ev->callback = callback; ev->events = 0; ev->arg = arg; ev->last_active = time(NULL); return ; } int nty_event_add(int epfd, int events, struct ntyevent *ev) { struct epoll_event ep_ev = {0, {0}}; ep_ev.data.ptr = ev; ep_ev.events = ev->events = events; int op; if (ev->status == 1) { op = EPOLL_CTL_MOD; } else { op = EPOLL_CTL_ADD; ev->status = 1; } if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) { printf("event add failed [fd=%d], events[%d]\n", ev->fd, events); return -1; } return 0; } int nty_event_del(int epfd, struct ntyevent *ev) { struct epoll_event ep_ev = {0, {0}}; if (ev->status != 1) { return -1; } ep_ev.data.ptr = ev; ev->status = 0; epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev); return 0; } int recv_cb(int fd, int events, void *arg) { struct ntyreactor *reactor = (struct ntyreactor*)arg; struct ntyevent *ev = reactor->events+fd; int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0); nty_event_del(reactor->epfd, ev); if (len > 0) { ev->length = len; ev->buffer[len] = '\0'; printf("C[%d]:%s\n", fd, ev->buffer); nty_event_set(ev, fd, send_cb, reactor); nty_event_add(reactor->epfd, EPOLLOUT, ev); } else if (len == 0) { close(ev->fd); printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events); } else { close(ev->fd); printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno)); } return len; } int send_cb(int fd, int events, void *arg) { struct ntyreactor *reactor = (struct ntyreactor*)arg; struct ntyevent *ev = reactor->events+fd; int len = send(fd, ev->buffer, ev->length, 0); if (len > 0) { printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer); nty_event_del(reactor->epfd, ev); nty_event_set(ev, fd, recv_cb, reactor); nty_event_add(reactor->epfd, EPOLLIN, ev); } else { close(ev->fd); nty_event_del(reactor->epfd, ev); printf("send[fd=%d] error %s\n", fd, strerror(errno)); } return len; } int accept_cb(int fd, int events, void *arg) { struct ntyreactor *reactor = (struct ntyreactor*)arg; if (reactor == NULL) return -1; struct sockaddr_in client_addr; socklen_t len = sizeof(client_addr); int clientfd; if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) { if (errno != EAGAIN && errno != EINTR) { } printf("accept: %s\n", strerror(errno)); return -1; } int i = 0; do { for (i = 3;i < MAX_EPOLL_EVENTS;i ++) { if (reactor->events[i].status == 0) { break; } } if (i == MAX_EPOLL_EVENTS) { printf("%s: max connect limit[%d]\n", __func__, MAX_EPOLL_EVENTS); break; } int flag = 0; if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) { printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS); break; } nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor); nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]); } while (0); printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i); return 0; } int init_sock(short port) { int fd = socket(AF_INET, SOCK_STREAM, 0); fcntl(fd, F_SETFL, O_NONBLOCK); struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_port = htons(port); bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr)); if (listen(fd, 20) < 0) { printf("listen failed : %s\n", strerror(errno)); } return fd; } int ntyreactor_init(struct ntyreactor *reactor) { if (reactor == NULL) return -1; memset(reactor, 0, sizeof(struct ntyreactor)); reactor->epfd = epoll_create(1); if (reactor->epfd <= 0) { printf("create epfd in %s err %s\n", __func__, strerror(errno)); return -2; } reactor->events = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent)); if (reactor->events == NULL) { printf("create epfd in %s err %s\n", __func__, strerror(errno)); close(reactor->epfd); return -3; } } int ntyreactor_destory(struct ntyreactor *reactor) { close(reactor->epfd); free(reactor->events); } int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) { if (reactor == NULL) return -1; if (reactor->events == NULL) return -1; nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor); nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]); return 0; } int ntyreactor_run(struct ntyreactor *reactor) { if (reactor == NULL) return -1; if (reactor->epfd < 0) return -1; if (reactor->events == NULL) return -1; struct epoll_event events[MAX_EPOLL_EVENTS+1]; int checkpos = 0, i; while (1) { long now = time(NULL); for (i = 0;i < 100;i ++, checkpos ++) { if (checkpos == MAX_EPOLL_EVENTS) { checkpos = 0; } if (reactor->events[checkpos].status != 1) { continue; } long duration = now - reactor->events[checkpos].last_active; if (duration >= 60) { close(reactor->events[checkpos].fd); printf("[fd=%d] timeout\n", reactor->events[checkpos].fd); nty_event_del(reactor->epfd, &reactor->events[checkpos]); } } int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000); if (nready < 0) { printf("epoll_wait error, exit\n"); continue; } for (i = 0;i < nready;i ++) { struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr; if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { ev->callback(ev->fd, events[i].events, ev->arg); } if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { ev->callback(ev->fd, events[i].events, ev->arg); } } } } int main(int argc, char *argv[]) { unsigned short port = SERVER_PORT; if (argc == 2) { port = atoi(argv[1]); } int sockfd = init_sock(port); struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor)); ntyreactor_init(reactor); ntyreactor_addlistener(reactor, sockfd, accept_cb); ntyreactor_run(reactor); ntyreactor_destory(reactor); close(sockfd); return 0; }
Reactor实现方式二:
#include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #include <sys/poll.h> #include <sys/epoll.h> #include <pthread.h> #define MAXLNE 4096 #define POLL_SIZE 1024 #define BUFFER_LENGTH 1024 #define MAX_EPOLL_EVENT 1024 #define NOSET_CB 0 #define READ_CB 1 #define WRITE_CB 2 #define ACCEPT_CB 3 typedef int NCALLBACK(int fd, int event, void *arg); struct nitem { // fd int fd; int status; int events; void *arg; #if 0 NCALLBACK callback; #else NCALLBACK *readcb; // epollin NCALLBACK *writecb; // epollout NCALLBACK *acceptcb; // epollin #endif unsigned char sbuffer[BUFFER_LENGTH]; // int slength; unsigned char rbuffer[BUFFER_LENGTH]; int rlength; }; struct itemblock { struct itemblock *next; struct nitem *items; }; struct reactor { int epfd; struct itemblock *head; }; int init_reactor(struct reactor *r); int read_callback(int fd, int event, void *arg); int write_callback(int fd, int event, void *arg); int accept_callback(int fd, int event, void *arg); struct reactor *instance = NULL; struct reactor *getInstance(void) { //singleton if (instance == NULL) { instance = malloc(sizeof(struct reactor)); if (instance == NULL) return NULL; memset(instance, 0, sizeof(struct reactor)); if (0 > init_reactor(instance)) { free(instance); return NULL; } } return instance; } int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; if (event == READ_CB) { r->head->items[fd].fd = fd; r->head->items[fd].readcb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLIN; } else if (event == WRITE_CB) { r->head->items[fd].fd = fd; r->head->items[fd].writecb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLOUT; } else if (event == ACCEPT_CB) { r->head->items[fd].fd = fd; r->head->items[fd].acceptcb = cb; r->head->items[fd].arg = arg; ev.events = EPOLLIN; } ev.data.ptr = &r->head->items[fd]; if (r->head->items[fd].events == NOSET_CB) { if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno); return -1; } r->head->items[fd].events = event; } else if (r->head->items[fd].events != event) { if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) { printf("epoll_ctl EPOLL_CTL_MOD failed\n"); return -1; } r->head->items[fd].events = event; } return 0; } int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) { struct reactor *r = getInstance(); struct epoll_event ev = {0}; ev.data.ptr = arg; epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev); r->head->items[fd].events = 0; return 0; } int write_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *sbuffer = R->head->items[fd].sbuffer; int length = R->head->items[fd].slength; int ret = send(fd, sbuffer, length, 0); if (ret < length) { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } else { nreactor_set_event(fd, read_callback, READ_CB, NULL); } return 0; } // 5k qps int read_callback(int fd, int event, void *arg) { struct reactor *R = getInstance(); unsigned char *buffer = R->head->items[fd].rbuffer; #if 0 //ET(边沿触发,需要循环读) int idx = 0, ret = 0; while (idx < BUFFER_LENGTH) { ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0); if (ret == -1) { break; } else if (ret > 0) { idx += ret; } else {// == 0 break; } } if (idx == BUFFER_LENGTH && ret != -1) { // 没读完,继续读 nreactor_set_event(fd, read_callback, READ_CB, NULL); } else if (ret == 0) { // 如果客户端断开,关闭 nreactor_set_event //close(fd); } else { nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #else //LT(水平触发,一次可以读完所有数据) int ret = recv(fd, buffer, BUFFER_LENGTH, 0); if (ret == 0) { // fin nreactor_del_event(fd, NULL, 0, NULL); close(fd); } else if (ret > 0) { unsigned char *sbuffer = R->head->items[fd].sbuffer; memcpy(sbuffer, buffer, ret); R->head->items[fd].slength = ret; printf("readcb: %s\n", sbuffer); nreactor_set_event(fd, write_callback, WRITE_CB, NULL); } #endif } // web server // ET / LT int accept_callback(int fd, int event, void *arg) { int connfd; struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } nreactor_set_event(connfd, read_callback, READ_CB, NULL); } int init_server(int port) { int listenfd; struct sockaddr_in servaddr; char buff[MAXLNE]; if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { printf("create socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(port); if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) { printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } if (listen(listenfd, 10) == -1) { printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } return listenfd; } int init_reactor(struct reactor *r) { if (r == NULL) return -1; int epfd = epoll_create(1); //int size r->epfd = epfd; // fd --> item r->head = (struct itemblock*)malloc(sizeof(struct itemblock)); if (r->head == NULL) { close(epfd); return -2; } memset(r->head, 0, sizeof(struct itemblock)); r->head->items = malloc(MAX_EPOLL_EVENT * sizeof(struct nitem)); if (r->head->items == NULL) { free(r->head); close(epfd); return -2; } memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem))); r->head->next = NULL; return 0; } // accept --> EPOLL int reactor_loop(int listenfd) { struct reactor *R = getInstance(); struct epoll_event events[POLL_SIZE] = {0}; while (1) { int nready = epoll_wait(R->epfd, events, POLL_SIZE, 5); if (nready == -1) { continue; } int i = 0; for (i = 0;i < nready;i ++) { struct nitem *item = (struct nitem *)events[i].data.ptr; int connfd = item->fd; if (connfd == listenfd) { // item->acceptcb(listenfd, 0, NULL); } else { if (events[i].events & EPOLLIN) { // item->readcb(connfd, 0, NULL); } if (events[i].events & EPOLLOUT) { // 一个循环中可以同时处理读事件和写事件 item->writecb(connfd, 0, NULL); } } } } return 0; } int main(int argc, char **argv) { int connfd, n; int listenfd = init_server(9999); nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL); //nreactor_set_event(listenfd, accept_callback, read_callback, write_callback); reactor_loop(listenfd); return 0; }
Reactor模型:
把每一个fd对应item的事件做了一个单独的管理。这样的好处是将IO的管理换成了对事件的管理,重点不是IO上面,是对事件的处理。对事件的设置决定了接下来对IO的操作。