IO多路转接(三)

简介: IO多路转接

三、epoll

3.1 epoll初识

epoll是系统提供的一个多路转接接口


epoll系统调用也可以让程序同时监视多个文件描述符上的事件是否就绪,与select和poll的定位是一样的,适用场景也相同

epoll在命名上比poll多了一个e,可以理解成是extend,epoll就是为了同时处理大量文件描述符而改进的poll

epoll在2.5.44内核中被引进,几乎具备了select和poll所有优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法

3.2 epoll相关系统调用

epoll_create函数


int epoll_create(int size);

参数size:自Linux2.6.8后,size参数是被忽略的,但size的值必须设置为大于0的值

返回值:epoll模型创建成功返回其对应的文件描述符,否则返回-1,同时错误码被设置

注意: 当不再使用时,须调用close函数关闭epoll模型对应的文件描述符,当所有引用epoll实例的文件描述符都已关闭时,内核将销毁该实例并释放相关资源


epoll_ctl函数


int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

参数说明:


epfd:epoll_create函数的返回值(epoll句柄)

op:表示具体的动作,用三个宏来表示

fd:需要监视的文件描述符

event:需要监视该文件描述符上的哪些事件

第二个参数op的取值有以下三种:


EPOLL_CTL_ADD:注册新的文件描述符到指定的epoll模型中

EPOLL_CTL_MOD:修改已经注册的文件描述符的监听事件

EPOLL_CTL_DEL:从epoll模型中删除指定的文件描述符

返回值:函数调用成功返回0,调用失败返回-1,同时错误码会被设置


第四个参数对应的struct epoll_event结构如下:


a6255f7cfe734d6d97315e8b5ab3d988.png


struct epoll_event结构中有两个成员,第一个成员events表示的是需监视的事件,第二个成员data为联合体结构,一般选择使用该结构中的fd,表示需要监听的文件描述符


events的常用取值如下:


EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)

EPOLLOUT:表示对应的文件描述符可以写

EPOLLPRI:表示对应的文件描述符有紧急的数据可读(表示有带外数据到来)

EPOLLERR:表示对应的文件描述符发送错误

EPOLLHUP:表示对应的文件描述符被挂断,即对端将文件描述符关闭了

EPOLLET:将epoll的工作方式设置为边缘触发(Edge Triggered)模式

EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,若还需继续监听该文件描述符,需重新将该文件描述符添加到epoll模型中

这些取值是以宏的方式定义,二进制序列中有且只有一个bit位是1,且为1的bit位是各不相同的



2dd9224a974c4bf0802c2ee518851b29.png

epoll_wait函数


int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数说明:


epfd:epoll_create函数的返回值(epoll句柄),用于指定epoll模型

events:内核会将已就绪的事件拷贝到events数组中(不能是空指针,内核只负责将就绪事件拷贝到该数组,不会在用户态中分配内存)

maxevents:events数组中的元素个数,该值不能大于创建epoll模型时传入的size值

timeout:表示epoll_wait函数的超时时间,单位是毫秒(ms)

参数timeout的取值:


-1:epoll_wait调用后进行阻塞等待,直到被监视的某个文件描述符上的某个事件就绪

0:epoll_wait调用后进行非阻塞等待,无论被监视的文件描述符上的事件是否就绪,epoll_wait检测后都会立即返回

特定的时间值:epoll_wait调用后在特定的时间内阻塞等待,若被监视的文件描述符上没有事件就绪,则在该时间后epoll_wait超时返回

返回值:


若函数调用成功,则返回有事件就绪的文件描述符个数

若timeout时间耗尽,则返回0

若函数调用失败,则返回-1,同时错误码会被设置

epoll_wait调用失败时,错误码可能被设置为:


EBADF:传入的epoll模型对应的文件描述符无效

EFAULT:events指向的数组空间无法通过写入权限访问

EINTR:此调用被信号所中断

EINVAL:epfd不是一个epoll模型对应的文件描述符,或传入的maxevents值小于等于0

3.3 epoll工作原理

红黑树 && 就绪队列


c9ea9a038b8b4c84834562cbf3870fdd.png


当某一进程调用epoll_create函数,Linux内核会创建一个eventpoll结构体,即epoll模型,eventpoll结构体中的成员rbr、rdlist与epoll的使用方式密切相关


struct eventpoll{
  ...
  //红黑树的根节点,这棵树中存储着所有添加到epoll中的需要监视的事件
  struct rb_root rbr;
  //就绪队列中则存放着将要通过epoll_wait返回给用户的满足条件的事件
  struct list_head rdlist;
  ...
}

epoll模型中的红黑树本质就是告诉内核,需监视哪些文件描述符上的哪些事件,调用epll_ctl函数就是在对这颗红黑树进行增删改操作

epoll模型中的就绪队列本质就是告诉内核,哪些文件描述符上的哪些事件已就绪,调用epoll_wait函数就是从就绪队列中获取已就绪的事件

在epoll中,对于每一个事件都有一个对应的epitem结构体,红黑树和就绪队列中的节点分别是基于epitem结构中的rbn成员和rdllink成员的,epitem结构中的成员ffd记录的是指定的文件描述符值,event成员记录的就是该文件描述符对应的事件

struct epitem{
  struct rb_node rbn; //红黑树节点
  struct list_head rdllink; //双向链表节点
  struct epoll_filefd ffd; //事件句柄信息
  struct eventpoll *ep; //指向其所属的eventpoll对象
  struct epoll_event event; //期待发生的事件类型
}

对于epitem结构中rbn成员而言,ffd与event的含义是:需监视ffd上的event事件是否就绪

对于epitem结构中的rdlink成员而言,ffd与event的含义是:ffd上的event事件已就绪

注意:


红黑树是一种二叉搜索树,必须有键值key,文件描述符就可以天然的作为红黑树key值

调用epoll_ctl向红黑树中新增节点时,若设置了EPOLLONESHOT选项,监听完这次事件后,若还需继续监听该文件描述符则需重新将其添加到epoll模型中,本质就是当设置了EPOLLONESHOT选项的事件就绪时,操作系统会自动将其从红黑树中删除

若调用epoll_ctl向红黑树中新增节点时没设置EPOLLONESHOT,那么该节点插入红黑树后就会一直存在,除非用户调用epoll_ctl将该节点从红黑树中删除

回调机制


所有添加到红黑树中的事件,都与设备(网卡)驱动程序建立回调方法,该回调方法在内核中被称为ep_poll_callback


对于select和poll而言,操作系统在监视多个文件描述符上的事件是否就绪时,需让操作系统主动对这多个文件描述符进行轮询检测,这会增加操作系统的负担

对于epoll而言,操作系统不需主动进行事件的检测,当红黑树中监视的事件就绪时,会自动调用对应回调方法,将就绪的事件添加到就绪队列中

当用户调用epoll_wait函数获取就绪事件时,只需关注底层就绪队列是否为空,若不为空则将就绪队列中的就绪事件拷贝给用户

采用回调机制最大的好处:不再需要操作系统主动对就绪事件进行检测,当事件就绪时会自动调用对应的回调函数进行处理

注意:


只有添加到红黑树中的事件才会与底层建立回调方法,因此只有红黑树中对应的事件就绪时,才会执行对应的回调方法将其添加到就绪队列

当不断有监视的事件就绪时,会不断调用回调方法向就绪队列中插入节点,而上层也会不断调用epoll_wait函数从就绪队列中获取节点,即典型的生产者消费者模型

由于就绪队列可能被多个执行流同时访问,因此必须要使用互斥锁进行保护,eventpoll结构中的lock和mtx就是用于保护临界资源的,因此epoll本身是线程安全的

eventpoll结构中的wq(wait queue)即等待队列,当多个执行流想同时访问同一个epoll模型时,就需在该等待队列下进行等待

3.4 epoll服务器

Epoll类


将epoll相关的系统调用进行封装,便于后续使用

#include <iostream>
#include <sys/epoll.h>
#include <cstdlib>
using namespace std;
class Epoll
{
public:
    static const int gsize = 256; 
public:
    static int EpollCreate() 
    {
        int epollFd = epoll_create(gsize);
        if(epollFd > 0) return epollFd;
        exit(5);//创建失败直接终止
    }
    static bool EpollCtl(int epollFd, int op, int socketFd, uint32_t events) 
    {
        struct epoll_event ev;
        ev.events = events;
        ev.data.fd = socketFd;
        int num = epoll_ctl(epollFd, op, socketFd, &ev);
        return num == 0;
    }
    static int EpollWait(int epollFd, struct epoll_event* revs, int num, int timeout) {
        return epoll_wait(epollFd, revs, num, timeout);
    }
};


EpollServer类

#ifndef __EPOLL_SERVER_HPP__
#define __EPOLL_SERVER_HPP__
#include <iostream>
#include <string>
#include <functional>
#include <cassert>
#include <unistd.h>
#include "Epoll.hpp"
#include "Log.hpp"
#include "Socket.hpp"
using namespace std;
namespace ns_epoll
{
    class EpollServer
    {
        using func_t = function<void(string)>;
    public:
        EpollServer(func_t handler, const uint16_t& port = 9090):_port(port), _revsNum(64),_handlerRequest(handler) {
            //申请空间
            _revs = new struct epoll_event[_revsNum];
            //创建监听套接字
            _listenSocketFd = Socket::SocketCreate();
            Socket::Bind(_listenSocketFd, _port);
            Socket::Listen(_listenSocketFd);
            //创建epoll模型
            _epollFd = Epoll::EpollCreate();
            LogMessage(DEBUG, "init success, listenSocketFd : %d, epollFd : %d", _listenSocketFd, _epollFd);
            //将监听套接字添加到epoll中
            if(Epoll::EpollCtl(_epollFd, EPOLL_CTL_ADD, _listenSocketFd, EPOLLIN))
                LogMessage(DEBUG, "Add listenSocketFd to epoll success");
            else exit(6);
        }
        ~EpollServer() {
            if(_listenSocketFd >= 0) close(_listenSocketFd);
            if( _epollFd >= 0) close(_epollFd);
            if(_revs != nullptr) delete[] _revs;
        }
    public:
        void Start()
        {
            int timeout = -1;
            while(true) 
            {
                LoopOnce(timeout);
            }
        }
    public:
        void LoopOnce(int timeout) {
            int num = Epoll::EpollWait(_epollFd, _revs, _revsNum, timeout);
            switch (num)
            {
            case 0:
                LogMessage(DEBUG, "Time Out...");
                break;
            case -1:
                LogMessage(WARNING, "epoll wait error: %s", strerror(errno));
                break;
            default:
                LogMessage(DEBUG, "Get a event");
                HandlerEvents(num);
                break;
            }
        }
        void HandlerEvents(int number)
        {
            assert(number);
            for(int i = 0; i < number; ++i) 
            {
                uint32_t revent = _revs[i].events;
                int socketFd = _revs[i].data.fd;
                if(revent & EPOLLIN) //读事件就绪
                {
                    if(socketFd == _listenSocketFd) Accetper(_listenSocketFd);
                    else Recver(socketFd);
                }
            }
        }
        void Accetper(int listenSocketFd) 
        {
            string clientIp;
            uint16_t clientPort;
            int socketFd = Socket::Accept(listenSocketFd, &clientIp, &clientPort);
            if(socketFd < 0) {
                LogMessage(WARNING, "Accept error");
                return;
            }
            if(!Epoll::EpollCtl(_epollFd, EPOLL_CTL_ADD, socketFd, EPOLLIN)) return;
            LogMessage(DEBUG, "Add new link : %d to epoll success", socketFd);
        }
        void Recver(int socketFd)
        {
            char buffer[10240];
            ssize_t n = recv(socketFd, buffer, sizeof(buffer) - 1, 0);
            if(n > 0) {
                buffer[n] = 0;
                _handlerRequest(buffer);
            }
            else if(n == 0) {
                LogMessage(NORMAL, "client %d close link, me too...", socketFd);
                bool ret = Epoll::EpollCtl(_epollFd, EPOLL_CTL_DEL, socketFd, 0);
                assert(ret);
                close(socketFd);
            }
            else {
                LogMessage(NORMAL, "client %d recv error, close error socketFd", socketFd);
                bool ret = Epoll::EpollCtl(_epollFd, EPOLL_CTL_DEL, socketFd, 0);
                assert(ret);
                close(socketFd);
            }
        }
    private:
        int _listenSocketFd;
        int _epollFd;
        uint16_t _port;
        struct epoll_event* _revs;
        int _revsNum;
        func_t _handlerRequest;
    };
}
#endif


epoll服务器测试


编写epoll服务器在调用epoll_wait函数时,将timeout的值设置成了-1,因此运行服务器后若没有客户端发来连接请求,那么服务器就会调用epoll_wait函数后阻塞等待


d31cfeaef13f4652a394ad057b163db6.png


使用telnet工具连接epoll服务器后,epoll服务器调用的epoll_wait函数在检测到监听套接字的读事件就绪后就会调用accept获取建立好的连接,并打印输出客户端的IP和端口号,此时客户端发来的数据也能成功被epoll服务器收到并进行打印输出


bdd5e1cbed22459c802b61b1e54c7e6d.png


该epoll服务器同样为单进程、单线程服务器,但可以为多个客户端提供服务


23f7f521f73247d6ae7bd6c7f13af01c.png


使用 ls /proc/PID/fd 命令,查看当前epoll服务器的文件描述符的使用情况。文件描述符0、1、2是默认打开的,分别对应的是标准输入、标准输出和标准错误,3号文件描述符对应的是监听套接字,4号文件描述符对应epoll句柄,5号和6号文件描述符分别对应访问服务器的两个客户端

941eda63a83a4f77ae78b4ba41ec4ad9.png



当服务器端检测到客户端退出后,也会关闭对应连接,此时epoll服务器对应的5号和6号文件描述符就关闭了


35cdc67eaf564bafb78239eb3126046d.png


3.5 epoll的优点

接口使用方便:拆分成了三个函数,使用起来更方便高效,不至于冗杂

数据拷贝轻量:只在新增监视事件的时候调用epoll_ctl将数据从用户拷贝到内核,而select和poll每次都需要重新将需要监视的事件从用户拷贝到内核。此外,调用epoll_wait获取就绪事件时,只会拷贝就绪的事件,不进行不必要的拷贝操作

事件回调机制:避免操作系统主动轮询检测事件就绪,而是采用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中。调用epoll_wait时直接访问就绪队列就知道哪些文件描述符已就绪,检测是否有文件描述符就绪的时间复杂度是O(1),因为本质只需要判断就绪队列是否为空即可

没有数量限制:监视的文件描述符数目无上限,只要内存允许,可一直向红黑树中新增节点

注意:


有的博客中说epoll中使用了内存映射机制,内核可以直接将底层就绪队列通过mmap的方式映射到用户态,此时用户就可以直接读取到内核中就绪队列中的数据,避免了内存拷贝的额外性能开销

这种说法是错误的,实际操作系统并没有做任何映射机制,因为操作系统是不相信任何人的,操作系统不会让用户进程直接访问到内核的数据的,用户只能通过系统调用来获取内核的数据

因此用户要获取内核中的数据,势必还是要将内核的数据拷贝到用户空间

与select和poll的不同之处


在使用select和poll时,都需借助第三方数组来维护历史上的文件描述符以及需要监视的事件,第三方数组由用户自行维护,对该数组的增删改操作都需要用户进行

使用epoll时,不需要用户维护第三方数组,epoll底层的红黑树就充当了这个第三方数组的功能,并且该红黑树的增删改操作都是由内核维护的,用户只需要调用epoll_ctl让内核对该红黑树进行对应的操作即可

在使用多路转接接口时,数据流都有两个方向,一个是用户告知内核,一个是内核告知用户。select和poll将这两件事情都交给了同一个函数来完成,而epoll在接口层面上就将这两件事进行了分离,epoll通过调用epoll_ctl完成用户告知内核,通过调用epoll_wait完成内核告知用户

3.6 epoll的工作方式

水平触发(LT,Level Triggered)


只要底层有事件就绪,epoll就会一直通知用户

类似于数字电路中的高电平触发一样,只要一直处于高电平,则会一直触发


fa630f9037dd4912b2dd85dec084d650.png

epoll默认状态下就是LT工作模式


由于在LT工作模式下,只要底层有事件就绪就会一直通知用户,因此当epoll检测到底层读事件就绪时,可以不立即进行处理,或者只处理一部分,因为只要底层数据没有处理完,下一次epoll还会通知用户事件就绪。

select和poll其实就是工作是LT模式下的

支持阻塞读写和非阻塞读写

边缘触发(ET,Edge Triggered)


只有底层就绪事件数量由无到有或由有到多发生变化的时候,epoll才会通知用户

类似于数字电路中的上升沿触发一样,只有当电平由低变高的那一瞬间才会触发

eac968333a6f4d24ae7eff271c9a207c.png


若要将epoll改为ET工作模式,则需在添加事件时设置EPOLLET选项


由于在ET工作模式下,只有底层就绪事件无到有或由有到多发生变化的时候才会通知用户,因此当epoll检测到底层读事件就绪时,必须立即进行处理,且全部处理完毕,因为有可能此后底层再也没有事件就绪,那么epoll就再也不会通知用户进行事件处理,此时没有处理完的数据就丢失了

ET工作模式下epoll通知用户的次数一般比LT少,并且每次都将缓冲区中全部事件处理完成,从而提高网络吞吐量,因此ET的性能一般比LT性能更高,Nginx就是默认采用ET模式使用epoll的

只支持非阻塞的读写

ET工作模式下如何进行读写


因为在ET工作模式下,只有底层就绪事件无到有或由有到多发生变化的时候才会通知用户,这就倒逼用户当读事件就绪时必须一次性将数据全部读取完毕,当写事件就绪时必须一次性将发送缓冲区写满,否则可能再也没有机会进行读写了


因此读数据时必须循环调用recv函数进行读取,写数据时必须循环调用send函数进行写入


当底层读事件就绪时,循环调用recv函数进行读取,直到某次调用recv读取时,实际读取到的字节数小于期望读取的字节数,则说明本次底层数据已读取完毕了

但有可能最后一次调用recv读取时,刚好实际读取的字节数和期望读取的字节数相等,但此时底层数据也恰好读取完毕了,若再调用recv函数进行读取,那么recv就会因为底层没有数据而被阻塞

在这里阻塞是非常严重的,就比如博客写的服务器都是单进程的服务器,若recv被阻塞住,并且此后该数据再也不就绪,那么就相当于服务器挂掉了,因此在ET工作模式下循环调用recv函数进行读取时,必须将对应的文件描述符设置为非阻塞状态

调用send函数写数据时也是同样的道理,需循环调用send函数进行数据的写入,且必须将对应的文件描述符设置为非阻塞状态

强调:ET工作模式下,recv和send操作的文件描述符必须设置为非阻塞状态,这是必须的,不是可选的


对比LT和ET


在ET模式下,一个文件描述符就绪之后,用户不会反复收到通知,看起来比LT更高效,但若在LT模式下能够做到每次都将就绪的文件描述符立即全部处理,不让操作系统反复通知用户的话,其实LT和ET的性能也是一样的

ET的编程难度比LT更高


目录
相关文章
|
数据处理 C语言
网络IO 多路IO复用 之 epoll
网络IO 多路IO复用 之 epoll
网络IO 多路IO复用 之 select
网络IO 多路IO复用 之 select
|
存储 NoSQL Linux
计算机网络 | IO多路转接技术 | select详解
计算机网络 | IO多路转接技术 | select详解
88 0
|
监控
IO多路转接(二)
IO多路转接
96 0
|
存储 监控 网络协议
IO多路转接(一)
IO多路转接
77 0
|
5月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
6月前
|
Java 大数据
解析Java中的NIO与传统IO的区别与应用
解析Java中的NIO与传统IO的区别与应用
|
4月前
|
Java 大数据 API
Java 流(Stream)、文件(File)和IO的区别
Java中的流(Stream)、文件(File)和输入/输出(I/O)是处理数据的关键概念。`File`类用于基本文件操作,如创建、删除和检查文件;流则提供了数据读写的抽象机制,适用于文件、内存和网络等多种数据源;I/O涵盖更广泛的输入输出操作,包括文件I/O、网络通信等,并支持异常处理和缓冲等功能。实际开发中,这三者常结合使用,以实现高效的数据处理。例如,`File`用于管理文件路径,`Stream`用于读写数据,I/O则处理复杂的输入输出需求。
268 12