一文说透IO多路复用select/poll/epoll

简介: 一文说透IO多路复用select/poll/epoll

概述

如果我们要开发一个高并发的TCP程序。常规的做法是:多进程或者多线程。即:使用其中一个线程或者进程去监听有没有客户端连接上来,一旦有新客户端连接,就新开一个线程(进程),将其扔到线程(或进程)中去处理具体的读写操作等业务逻辑,主线程(进程)继续等待,监听其他的客户端。

这样操作往往存在很大的弊端。首先是浪费资源,要知道,单个进程的最大虚拟内存是4G,单个线程的虚拟内存也有将近8M,那么,如果上万个客户端连接上来,服务器将会承受不住。

其次是浪费时间,因为你必须一直等在accept那个地方,十分被动。

上述的网络模型,其实说白了,就是一个线程一路IO,在单个线程里只能处理一个IO。因此,也可称之为单路IO。而一路IO,就是一个并发。有多少个并发,就必须要开启多少个线程,因此,对资源的浪费是不言而喻的。

那么,有没有一种方式,可以在一个线程里,处理多路IO呢?

我们回顾一下多线程模型 ,它最大的技术难点是acceptrecv函数都是阻塞的。只要没有新连接上来,accept就阻塞住了,无法处理后续的业务逻辑;没有数据过来,recv又阻塞住了 ,无法处理新的accept请求。因此,只要能够搞定在同一个线程里同时acceptrecv的问题,似乎所有问题就迎刃而解了。

有人说,这怎么可能嘛?肯定要两个线程的 。

还真有可能,而这所谓的可能 ,就是IO多路复用技术。

IO多路复用

所谓的IO多路复用,它的核心思想就是,把监听新客户端连接、读写事件等的操作转包出去,让系统内核来做这件事情。即由内核来负责监听有没有连接建立、有没有读写请求,作为服务端,只需要注册相应的事件,当事件触发时,由内核通知服务端程序去处理就行了。

这样做的好处显而易见:只需要在一个主线程里,就可以完成所有的工作,既不会阻塞,也不会浪费太多资源。

说得通俗易懂一些,就是原来需要由主线程干的活,现在都交给内核去干了。我们不用阻塞在acceptrecv那里,而是由内核告诉程序,有新客户端连接上来了 ,或者有数据发送过来了,我们再去调用acceptrecv就行了,其余时间,我们可以处理其他的业务逻辑。

那么有人问了,你不还是要调用acceptrecv吗?为什么现在就不会阻塞了呢 ?

这就要深入说一下listenaccept的关系了。

假如服务器是海底捞火锅店的话,listen就是门口迎宾的小姐,当来了一个客人(客户端),就将其迎进店内。而accept则是店内的大堂经理 ,当没人来的时候,就一直闲在那里没事做,listen将客人 迎进来之后,accept就会分配一个服务员(fd)专门服务于这个客人。

所以说,只要listen正常工作,就能源源不断地将客人迎进饭店(客户端能正常连接上服务器),即使此时并没有accept。那么,有人肯定有疑问,总不能一直往里迎吧,酒店也是有大小的,全部挤在大堂也装不下那么多人啊。还记得listen函数的第二个参数backlog吗?它就表示在没有accept之前,最多可以迎多少个客人进来。

因此,对于多线程模型来说,accept作为大堂经理,在没客人来的时候,就眼巴巴地盯着门口 ,啥也不干,当listen把人迎进来了,才开始干活。只能说,摸鱼,还是accpet会啊。

IO多路复用,则相当于请了一个秘书。accept作为大堂经理,肯定有很多其他事情可以忙,他就不用一直盯着门口,当listen把人迎进来之后,秘书就会把客人(们)带到经理身边,让经理安排服务员(fd)。

只是这个秘书是内核提供的,因此不仅免费,而且勤快。免费的劳动力,何乐而不为呢?

它的流程图大概是下面这样子的:

我们通常所说的IO多路复用技术,在Linux环境下,主要有三种实现,分别为selectpollepoll,当然还有内核新增的io_uring。在darwin平台 ,则有kqueueWindows下则是iocp。从性能上来说,iocp要优于epoll,与io_uring不相上下。但selectpollepoll的演变是一个持续迭代的过程,虽说从效率以及使用普及率上来说,epoll堪称经典,但并不是另外两种实现就毫无用处,也是有其存在的意义的,尤其是select

本文不会花太多笔墨来介绍kqueue,笔者始终认为,拿MacOS作为服务器开发,要么脑子瓦特了,要么就是钱烧的。基本上除了自己写写demo外,极少能在生产环境真正用起来。而iocp自成一派,未来有暇,将专门开辟专题细说。io_uring作为较新的内核才引入的特性,本文也不宜大肆展开。

唯有selectpoll以及epoll,久经时间考验,已被广泛运用于各大知名网络应用,并由此诞生出许多经典的网络模型,实在是值得好好细说。

select

原型

select函数原型:

/* According to POSIX.1-2001, POSIX.1-2008 */
       #include <sys/select.h>
       /* According to earlier standards */
       #include <sys/time.h>
       #include <sys/types.h>
       #include <unistd.h>
       int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

参数说明:

  • nfds: 最大的文件描述符+1,代表监听这一组描述符(为什么要+1?因为除了当前最大描述符之外,还有可能有新的fd连接上来)
  • fd_set: 是一个位图集合, 对于同一个文件描述符,可以监听不同的事件
  • readfds:文件描述符“可读”事件
  • writefds:文件描述符“可写”事件
  • exceptfds:文件描述符“异常”事件,一般内核用的,实际编程很少使用
  • timeout:超时时间:0是立即返回,-1是一直阻塞,如果大于0,则达到设置值的微秒数即返回
  • 返回值: 所监听的所有监听集合中满足条件的总数(满足条件的读、写、异常事件的总数),出错时返回-1,并设置errno。如果超时时间触发,则返回0。

select的函数原型可知,它主要依赖于三个bitmap的集合,分别为可读事件集合,可写事件集合,以及异常事件集合。我们只需要将待监听的fd加入到对应的集合中,当有对应事件触发,我们再从集合中将其拿出来进行处理就行了。

那么,怎么将文件描述符加到监听事件集合中呢?

内核为我们提供了四个操作宏:

void FD_CLR(int fd, fd_set *set);    //将fd从set中清除出去,位图置为0
int  FD_ISSET(int fd, fd_set *set);   //判断fd是否在集合中,返回值为1,说明满足了条件
void FD_SET(int fd, fd_set *set);    //将fd设置到set中去,位图置为1
void FD_ZERO(fd_set *set);    //将set集合清空为0值

有了以上基础,我们 就能大致梳理一下select处理的流程:

  1. 创建fd_set 位图集合(3个集合,一个readfds,一个writefds,一个exceptfds
  2. FD_ZEROset清空
  3. 使用FD_SET将需要监听的fd设置对应的事件
  4. select函数监听事件,只要select函数返回了大于1的值,说明有事件触发,这时候把set拿出来做判断
  5. FD_ISSET判断fd到底触发了什么事件

实现

其代码 实现如下所示:

#include <stdio.h>
#include <stdlib.h>
#include  <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <ctype.h>
int main(int argc, char *argv[]){
    int i, n, maxi;
    int nready,  client[FD_SETSIZE];    // FD_SETSIZE 为内核定义的,大小为1024, client保存已经被监听的文件描述符,避免每次都遍历1024个fd
    int maxfd, listenfd, connfd,  sockfd;
    char  buf[BUFSIZ], str[INET_ADDRSTRLEN];  // INET_ADDRSTRLEN = 16
    struct sockaddr_in clie_addr, serv_addr;
    socklen_t clie_addr_len;
    fd_set rset, allset;   //allset为所有已经被监听的fd集合,rset为select返回的有监听事件的fd
    listenfd = socket(AF_INET, SOCK_STREAM, 0);   //创建服务端fd
    bzero(&serv_addr, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(8888);
    if (bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
        perror("bind");
    }
    listen(listenfd, 20);
    maxfd = listenfd;
    maxi = -1;
    for(i = 0; i < FD_SETSIZE; i++) {
        client[i] = -1;
    }
    FD_ZERO(&allset);
    FD_SET(listenfd, &allset);
    //----------------------------------------------------------
    //至此,初始化全部完成, 开始监听
    while(1) {
        rset = allset;  //allset不能被select改掉了,所以要复制一份出来放到rset
        nready = select(maxfd+1, &rset, NULL, NULL,  NULL);
        if (nready < 0) {
            perror("select");
        }
        //listenfd有返回,说明有新连接建立了 
        if (FD_ISSET(listenfd, &rset)) {
            clie_addr_len = sizeof(clie_addr);
            connfd = accept(listenfd, (struct sockaddr *)&clie_addr, &clie_addr_len);
            printf("received form %s at port %d\n", inet_ntop(AF_INET, &clie_addr.sin_addr, str, sizeof(str)), ntohs(clie_addr.sin_port));
            //把新连接的client fd放到client数组中
            for (i = 0; i < FD_SETSIZE; i++) {
                if (client[i] == -1) {
                    client[i] = connfd;
                    break;
                }
            }
            //连接数超过了1024, select函数处理不了了,直接报错返回
            if (i == FD_SETSIZE) {
                fputs("too many clients\n", stderr);
                exit(1);
            }
            FD_SET(connfd, &allset);    //把新的客户端fd加入到下次要监听的列表中
            if (connfd > maxfd) {
                maxfd = connfd;      //主要给select第一个参数用的
            }
            if (i > maxi) {
                maxi = i;    //保证maxi存的总是client数组的最后一个下标元素
            }
            //如果nready = 0, 说明新连接都已经处理完了,且没有已建立好的连接触发读事件
            if (--nready == 0) {
                continue;
            }
        }
        for (i = 0; i <= maxi; i++) {
            if ((sockfd = client[i]) < 0) {
                continue;
            }
            //sockfd 是存在client里的fd,该函数触发,说明有数据过来了
            if (FD_ISSET(sockfd, &rset)) {
                if ((n = read(sockfd, buf, sizeof(buf))) == 0) {
                    printf("socket[%d] closed\n", sockfd);
                    close(sockfd);
                    FD_CLR(sockfd, &allset);
                    client[i] = -1;
                } else if (n > 0) {
                    //正常接收到了数据
                    printf("accept data: %s\n", buf);
                }
                if (--nready == 0) {
                    break;
                }        
            }
        }
    }
    close(listenfd);
    return 0;
}

缺点

select作为IO多路复用的初始版本,只能说是能用而已,性能并不能高到哪儿去,使用的局限性也比较大。主要体现在以下几个方面:

  • 文件描述符上限:1024,同时监听的最大文件描述符也为1024
  • select需要遍历所有的文件描述符(1024个),所以通常需要自定义数据结构(数组),单独存文件描述符,减少遍历
  • 监听集合和满足条件的集合是同一个集合,导致判断和下次监听时需要对集合读写,也就是说,下次监听时需要清零,那么当前的集合结果就需要单独保存。

优点

select也并不是一无是处,我个人是十分喜欢select这个函数的,主要得益于以下几个方面:

  • 它至少提供了单线程同时处理多个IO的一种解决方案,在一些简单的场景(比如并发小于 1024)的时候 ,还是很有用处的
  • select的实现比起pollepoll,要简单明了许多,这也是我为什么推荐在一些简单场景优先使用select的原因
  • select是跨平台的,相比于pollepollUnix独有,select明显有更加广阔的施展空间
  • 利用select的跨平台特性,可以实现很多有趣的功能。比如实现一个跨平台的sleep函数。
  • 我们知道,Linux下的原生sleep函数是依赖于sys/time.h的,这也就意味着它无法被Windows平台调用。
  • 因为select函数本身跨平台,而第五个参数恰好是一个超时时间,即:我们可以传入一个超时时间,此时程序就会阻塞在select这里,直到超时时间触发,这也就间接地实现了sleep功能。
  • 代码实现如下
//传入一个微秒时间
void general_sleep(int t){
  struct timeval tv; 
  tv.tv_usec = t % 10e6;
  tv.tv_sec = t / 10e6;
  select(0, NULL, NULL, NULL, &tv);
}
  • select实现的sleep函数至少有两个好处:
  • 可以跨平台调用
  • 精度可以精确到微秒级,比起Linux原生的sleep函数,精度要高得多。

poll

原型

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
      int   fd;         /* file descriptor */
      short events;     /* requested events */
      short revents;    /* returned events */
};

参数说明:

fds: 数组的首地址

nfds: 最大监听的文件描述符个数

timeout: 超时时间

鉴于select函数的一些 缺点和局限性,poll的实现就做了一些升级。首先,它突破了1024文件描述符的限制,其次,它将事件封装了一下 ,构成了pollfd的结构体,并将这个 结构体中注册的事件直接与fd进行了绑定,这样 就无需每次有事件触发,就遍历所有的fd了,我们只需要遍历这个 结构体数组中的fd即可。

那么 ,poll函数可以注册哪些事件类型呢?

POLLIN 读事件
POLLPRI 触发异常条件
POLLOUT 写事件
POLLRDHUP  关闭连接
POLLERR  发生了错误
POLLHUP  挂断
POLLNVAL 无效请求,fd未打开
POLLRDNORM 等同于POLLIN
POLLRDBAND 可以读取优先带数据(在 Linux 上通常不使用)。
POLLWRNORM 等同于POLLOUT
POLLWRBAND 可以写入优先级数据。

事件虽然比较多,但我们主要关心POLLINPOLLOUT就行了。

实现

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <errno.h>
#include <ctype.h>
#define OPEN_MAX 1024
int main(int argc, char *argv[]){
    int i, maxi, listenfd, connfd,  sockfd;
    int nready;  // 接受poll返回值,记录满足监听事件的fd个数
    ssize_t n;  
    char  buf[BUFSIZ], str[INET_ADDRSTRLEN];  // INET_ADDRSTRLEN = 16
    struct pollfd client[OPEN_MAX];     //用来存放监听文件描述符和事件的集合
    struct sockaddr_in cliaddr, servaddr;
    socklen_t clilen;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);   //创建服务端fd
    int opt = 1;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));  //设置端口复用
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(8888);
    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind");
    }
    listen(listenfd, 128);
    //设置第一个要监听的文件描述符,即服务端的文件描述符
    client[0].fd = listenfd;
    client[0].events = POLLIN;   //监听读事件
    for(i = 1; i < OPEN_MAX; i++) {   //注意从1开始,因为0已经被listenfd用了
        client[i].fd = -1;
    }
    maxi = 0;   //因为已经加进去一个了,所以从0开始就行
    //----------------------------------------------------------
    //至此,初始化全部完成, 开始监听
    for(;;) {
        nready = poll(client,  maxi+1, -1);   // 阻塞监听是否有客户端读事件请求
        if (client[0].revents & POLLIN) {   // listenfd触发了读事件
            clilen = sizeof(cliaddr);
            connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen);    //接受新客户端的连接请求
            printf("recieved from %s at port %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port));
            for (i = 0; i < OPEN_MAX; i++) {
                if (client[i].fd < 0) {
                    client[i].fd = connfd;  // 将新连接的文件描述符加到client数组中
                    break;
                }
            }
            if (i == OPEN_MAX) {
                perror("too many open connections");
            }
            client[i].events = POLLIN;
            if(i > maxi) {
                maxi = i;
            }
            if (--nready == 0) {
                continue;
            }
        }
        //前面的if没满足,说明有数据发送过来,开始遍历client数组
        for (i = 1; i <= maxi; i++) {
            if ((sockfd = client[i].fd) < 0) {
                continue;
            }
            //读事件满足,用read去接受数据
            if (client[i].revents & POLLIN) {
                if ((n = read(sockfd, buf, sizeof(buf))) < 0) {
                    if (errno = ECONNRESET) {   // 收到RST标志
                        printf("client[%d] aborted conection\n", client[i].fd);
                        close(sockfd);
                        client[i].fd = -1;  //poll中不监控该文件描述符,直接置-1即可,无需像select中那样移除
                    } else {
                        perror("read error");
                    }
                } else if (n == 0) {    //客户端关闭连接
                    printf("client[%d] closed connection\n", client[i].fd);
                    close(sockfd);
                    client[i].fd = -1; 
                } else {
                    printf("recieved data: %s\n", buf);
                }
            }
            if (--nready <= 0) {
                break;
            }
        }
    }
    close(listenfd);
    return 0;
}

优点

poll函数相比于select函数来说,最大的优点就是突破了1024个文件描述符的限制,这使得百万并发变得可能。

而且不同于selectpoll函数的监听和返回是分开的,因此不用在每次操作之前都单独备份一份了,简化了代码实现。因此,可以理解为select的升级增强版。

缺点

虽然poll不需要遍历所有的文件描述符了,只需要遍历加入数组中的描述符,范围缩小了很多,但缺点仍然是需要遍历。假设真有百万并发的场景,当仅有两三个事件触发的时候,仍然要遍历上百万个文件描述符,只为了找到那触发事件的两三个fd,这样看来 ,就有些得不偿失了。而这个缺点,将在epoll中得以彻底解决。

poll作为 一个过度版本的实现 ,说实话地位有些尴尬:它既不具备select函数跨平台的优势,又不具备epoll的高性能。因此使用面以及普及程度相对来说,反而是三者之中最差劲的一个。

若说它的唯一使用场景,大概也就是开发者既想突破1024文件描述符的限制,又不想把代码写得像epoll那样复杂了。

epoll

原型

epoll可谓是当前IO多路复用的最终形态,它是poll的 增强版本。我们说poll函数,虽然突破了select函数1024文件描述符的限制,且把监听事件和返回事件分开了,但是说到底还是要遍历所有文件描述符,才能知道到底是哪个文件描述符触发了事件,或者需要单独定义一个数组。

epoll则可以返回一个触发了事件的所有描述符的数组集合,在这个数组集合里,所有的文件描述符都是需要处理的,就不需要我们再单独定义数组了。

虽然epoll功能强大了,但是使用起来却麻烦得多。不同于selectpoll使用一个函数监听即可,epoll提供了三个函数。

epoll_create

首先,需要使用epoll_create创建一个句柄:

#include <sys/epoll.h>
int epoll_create(int size);

该函数返回一个文件描述符,这个文件描述符并不是 一个常规意义的文件描述符,而是一个平衡二叉树(准确来说是红黑树)的根节点。size则是树的大小,它代表你将监听多少个文件描述符。epoll_create将按照传入的大小,构造出一棵大小为size的红黑树。

注意:这个size只是建议值,实际内核并不一定局限于size的大小,可以监听比size更多的文件描述符。但是由于平衡二叉树增加节点时可能需要自旋,如果size与实际监听的文件描述符差别过大,则会增加内核开销。

epoll_ctl

第二个函数是epoll_ctl, 这个函数主要用来操作epoll句柄,可以使用该函数往红黑树里增加文件描述符,修改文件描述符,和删除文件描述符。

可以看到,selectpoll使用的都是bitmap位图,而epoll使用的是红黑树。

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctl有四个参数,参数1就是epoll_create创建出来的句柄。

第二个参数op是操作标志位,有三个值,分别如下:

  • EPOLL_CTL_ADD 向树增加文件描述符
  • EPOLL_CTL_MOD 修改树中的文件描述符
  • EPOLL_CTL_DEL 删除树中的文件描述符

第三个参数就是需要操作的文件描述符,这个没啥说的。

重点看第四个参数,它是一个结构体。这个结构体原型如下:

typedef union epoll_data {
               void        *ptr;
               int          fd;
               uint32_t     u32;
               uint64_t     u64;
           } epoll_data_t;
           struct epoll_event {
               uint32_t     events;      /* Epoll events */
               epoll_data_t data;        /* User data variable */
           };

第一个元素为uint32_t类型的events,这个和poll类似,是一个bit mask,主要使用到的标志位有:

  • EPOLLIN 读事件
  • EPOLLOUT 写事件
  • EPOLLERR 异常事件

这个结构体还有第二个元素,是一个epoll_data_t类型的联合体。我们先重点关注里面的fd,它代表一个文件描述符,初始化的时候传入需要监听的文件描述符,当监听返回时,此处会传出一个有事件发生的文件描述符,因此,无需我们遍历得到结果了。

epoll_wait

epoll_wait才是真正的监听函数,它的原型如下:

int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);

第一个参数不用说了, 注意第二个参数,它虽然也是struct epoll_event *类型,但是和epoll_ctl中的含义不同,epoll_ctl代表传入进去的是一个地址,epoll_wait则代表传出的是一个数组。这个数组就是返回的所有触发了事件的文件描述符集合。

第三个参数maxevents代表这个数组的大小。

timeout不用说了,它代表的是超时时间。不过要注意的是,0代表立即返回,-1代表永久阻塞,如果大于0,则代表毫秒数(注意selecttimeout是微秒)。

这个函数的返回值也是有意义的,它代表有多少个事件触发,也就可以简单理解为传出参数events的大小。

监听流程

大致梳理一下epoll的监听流程:

  • 首先,要有一个服务端的listenfd
  • 然后,使用epoll_create创建一个句柄
  • 使用epoll_ctllistenfd加入到树中,监听EPOLLIN事件
  • 使用epoll_wait监听
  • 如果EPOLLIN事件触发,说明有客户端连接上来,将新客户端加入到events中,重新监听
  • 如果再有EPOLLIN事件触发:
  • 遍历events,如果fdlistenfd,则说明又有新客户端连接上来,重复上面的步骤,将新客户端加入到events
  • 如果fd不为listenfd,这说明客户端有数据发过来,直接调用read函数读取内容即可。

触发

epoll有两种触发方式,分别为水平触发边沿触发

  • 水平触发
    所谓的水平触发,就是只要仍有数据处于就绪状态,那么可读事件就会一直触发。
    举个例子,假设客户端一次性发来了4K数据 ,但是服务器recv函数定义的buffer大小仅为1024字节,那么一次肯定是不能将所有数据都读取完的,这时候就会继续触发可读事件,直到所有数据都处理完成。
    epoll默认的触发方式就是水平触发。
  • 边沿触发
    边沿触发恰好相反,边沿触发是只有数据发送过来的时候会触发一次,即使数据没有读取完,也不会继续触发。必须client再次调用send函数触发了可读事件,才会继续读取。
    假设客户端 一次性发来4K数据,服务器recvbuffer大小为1024字节,那么服务器在第一次收到1024字节之后就不会继续,也不会有新的可读事件触发。只有当客户端再次发送数据的时候,服务器可读事件触发 ,才会继续读取第二个1024字节数据。
    注意:第二次可读事件触发时,它读取的仍然是上次未读完的数据 ,而不是客户端第二次发过来的新数据。也就是说:**数据没读完虽然不会继续触发EPOLLIN,但不会丢失数据。 **
  • 触发方式的设置:
    水平触发和边沿触发在内核里 使用两个bit mask区分,分别为:

EPOLLLT 水平 触发

EPOLLET 边沿触发

  • 我们只需要在注册事件的时候将其与需要注册的事件做一个位或运算即可:
ev.events = EPOLLIN;  //LT
ev.events = EPOLLIN | EPOLLET;   //ET

实现

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<ctype.h>
#define OPEN_MAX 1024
int main(int argc, char **argv){
    int i, listenfd, connfd, sockfd,epfd, res, n;
    ssize_t nready = 0;
    char buf[BUFSIZ] = {0};
    char str[INET_ADDRSTRLEN];
    socklen_t clilen;
    struct sockaddr_in cliaddr, servaddr;
    struct epoll_event event, events[OPEN_MAX];
    //开始创建服务端套接字
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(8888);
    bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    listen(listenfd, 128);
    //开始初始化epoll
    epfd = epoll_create(OPEN_MAX);
    event.events = EPOLLIN;
    event.data.fd = listenfd;
    res = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event);
    if (res == -1) {
        perror("server epoll_ctl error");
        exit(res);
    }
    for(;;) {
        //开始监听
        nready = epoll_wait(epfd, events, OPEN_MAX, -1);
        if (nready == -1) {
            perror("epoll_wait error");
            exit(nready);
        }
        for (i = 0; i < nready; i++) {
            if (!(events[i].events & EPOLLIN)) {
                continue;
            }
            if (events[i].data.fd == listenfd) {
                //有新客户端连接
                clilen = sizeof(cliaddr);
                connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &clilen);
                printf("received from %s at port %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
                    ntohs(cliaddr.sin_port));
                event.events = EPOLLIN;
                event.data.fd = connfd;
                if (epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &event) == -1) {
                    perror("client epoll_ctl error");
                    exit(-1);
                }
            } else {
                //有数据可以读取
                sockfd = events[i].data.fd;
                n = read(sockfd, buf, sizeof(buf));
                if (n ==0) {
                    //读到0,说明客户端关闭
                    epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
                    close(sockfd);
                    printf("client[%d] closed connection\n", sockfd);
                } else if (n < 0){
                    //出错
                    epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
                    close(sockfd);
                    printf("client[%d] read error\n", sockfd);
                } else {
                    //读到了数据
                    printf("received data: %s\n", buf);
                }
            }
        }
    }
    close(listenfd);
    close(epfd);
    return 0;
}

优点

epoll的优点显而易见,它解决了poll需要遍历所有注册的fd的问题,只需要关心触发了事件的极少量fd即可,大大提升了效率。

而更有意思 的是epoll_data_t这个联合体,它里面有四个元素:

typedef union epoll_data {
      void        *ptr;
      int          fd;
      uint32_t     u32;
      uint64_t     u64;
 } epoll_data_t;

简单开发时,我们可以将fd记录在其中,但是我们注意到 这里面还有一个void *类型的元素,那就提供了无限可能。它可以是一个struct,也可以是一个callback,也可以是struct嵌套callback,从而实现无线的扩展可能。大名鼎鼎的reactor反应堆模型就是通过这种方式完成的。

在下篇专题里,笔者将带大家走进reactor模型,领略epoll的神奇魅力。

缺点

什么?epoll也有缺点?当然有,我认为epoll的最大缺点就是代码实现起来变得复杂了,写起来复杂,理解起来更复杂。

而且还有一个不能算缺点的缺点,对于笔者这样一个长期开发跨平台应用程序的开发者来说,epoll虽好,但无法实现一套跨平台的接口封装,却过于鸡肋了。


本专栏知识点是通过<零声教育>的系统学习,进行梳理总结写下文章,对C/C++课程感兴趣的读者,可以点击链接,查看详细的服务:C/C++Linux服务器开发/高级架构师

目录
相关文章
|
1月前
|
存储 监控 Linux
【Linux IO多路复用 】 Linux下select函数全解析:驾驭I-O复用的高效之道
【Linux IO多路复用 】 Linux下select函数全解析:驾驭I-O复用的高效之道
54 0
|
2月前
|
网络协议 安全 测试技术
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
44 2
|
3月前
|
存储 Linux 调度
io复用之epoll核心源码剖析
epoll底层实现中有两个关键的数据结构,一个是eventpoll另一个是epitem,其中eventpoll中有两个成员变量分别是rbr和rdlist,前者指向一颗红黑树的根,后者指向双向链表的头。而epitem则是红黑树节点和双向链表节点的综合体,也就是说epitem即可作为树的节点,又可以作为链表的节点,并且epitem中包含着用户注册的事件。当用户调用epoll_create()时,会创建eventpoll对象(包含一个红黑树和一个双链表);
72 0
io复用之epoll核心源码剖析
|
1月前
|
NoSQL Java Linux
【Linux IO多路复用 】 Linux 网络编程 认知负荷与Epoll:高性能I-O多路复用的实现与优化
【Linux IO多路复用 】 Linux 网络编程 认知负荷与Epoll:高性能I-O多路复用的实现与优化
62 0
|
1月前
|
JavaScript Unix Linux
IO多路复用:提高网络应用性能的利器
IO多路复用:提高网络应用性能的利器
|
3月前
|
消息中间件 架构师 Java
性能媲美epoll的io_uring
性能媲美epoll的io_uring
42 0
|
1月前
|
存储 Java 数据处理
|
1月前
|
Java API
java中IO与NIO有什么不同
java中IO与NIO有什么不同
|
3月前
|
存储 Java 数据安全/隐私保护
从零开始学习 Java:简单易懂的入门指南之IO字符流(三十一)
从零开始学习 Java:简单易懂的入门指南之IO字符流(三十一)
|
3月前
|
存储 移动开发 Java
从零开始学习 Java:简单易懂的入门指南之IO字节流(三十)
从零开始学习 Java:简单易懂的入门指南之IO字节流(三十)