【Linux】I/O多路复用-SELECT/POLL/EPOLL

简介: 【Linux】I/O多路复用-SELECT/POLL/EPOLL

I/O多路复用

前言

  • 文本相关参考资料及部分内容来源

    • 《Linux高性能服务器编程》
    • 《TCP/IP网络编程》
    • 《Linux/UNIX系统编程手册》

  • I/O多路复用核心思想为,使用一个线程,来处理多个客户端的请求。
  • 或者说,使用一个特殊的fd,监视多个fd。
  • 使得程序能同时监听多个文件描述符,这对提高程序的性能至关重要。

通常网络程序在下列情况下需要使用I/O多路复用技术

  • 客户端程序需要同时处理多个socket。
  • 客户端程序要同时处理用户输入和网络连接。
  • TCP服务器要同时处理监听socket和连接socket。这是I/O复用使用最多的场合
  • 服务器要同时处理TCP请求和UDP请求。
  • 服务器要同时监听多个端口,或处理多种服务。

select

select-函数

  • 在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。
  • select
  • 函数原型:
int select(
           int nfds, 
           fd_set *readfds, 
           fd_set *writefds, 
           fd_set *exceptfds, 
           struct timeval *timeout
);
  • 参数:

    • nfds: 被监听的文件描述符的总数。
    • readfds: 将所有关注"是否存在待读取数据"的文件描述符注册到fd_set型变量中(存入fd_set数组),并传递其地址值。-fd_set文件描述符集合,注意传递记得备份,因为调用select后会将其重置
    • writefds: 将所有关注"是否存在无阻塞数据(可写入)"的文件描述符注册到fd_set型变量中(存入fd_set数组),并传递其地址值。
    • exceptfds: 将所有关注"是否发生异常"的文件描述符注册到fd_set型变量中,并传递其地址值。
    • timeout: 用来设定select的阻塞时间上限。

      • 指定为NULL将会一直阻塞,直到某个文件描述符就绪。
      • 指定为一个timeval结构体,详见timeval结构体
  • 返回值:

    • -1: 表示发生错误。
    • 0: 表示超时。
    • \>0: 表示有一个或多个文件描述符已达到就绪态,返回值表示处于就绪态的文件描述符个数。[三个集合中就绪的fd数量总和,也就是说,如果一个fd在三个fd_set数组中,三种事件都就绪了,会存在重复累计(对于同一个fd来说)。]

timeval-结构体

  • 结构体定义
struct timeval{
    long tv_sec; // 秒
    long tv_usec; // 微秒
};
  • 如果结构体 timeval 的两个域都为 0 的话,此时 select()不会阻塞,它只是简单地轮询指定的文件描述符集合,看看其中是否有就绪的文件描述符并立刻返回。
  • 否则,timeout 将为 select指定一个等待时间的上限值。

fd_set-文件描述符集合

  • 在fd_set变量中各注册或更改值的操作都由以下四个宏完成。
  • 将fdset所指向的文件描述符集合初始化为空。
void FD_ZERO(fd_set *fdset); 
  • 将文件描述符fd,从fdset所指向的文件描述集合中移除。
void FD_CLR(int fd, fd_set *fdset);
  • 将文件描述符fd,添加到fdset所指向的文件描述集合中。
void FD_SET(int fd, fd_set *fd_set); 
  • 检查指定的文件描述符fd,是否在fdset所指向的文件描述集合中。

    • 存在返回非0,反之返回0。
int FD_ISSET(int fd, fd_set *fdset);

文件描述符就绪条件

  • 以下情况socket可读:

    • socket内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地读取该socket,并且读操作将返回的字节数大于0。
    • socket通信的对方关闭连接。此时对该socket的读操作将返回0。
    • 监听socket上有新的连接请求。
    • socket上有未处理的错误。此时我们可以使用getsockop来读取和清除该错误。
  • 以下情况socket可写:

    • socket内核发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
    • socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
    • socket使用非阻塞connect连接成功或失败(超时)之后。
    • socket上有未处理的错误,此时我们可以使用getsockopt来读取和清除该错误。
  • 异常情况:

    • 网络程序中,select能处理的异常情况只有一种: socket上接收到带外数据。

示例

  • server.c
#include <sys/types.h> 
#include <sys/socket.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <sys/time.h> 
#include <sys/ioctl.h> 
#include <unistd.h> 
#include <stdlib.h>
#include <errno.h>

int errno;
int main(void){
    int server_sockfd,client_sockfd;
    int server_len,client_len;
    struct sockaddr_in server_address;
    struct sockaddr_in client_address;
    int result;
    //两个文件描述符集合
    fd_set readfds,testfds;//readfds用于检测输出是否就绪的文件描述符集合

    server_sockfd = socket(AF_INET,SOCK_STREAM,0); // 建立服务端socket
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = htonl(INADDR_ANY);
    server_address.sin_port = htons(9000);
    server_len = sizeof(server_address);
    bind(server_sockfd,(struct sockaddr*)&server_address,server_len);
    listen(server_sockfd,5);// 监听队列最多容纳5个
    
    FD_ZERO(&readfds);// 清空置0
    FD_SET(server_sockfd,&readfds);// 将服务端socket加入到集合中

    /*
    int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
    */ 

    while(1){
        char ch;
        int fd;
        int nread;
        testfds = readfds;//相当于备份一份,因为调用select后,传进去的文件描述符集合会被修改。
        struct timeval my_time;
        my_time.tv_sec = 2;
        my_time.tv_usec = 0;
        printf("server waiting\n");
        // 监视server_sockfd与client_sockfd
        //result = select(FD_SETSIZE,&testfds,(fd_set*)0,(fd_set*)0,(struct timeval* )0); //无限期阻塞,并测试文件描述符变动
        result = select(FD_SETSIZE,&testfds,(fd_set*)0,(fd_set*)0,&my_time); //根据my_time中设置的时间进行等待,超过继续往下执行。
        if(result < 0){//有错误发生
            perror("select errno"); 
            exit(1);
        }else if(result == 0){//超过等待时间,未响应    
            FD_ZERO(&readfds);// 清空置0
            FD_SET(server_sockfd,&readfds);// 将服务端socket重新加入到集合中
            printf("no connect request \n");
            continue;//没有响应的就别下去遍历了
        }
        //扫描所有的文件描述符(遍历所有的文件句柄),是件很耗时的事情,严重拉低效率。
        for(fd = 0;fd<FD_SETSIZE;fd++){
            //找到相关文件描述符,判断是否在testfds这个文件描述符集合中。 
            if(FD_ISSET(fd,&testfds)){
                //判断是否为服务器套接字,是则表示为客户端请求连接
                if(fd == server_sockfd){
                    client_len = sizeof(client_address);
                    client_sockfd = accept(server_sockfd,(struct sockaddr*)&client_address,&client_len);
                    FD_SET(client_sockfd,&readfds);//将客户端socket加入到集合中,用来监听是否有数据来。
                    printf("adding client on fd %d\n",client_sockfd);;
                }else{// 客户端来消息了
                    //获取接收缓存区中的字节数
                    ioctl(fd,FIONREAD,&nread);//即获取fd来了多少数据

                    //客户端数据请求完毕,关闭套接字,并从集合中清除相应的套接字描述符
                    if(nread ==0){
                        close(fd);
                        FD_CLR(fd,&readfds);//去掉关闭的fd
                        printf("removing client on fd %d\n", fd);
                    }else{//处理客户数请求
                        read(fd,&ch,1);
                        sleep(5);
                        printf("serving client on fd %d\n", fd);
                        ch++;
                        write(fd, &ch, 1);
                    }
                }
            }
        }
    }
    return 0;
}
  • client.c
#include <sys/types.h> 
#include <sys/socket.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <arpa/inet.h> 
#include <unistd.h> 
#include <stdlib.h>
#include <sys/time.h>

int main(){
    
    int client_sockfd;
    int len;
    struct sockaddr_in address;//服务器端网络地址结构体 
    int result;
    char ch = 'A';
    client_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立客户端socket 
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = inet_addr("127.0.0.1");
    address.sin_port = htons(9000);
    len = sizeof(address);
    result = connect(client_sockfd, (struct sockaddr*)&address, len);
    if (result == -1){
        perror("oops: client2");
        exit(1);
    }
    //第一次读写
    write(client_sockfd, &ch, 1);
    read(client_sockfd, &ch, 1);
    printf("the first time: char from server = %c\n", ch);
    sleep(5);

    //第二次读写
    write(client_sockfd, &ch, 1);
    read(client_sockfd, &ch, 1);
    printf("the second time: char from server = %c\n", ch);

    close(client_sockfd);

    return 0;
}


poll

poll函数

  • 与select类似,也是在指定事件内轮询一定的数量的文件描述符,看其中是否有就绪的。
  • poll
  • 函数原型:
int poll(
         struct pollfd *fds, 
         nfds_t nfds, 
         int timeout
);
  • 参数:

    • fds: pollfd类型的数组,它存储所有我们该兴趣的文件描述符上发生的可读、可写和异常事件。结构体定义详见pollfd结构体
    • nfds: 数组fds中的元素个数,类型为nfds_t无符号整型。
    • timeout: 超时等待时间。

      • -1: 一直阻塞,直到某个事件发生。
      • 0: 调用后不等待立即返回。
  • 返回值:

    • -1: 表示发生错误。
    • 0: 表示超时。
    • \>0: 表示fds中有这么多个文件描述符处于就绪态了。即fds中拥有非零revents字段的pollfd结构体数量。

pollfd-结构体

  • 结构体定义:
struct pollfd {
    int fd;          
    short events;     
    short revents;   
};
  • 参数:
  • fd: 文件描述符
  • events: 注册的事件。如下图所示。
  • revents: 实际发生的事件,由内核填充


示例

  • server.c
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>
#include <poll.h>

#define MAX_FD  8192 //最大文件标识符
struct pollfd  fds[MAX_FD];
int cur_max_fd = 0;     //当前要监听的最大文件描述符+1,减少要遍历的数量。

int main(void){

    int server_sockfd,client_sockfd;
    int server_len,client_len;
    struct sockaddr_in server_address,client_address;
    int result;
    server_sockfd = socket(AF_INET,SOCK_STREAM,0);//服务端socket
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = htonl(INADDR_ANY);
    server_address.sin_port = htons(9000);
    server_len = sizeof(server_address);
    bind(server_sockfd,(struct sockaddr*)&server_address,server_len);
    listen(server_sockfd,5);

    //添加待监测文件描述符到fds数组中
    fds[server_sockfd].fd = server_sockfd;
    fds[server_sockfd].events = POLLIN;
    fds[server_sockfd].revents = 0;
    
    if(cur_max_fd <= server_sockfd){
        cur_max_fd = server_sockfd+1;
    }

    while(1){
        char ch;
        int i,fd;
        int nread;
        printf("server waiting\n");

        result = poll(fds,cur_max_fd,1000);
        if(result <0){
            perror("server5");
            exit(1);
        }else if(result == 0){
            printf("no connect,end waiting\n");
        }else{//大于0,返回的是fds中处于就绪态的文件描述符个数。

        }
        //扫描文件描述符
        for(i = 0; i < cur_max_fd;i++){
            if(fds[i].revents){//有没有结果,没有结果说明该文件描述符上还未发生事件。
                fd= fds[i].fd;
                //判断是否为服务器套接字,是则表示为客户端请求连接
                if(fd == server_sockfd){
                    client_len = sizeof(client_address);
                    client_sockfd = accept(server_sockfd,(struct sockaddr*)&client_address,&client_len);
                    fds[client_sockfd].fd = client_sockfd;
                    fds[client_sockfd].events = POLLIN;
                    fds[client_sockfd].revents = 0;
                    if(cur_max_fd <= client_sockfd){
                        cur_max_fd = client_sockfd + 1;
                    }
                    printf("adding client on fd %d\n",client_sockfd);
                }else{//客户端socket中有数据请求
                    if(fds[i].revents & POLLIN){//读
                        nread = read(fd,&ch,1);
                        if(nread == 0){
                            close(fd);
                            memset(&fds[i],0,sizeof(struct pollfd));
                            printf("removing client on fd %d\n",fd);
                        }else{//写
                            sleep(5);
                            printf("serving client on fd %d,receive: %c\n",fd,ch);
                            ch++;
                            fds[i].events = POLLOUT;//添加一个写事件监听
                        }
                    }else if(fds[i].revents & POLLOUT){//写
                        write(fd,&ch,1);
                        fds[i].events = POLLIN;
                    }

                }
            }
        }

    }
    return 0;
}
  • client.c——同select中的实例。


epoll

  • epoll是Linux特有的I/O复用函数。它在实现和使用上与select和poll有很大的差异。
  • epoll使用一组函数来完成任务,而不是单个函数
  • epoll把用户关心的文件描述符上的事件放在内核的一个事件表中,不需要像select与poll那样每次都要重复传入文件描述符集合或是事件集。epoll需要使用一个额外的文件描述符,在内核中唯一标识这个事件表

epoll_create-创建epoll

  • epoll_create
  • 功能: 创建一个epoll实例。
  • 函数原型:
int epoll_create(int size); 
  • 参数:

    • size: 现已被抛弃,只是给内核一个提示,告诉它事件表需要多大。
  • 返回值: 返回创建的epoll实例文件描述符,在其它epoll相关函数中指定要访问的内核事件表

epoll_ctl-操作对应内核事件表

  • epoll_ctl
  • 功能: 操作对应epoll的内核事件表,进行添加/删除/修改指定fd的事件。
  • 函数原型:
int epoll_ctl(
  int epfd, 
  int op, 
  int fd, 
  struct epoll_event *event
); 
  • 参数:

    • epfd: epoll实例,用来指定要访问的内核事件表。
    • op: 用来指定需要执行的操作。

      • EPOLL_CTR_ADD: 往事件表上注册fd上的事件。
      • EPOLL_CTR_MOD: 修改fd上的注册事件。
      • EPOLL_CTR_DEL: 删除fd上的注册事件。
    • fd: 要进行op操作的文件描述符。
    • event: 为一个指向epoll_event结构体的指针。结构体定义如下event_event-结构体所示:
  • 返回值:

    • 成功: 返回0。
    • 失败: 返回-1并设置errno。

epoll_event-结构体

  • 结构体定义:
struct epoll_event{
    uint32_t events;
    epoll_data_t data;
};

image-20220913201232442


epoll_data_t-结构体

  • 结构体定义:
typedef union epoll_data{
    void *ptr;    // 用户自定义使用
    int fd;         // 指定事件文件描述符
    uint32_t u32;
    uint64_t u64;
}epoll_data_t;
  • 参数:

    • ptr: 可用来指定与fd相关的用户数据。
    • fd: 指定事件所从属的目标文件描述符。
  • 注意:

    • epoll_data_t是一个联合体,所以我们只能使用fd或ptr其中一个成员。
    • 如果要将文件描述符和用户数据关联起来,以实现快速的数据访问,可以放弃使用epoll_data_t中的fd成员,而是在ptr所指向的自定义用户数据中包含fd。
  • 补充:

    • 当我们调用epoll_wait后,evlist数组中的epoll_event每个data参数为我们在一开始(即调用epoll_ctl)所指定的内容,比如像上面所说的我们指定了自定义数据ptr,最终某一fd产生了我们监视的事件,我们可以在其对应的epoll_event的data中取到。例如下方epoll-简易web服务器中的_ConnectStat结构体。

epoll_wait-事件等待

  • epoll_wait
  • 功能: 在一段超时时间内等待一组文件描述符上的事件。
  • 函数原型:
int epoll_wait(int epfd, 
               struct epoll_event * evlist, 
               int maxevents, 
               int timeout
); 
  • 参数:

    • epfd: epoll文件描述符,指定内核事件表。
    • evlist: 分配好的epoll_event结构体数组,epoll将会把发生的事件复制到evlist数组中。
    • maxevents: 最多监听多少个时间,必须大于0。
    • timeout: 表示在没有检测到事件发生时最多等待的时间(ms)。

      • 0: 将会立即返回,不会等待。
      • -1: 表示无限期阻塞,直到有事件发生。
      • \>0: 阻塞(等待)时间。
  • 返回值:

    • 成功: 返回就绪的文件描述符个数。
    • 失败: 返回-1并设置errno。

epoll-简易web服务器

  • 使用epoll的一个简易web服务器

头文件

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>

自定义保存数据的结构体

//因为下面的函数指针所以单独拿出来声明typedef
typedef struct _ConnectStat  ConnectStat;

typedef void(*response_handler) (ConnectStat * stat);

// 保存自定义数据的结构体,调用epoll时用epoll_data_t中的ptr存储
struct _ConnectStat {
    int fd;                        //文件描述符
    char name[64];                //姓名
    char  age[64];                //年龄
    struct epoll_event _ev;        //当前文件句柄对应epoll事件
    int  status;                //0-未登录,1-已登录
    response_handler handler;    //不同页面的处理函数
};

相关函数声明与全局变量

// 初始化一个自定义数据存储结构体
ConnectStat * stat_init(int fd);

// 将新链接进来的客户端fd放入当前epoll所对应的内核事件表中
void connect_handle(int new_fd);

// 请求响应-指定对应的处理函数
void do_http_respone(ConnectStat * stat);

// 处理http请求
void do_http_request(ConnectStat * stat);

// 响应处理函数——请求链接返回的内容
void welcome_response_handler(ConnectStat * stat);

// 响应处理函数——commit后返回的内容
void commit_respone_handler(ConnectStat * stat);

// 将新链接进来的客户端fd放入当前epoll所对应的内核事件表中
void connect_handle(int new_fd);

// 创建一个监听套接字 - 略
int startup(char* _ip, int _port);

// 将fd-设置为非阻塞状态,即给指定fd添加状态
void set_nonblock(int fd);

// 打印信息提示ip:port
void usage(const char* argv);
    
// 响应头
const char *main_header = "HTTP/1.0 200 OK\r\nServer: Xuanxuan Server\r\nContent-Type: text/html\r\nConnection: Close\r\n";

static int epfd = 0;// epoll文件描述符,对应一张内核事件表

初始化一个自定义数据存储结构体

// 初始化自定义数据存储结构体
ConnectStat * stat_init(int fd) {
    ConnectStat * temp = NULL;
    temp = (ConnectStat *)malloc(sizeof(ConnectStat));

    if (!temp) {
        fprintf(stderr, "malloc failed. reason: %m\n");
        return NULL;
    }

    memset(temp, '\0', sizeof(ConnectStat));
    temp->fd = fd; 
    temp->status = 0; 
}

处理http请求

// 解析http请求
void do_http_request(ConnectStat * stat) {

    //读取和解析http 请求
    char buf[4096];
    char * pos = NULL;

    ssize_t _s = read(stat->fd, buf, sizeof(buf) - 1);
    if (_s > 0){// 读取到数据
        buf[_s] = '\0';
        // printf("receive from client:%s\n", buf);//GET / HTTP/1.1
        pos = buf;

        //Demo 仅仅演示效果,不做详细的协议解析
        if (!strncasecmp(pos, "GET", 3)) {// 是否为Get请求
            stat->handler = welcome_response_handler;// 设置执行函数
        }else if (!strncasecmp(pos, "Post", 4)) {// 是否为POST请求
            //获取 uri
            //printf("---Post----\n");
            pos += strlen("Post");
            while (*pos == ' ' || *pos == '/') ++pos;

            // POST /commit HTTP/1.1
            if (!strncasecmp(pos, "commit", 6)) {//提交
                int len = 0;

                //printf("post commit --------\n");
                pos = strstr(buf, "\r\n\r\n");//返回第一次出现\r\n\r\n的位置
                char *end = NULL;
                // 拿到姓名与年龄
                if (end = strstr(pos, "name=")) {
                    pos = end + strlen("name=");
                    end = pos;
                    while (('a' <= *end && *end <= 'z') || ('A' <= *end && *end <= 'Z') || ('0' <= *end && *end <= '9'))    end++;
                    len = end - pos;
                    if (len > 0) {// 将姓名存入自定义结构体中
                        memcpy(stat->name, pos, end - pos);
                        stat->name[len] = '\0';
                    }
                }
                if (end = strstr(pos, "age=")) {
                    pos = end + strlen("age=");
                    end = pos;
                    while ('0' <= *end && *end <= '9')    end++;
                    len = end - pos;
                    if (len > 0) {// 将年龄存入自定义结构体中
                        memcpy(stat->age, pos, end - pos);
                        stat->age[len] = '\0';
                    }
                }
                stat->handler = commit_respone_handler;// 设置响应函数
            }
            else {
                stat->handler = welcome_response_handler;// 设置响应函数
            }
        }
        else {
            stat->handler = welcome_response_handler;// 设置响应函数
        }

        stat->_ev.events = EPOLLOUT;    // 修改事件类型
        epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);   //修改,交给eoill监视。
    }else if (_s == 0){// 没有读取到数据,客户端关闭。
        printf("client: %d close\n", stat->fd);
        epoll_ctl(epfd, EPOLL_CTL_DEL, stat->fd, NULL);// 将对应fd从对应epoll的内核事件表中删除
        close(stat->fd);// 关闭套接字
        free(stat);    // 释放内存
    }else{// read发生错误
        perror("read");
    }
}

请求响应-根据指定的处理函数

void do_http_respone(ConnectStat * stat) {
    stat->handler(stat);// 调用对应设置的函数
}

响应处理函数——请求链接返回的内容

void welcome_response_handler(ConnectStat * stat) {
    const char * welcome_content = "\
            <html lang=\"zh-CN\">\n\
            <head>\n\
            <meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
            <title>This is a test</title>\n\
            </head>\n\
            <body>\n\
            <div align=center height=\"500px\" >\n\
            <br/><br/><br/>\n\
            <h2>Hello World</h2><br/><br/>\n\
            <form action=\"commit\" method=\"post\">\n\
            姓名: <input type=\"text\" name=\"name\" />\n\
            <br/>年龄: <input type=\"password\" name=\"age\" />\n\
            <br/><br/><br/><input type=\"submit\" value=\"提交\" />\n\
            <input type=\"reset\" value=\"重置\" />\n\
            </form>\n\
            </div>\n\
            </body>\n\
            </html>";

    char sendbuffer[4096];
    char content_len[64];

    strcpy(sendbuffer, main_header);// 拷贝响应头
    snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", (int)strlen(welcome_content));
    strcat(sendbuffer, content_len);
    strcat(sendbuffer, welcome_content);
    //printf("send reply to client \n%s", sendbuffer);

    // 写给客户端-即发起请求的浏览器
    write(stat->fd, sendbuffer, strlen(sendbuffer));

    stat->_ev.events = EPOLLIN;        // 修改关心的事件
    //stat->_ev.data.ptr = stat;
    epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);
}

响应处理函数——commit后返回的内容

void commit_respone_handler(ConnectStat * stat) {
    const char * commit_content = "\
        <html lang=\"zh-CN\">\n\
        <head>\n\
        <meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
        <title>This is a test</title>\n\
        </head>\n\
        <body>\n\
        <div align=center height=\"500px\" >\n\
        <br/><br/><br/>\n\
        <h2>欢迎&nbsp;%s &nbsp;,年龄&nbsp;%s!</h2><br/><br/>\n\
        </div>\n\
        </body>\n\
        </html>\n";

    char sendbuffer[4096];
    char content[4096];
    char content_len[64];
    int len = 0;

    len = snprintf(content, 4096, commit_content, stat->name, stat->age);
    strcpy(sendbuffer, main_header); //响应头
    snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", len);
    strcat(sendbuffer, content_len);
    strcat(sendbuffer, content);
    //printf("send reply to client \n%s", sendbuffer);

    write(stat->fd, sendbuffer, strlen(sendbuffer));

    stat->_ev.events = EPOLLIN; // 修改关心的事件

    epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);// 交给epoll来监视
}

打印信息提示ip:port

void usage(const char* argv){
    printf("%s:[ip][port]\n", argv);
}

将fd-设置为非阻塞状态

void set_nonblock(int fd){
    // 这里的文件状态标志flag即open函数的第二个参数
    int fl = fcntl(fd, F_GETFL);// 获取设置的flag
    fcntl(fd, F_SETFL, fl | O_NONBLOCK);// 设置flag
    // fcntl函数                 https://blog.csdn.net/zhoulaowu/article/details/14057799
    // O_NONBLOCK https://blog.csdn.net/cjfeii/article/details/115484558
}

创建一个监听套接字

int startup(char* _ip, int _port){
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0){
        perror("sock");
        exit(2);
    }

    int opt = 1;
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    struct sockaddr_in local;
    local.sin_port = htons(_port);
    local.sin_family = AF_INET;
    local.sin_addr.s_addr = inet_addr(_ip);

    if (bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0){
        perror("bind");
        exit(3);
    }

    if (listen(sock, 5) < 0){
        perror("listen");
        exit(4);
    }
    return sock;    //返回套接字
}

main

#include "epoll_server.h"

int main(int argc, char *argv[]){

    if (argc != 3){//检查输入的参数个数是否正确
        usage(argv[0]);
        exit(1);
    }
    
    //创建一个server socket
    int listen_sock = startup(argv[1], atoi(argv[2]));      

    //创建epoll
    epfd = epoll_create(256);
    if (epfd < 0){//创建失败
        perror("epoll_create");
        exit(5);
    }

    ConnectStat * stat = stat_init(listen_sock);// 自定义数据存储

    struct epoll_event _ev;     //epoll事件结构体
    _ev.events = EPOLLIN;        //设置关心事件为读事件     
    _ev.data.ptr = stat;        //接收返回值
    

    //将listen_sock添加到epfd中,关心读事件,有客户端来请求链接
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &_ev);

    struct epoll_event revs[64];//接收返回的产生响应的事件

    int timeout = -1;// -1无限期阻塞
    int num = 0;// 就绪的请求I/O个数

    while (1){
        //检测事件
        switch ((num = epoll_wait(epfd, revs, 64, timeout))){
        case 0:   //监听超时               
            printf("timeout\n");
            break;
        case -1: //出错     
            perror("epoll_wait");
            break;
        default:{    //>0,即返回了需要处理事件的数目
            //拿到对应的文件描述符
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);

            for (int i = 0; i < num; i++){//
                //拿到该fd相关的链接信息
                ConnectStat * stat = (ConnectStat *)revs[i].data.ptr;

                int rsock = stat->fd;//拿到对应的fd,进行如下的判断
                if (rsock == listen_sock && (revs[i].events) && EPOLLIN) {// 有客户端链接
                    int new_fd = accept(listen_sock, (struct sockaddr*)&peer, &len);
                    
                    if (new_fd > 0){//accept成功
                        printf("get a new client:%s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));    
                        connect_handle(new_fd);// 监听新进来的客户端fd
                    }
                }else {//除server socket 之外的其他fd就绪
                    if (revs[i].events & EPOLLIN){//有数据可读
                        do_http_request((ConnectStat *)revs[i].data.ptr);
                    }else if (revs[i].events & EPOLLOUT){//写
                        do_http_respone((ConnectStat *)revs[i].data.ptr);// 完成响应后会再次关心EPOLLIN事件,等待下一次请求。                    
                    }else{
                    }
                }
            }
        }
        break;
        }
    }
    return 0;
}
  • 运行-访问-示例: ./main 192.168.0.70 8080

image-20220924204711787

image-20220924204731175


LT与ET

  • Level Trigger——水平触发:

    • 当被监控的文件描述符上有可读写的事件发生时,epoll_wait会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次有调用epoll_wait时,它还会通知你在上次没读写完的文件描述符上继续读写,如果你一直不去读写它,它就会一直通知你。
    • 如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。
    • 应用程序可以不立即处理该事件,因为当下一次调用epoll_wait,epoll_wait还会再次向应用程序通告此事件。
    • 设置方式: 默认即水平触发。
  • Edge_triggered(边缘触发):

    • 当被监控的文件描述符上有可读写的事件发生时,epoll_wait会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你
    • 这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符,很大程度上降低了同一个epoll事件被重复触发的次数。
    • 同时,应用程序应立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件(之后的读写事件就会通知,只是这次的不会了)。
  • 设置方式(epoll):

    • 对应文件描述符上要监听的事件设置为,events |= EPOLLET
    • 同时对该文件描述符设置为非阻塞模式。如上epoll-简易web服务器中所示。

EPOLLONESHOT事件

  • 用途: 保证一个socket连接在任一时刻都只被一个线程处理,从而保证连接的完整性,避免了很多可能的竞态条件。
  • 可能产生的情景: 一个线程(或进程)在读取完某个socket上的数据并开始处理时,在处理的过程中该socket上又有新的数据可读(EPOLLIN被再次触发),此时唤醒另一个线程来读取这些新的数据。于是就出现了两个线程操作一个socket的局面。
  • 使用: 使用epoll_ctrl函数在该socket(文件描述符)上注册EPOLLONESHOT事件。
  • 注意:

    • 注册完EPOLLONESHOT事件的socket一旦被某个线程处理完毕,应立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下次可读时,其EPOLLIN事件能被触发,同时也给其它线程处理这个socket的机会。
    • 用于监听链接请求的Server_socket是不能注册EPOLLONESHOT事件的,否则应用程序只能处理一个客户链接,因为后续的客户链接请求将不再触发Server_socket上的EPOLLIN事件。
    • 如果某一线程处理完成该socket上的请求之后,又在该socket上收到了新的客户请求,该线程将继续接触这个socket。

代码示例

  • 仅部分核心代码示例: 完整的可以去《Linux高性能服务器编程》源代码9-4查看

主线程中循环监听事件

while( 1 ){
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )break;
    
        for ( int i = 0; i < ret; i++ ){
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd ){// 有链接请求接入
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                addfd( epollfd, connfd, true );
            }
            else if ( events[i].events & EPOLLIN ){
                pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                // 创建一个线程去处理
                pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
            }
            else printf( "something else happened \n" );
        }
    }

将指定fd上的某一事件注册到对应的内核事件表中

void addfd( int epollfd, int fd, bool oneshot ){
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    if( oneshot ){
        event.events |= EPOLLONESHOT;// 注册EPOLLONESHOT事件
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );// 设置为非阻塞fd
}

设置fd为非阻塞

int setnonblocking( int fd ){
    int old_option = fcntl( fd, F_GETFL );// 拿到之前对该fd的设置属性
    int new_option = old_option | O_NONBLOCK;// 追加O_NONBLOCK属性
    fcntl( fd, F_SETFL, new_option );// 设置
    return old_option;// 当前示例Demo返回无意义,未使用。
}

线程工作函数

void* worker( void* arg ){
    int sockfd = ( (fds*)arg )->sockfd;
    int epollfd = ( (fds*)arg )->epollfd;
    printf( "start new thread to receive data on fd: %d\n", sockfd );
    char buf[ BUFFER_SIZE ];
    memset( buf, '\0', BUFFER_SIZE );
    while( 1 ){// 因为是非阻塞的,所以要一次性读光,即要立即处理,因为epoll_wait只会提醒一次。
        int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
        if( ret == 0 ){
            close( sockfd );
            printf( "foreiner closed the connection\n" );
            break;
        }
        else if( ret < 0 ){
            if( errno == EAGAIN ){// 读光啦
                reset_oneshot( epollfd, sockfd );// 重置注册事件
                printf( "read later\n" );
                break;
            }
        }
        else{
            printf( "get content: %s\n", buf );
            // sleep 5秒,模拟数据处理过程
            sleep( 5 );
        }
    }
    printf( "end thread receiving data on fd: %d\n", sockfd );
}

重置fd上注册的事件

void reset_oneshot( int epollfd, int fd ){
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}

三个I/O复用函数的对比

  • select:

    • select的参数类型fd_set,仅仅是个文件描述符集合,因此select需要3个这种类型的参数来区分可读、可写及异常事件。
    • 一方面使得select不能处理更多类型的事件,另一方面内核对fd_set在线修改,导致应用程序下次再调用select前不得不重置这三个fd_set集合。同时我们也需要在使用前进行备份
  • poll:

    • poll参数类型pollfd要聪明一些,将文件描述符和事件类型定义在一起,调用后修改的是pollfd结构体中的revents成员,为实际检测到的事件,我们设置的events成员保持不变。再次调用后,revents会被重新置空。
  • select与poll每次调用后,都需要遍历整个用户关心的事件集合,无论其中的事件是否就绪,所以应用程序检索就绪文件描述符的时间复杂度为O(n)。
  • epoll:

    • epoll使用与上面二者完全不同的方式来管理用户注册事件,它在内核中维护一个事件表,并提供独立的系统调用epoll_ctl来往其中进行添加、删除、修改事件,而无须反复地从用户空间读入这些事件。
  • epoll_wait系统调用的events参数负责保存这些就绪的事件,使得应用程序检索就绪文件描述符的时间复杂度达到O(1)。

  • 最大支持文件描述符个数:

    • poll与epoll_wait分别用nfds和maxevents参数来指定最多监听多少个文件描述符和事件,这两个数值都能达到系统允许打开的最大文件描述符个数——65535。而select允许监听的最大文件描述符数量通常有限制。虽然用户可以修改这个限制,但是这可能会导致不可预期的后果。
  • 工作模式:

    • select与poll都只能工作在相对低效的LT模式,而epoll可以工作在高效的ET模式。
  • 内核实现:

    • select与poll采用的是轮询的方式,每次扫描整个注册文件描述符集合,将就绪的文件描述符返回给用户程序。检测就绪事件的时间复杂度为O(n)。
    • epoll_wait采用回调的方式,内核检测到就绪的文件描述符时,将触发回调函数,回调函数将该文件描述符上对应的事件插入内核就绪事件队列。内核最后在适当的时机将该就绪事件队列中的内容拷贝到用户空间。
    • 活动连接比较多的时候,epoll_wait的效率未必比select和poll高,因为此时回调函数被触发得过于频繁。所以epoll_wait适用于连接数量多,但活动连接较少的情况。

image-20220925162946375


相关文章
|
29天前
|
网络协议 安全 Linux
Linux C/C++之IO多路复用(select)
这篇文章主要介绍了TCP的三次握手和四次挥手过程,TCP与UDP的区别,以及如何使用select函数实现IO多路复用,包括服务器监听多个客户端连接和简单聊天室场景的应用示例。
86 0
|
29天前
|
存储 Linux C语言
Linux C/C++之IO多路复用(aio)
这篇文章介绍了Linux中IO多路复用技术epoll和异步IO技术aio的区别、执行过程、编程模型以及具体的编程实现方式。
66 1
Linux C/C++之IO多路复用(aio)
|
7天前
|
存储 JSON Java
细谈 Linux 中的多路复用epoll
大家好,我是 V 哥。`epoll` 是 Linux 中的一种高效多路复用机制,用于处理大量文件描述符(FD)事件。相比 `select` 和 `poll`,`epoll` 具有更高的性能和可扩展性,特别适用于高并发服务器。`epoll` 通过红黑树管理和就绪队列分离事件,实现高效的事件处理。本文介绍了 `epoll` 的核心数据结构、操作接口、触发模式以及优缺点,并通过 Java NIO 的 `Selector` 类展示了如何在高并发场景中使用多路复用。希望对大家有所帮助,欢迎关注威哥爱编程,一起学习进步。
|
29天前
|
Linux C++
Linux C/C++之IO多路复用(poll,epoll)
这篇文章详细介绍了Linux下C/C++编程中IO多路复用的两种机制:poll和epoll,包括它们的比较、编程模型、函数原型以及如何使用这些机制实现服务器端和客户端之间的多个连接。
22 0
Linux C/C++之IO多路复用(poll,epoll)
|
3月前
|
监控 Linux
在Linux中,如何监控磁盘I/O性能?
在Linux中,如何监控磁盘I/O性能?
|
3月前
|
Linux
Linux的I/O操作
Linux的I/O操作
|
16天前
|
运维 安全 Linux
Linux中传输文件文件夹的10个scp命令
【10月更文挑战第18天】本文详细介绍了10种利用scp命令在Linux系统中进行文件传输的方法,涵盖基础文件传输、使用密钥认证、复制整个目录、从远程主机复制文件、同时传输多个文件和目录、保持文件权限、跨多台远程主机传输、指定端口及显示传输进度等场景,旨在帮助用户在不同情况下高效安全地完成文件传输任务。
111 5
|
15天前
|
Linux
Linux系统之expr命令的基本使用
【10月更文挑战第18天】Linux系统之expr命令的基本使用
51 4
|
2天前
|
缓存 监控 Linux