select
主旨思想:
1.首先要构造一个关于文件描述符的列表,将要监听的文件描述符添加到该列表中
2.调用一个系统函数,监听该列表中的文件描述符,直到这些描述符中的一个或多个进行I/O操作时,该函数才返回。
a.这个函数是阻塞的
b.函数对文件描述符的检测的操作是由内核完成的
3.在返回时,它会告诉进程有多少描述符要进行I/O操作
分析:
1.select能够监听的文件描述符个数受限于 FD_SETSIZE,一般为1024,单纯改变进程打开的文件描述符个数并不能改变select监听文件个数。 【默认进程中能够打开的文件描述符个数为1024,历史遗留问题:重新编译linux内核可解决】
2.解决1024以下客户端时使用select是很合适的,但如果连接的客户端过多,select采用轮询模型(NIO),会大幅度降低服务器响应效率【因为select不会告诉应用进程到底是哪个文件描述符有数据到达,所以每次都需要去循环一遍,比如有1000个客户端连接,每次循环的需要有1000次系统调用,是消耗大量的资源】
3.工作过程中存在大量的拷贝工作
4.select了解以下,不需要花费大量时间学习,不划算
图解原理:
相关API
int select(int nfds,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timerval *timeval *timeout);
nfds:监控的文件描述符集里最大文件描述符加1,告诉内核检测多少个文件描述符
readfds:监控有读数据到达文件描述符集合,传入传出参数 ==> 可读事件
writefds:监控写数据到达文件描述符集合,传入传出参数 ==> 可写事件
exceptfds:监控异常发生达到文件描述符集合,传入传出参数 ==> 异常事件
timeout:定时阻塞监控时间
1.NULL 永远等待,直到检测到了文件描述符有变化
2.设置 timeval 等待固定时间 【- tv_sec > 0 tv_usec > 0, 阻塞对应的时间】
3.设置 timeval 时间均为 0,检测描述符后立即返回,轮询
【 - tv_sec = 0 tv_usec = 0, 不阻塞】
1. struct timeval { 2. long tv_sec; /* seconds */ 3. long tv_usec; /* microseconds */ 4. };
void FD_CLR(int fd,fd_set *set); //把文件描述符集合里fd清0
void FD_ISSET(int fd,fd_set *set); //测试文件描述符集合里fd是否置1
void FD_SET(int fd,fd_set *set); //把文件描述符集合里fd位 置1
void FD_ZERO(fd_set *set); //把文件描述符集合里所有未清0
一些说明:
select返回的是所监听的集合中满足条件的总数,通过上面的四个函数可以判断恐惧特发生的事件和具体哪一个满足条件。fd_set是位图机制
select缺点:
1.每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
2.同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
3.select支持的文件描述符数量太小了,默认是1024
4.fds集合不能重用,每次都需要重置
代码案例:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/select.h> #include "wrap.h" #define MAXLINE 80 #define SERV_PORT 8080 int main(void) { int i,n; //创建socket套接字 int listenfd; listenfd = Socket(AF_INET,SOCK_STREAM,0); //绑定 struct sockaddr_in servaddr; bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(SERV_PORT); servaddr.sin_addr.s_addr = htonl(INADDR_ANY); Bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); //设置 Listen(listenfd,20); //连接前 ==> 先交给select int connfd,sockfd; fd_set allset,rset; //allset保留原来的位, int nready; //select返回值,有事件发生的总数位图 int maxfd = listenfd; //最大的文件描述符位置,告诉内核监听范围 FD_ZERO(&allset); //全部置为0 FD_SET(listenfd,&allset); //把监听描述符的位置 置1 struct sockaddr_in cliaddr; socklen_t cliaddr_len; char str[INET_ADDRSTRLEN]; //保存客户端的 IP int client[FD_SETSIZE]; //保存监听的位置 client[i] = 4 表示第4个文件描述符有事件发生 int maxi = -1; //client 的最大下标 char buf[MAXLINE]; for(i = 0;i<FD_SETSIZE;++i) { client[i] = -1; } while(1) { //监听读事件,阻塞。select最好不要设置为轮询的方式 rset = allset; nready = select(maxfd+1,&rset,NULL,NULL,NULL); if(nready < 0) //失败,退出 { // void perr_exit(const char *s) // { // perror(s); // exit(1); // } perr_exit("select error"); } if(FD_ISSET(listenfd,&rset)) //如果为真,有新的连接到达 监听事件不放入client { //有新连接到达,我们需要把连接的客户端的信息拿到,后面要给它返回信息 cliaddr_len = sizeof(cliaddr); //连接 accept,此时Accept不会发生阻塞等待,因为listenfd已经有事件发生 connfd = Accept(listenfd,(struct sockaddr *)&cliaddr,&cliaddr_len); //打印连接的客户端信息 printf("连接来自 %s 在 %d 端口\n", inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port)); //把客户端连接的文件描述符加入到监听队列中,监听该客户端是否有数据传来 for(i = 0; i<FD_SETSIZE; ++i) //通过 for循环总是能得到最小的空闲位置 { if(client[i] < 0) { client[i] = connfd; break; } } //select能监听的文件个数达到上限 1024 if(i == FD_SETSIZE) { fputs("select能监听的文件个数达到上限\n",stdout); //向标准设备打印提示信息 exit(1); } if(i > maxi) { maxi = i; } FD_SET(connfd,&allset); //添加到监听信号集 if(connfd > maxfd) { maxfd = connfd; //maxfd做个迭代 } if(--nready == 0) //如果没有更多的就绪文件描述符,继续回到select阻塞监听 { continue; } } for(i = 0;i <= maxi;++i) //检测哪一个 clients有数据就绪 { sockfd = client[i]; if(client[i] < 0) { continue; } if(FD_ISSET(sockfd,&rset)) { if((n = Read(sockfd,buf,MAXLINE)) == 0) //与客户端关闭连接 { Close(sockfd); FD_CLR(sockfd,&allset); //解除select监听此文件描述符 client[i] = -1; } int j; for(j=0;j<n;++j) { buf[j] = toupper(buf[j]); } Write(sockfd,buf,n); if(--nready == 0) { break; } } } } Close(listenfd); return 0; }
客户端代码前面案例相同
poll
poll只针对Linux有效,poll模型是基于select最大文件描述符限制提出的,跟select一样,只是将select使用的三个基于位的文件描述符(readfds/writefds/exceptfds)封装成了一个结构体,然后通过数组的是形式来突破最大文件描述符的限制。
#include <poll.h>
int poll(struct pollfd *fds,nfds_t nfds,int timeout);
参数:
fds: 数组的首地址
nfds_t: 监控数组中有多少文件描述符需要被监控
timeout:
-1 阻塞等待
0 立即返回,不阻塞
>0 等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值
struct pollfd
{
int fd; //文件描述符
short events; //监控的事件
short revents; //监控事件中满足条件返回的事件
};
- 返回值: -1 : 失败 >0(n)
成功,n表示检测到集合中有n个文件描述符发生变化
如果不再监控某一个文件描述符时,可以把pollfd中,fd设置为-1,poll不再监控此pollfd,下次返回时,把revents设置为0
代码案例:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include <poll.h> #include <errno.h> #include "wrap.h" #define MAXLINE 80 #define SERV_PORT 8080 #define OPEN_MAX 1024 //监控的最多的数量 int main(void) { int i,n,j; //创建socket int listenfd = Socket(AF_INET,SOCK_STREAM,0); //绑定 struct sockaddr_in servaddr; bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT); Bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); //设置监听 Listen(listenfd,20); //poll struct pollfd fds[OPEN_MAX]; //记录监听的文件描述符 nfds_t maxi = -1; //最大的那个监听文件描述符 int nready; //将设置为监听描述符 fds[0].fd = listenfd; fds[0].events = POLLRDNORM; //监听为普通事件 maxi = 0; //client[i] = -1 表示文件描述符 i 不处于监听状态 for(i=1;i<OPEN_MAX;++i) { fds[i].fd = -1; } //客户端信息 struct sockaddr_in cliaddr; socklen_t cliaddr_len; int connfd,sockfd; char str[INET_ADDRSTRLEN]; char buf[MAXLINE]; while(1) { nready = poll(fds,maxi+1,-1); //-1表示阻塞等待 if(fds[0].revents & POLLRDNORM) // &位操作 有客户端连接请求 { cliaddr_len = sizeof(cliaddr); connfd = Accept(listenfd,(struct sockaddr *)&cliaddr,&cliaddr_len); //打印客户端信息 printf("连接来自 %s 在端口 %d\n", inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)), ntohs(cliaddr.sin_port)); //将 connfd 加入到监听数组 for(i=1;i<OPEN_MAX;++i) { if(fds[i].fd < 0) { fds[i].fd = connfd; break; } } //判断监听事件是否超过最大限制 if(i == OPEN_MAX) { perr_exit("too many clients\n"); } fds[i].events = POLLRDNORM; if(i > maxi) { maxi = i; } if(--nready <= 0) { continue; } } for(i = 1;i<= maxi;++i) { sockfd = fds[i].fd; if(fds[i].fd < 0) { continue;; } if(fds[i].revents & (POLLRDNORM|POLLERR)) { if((n = Read(sockfd,buf,MAXLINE)) < 0) { if(errno == ECONNRESET) //sockfd 不监听了 { printf("fds[%d] aborted connection\n",i); Close(sockfd); fds[i].fd = -1; } else { perr_exit("read error"); } } else if(n == 0) { printf("fds[%d] closed connection\n",i); Close(sockfd); fds[i].fd = -1; } else { for(j=0;j<n;++j) { buf[j] = toupper(buf[j]); } Writen(sockfd,buf,n); } if(--nready <= 0) { break; } } } } return 0; }
最重要的epoll(单独拿出来)
epoll是Linux下多路复用IO接口select/poll的增强版本,它能够显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被监听的文件描述符集合,另外一点就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒儿加入Ready队列的描述符集合就行了。
目前epoll是Linux大规模并发网络程序中的热门首先模型
epoll除了提供select/poll那种IO事件的水平触发(LT)外,还提供了边缘触发(ET)。这就使得用户空间程序有可能缓冲IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率
可以使用cat命令查看一个进程可以打开的socket描述符上限
cat /proc/sys/fs/file-max
也可以修改配置文件的方式修改该上限
sudo vi /etc/security/limits.conf
在文件尾部写入一下配置,soft软限制,hard硬限制
soft nofile 65536
hard nofile 100000
基础API
1.创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关
#include <sys/epoll.h>
int epoll_create(int size);
- 参数: size : 目前没有意义了。随便写一个数,必须大于0
- 返回值: -1 : 失败 > 0 : 文件描述符,操作epoll实例的
创建一个新epoll实例。在内核中创建了一个数据,这个数据中有两个比较重要的数据,一个是需要检测的文件描述符的信息(红黑树),还有一个是就绪列表,存放检测到数据发送改变的文件描述符信息(双向链表)
2.控制某个epoll监控的文件描述符上的事件,注册、修改、删除
int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event);
epfd: 为epoll_create的句柄
op:表示动作,用3个宏来表示
EPOLL_CTL_ADD:注册新的fd到epfd
EPOLL_CTL_MOD:修改已经注册的fd的监听事件
EPOLL_CTL_DEL:从epfd删除一个fd
event:告诉内核需要监听的事件
struct epoll_event{
_uint32_t events; // Epoll events
epoll_data data; //user data variable
};
typedef union epoll_data {
void *ptr; //回调函数
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
常见的Epoll检测事件:
- EPOLLIN
- EPOLLOUT
- EPOLLERR
3.等待所监听文件描述符上有事件产生,类似于select()调用
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
- 参数:
- epfd : epoll实例对应的文件描述符
- events : 传出参数,保存了发送了变化的文件描述符的信息
- maxevents : 第二个参数结构体数组的大小
- timeout : 阻塞时间
0 : 不阻塞 -
1 : 阻塞,直到检测到fd数据发生变化,解除阻塞
> 0 : 阻塞的时长(毫秒)
- 返回值:
- 成功,返回发送变化的文件描述符的个数 > 0
- 失败 -1
代码案例:
#include <stdio.h> #include <stdlib.h> #include <netinet/in.h> #include <string.h> #include <sys/epoll.h> #include <arpa/inet.h> #include "wrap.h" #define MAXLINE 80 #define SERV_PORT 8080 #define OPEN_MAX 1024 int main(void) { int i,n,j,ret; //创建套接字 int listenfd = Socket(AF_INET,SOCK_STREAM,0); //绑定 struct sockaddr_in servaddr; bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(SERV_PORT); servaddr.sin_addr.s_addr = htonl(INADDR_ANY); Bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); //设置监听 Listen(listenfd,20); //epoll int client[OPEN_MAX]; int maxi = -1; for(i=0;i<OPEN_MAX;++i) { client[i] = -1; } maxi = -1; //创建一个 epoll 句柄 int efd = epoll_create(OPEN_MAX); if(efd == -1) { perr_exit("epoll_create"); } //设置连接 int nready; struct epoll_event tep,ep[OPEN_MAX]; tep.events = EPOLLIN; tep.data.fd = listenfd; ret = epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep); if(ret == -1) { perr_exit("epoll_ctl"); } //客户端信息 struct sockaddr_in cliaddr; socklen_t cliaddr_len; char str[INET_ADDRSTRLEN]; int connfd,sockfd; char buf[MAXLINE]; while(1) { nready = epoll_wait(efd,ep,OPEN_MAX,-1); //-1表示阻塞 if(nready == -1) { perr_exit("epoll_wait"); } for(i=0;i<nready;++i) { if(!ep[i].events & EPOLLIN) { continue; } if(ep[i].data.fd == listenfd) //有新客户端连接 { cliaddr_len = sizeof(cliaddr); connfd = Accept(listenfd,(struct sockaddr *)&cliaddr,&cliaddr_len); printf("连接来自 %s 在端口 %d\n", inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port)); for(j=0;j<OPEN_MAX;++j) { if(client[j] < 0) { client[j] = connfd; break; } } if(j == OPEN_MAX) { perr_exit("too many clients"); } if(j > maxi) { maxi = j; } tep.events = EPOLLIN; tep.data.fd = connfd; ret = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep); if(ret == -1) { perr_exit("epoll_ctl"); } // if(--nready <= 0) // { // continue; // } } else { sockfd = ep[i].data.fd; n = Read(sockfd,buf,MAXLINE); if(n == 0) { for(j =0;j <= maxi;++j) { if(client[j] == sockfd) { client[j] = -1; break; } } ret = epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL); if(ret == -1) { perr_exit("epoll_ctl"); } Close(sockfd); printf("cliend[%d] closed connect\n",j); } else { for(j =0 ;j<n;++j) { buf[j] = toupper(buf[j]); } Writen(sockfd,buf,n); } } } } Close(listenfd); Close(efd); return 0; }
epoll进阶
事件模式:
EPOLL事件有两种模型 ==>
Edge Triggred(ET) 边缘触发:只有数据到来才触发,不管缓存区中是否还有数据
Level Triggered(LT)水平触发:水平触发只要有数据都会触发
案例:
1.假定我们已经把一个用来从管道中读取数据的文件描述符(RFD)添加到epoll描述符
2.管道的另一端写入了 2KB 的数据
3.调用epoll_wait,并且它会返回RFD,说明它已经准备好读取操作
4.读取1KB的数据
5.调用 epoll_wait...
在这个过程中,有两种工作模式
client ----> 1000B
epoll_wait(cfd);
read(500B) 已读 500B
水平触发:触发epoll,直到读完
边缘触发:不告诉了,不触发epoll,除非有新的数据到达
ET模式
ET模式即Edge Triggered 工作模式
如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候ET工作模式才会汇报事件。因此在第5步的时候调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。epoll工作在ET模式的时候,必须使用非阻塞套接字,以避免由于一个文件句柄的阻塞读、阻塞写操作把处理多个文件描述符的任务饿死。
1).基于非阻塞文件句柄
2).只有当read或者write返回EAGAIN(非阻塞读,暂时无数据)时才挂起、等待。但这并不意味着说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的读到的数据长度小于请求的数据长度时,就可以确定缓冲区已经没有数据了,也就可以认为此事件已处理完成
LT模式
与LT模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用。
LT:LT是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以堆这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
ET:ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变成就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意:如果一直不对这个fd作IO操作(从而导致它不再变成未就绪),内核不会发送更多的通知.
代码案例:
基于网络C/S非阻塞模型的epoll ET触发模式
#include <stdio.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include <signal.h> #include <sys/wait.h> #include <sys/types.h> #include <sys/epoll.h> #include <unistd.h> #include <fcntl.h> #include <sys/socket.h> #define MAXLINE 10 #define SERV_PORT 8080 int main(void) { int res; //创建socket套接字 int listenfd = socket(AF_INET,SOCK_STREAM,0); //绑定 struct sockaddr_in servaddr; servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT); bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); //设置监听 listen(listenfd,20); //epoll + ET设置 struct epoll_event event; struct epoll_event resevent[10]; int efd = epoll_create(10); event.events = EPOLLIN | EPOLLET; //ET边缘触发 默认是水平触发 //保存客户端信息 int connfd,len; char str[INET_ADDRSTRLEN]; char buf[MAXLINE]; struct sockaddr_in cliaddr; socklen_t cliaddr_len = sizeof(cliaddr); connfd = accept(listenfd,(struct sockaddr *)&cliaddr,&cliaddr_len); printf("连接的来自 %s 在端口 %d\n", inet_ntop(AF_INET,&servaddr.sin_addr,str,sizeof(str)),ntohs(cliaddr.sin_port)); //设置connfd 非阻塞 int flag = fcntl(connfd,F_GETFL); flag |= O_NONBLOCK; fcntl(connfd,F_SETFL,flag); event.data.fd = connfd; epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&event); while(1) { printf("epoll_wait begin\n"); res = epoll_wait(efd,resevent,10,-1); printf("epoll_wait end res %d\n",res); if(resevent[0].data.fd == connfd) { while((len = read(connfd,buf,MAXLINE/2)) > 0) { write(STDOUT_FILENO,buf,len); } } } return 0; }
#include <stdio.h> #include <string.h> #include <unistd.h> #include <netinet/in.h> #include "wrap.h" #define MAXLINE 10 #define SERV_PORT 8080 #define SERV_IP "127.0.0.1" int main(void) { int i; char ch = 'a'; char buf[MAXLINE]; //创建套接字 int sockfd; sockfd = socket(AF_INET,SOCK_STREAM,0); //连接 struct sockaddr_in servaddr; bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET,SERV_IP,&servaddr.sin_addr); servaddr.sin_port = htons(SERV_PORT); connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); while(1) { for(i=0;i<MAXLINE/2;++i) { buf[i] = ch; } buf[i-1] = '\n'; ch++; for(;i<MAXLINE;++i) { buf[i] = ch; } buf[i-1] = '\n'; ch++; write(sockfd,buf,sizeof(buf)); sleep(10); } close(sockfd); return 0; }