我们来复现第一种情况:为什么说select会受到1024的限制呢?
//-------------------------------------------// #define __FD_SETSIZE 1024 //-------------------------------------------// //-------------------------------------------// #define __NFDBITS (8 * (int) sizeof (__fd_mask)) //-------------------------------------------// //-------------------------------------------// typedef long int __fd_mask; //-------------------------------------------// /* fd_set for select and pselect. */ typedef struct { /* XPG4.2 requires this member name. Otherwise avoid the name from the global namespace. */ #ifdef __USE_XOPEN __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)->fds_bits) #else __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)->__fds_bits) #endif } fd_set;
这里面很重要的一个部分: __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]
__fd_mask --> long int
__FD_SETSIZE --> 1024
__NFDBITS --> long int
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]-->long int fds_bits[128]-->8 * 128个字节 --> 1024个位-->每一个位对应一个IO,分别有1和0两种状态对应事件的发生或者没有发生。
select的性能权限是什么呢?
我们在写上面select的过程中就能体会到select的一些权限。
1.我们需要重复的复制集合,而且在select的内部,需要将集合从用户区复制到内核区,等有事件到来,又需要将数据从内核区复制到用户区。
2.每次的需要遍历整个IO集合。
select的使用上的缺点在哪呢?
1.参数太多,容易搞错。一共5个参数,用户体验不好。
2.每次需要把待检测的IO集合进行拷贝,对性能有影响。
3.对IO的数量有限制。
poll
对于poll而言,本质上是对select的改进。但是只停留在了表面,并没有解决性能问题,只是在参数层面做了优化和解除了通过宏定义来设置的限制。本质上没啥好说的,我们就来简单实现下吧,熟悉熟悉代码。
#include <sys/socket.h> #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <pthread.h> #include <sys/poll.h> int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(2048); if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10); // 准备一个事件消息的数组,这个长度由用户定义 --> 突破了select通过宏定义写死的缺陷 struct pollfd fds[1024] = {0}; fds[sockfd].fd = sockfd; fds[sockfd].events = POLLIN; int fd_in[1024] = {0}; int maxFd = sockfd; fd_in[sockfd] = 1; while(1) { int nready = poll(fds, maxFd + 1, -1); if(fds[sockfd].revents & POLLIN) { struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); fds[clientfd].fd = clientfd; fds[clientfd].events = POLLIN; fd_in[clientfd] = clientfd; maxFd = clientfd; } int i = 0; for(i = sockfd + 1; i <= maxFd; ++i) { if(fds[i].revents & POLLIN) { char buffer[128] = {0}; int count = recv(i, buffer, 128, 0); if(count == 0) { fds[i].fd = -1; fds[i].events = 0; close(i); continue; } send(i, buffer, count, 0); } } } return 0; }
epoll
epoll对于Linux来说太重要了,如果没有epoll的存在,Linux只能停留在设备相关了开发了,epoll解决了select的问题。从设计层面来说,摒弃了之前的数组思维。那么epoll怎么设计的呢?为了方便理解,我们举一个例子,现在有一个快递站点,有用户,有一个快递员。对于用户来说寄快递只需要将快递放到快递站点,取快递只需要到快递站点去就行了,不用关系之后会不会有用户搬出和新用户的入住,将这层分割出来了。
我们来用下吧。
#include <sys/socket.h> #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <pthread.h> #include <sys/epoll.h> int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(2048); bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr)); listen(sockfd, 10); // 这里的参数没有意义,为了兼用2.6以前版本。传参时需要大于0 int epfd = epoll_create(1); // 一个epoll事件 struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = sockfd; // 把连接事件加入到集合中(本质上是一个红黑树) epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); // 存储epoll事件,这里我们暂定位1024 struct epoll_event events[1024] = {0}; while(1) { int nready = epoll_wait(epfd, events, 1024, -1); int i = 0; for(int i = 0; i < nready; ++i) { int connfd = events[i].data.fd; if(sockfd == connfd) { struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); ev.events = EPOLLIN | EPOLLET; ev.data.fd = clientfd; epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev); } else if(events[i].events & EPOLLIN) { char buffer[10] = {0}; int count = recv(connfd, buffer, 10, 0); if(count == 0) { epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL); close(i); continue; } send(connfd, buffer, count, 0); } } } close(sockfd); return 0; }
用法很简答哈,其实这样的代码有点问题,不能让epoll实现百万级别的并发。这里我们先不讨论,我们后面封住的时候会一步步优化代码的。
epoll有存在两种触发机制,水平触发(LT)和边缘触发(ET),所谓的水平触发就是当缓冲区有数据的时候会一直触发,知道缓冲区为空。而边缘触发是指来数据的时候,触发一次。怎么测出来这样的效果,我们把接收的buffer变小,然后分别修改为ET和LT及上打印,很容易看出区别。篇幅问题,这里就演示了,很简答的。
我们先提出两个问题:
1.epoll里面有没有mmap?
2.epoll是不是线程安全的?
3.什么时候用水平触发合适,什么时候用边缘触发合适?
对于第一个和第二个问题,我们需要阅读epoll的源码(路径:eventpoll.c),这里带大家一起看一下。(源码:The Linux Kernel Archives) --> 建议拷过来
epoll里面有没有mmap?
通过查看eventpoll.c,并没有发现关于mmap相关的API出现,可见epoll中是不包含mmap的。epoll的性能好是取决于他的设计方式和数据结构,减少遍历次数,不需要像select和poll一样需要将整个数组遍历,只需要将返回的列表中就绪的进行操作,使用红黑树结构能够将查找效率从O(N)缩减至log(N),当并发量越大时,效果越明显。
epoll是不是线程安全的?
通过查看eventpoll.c,它的过程是在内核太完成的,不涉及到用户空间和多线程竞争关系。但值得注意的时,如果在,当需要多个线程或者多个进程同时操作同一个epoll的文件描述符时,需要注意红黑树和就绪队列所在的空间存在竞争问题。
什么时候用水平触发合适,什么时候用边缘触发合适?
无论什么场景使用水平触发和边缘触发都可以实现,需要考虑的是在什么时候使用哪种方式会更合适。
边缘触发对于类似于代理的那种方式,比如不对数据进行处理,直接转发的情况下使用边缘触发。当业务处理比较慢的情况,读缓冲区数据比较多的情况,需要处理完业务然后再此去读,这时候使用水平触发。
select和poll使用的都是水平触发方式。
我们总算是把socket基础部分讲完了,现在我们要正式的开始进行reactor的封装了。
reactor
通常情况下,在epoll对IO的处理有两种方式,一种是面向IO的处理模式,一只种是面向事件的处理方式。
面向IO的方式不方便封装和维护,面向事件的方式发生什么事件就调用什么回调函数,例如发生读IO事件,那么就调用响应的读回调函数。换一种说法,通过面向连接的方式能够将IO事件与读写存储以及事件回调封装到一起。
对于reactor的封装,我们有要采用面向事件的方式,事实上reactor也是这么做的。
step01:首先我们需要考虑处理的对象是什么?
---> 能够想到我们需要处理的是IO
step02:然后我们需要考虑如何封装?
--> 对于一个IO来说有哪些属性或者行为,IO对应的文件描述符,读数据、写数据、各种事件的处理。看起来就这么多。
stop03:代码怎么体现呢?
--> 使用结构体或者类将他们组装在一起。所以我们可以设计以下的结构:
struct conn_item { int fd; // 一个IO事件对应的文件描述符 char rbuffer[BUFFER_LENGTH]; // 读缓冲区 int rlen; char wbuffer[BUFFER_LENGTH]; // 写缓冲区 int wlen; // 事件处理 - 连接、读事件、写事件 RCALLBACK accept_callback; RCALLBACK read_callback; RCALLBACK write_callback; };
到目前为止,看起一切都是这么的顺其自然。
stop04:现在我们已经设计好了一个IO需要处理的结构了,那么在使用的过程中,无可厚非会有很多个IO,为了方便我们定义一个数组来存储这些。当然你可以选择其他的数据结构进行存储。
struct conn_item connlist[1024] = {0};
stop05:总的设计思路我们确定了下来,接下来我们需要把整个使用的框架先初步搭起来,因为这里我们使用epoll来做,本质上就是epoll的那一套。
#include <sys/socket.h> #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <sys/poll.h> #include <sys/epoll.h> #define BUFFER_LENGTH 1024 typedef int (*RCALLBACK)(int fd); struct conn_item { int fd; // 一个IO事件对应的文件描述符 char rbuffer[BUFFER_LENGTH]; // 读缓冲区 int rlen; char wbuffer[BUFFER_LENGTH]; // 写缓冲区 int wlen; // 事件处理 - 连接、读事件、写事件 RCALLBACK accept_callback; RCALLBACK read_callback; RCALLBACK write_callback; }; struct conn_item connlist[1024] = {0}; int accept_cb(int fd) { return 0; } int read_cb(int fd) { return 0; } int write_cb(int fd) { return 0; } int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); if(-1 == sockfd) { perror("socket"); return -1; } struct sockaddr_in serveraddr; serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(2048); if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr))) { perror("bind"); return -1; } listen(sockfd, 10); // 将sockfd放入到监视集合中 connlist[sockfd].fd = sockfd; connlist[sockfd].accept_callback = accept_cb; // 这里我们使用epoll int epfd = epoll_create(1); struct epoll_event ev; ev.data.fd = sockfd; ev.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); struct epoll_event events[1024] = {0}; // 就绪事件 while(1) { int nready = epoll_wait(epfd, events, 1024, -1); for(int i = 0; i < nready; ++i) { // 发生什么事件,就处理什么事件 if(events[i].events & EPOLLIN) { // 连接事件 if(events[i].data.fd == sockfd) { // struct sockaddr_in clientaddr; // socklen_t len = sizeof(clientaddr); // int connfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); // ev.data.fd = connfd; // ev.events = EPOLLIN; // epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); // connlist[connfd].fd = connfd; // connlist[connfd].read_callback = read_cb; // connlist[connfd].write_callback = write_cb; connlist[sockfd].accept_callback(sockfd); } else { // 读事件 int connfd = events[i].data.fd; connlist[connfd].read_callback(connfd); } } else if(events[i].events & EPOLLOUT) { // 写事件 int connfd = events[i].data.fd; connlist[connfd].write_callback(connfd); } } } return 0; }
stop06:现在我们做完了大部分工作了,只需要将回调函数实现就可以了。之后我们就开始优化我们打代码。
#include <sys/socket.h> #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <sys/poll.h> #include <sys/epoll.h> #define BUFFER_LENGTH 1024 typedef int (*RCALLBACK)(int fd); int accept_cb(int fd); int read_cb(int fd); int write_cb(int fd); struct conn_item { int fd; // 一个IO事件对应的文件描述符 char rbuffer[BUFFER_LENGTH]; // 读缓冲区 int rlen; char wbuffer[BUFFER_LENGTH]; // 写缓冲区 int wlen; // 事件处理 - 连接、读事件、写事件 RCALLBACK accept_callback; RCALLBACK read_callback; RCALLBACK write_callback; }; struct conn_item connlist[1024] = {0}; int epfd; int accept_cb(int fd) { // 创建连接 struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int connfd = accept(fd, (struct sockaddr*)&clientaddr, &len); struct epoll_event ev; ev.data.fd = connfd; ev.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); connlist[connfd].fd = connfd; connlist[connfd].read_callback = read_cb; connlist[connfd].write_callback = write_cb; return connfd; } int read_cb(int fd) { char* buffer = connlist[fd].rbuffer; int index = connlist[fd].rlen; int count = recv(fd, buffer + index, BUFFER_LENGTH - index, 0); if(0 == count) { epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); close(fd); return 0; } connlist[fd].rlen += count; // TODO printf("buffer:%s\n", buffer); memcpy(connlist[fd].wbuffer, buffer, count); connlist[fd].wlen = connlist[fd].rlen; struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLOUT; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); return count; } int write_cb(int fd) { char* buffer = connlist[fd].wbuffer; int index = connlist[fd].wlen; printf("%s\n", buffer); int count = send(fd, buffer, index, 0); struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); return count; } int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); if(-1 == sockfd) { perror("socket"); return -1; } struct sockaddr_in serveraddr; serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(2480); if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr))) { perror("bind"); return -1; } listen(sockfd, 10); // 将sockfd放入到监视集合中 connlist[sockfd].fd = sockfd; connlist[sockfd].accept_callback = accept_cb; // 这里我们使用epoll epfd = epoll_create(1); struct epoll_event ev; ev.data.fd = sockfd; ev.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); struct epoll_event events[1024] = {0}; // 就绪事件 while(1) { int nready = epoll_wait(epfd, events, 1024, -1); for(int i = 0; i < nready; ++i) { // 发生什么事件,就处理什么事件 if(events[i].events & EPOLLIN) { // 连接事件 if(events[i].data.fd == sockfd) { connlist[sockfd].accept_callback(sockfd); } else { // 读事件 int connfd = events[i].data.fd; connlist[connfd].read_callback(connfd); } } else if(events[i].events & EPOLLOUT) { // 写事件 int connfd = events[i].data.fd; connlist[connfd].write_callback(connfd); } } } close(sockfd); return 0; }
stop07:接下来我们来优化我们的代码,先在代码层进行。把一些公共的操作提取出来
很明显,我们可以看到大量的epoll操作,这部分可以提到一个函数中:
void setEpoll(int fd, int mode) { switch (mode) { case 1: // 添加 { struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); break; } case 2: // EPOLLOUT --> EPOLLIN { struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); break; } case 3: // EPOLLIN --> EPOLLOUT { struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLOUT; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); break; } case 4: // 删除 { epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); break; } default: break; } }
观察者一块,能不能融到一起
我们想一想,这两个逻辑是不会同时发生的,而且发的IO事件肯定不是同一个,因为accept_callback是在sockfd中发生的,而read_callback是在connfd中发生的。每一个IO对应一个conn_item。所以这个很适合使用联合体进行。
【联合体:允许在同一内存空间中存储不同数据类型的成员变量,但同一时间只能使用其中一个成员变量。联合体的所有成员变量从同一个地址开始,占用的空间大小取最大成员变量的大小,这使得联合体在底层内存上非常高效】
更有甚者,连接事件在epoll中也是读事件,而且使用是函数指针,发生事件的IO又不同,所以我们可以这样设计:(其他地方代码最相应修改即可)
struct conn_item { int fd; // 一个IO事件对应的文件描述符 char rbuffer[BUFFER_LENGTH]; // 读缓冲区 int rlen; char wbuffer[BUFFER_LENGTH]; // 写缓冲区 int wlen; // 事件处理 - 连接、读事件、写事件 // union // { // RCALLBACK accept_callback; // RCALLBACK read_callback; // }recv_t; // RCALLBACK accept_callback; RCALLBACK read_callback; RCALLBACK write_callback; };
完整代码
#include <sys/socket.h> #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <sys/poll.h> #include <sys/epoll.h> #define BUFFER_LENGTH 1024 typedef int (*RCALLBACK)(int fd); int read_cb(int fd); int write_cb(int fd); void setEpoll(int fd, int mode); struct conn_item { int fd; // 一个IO事件对应的文件描述符 char rbuffer[BUFFER_LENGTH]; // 读缓冲区 int rlen; char wbuffer[BUFFER_LENGTH]; // 写缓冲区 int wlen; RCALLBACK read_callback; RCALLBACK write_callback; }; struct conn_item connlist[1024] = {0}; int epfd; int accept_cb(int fd) { // 创建连接 struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int connfd = accept(fd, (struct sockaddr*)&clientaddr, &len); printf("connfd:%d\n", connfd); connlist[connfd].fd = fd; connlist[connfd].read_callback = read_cb; connlist[connfd].write_callback = write_cb; setEpoll(connfd, 1); return connfd; } int read_cb(int fd) { char* buffer = connlist[fd].rbuffer; int index = connlist[fd].rlen; int count = recv(fd, buffer + index, BUFFER_LENGTH - index, 0); if(0 == count) { setEpoll(fd, 4); close(fd); return 0; } connlist[fd].rlen += count; // TODO printf("buffer:%s\n", buffer); memcpy(connlist[fd].wbuffer, buffer, count); connlist[fd].wlen = connlist[fd].rlen; setEpoll(fd, 3); return count; } int write_cb(int fd) { char* buffer = connlist[fd].wbuffer; int index = connlist[fd].wlen; printf("%s\n", buffer); int count = send(fd, buffer, index, 0); setEpoll(fd, 2); return count; } void setEpoll(int fd, int mode) { switch (mode) { case 1: // 添加 { struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); break; } case 2: // EPOLLOUT --> EPOLLIN { struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); break; } case 3: // EPOLLIN --> EPOLLOUT { struct epoll_event ev; ev.data.fd = fd; ev.events = EPOLLOUT; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); break; } case 4: // 删除 { epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); break; } default: break; } } int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); if(-1 == sockfd) { perror("socket"); return -1; } struct sockaddr_in serveraddr; serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(2480); if(-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr))) { perror("bind"); return -1; } listen(sockfd, 10); // 将sockfd放入到监视集合中 connlist[sockfd].fd = sockfd; connlist[sockfd].read_callback = accept_cb; // 这里我们使用epoll epfd = epoll_create(1); setEpoll(sockfd, 1); struct epoll_event events[1024] = {0}; // 就绪事件 while(1) { int nready = epoll_wait(epfd, events, 1024, -1); for(int i = 0; i < nready; ++i) { // 发生什么事件,就处理什么事件 int connfd = events[i].data.fd; if(events[i].events & EPOLLIN) { // 读事件 connlist[connfd].read_callback(connfd); } else if(events[i].events & EPOLLOUT) { // 写事件 connlist[connfd].write_callback(connfd); } } } close(sockfd); return 0; }
后续
由于篇幅问题,已经在逐步实现reactor过程中设计到不同的知识点,我们不方便一次完成,如果感兴趣可以点个关注。
后续文章将进行wsl测试已经对buffer进行优化,设计合理的用户缓冲区,而不是使用定长的buffer进行实现。
然后会对代码进行调整,目前的设计不方便提取出来使用,我们最后会封装成一个库的方式,提供一个.h和.c文件,以此方便移植。