一、IO操作方式
多路I/O转接服务器
多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。
阻塞等待
- 好处:不占用CPU宝贵时间
- 缺点:同一时间只能处理一个操作,效率低
非阻塞, 忙轮询
- 优点: 提高了程序的执行效率
- 缺点: 需要占用更多的cpu和系统资源
一个任务
多个任务
解决方案:
使用IO多路转接技术 select/poll/epoll
第一种: select/poll
select 代收员比较懒, 她只会告诉你有几个快递到了,但是哪个快递,你需要挨个遍历一遍。
select函数
头文件 #include <sys/select.h> /* According to earlier standards */ #include <sys/time.h> #include <sys/types.h> #include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); nfds: 监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态 readfds: 监控有读数据到达文件描述符集合,传入传出参数 writefds: 监控写数据到达文件描述符集合,传入传出参数 exceptfds: 监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数 timeout: 定时阻塞监控时间,3种情况 1.NULL,永远等下去 2.设置timeval,等待固定时间 3.设置timeval里时间均为0,检查描述字后立即返回,轮询 struct timeval { long tv_sec; /* seconds */ long tv_usec; /* microseconds */ }; void FD_CLR(int fd, fd_set *set); //把文件描述符集合里fd清0 int FD_ISSET(int fd, fd_set *set); //测试文件描述符集合里fd是否置1 void FD_SET(int fd, fd_set *set); //把文件描述符集合里fd位置1 void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0
poll函数
#include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout); struct pollfd { int fd; /* 文件描述符 */ short events; /* 监控的事件 */ short revents; /* 监控事件中满足条件返回的事件 */ }; POLLIN 普通或带外优先数据可读,即POLLRDNORM | POLLRDBAND POLLRDNORM 数据可读 POLLRDBAND 优先级带数据可读 POLLPRI 高优先级可读数据 POLLOUT 普通或带外数据可写 POLLWRNORM 数据可写 POLLWRBAND 优先级带数据可写 POLLERR 发生错误 POLLHUP 发生挂起 POLLNVAL 描述字不是一个打开的文件 nfds 监控数组中有多少文件描述符需要被监控 timeout 毫秒级等待 -1:阻塞等,#define INFTIM -1 Linux中没有定义此宏 0:立即返回,不阻塞进程 >0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值
如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1,poll不再监控此pollfd,下次返回时,把revents设置为0。
select服务器代码
/************************************************************************* > File Name: select_server.c > Author: 杨永利 > Mail: 1795018360@qq.com > Created Time: 2020年10月26日 星期一 10时36分26秒 ************************************************************************/ #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <string.h> #include <sys/socket.h> #include <arpa/inet.h> #include <ctype.h> int main(int argc, char* argv[]) { if(argc<2) { printf("eg: ./a.out port\n"); exit(1); } struct sockaddr_in serv_addr; socklen_t serv_len =sizeof(serv_addr); int port = atoi(argv[1]); // 创建套接字 int lfd=socket(AF_INET,SOCK_STREAM,0); // 初始化服务器 sockaddr_in memset(&serv_addr,0,serv_len); serv_addr.sin_family=AF_INET; serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); serv_addr.sin_port=htons(port); // 绑定IP和端口 bind(lfd,(struct sockaddr*)&serv_addr,serv_len); // 设置同时监听的最大个数 listen(lfd,36); printf("Start accept ......\n"); struct sockaddr_in client_addr; socklen_t cli_len=sizeof(client_addr); // 最大的文件描述符 int maxfd= lfd; // 文件描述符读集合 fd_set reads, temp; // init 初始化 FD_ZERO(&reads); FD_SET(lfd, &reads); while(1) { // 委托内核做IO检测 temp =reads; int ret=select(maxfd+1,&temp,NULL,NULL,NULL); if(ret == -1) { perror("select error\n"); exit(1); } // 客户端发起了新的连接 if(FD_ISSET(lfd, &temp)) { // 接受连接请求 - accept不阻塞 int cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len); if(cfd == -1) { perror("accept error"); exit(1); } char ip[64]; printf("new client IP: %s, Port: %d\n", inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)), ntohs(client_addr.sin_port)); // 将cfd加入到待检测的读集合中 - 下一次就可以检测到了 FD_SET(cfd, &reads); // 更新最大的文件描述符 maxfd = maxfd < cfd ? cfd : maxfd; } // 已经连接的客户端有数据到达 for(int i=lfd+1; i<=maxfd; ++i) { if(FD_ISSET(i, &temp)) { char buf[1024] = {0}; int len = recv(i, buf, sizeof(buf), 0); if(len == -1) { perror("recv error"); exit(1); } else if(len == 0) { printf("客户端已经断开了连接\n"); close(i); // 从读集合中删除 FD_CLR(i, &reads); } else { printf("recv buf: %s\n", buf); send(i, buf, strlen(buf)+1, 0); } } } } return 0; }
poll服务器代码
/************************************************************************* > File Name: poll.c > Author: 杨永利 > Mail: 1795018360@qq.com > Created Time: 2020年10月28日 星期三 11时51分23秒 ************************************************************************/ #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <string.h> #include <sys/socket.h> #include <arpa/inet.h> #include <ctype.h> #include <poll.h> #define SERV_PORT 8989 int main(int argc, const char* argv[]) { int lfd, cfd; struct sockaddr_in serv_addr, clien_addr; int serv_len, clien_len; // 创建套接字 lfd = socket(AF_INET, SOCK_STREAM, 0); // 初始化服务器 sockaddr_in memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; // 地址族 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IP serv_addr.sin_port = htons(SERV_PORT); // 设置端口 serv_len = sizeof(serv_addr); // 绑定IP和端口 bind(lfd, (struct sockaddr*)&serv_addr, serv_len); // 设置同时监听的最大个数 listen(lfd, 36); printf("Start accept ......\n"); // poll结构体 struct pollfd allfd[1024]; int max_index = 0; // init for(int i=0; i<1024; ++i) { allfd[i].fd = -1; } allfd[0].fd = lfd; allfd[0].events = POLLIN; while(1) { int i = 0; int ret = poll(allfd, max_index+1, -1); if(ret == -1) { perror("poll error"); exit(1); } // 判断是否有连接请求 if(allfd[0].revents & POLLIN) { clien_len = sizeof(clien_addr); // 接受连接请求 int cfd = accept(lfd, (struct sockaddr*)&clien_addr, &clien_len); printf("============\n"); // cfd添加到poll数组 for(i=0; i<1024; ++i) { if(allfd[i].fd == -1) { allfd[i].fd = cfd; break; } } // 更新最后一个元素的下标 max_index = max_index < i ? i : max_index; } // 遍历数组 for(i=1; i<=max_index; ++i) { int fd = allfd[i].fd; if(fd == -1) { continue; } if(allfd[i].revents & POLLIN) { // 接受数据 char buf[1024] = {0}; int len = recv(fd, buf, sizeof(buf), 0); if(len == -1) { perror("recv error"); exit(1); } else if(len == 0) { allfd[i].fd = -1; close(fd); printf("客户端已经主动断开连接。。。\n"); } else { printf("recv buf = %s\n", buf); for(int k=0; k<len; ++k) { buf[k] = toupper(buf[k]); } printf("buf toupper: %s\n", buf); send(fd, buf, strlen(buf)+1, 0); } } } } close(lfd); return 0; }
客户端代码
/************************************************************************* > File Name: client.c > Author: 杨永利 > Mail: 1795018360@qq.com > Created Time: 2020年10月27日 星期二 23时20分19秒 ************************************************************************/ #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <arpa/inet.h> int main(int argc, const char* argv[]) { if(argc < 2) { printf("eg: ./a.out port\n"); exit(1); } int port = atoi(argv[1]); // 创建套接字 int fd = socket(AF_INET, SOCK_STREAM, 0); // 连接服务器 struct sockaddr_in serv; memset(&serv, 0, sizeof(serv)); serv.sin_family = AF_INET; serv.sin_port = htons(port); // oserv.sin_addr.s_addr = htonl(); inet_pton(AF_INET, "127.0.0.1", &serv.sin_addr.s_addr); connect(fd, (struct sockaddr*)&serv, sizeof(serv) ); // 通信 while(1) { // 发送数据 char buf[1024]; printf("请输入要发送的字符串: \n"); fgets(buf, sizeof(buf), stdin); write(fd, buf, strlen(buf)); // 等待接收数据 int len = read(fd, buf, sizeof(buf)); if(len == -1) { perror("read error"); exit(1); } else if(len == 0) { printf("服务器端关闭了连接\n"); break; } else { printf("recv buf: %s\n", buf); } } close(fd); return 0; }
第二种: epoll
epoll代收快递员很勤快, 她不仅会告诉你有几个快递到了, 还会告诉你是哪个快递公司的快递
epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
目前epell是linux大规模并发网络程序中的热门首选模型。
epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
可以使用cat命令查看一个进程可以打开的socket描述符上限。
cat /proc/sys/fs/file-max
如有需要,可以通过修改配置文件的方式修改该上限值。
sudo vi /etc/security/limits.conf
在文件尾部写入以下配置,soft软限制,hard硬限制。如下图所示。
* soft nofile 65536 * hard nofile 100000
epoll函数
1.创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。
#include <sys/epoll.h> int epoll_create(int size) size:监听数目
2.控制某个epoll监控的文件描述符上的事件:注册、修改、删除。
#include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) epfd: 为epoll_creat的句柄 op: 表示动作,用3个宏来表示: EPOLL_CTL_ADD (注册新的fd到epfd), EPOLL_CTL_MOD (修改已经注册的fd的监听事件), EPOLL_CTL_DEL (从epfd删除一个fd); event: 告诉内核需要监听的事件 struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; EPOLLIN : 表示对应的文件描述符可以读(包括对端SOCKET正常关闭) EPOLLOUT: 表示对应的文件描述符可以写 EPOLLPRI: 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来) EPOLLERR: 表示对应的文件描述符发生错误 EPOLLHUP: 表示对应的文件描述符被挂断; EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的 EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
3.等待所监控文件描述符上有事件的产生,类似于select()调用。
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) events: 用来存内核得到事件的集合, maxevents: 告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size, timeout: 是超时时间 -1: 阻塞 0: 立即返回,非阻塞 >0: 指定毫秒 返回值: 成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1
epoll服务器代码
/************************************************************************* > File Name: epoll.c > Author: 杨永利 > Mail: 1795018360@qq.com > Created Time: 2020年10月28日 星期三 15时38分00秒 ************************************************************************/ #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <string.h> #include <sys/socket.h> #include <arpa/inet.h> #include <ctype.h> #include <sys/epoll.h> int main(int argc, const char* argv[]) { if(argc<2) { printf("eg: ./a.out port\n"); exit(1); } struct sockaddr_in serv_addr; socklen_t serv_len = sizeof(serv_addr); int port = atoi(argv[1]); // 创建套接字 int lfd = socket(AF_INET, SOCK_STREAM, 0); // 初始化服务器 sockaddr_in memset(&serv_addr, 0, serv_len); serv_addr.sin_family = AF_INET; // 地址族 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IP serv_addr.sin_port = htons(port); // 设置端口 // 绑定IP和端口 bind(lfd, (struct sockaddr*)&serv_addr, serv_len); // 设置同时监听的最大个数 listen(lfd, 36); printf("Start accept ......\n"); struct sockaddr_in client_addr; socklen_t cli_len = sizeof(client_addr); // 创建epoll树根节点 int epfd = epoll_create(2000); // 初始化epoll树 struct epoll_event ev; ev.events=EPOLLIN; ev.data.fd=lfd; epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev); struct epoll_event all[2000]; while(1) { // 使用epoll通知内核fd 文件IO检测 int ret = epoll_wait(epfd,all,sizeof(all)/sizeof(all[0]), -1 ); // 遍历all数组中的前ret个元素 for (int i = 0; i < ret; ++i) { int fd = all[i].data.fd; // 判断是否有新连接 if(fd==lfd) { // 有就接守新连接 int cfd =accept(lfd,(struct sockaddr*)&client_addr,&cli_len ); if (cfd == -1) { perror("accept error\n"); exit(1); /* code */ } // 将新得到的cfd挂到树上 struct epoll_event temp; temp.events =EPOLLIN; temp.data.fd = cfd; epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&temp); // 打印客户端信息 char ip[64]={0}; printf("New client IP :%s ,Port: %d\n", inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)), ntohs(client_addr.sin_port)); } else // 没有新连接 { // c处理已经连接进来的客户端发来的信息 if (!all[i].events & EPOLLIN) { continue; } // 读数据 char buf[1024]={0}; int len = recv(fd,buf,sizeof(buf),0); if (len==-1) { perror("recv error"); exit(1); } else if(len == 0) { printf("client disconnected ....\n"); close(fd); // fd从epoll树上删除 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); } else { printf(" recv buf: %s\n", buf); write(fd, buf, len); } } } } close(lfd); return 0; }
二. 什么是I/O多路转接技术:
- 先构造一张有关文件描述符的列表, 将要监听的文件描述符添加到该表中
- 然后调用一个函数,监听该表中的文件描述符,直到这些描述符表中的一个进行I/O操作时,该函数才返回。
该函数为阻塞函数
函数对文件描述符的检测操作是由内核完成的
- 在返回时,它告诉进程有多少(哪些)描述符要进行I/O操作。