I/O多路复用
前言
文本相关参考资料及部分内容来源
- 《Linux高性能服务器编程》
- 《TCP/IP网络编程》
- 《Linux/UNIX系统编程手册》
- I/O多路复用核心思想为,使用一个线程,来处理多个客户端的请求。
- 或者说,使用一个特殊的fd,监视多个fd。
- 使得程序能同时监听多个文件描述符,这对提高程序的性能至关重要。
通常,网络程序在下列情况下需要使用I/O多路复用技术。
- 客户端程序需要同时处理多个socket。
- 客户端程序要同时处理用户输入和网络连接。
TCP服务器要同时处理监听socket和连接socket。这是I/O复用使用最多的场合
。- 服务器要同时处理TCP请求和UDP请求。
- 服务器要同时监听多个端口,或处理多种服务。
select
select-函数
- 在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。
- select
- 函数原型:
int select(
int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout
);
参数:
- nfds: 被监听的文件描述符的总数。
- readfds: 将所有关注"是否存在待读取数据"的文件描述符注册到fd_set型变量中(存入fd_set数组),并传递其地址值。-fd_set文件描述符集合,注意传递记得备份,因为调用
select后会将其重置
。- writefds: 将所有关注"是否存在无阻塞数据(可写入)"的文件描述符注册到fd_set型变量中(存入fd_set数组),并传递其地址值。
- exceptfds: 将所有关注"是否发生异常"的文件描述符注册到fd_set型变量中,并传递其地址值。
timeout: 用来设定select的阻塞时间上限。
- 指定为NULL将会
一直阻塞
,直到某个文件描述符就绪。- 指定为一个timeval结构体,详见timeval结构体。
返回值:
- -1: 表示发生错误。
- 0: 表示超时。
- \>0: 表示有一个或多个文件描述符已达到就绪态,返回值表示处于就绪态的文件描述符个数。[三个集合中就绪的fd数量总和,也就是说,如果一个fd在三个fd_set数组中,三种事件都就绪了,会存在重复累计(对于同一个fd来说)。]
timeval-结构体
- 结构体定义
struct timeval{
long tv_sec; // 秒
long tv_usec; // 微秒
};
- 如果结构体 timeval 的两个域都为 0 的话,此时 select()不会阻塞,它只是简单地轮询指定的文件描述符集合,看看其中是否有就绪的文件描述符并立刻返回。
- 否则,timeout 将为 select指定一个等待时间的上限值。
fd_set-文件描述符集合
- 在fd_set变量中各注册或更改值的操作都由以下四个宏完成。
- 将fdset所指向的文件描述符集合初始化为空。
void FD_ZERO(fd_set *fdset);
- 将文件描述符fd,从fdset所指向的文件描述集合中移除。
void FD_CLR(int fd, fd_set *fdset);
- 将文件描述符fd,添加到fdset所指向的文件描述集合中。
void FD_SET(int fd, fd_set *fd_set);
检查指定的文件描述符fd,是否在fdset所指向的文件描述集合中。
- 存在返回非0,反之返回0。
int FD_ISSET(int fd, fd_set *fdset);
文件描述符就绪条件
以下情况socket可读:
- socket内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地读取该socket,并且读操作将返回的字节数大于0。
- socket通信的对方关闭连接。此时对该socket的读操作将返回0。
- 监听socket上有新的连接请求。
- socket上有未处理的错误。此时我们可以使用getsockop来读取和清除该错误。
以下情况socket可写:
- socket内核发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
- socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
- socket使用非阻塞connect连接成功或失败(超时)之后。
- socket上有未处理的错误,此时我们可以使用getsockopt来读取和清除该错误。
异常情况:
- 网络程序中,select能处理的异常情况只有一种: socket上接收到带外数据。
示例
- server.c
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
int errno;
int main(void){
int server_sockfd,client_sockfd;
int server_len,client_len;
struct sockaddr_in server_address;
struct sockaddr_in client_address;
int result;
//两个文件描述符集合
fd_set readfds,testfds;//readfds用于检测输出是否就绪的文件描述符集合
server_sockfd = socket(AF_INET,SOCK_STREAM,0); // 建立服务端socket
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = htonl(INADDR_ANY);
server_address.sin_port = htons(9000);
server_len = sizeof(server_address);
bind(server_sockfd,(struct sockaddr*)&server_address,server_len);
listen(server_sockfd,5);// 监听队列最多容纳5个
FD_ZERO(&readfds);// 清空置0
FD_SET(server_sockfd,&readfds);// 将服务端socket加入到集合中
/*
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
*/
while(1){
char ch;
int fd;
int nread;
testfds = readfds;//相当于备份一份,因为调用select后,传进去的文件描述符集合会被修改。
struct timeval my_time;
my_time.tv_sec = 2;
my_time.tv_usec = 0;
printf("server waiting\n");
// 监视server_sockfd与client_sockfd
//result = select(FD_SETSIZE,&testfds,(fd_set*)0,(fd_set*)0,(struct timeval* )0); //无限期阻塞,并测试文件描述符变动
result = select(FD_SETSIZE,&testfds,(fd_set*)0,(fd_set*)0,&my_time); //根据my_time中设置的时间进行等待,超过继续往下执行。
if(result < 0){//有错误发生
perror("select errno");
exit(1);
}else if(result == 0){//超过等待时间,未响应
FD_ZERO(&readfds);// 清空置0
FD_SET(server_sockfd,&readfds);// 将服务端socket重新加入到集合中
printf("no connect request \n");
continue;//没有响应的就别下去遍历了
}
//扫描所有的文件描述符(遍历所有的文件句柄),是件很耗时的事情,严重拉低效率。
for(fd = 0;fd<FD_SETSIZE;fd++){
//找到相关文件描述符,判断是否在testfds这个文件描述符集合中。
if(FD_ISSET(fd,&testfds)){
//判断是否为服务器套接字,是则表示为客户端请求连接
if(fd == server_sockfd){
client_len = sizeof(client_address);
client_sockfd = accept(server_sockfd,(struct sockaddr*)&client_address,&client_len);
FD_SET(client_sockfd,&readfds);//将客户端socket加入到集合中,用来监听是否有数据来。
printf("adding client on fd %d\n",client_sockfd);;
}else{// 客户端来消息了
//获取接收缓存区中的字节数
ioctl(fd,FIONREAD,&nread);//即获取fd来了多少数据
//客户端数据请求完毕,关闭套接字,并从集合中清除相应的套接字描述符
if(nread ==0){
close(fd);
FD_CLR(fd,&readfds);//去掉关闭的fd
printf("removing client on fd %d\n", fd);
}else{//处理客户数请求
read(fd,&ch,1);
sleep(5);
printf("serving client on fd %d\n", fd);
ch++;
write(fd, &ch, 1);
}
}
}
}
}
return 0;
}
- client.c
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/time.h>
int main(){
int client_sockfd;
int len;
struct sockaddr_in address;//服务器端网络地址结构体
int result;
char ch = 'A';
client_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立客户端socket
address.sin_family = AF_INET;
address.sin_addr.s_addr = inet_addr("127.0.0.1");
address.sin_port = htons(9000);
len = sizeof(address);
result = connect(client_sockfd, (struct sockaddr*)&address, len);
if (result == -1){
perror("oops: client2");
exit(1);
}
//第一次读写
write(client_sockfd, &ch, 1);
read(client_sockfd, &ch, 1);
printf("the first time: char from server = %c\n", ch);
sleep(5);
//第二次读写
write(client_sockfd, &ch, 1);
read(client_sockfd, &ch, 1);
printf("the second time: char from server = %c\n", ch);
close(client_sockfd);
return 0;
}
poll
poll函数
- 与select类似,也是在指定事件内轮询一定的数量的文件描述符,看其中是否有就绪的。
- poll
- 函数原型:
int poll(
struct pollfd *fds,
nfds_t nfds,
int timeout
);
参数:
- fds: pollfd类型的数组,它存储所有我们该兴趣的文件描述符上发生的可读、可写和异常事件。结构体定义详见pollfd结构体。
- nfds: 数组fds中的元素个数,类型为nfds_t无符号整型。
timeout: 超时等待时间。
- -1: 一直阻塞,直到某个事件发生。
- 0: 调用后不等待立即返回。
返回值:
- -1: 表示发生错误。
- 0: 表示超时。
- \>0: 表示fds中有这么多个文件描述符处于就绪态了。即fds中拥有非零revents字段的pollfd结构体数量。
pollfd-结构体
- 结构体定义:
struct pollfd {
int fd;
short events;
short revents;
};
- 参数:
- fd:
文件描述符
。- events:
注册的事件
。如下图所示。- revents: 实际发生的事件,由
内核填充
。
示例
- server.c
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>
#include <poll.h>
#define MAX_FD 8192 //最大文件标识符
struct pollfd fds[MAX_FD];
int cur_max_fd = 0; //当前要监听的最大文件描述符+1,减少要遍历的数量。
int main(void){
int server_sockfd,client_sockfd;
int server_len,client_len;
struct sockaddr_in server_address,client_address;
int result;
server_sockfd = socket(AF_INET,SOCK_STREAM,0);//服务端socket
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = htonl(INADDR_ANY);
server_address.sin_port = htons(9000);
server_len = sizeof(server_address);
bind(server_sockfd,(struct sockaddr*)&server_address,server_len);
listen(server_sockfd,5);
//添加待监测文件描述符到fds数组中
fds[server_sockfd].fd = server_sockfd;
fds[server_sockfd].events = POLLIN;
fds[server_sockfd].revents = 0;
if(cur_max_fd <= server_sockfd){
cur_max_fd = server_sockfd+1;
}
while(1){
char ch;
int i,fd;
int nread;
printf("server waiting\n");
result = poll(fds,cur_max_fd,1000);
if(result <0){
perror("server5");
exit(1);
}else if(result == 0){
printf("no connect,end waiting\n");
}else{//大于0,返回的是fds中处于就绪态的文件描述符个数。
}
//扫描文件描述符
for(i = 0; i < cur_max_fd;i++){
if(fds[i].revents){//有没有结果,没有结果说明该文件描述符上还未发生事件。
fd= fds[i].fd;
//判断是否为服务器套接字,是则表示为客户端请求连接
if(fd == server_sockfd){
client_len = sizeof(client_address);
client_sockfd = accept(server_sockfd,(struct sockaddr*)&client_address,&client_len);
fds[client_sockfd].fd = client_sockfd;
fds[client_sockfd].events = POLLIN;
fds[client_sockfd].revents = 0;
if(cur_max_fd <= client_sockfd){
cur_max_fd = client_sockfd + 1;
}
printf("adding client on fd %d\n",client_sockfd);
}else{//客户端socket中有数据请求
if(fds[i].revents & POLLIN){//读
nread = read(fd,&ch,1);
if(nread == 0){
close(fd);
memset(&fds[i],0,sizeof(struct pollfd));
printf("removing client on fd %d\n",fd);
}else{//写
sleep(5);
printf("serving client on fd %d,receive: %c\n",fd,ch);
ch++;
fds[i].events = POLLOUT;//添加一个写事件监听
}
}else if(fds[i].revents & POLLOUT){//写
write(fd,&ch,1);
fds[i].events = POLLIN;
}
}
}
}
}
return 0;
}
- client.c——同select中的实例。
epoll
- epoll是
Linux特有的I/O复用函数
。它在实现和使用上与select和poll有很大的差异。- epoll使用
一组函数
来完成任务,而不是单个函数
。- epoll把用户关心的文件描述符上的事件放在内核的一个
事件表
中,不需要像select与poll那样每次都要重复传入文件描述符集合或是事件集。epoll需要使用一个额外的文件描述符,在内核中唯一标识这个事件表
。
epoll_create-创建epoll
- epoll_create
- 功能: 创建一个epoll实例。
- 函数原型:
int epoll_create(int size);
参数:
- size: 现已被抛弃,只是给内核一个提示,告诉它事件表需要多大。
- 返回值:
返回创建的epoll实例文件描述符,在其它epoll相关函数中指定要访问的内核事件表
。
epoll_ctl-操作对应内核事件表
- epoll_ctl
- 功能: 操作对应epoll的内核事件表,进行添加/删除/修改指定fd的事件。
- 函数原型:
int epoll_ctl( int epfd, int op, int fd, struct epoll_event *event );
参数:
- epfd: epoll实例,用来指定要访问的内核事件表。
op: 用来指定需要执行的操作。
- EPOLL_CTR_ADD: 往事件表上注册fd上的事件。
- EPOLL_CTR_MOD: 修改fd上的注册事件。
- EPOLL_CTR_DEL: 删除fd上的注册事件。
- fd: 要进行op操作的文件描述符。
- event: 为一个指向epoll_event结构体的指针。结构体定义如下event_event-结构体所示:
返回值:
- 成功: 返回0。
- 失败: 返回-1并设置errno。
epoll_event-结构体
- 结构体定义:
struct epoll_event{
uint32_t events;
epoll_data_t data;
};
参数:
- events: epoll事件,如下图所示。
- data: 用户数据,详见epoll_data_t-结构体。
epoll_data_t-结构体
- 结构体定义:
typedef union epoll_data{
void *ptr; // 用户自定义使用
int fd; // 指定事件文件描述符
uint32_t u32;
uint64_t u64;
}epoll_data_t;
参数:
- ptr: 可用来指定与fd相关的用户数据。
- fd: 指定事件所从属的目标文件描述符。
注意:
- epoll_data_t是一个联合体,所以我们只能使用fd或ptr其中一个成员。
- 如果要将文件描述符和用户数据关联起来,以实现快速的数据访问,可以放弃使用epoll_data_t中的fd成员,而是在ptr所指向的自定义用户数据中包含fd。
补充:
当我们调用epoll_wait后,evlist数组中的epoll_event每个data参数为我们在一开始(即调用epoll_ctl)所指定的内容,比如像上面所说的我们指定了自定义数据ptr,最终某一fd产生了我们监视的事件,我们可以在其对应的epoll_event的data中取到。
例如下方epoll-简易web服务器中的_ConnectStat结构体。
epoll_wait-事件等待
- epoll_wait
- 功能: 在一段超时时间内等待一组文件描述符上的事件。
- 函数原型:
int epoll_wait(int epfd,
struct epoll_event * evlist,
int maxevents,
int timeout
);
参数:
- epfd: epoll文件描述符,指定内核事件表。
- evlist: 分配好的epoll_event结构体数组,epoll将会把发生的事件复制到evlist数组中。
- maxevents: 最多监听多少个时间,必须大于0。
timeout: 表示在没有检测到事件发生时最多等待的时间(ms)。
- 0: 将会立即返回,不会等待。
- -1: 表示无限期阻塞,直到有事件发生。
- \>0: 阻塞(等待)时间。
返回值:
- 成功: 返回就绪的文件描述符个数。
- 失败: 返回-1并设置errno。
epoll-简易web服务器
- 使用epoll的一个简易web服务器
头文件
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>
自定义保存数据的结构体
//因为下面的函数指针所以单独拿出来声明typedef
typedef struct _ConnectStat ConnectStat;
typedef void(*response_handler) (ConnectStat * stat);
// 保存自定义数据的结构体,调用epoll时用epoll_data_t中的ptr存储
struct _ConnectStat {
int fd; //文件描述符
char name[64]; //姓名
char age[64]; //年龄
struct epoll_event _ev; //当前文件句柄对应epoll事件
int status; //0-未登录,1-已登录
response_handler handler; //不同页面的处理函数
};
相关函数声明与全局变量
// 初始化一个自定义数据存储结构体
ConnectStat * stat_init(int fd);
// 将新链接进来的客户端fd放入当前epoll所对应的内核事件表中
void connect_handle(int new_fd);
// 请求响应-指定对应的处理函数
void do_http_respone(ConnectStat * stat);
// 处理http请求
void do_http_request(ConnectStat * stat);
// 响应处理函数——请求链接返回的内容
void welcome_response_handler(ConnectStat * stat);
// 响应处理函数——commit后返回的内容
void commit_respone_handler(ConnectStat * stat);
// 将新链接进来的客户端fd放入当前epoll所对应的内核事件表中
void connect_handle(int new_fd);
// 创建一个监听套接字 - 略
int startup(char* _ip, int _port);
// 将fd-设置为非阻塞状态,即给指定fd添加状态
void set_nonblock(int fd);
// 打印信息提示ip:port
void usage(const char* argv);
// 响应头
const char *main_header = "HTTP/1.0 200 OK\r\nServer: Xuanxuan Server\r\nContent-Type: text/html\r\nConnection: Close\r\n";
static int epfd = 0;// epoll文件描述符,对应一张内核事件表
初始化一个自定义数据存储结构体
// 初始化自定义数据存储结构体
ConnectStat * stat_init(int fd) {
ConnectStat * temp = NULL;
temp = (ConnectStat *)malloc(sizeof(ConnectStat));
if (!temp) {
fprintf(stderr, "malloc failed. reason: %m\n");
return NULL;
}
memset(temp, '\0', sizeof(ConnectStat));
temp->fd = fd;
temp->status = 0;
}
处理http请求
// 解析http请求
void do_http_request(ConnectStat * stat) {
//读取和解析http 请求
char buf[4096];
char * pos = NULL;
ssize_t _s = read(stat->fd, buf, sizeof(buf) - 1);
if (_s > 0){// 读取到数据
buf[_s] = '\0';
// printf("receive from client:%s\n", buf);//GET / HTTP/1.1
pos = buf;
//Demo 仅仅演示效果,不做详细的协议解析
if (!strncasecmp(pos, "GET", 3)) {// 是否为Get请求
stat->handler = welcome_response_handler;// 设置执行函数
}else if (!strncasecmp(pos, "Post", 4)) {// 是否为POST请求
//获取 uri
//printf("---Post----\n");
pos += strlen("Post");
while (*pos == ' ' || *pos == '/') ++pos;
// POST /commit HTTP/1.1
if (!strncasecmp(pos, "commit", 6)) {//提交
int len = 0;
//printf("post commit --------\n");
pos = strstr(buf, "\r\n\r\n");//返回第一次出现\r\n\r\n的位置
char *end = NULL;
// 拿到姓名与年龄
if (end = strstr(pos, "name=")) {
pos = end + strlen("name=");
end = pos;
while (('a' <= *end && *end <= 'z') || ('A' <= *end && *end <= 'Z') || ('0' <= *end && *end <= '9')) end++;
len = end - pos;
if (len > 0) {// 将姓名存入自定义结构体中
memcpy(stat->name, pos, end - pos);
stat->name[len] = '\0';
}
}
if (end = strstr(pos, "age=")) {
pos = end + strlen("age=");
end = pos;
while ('0' <= *end && *end <= '9') end++;
len = end - pos;
if (len > 0) {// 将年龄存入自定义结构体中
memcpy(stat->age, pos, end - pos);
stat->age[len] = '\0';
}
}
stat->handler = commit_respone_handler;// 设置响应函数
}
else {
stat->handler = welcome_response_handler;// 设置响应函数
}
}
else {
stat->handler = welcome_response_handler;// 设置响应函数
}
stat->_ev.events = EPOLLOUT; // 修改事件类型
epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev); //修改,交给eoill监视。
}else if (_s == 0){// 没有读取到数据,客户端关闭。
printf("client: %d close\n", stat->fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, stat->fd, NULL);// 将对应fd从对应epoll的内核事件表中删除
close(stat->fd);// 关闭套接字
free(stat); // 释放内存
}else{// read发生错误
perror("read");
}
}
请求响应-根据指定的处理函数
void do_http_respone(ConnectStat * stat) {
stat->handler(stat);// 调用对应设置的函数
}
响应处理函数——请求链接返回的内容
void welcome_response_handler(ConnectStat * stat) {
const char * welcome_content = "\
<html lang=\"zh-CN\">\n\
<head>\n\
<meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
<title>This is a test</title>\n\
</head>\n\
<body>\n\
<div align=center height=\"500px\" >\n\
<br/><br/><br/>\n\
<h2>Hello World</h2><br/><br/>\n\
<form action=\"commit\" method=\"post\">\n\
姓名: <input type=\"text\" name=\"name\" />\n\
<br/>年龄: <input type=\"password\" name=\"age\" />\n\
<br/><br/><br/><input type=\"submit\" value=\"提交\" />\n\
<input type=\"reset\" value=\"重置\" />\n\
</form>\n\
</div>\n\
</body>\n\
</html>";
char sendbuffer[4096];
char content_len[64];
strcpy(sendbuffer, main_header);// 拷贝响应头
snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", (int)strlen(welcome_content));
strcat(sendbuffer, content_len);
strcat(sendbuffer, welcome_content);
//printf("send reply to client \n%s", sendbuffer);
// 写给客户端-即发起请求的浏览器
write(stat->fd, sendbuffer, strlen(sendbuffer));
stat->_ev.events = EPOLLIN; // 修改关心的事件
//stat->_ev.data.ptr = stat;
epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);
}
响应处理函数——commit后返回的内容
void commit_respone_handler(ConnectStat * stat) {
const char * commit_content = "\
<html lang=\"zh-CN\">\n\
<head>\n\
<meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
<title>This is a test</title>\n\
</head>\n\
<body>\n\
<div align=center height=\"500px\" >\n\
<br/><br/><br/>\n\
<h2>欢迎 %s ,年龄 %s!</h2><br/><br/>\n\
</div>\n\
</body>\n\
</html>\n";
char sendbuffer[4096];
char content[4096];
char content_len[64];
int len = 0;
len = snprintf(content, 4096, commit_content, stat->name, stat->age);
strcpy(sendbuffer, main_header); //响应头
snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", len);
strcat(sendbuffer, content_len);
strcat(sendbuffer, content);
//printf("send reply to client \n%s", sendbuffer);
write(stat->fd, sendbuffer, strlen(sendbuffer));
stat->_ev.events = EPOLLIN; // 修改关心的事件
epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);// 交给epoll来监视
}
打印信息提示ip:port
void usage(const char* argv){
printf("%s:[ip][port]\n", argv);
}
将fd-设置为非阻塞状态
void set_nonblock(int fd){
// 这里的文件状态标志flag即open函数的第二个参数
int fl = fcntl(fd, F_GETFL);// 获取设置的flag
fcntl(fd, F_SETFL, fl | O_NONBLOCK);// 设置flag
// fcntl函数 https://blog.csdn.net/zhoulaowu/article/details/14057799
// O_NONBLOCK https://blog.csdn.net/cjfeii/article/details/115484558
}
创建一个监听套接字
int startup(char* _ip, int _port){
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0){
perror("sock");
exit(2);
}
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in local;
local.sin_port = htons(_port);
local.sin_family = AF_INET;
local.sin_addr.s_addr = inet_addr(_ip);
if (bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0){
perror("bind");
exit(3);
}
if (listen(sock, 5) < 0){
perror("listen");
exit(4);
}
return sock; //返回套接字
}
main
#include "epoll_server.h"
int main(int argc, char *argv[]){
if (argc != 3){//检查输入的参数个数是否正确
usage(argv[0]);
exit(1);
}
//创建一个server socket
int listen_sock = startup(argv[1], atoi(argv[2]));
//创建epoll
epfd = epoll_create(256);
if (epfd < 0){//创建失败
perror("epoll_create");
exit(5);
}
ConnectStat * stat = stat_init(listen_sock);// 自定义数据存储
struct epoll_event _ev; //epoll事件结构体
_ev.events = EPOLLIN; //设置关心事件为读事件
_ev.data.ptr = stat; //接收返回值
//将listen_sock添加到epfd中,关心读事件,有客户端来请求链接
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &_ev);
struct epoll_event revs[64];//接收返回的产生响应的事件
int timeout = -1;// -1无限期阻塞
int num = 0;// 就绪的请求I/O个数
while (1){
//检测事件
switch ((num = epoll_wait(epfd, revs, 64, timeout))){
case 0: //监听超时
printf("timeout\n");
break;
case -1: //出错
perror("epoll_wait");
break;
default:{ //>0,即返回了需要处理事件的数目
//拿到对应的文件描述符
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
for (int i = 0; i < num; i++){//
//拿到该fd相关的链接信息
ConnectStat * stat = (ConnectStat *)revs[i].data.ptr;
int rsock = stat->fd;//拿到对应的fd,进行如下的判断
if (rsock == listen_sock && (revs[i].events) && EPOLLIN) {// 有客户端链接
int new_fd = accept(listen_sock, (struct sockaddr*)&peer, &len);
if (new_fd > 0){//accept成功
printf("get a new client:%s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));
connect_handle(new_fd);// 监听新进来的客户端fd
}
}else {//除server socket 之外的其他fd就绪
if (revs[i].events & EPOLLIN){//有数据可读
do_http_request((ConnectStat *)revs[i].data.ptr);
}else if (revs[i].events & EPOLLOUT){//写
do_http_respone((ConnectStat *)revs[i].data.ptr);// 完成响应后会再次关心EPOLLIN事件,等待下一次请求。
}else{
}
}
}
}
break;
}
}
return 0;
}
- 运行-访问-示例: ./main 192.168.0.70 8080
LT与ET
Level Trigger——水平触发:
- 当被监控的文件描述符上有可读写的事件发生时,epoll_wait会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次有调用epoll_wait时,它还会通知你在上次没读写完的文件描述符上继续读写,如果你一直不去读写它,它就会一直通知你。
如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。
- 应用程序可以不立即处理该事件,因为当下一次调用epoll_wait,epoll_wait还会再次向应用程序通告此事件。
- 设置方式: 默认即水平触发。
Edge_triggered(边缘触发):
- 当被监控的文件描述符上有可读写的事件发生时,epoll_wait会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你。
这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符,很大程度上降低了同一个epoll事件被重复触发的次数。
同时,应用程序应立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件(之后的读写事件就会通知,只是这次的不会了)。
设置方式(epoll):
- 对应文件描述符上要监听的事件设置为,events |= EPOLLET
- 同时对该文件描述符设置为非阻塞模式。如上epoll-简易web服务器中所示。
EPOLLONESHOT事件
- 用途:
保证一个socket连接在任一时刻都只被一个线程处理,从而保证连接的完整性,避免了很多可能的竞态条件。
- 可能产生的情景: 一个线程(或进程)在读取完某个socket上的数据并开始处理时,在处理的过程中该socket上又有新的数据可读(EPOLLIN被再次触发),此时唤醒另一个线程来读取这些新的数据。于是就出现了两个线程操作一个socket的局面。
- 使用: 使用epoll_ctrl函数在该socket(文件描述符)上注册EPOLLONESHOT事件。
注意:
注册完EPOLLONESHOT事件的socket一旦被某个线程处理完毕,应立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下次可读时,其EPOLLIN事件能被触发,同时也给其它线程处理这个socket的机会。
用于监听链接请求的Server_socket是不能注册EPOLLONESHOT事件的,否则应用程序只能处理一个客户链接,因为后续的客户链接请求将不再触发Server_socket上的EPOLLIN事件。
如果某一线程处理完成该socket上的请求之后,又在该socket上收到了新的客户请求,该线程将继续接触这个socket。
代码示例
- 仅部分核心代码示例: 完整的可以去《Linux高性能服务器编程》源代码9-4查看
主线程中循环监听事件
while( 1 ){
int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ret < 0 )break;
for ( int i = 0; i < ret; i++ ){
int sockfd = events[i].data.fd;
if ( sockfd == listenfd ){// 有链接请求接入
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
addfd( epollfd, connfd, true );
}
else if ( events[i].events & EPOLLIN ){
pthread_t thread;
fds fds_for_new_worker;
fds_for_new_worker.epollfd = epollfd;
fds_for_new_worker.sockfd = sockfd;
// 创建一个线程去处理
pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
}
else printf( "something else happened \n" );
}
}
将指定fd上的某一事件注册到对应的内核事件表中
void addfd( int epollfd, int fd, bool oneshot ){
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
if( oneshot ){
event.events |= EPOLLONESHOT;// 注册EPOLLONESHOT事件
}
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );// 设置为非阻塞fd
}
设置fd为非阻塞
int setnonblocking( int fd ){
int old_option = fcntl( fd, F_GETFL );// 拿到之前对该fd的设置属性
int new_option = old_option | O_NONBLOCK;// 追加O_NONBLOCK属性
fcntl( fd, F_SETFL, new_option );// 设置
return old_option;// 当前示例Demo返回无意义,未使用。
}
线程工作函数
void* worker( void* arg ){
int sockfd = ( (fds*)arg )->sockfd;
int epollfd = ( (fds*)arg )->epollfd;
printf( "start new thread to receive data on fd: %d\n", sockfd );
char buf[ BUFFER_SIZE ];
memset( buf, '\0', BUFFER_SIZE );
while( 1 ){// 因为是非阻塞的,所以要一次性读光,即要立即处理,因为epoll_wait只会提醒一次。
int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
if( ret == 0 ){
close( sockfd );
printf( "foreiner closed the connection\n" );
break;
}
else if( ret < 0 ){
if( errno == EAGAIN ){// 读光啦
reset_oneshot( epollfd, sockfd );// 重置注册事件
printf( "read later\n" );
break;
}
}
else{
printf( "get content: %s\n", buf );
// sleep 5秒,模拟数据处理过程
sleep( 5 );
}
}
printf( "end thread receiving data on fd: %d\n", sockfd );
}
重置fd上注册的事件
void reset_oneshot( int epollfd, int fd ){
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}
三个I/O复用函数的对比
select:
- select的参数类型fd_set,仅仅是个文件描述符集合,因此select需要3个这种类型的参数来区分可读、可写及异常事件。
- 一方面使得select不能处理更多类型的事件,另一方面内核对fd_set
在线修改
,导致应用程序下次再调用select前不得不重置
这三个fd_set集合。同时我们也需要在使用前进行备份
。poll:
- poll参数类型pollfd要聪明一些,将文件描述符和事件类型定义在一起,调用后修改的是pollfd结构体中的revents成员,为实际检测到的事件,我们设置的events成员保持不变。再次调用后,revents会被重新置空。
select与poll每次调用后,都需要遍历整个用户关心的事件集合,无论其中的事件是否就绪,所以应用程序检索就绪文件描述符的时间复杂度为O(n)。
epoll:
- epoll使用与上面二者完全不同的方式来管理用户注册事件,它在内核中维护一个
事件表
,并提供独立的系统调用epoll_ctl来往其中进行添加、删除、修改事件,而无须
反复地从用户空间读入这些事件。epoll_wait系统调用的events参数负责保存这些就绪的事件,使得应用程序检索就绪文件描述符的时间复杂度达到O(1)。
最大支持文件描述符个数:
- poll与epoll_wait分别用nfds和maxevents参数来指定最多监听多少个文件描述符和事件,这两个数值都能达到系统允许打开的最大文件描述符个数——65535。而select允许监听的最大文件描述符数量通常有限制。虽然用户可以修改这个限制,但是这可能会导致
不可预期
的后果。工作模式:
- select与poll都只能工作在相对低效的LT模式,而epoll可以工作在高效的ET模式。
内核实现:
- select与poll采用的是
轮询
的方式,每次扫描整个注册文件描述符集合
,将就绪的文件描述符返回给用户程序。检测就绪事件的时间复杂度为O(n)。- epoll_wait采用
回调
的方式,内核检测到就绪的文件描述符时,将触发回调函数,回调函数将该文件描述符上对应的事件插入内核就绪事件队列。内核最后在适当的时机将该就绪事件队列中的内容拷贝到用户空间。
- 当
活动连接比较多
的时候,epoll_wait的效率未必
比select和poll高,因为此时回调函数被触发得过于频繁。所以epoll_wait适用于连接数量多,但活动连接较少的情况。