IO多路转接 ——— select、poll、epoll(下)

简介: IO多路转接 ——— select、poll、epoll

sock.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
class Sock
{
private:
    // listen的第二个参数,意义:底层全连接队列的长度 = listen的第二个参数+1
    const static int gbacklog = 10;
public:
    Sock() {}
    static int Socket()
    {
        int listensock = socket(AF_INET, SOCK_STREAM, 0);
        if (listensock < 0)
        {
            exit(2);
        }
        int opt = 1;
        setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listensock;
    }
    static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof local);
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
        if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            exit(3);
        }
    }
    static void Listen(int sock)
    {
        if (listen(sock, gbacklog) < 0)
        {exit(4);}
    }
    // 一般经验
    // const std::string &: 输入型参数
    // std::string *: 输出型参数
    // std::string &: 输入输出型参数
    static int Accept(int listensock, std::string *ip, uint16_t *port)
    {
        struct sockaddr_in src;
        socklen_t len = sizeof(src);
        int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
        if (servicesock < 0)
        {
            return -1;
        }
        if(port) *port = ntohs(src.sin_port);
        if(ip) *ip = inet_ntoa(src.sin_addr);
        return servicesock;
    }
    static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(server_port);
        server.sin_addr.s_addr = inet_addr(server_ip.c_str());
        if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;
        else return false;
    }
    ~Sock() {}
};

main.cc

#include "pollserver.hpp"
using namespace std;
int main()
{
   PollServer pollserver;
   cout<<"runring......."<<endl;
   pollserver.start();
}

/O多路转接之epoll
epoll初识
epoll也是系统提供的一个多路转接接口。

epoll系统调用也可以让我们的程序同时监视多个文件描述符上的事件是否就绪,与select和poll的定位是一样的,适用场景也相同。

epoll在命名上比poll多了一个e,这个e可以理解成是extend,epoll就是为了同时处理大量文件描述符而改进的poll。

epoll在2.5.44内核中被引进,它几乎具备了select和poll的所有优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll的相关系统调用

epoll有三个相关的系统调用,分别是epoll_create、epoll_ctl和epoll_wait。

epoll工作原理

 

epoll.hpp

#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
using namespace std;
class Epoll
{
public:
   static const int gsize = 256;
public:
   static int CreateEpoll()
   {
       int epfd = epoll_create(gsize);
       if(epfd > 0) return epfd;
       exit(5);
   }
   static bool CtrlEpoll(int epfd,int oper,int sock,uint32_t events)
   {
      struct epoll_event ev;
      ev.events = events;
      ev.data.fd = sock;
      int n = epoll_ctl(epfd,oper,sock,&ev);
      return n == 0;
   }
   static int WaitEpoll(int epfd,struct epoll_event* revs,int num,int timeout)
   {
            // 细节1:如果底层就绪的sock非常多,revs承装不下,怎么办??不影响!一次拿不完,就下一次再拿
        // 细节2:关于epoll_wait的返回值问题:有几个fd上的事件就绪,就返回几,epoll返回的时候,会将所有
        //       就绪的event按照顺序放入到revs数组中!一共有返回值个!
      return epoll_wait(epfd,revs,num,timeout);
   }
};

log.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4
const char *gLevelMap[] = {
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};
#define LOGFILE "./selectServer.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
    // va_list ap;
    // va_start(ap, format);
    // while()
    // int x = va_arg(ap, int);
    // va_end(ap); //ap=nullptr
    char stdBuffer[1024]; //标准部分
    time_t timestamp = time(nullptr);
    // struct tm *localtime = localtime(&timestamp);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
    char logBuffer[1024]; //自定义部分
    va_list args;
    va_start(args, format);
    // vprintf(format, args);
    vsnprintf(logBuffer, sizeof logBuffer, format, args);
    va_end(args);
    // FILE *fp = fopen(LOGFILE, "a");
    printf("%s%s\n", stdBuffer, logBuffer);
    // fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
    // fclose(fp);
}

sock.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
//全加静态成员让他变成一个方法
class Sock
{
private:
    // listen的第二个参数,意义:底层全连接队列的长度 = listen的第二个参数+1
    const static int gbacklog = 10;
public:
    Sock() {}
    static int Socket()
    {
        int listensock = socket(AF_INET, SOCK_STREAM, 0);
        if (listensock < 0)
        {
            exit(2);
        }
        int opt = 1;
        setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listensock;
    }
    static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof local);
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
        if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            exit(3);
        }
    }
    static void Listen(int sock)
    {
        if (listen(sock, gbacklog) < 0)
        {
            exit(4);
        }
    }
    // 一般经验
    // const std::string &: 输入型参数
    // std::string *: 输出型参数
    // std::string &: 输入输出型参数
    static int Accept(int listensock, std::string *ip, uint16_t *port)
    {
        struct sockaddr_in src;
        socklen_t len = sizeof(src);
        int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
        if (servicesock < 0)
        {
            return -1;
        }
        if(port) *port = ntohs(src.sin_port);
        if(ip) *ip = inet_ntoa(src.sin_addr);
        return servicesock;
    }
    static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(server_port);
        server.sin_addr.s_addr = inet_addr(server_ip.c_str());
        if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;
        else return false;
    }
    ~Sock() {}
};

epollserver.hpp

#include "sock.hpp"
#include "log.hpp"
#include <memory>
#include <sys/types.h>
#include <cstring>
#include <functional>
#include <string>
#include "epoll.hpp"
using namespace std;
const static int default_port = 8080;
const static int gnum = 64;
using func_t = function<void(string)>;
class EpollServer
{
public:
     EpollServer(func_t HandlerRequese,const int &port = default_port)
     :_port(port),_HandlerRequest(HandlerRequese),_revs_num(gnum)
     {
        // 0. 申请对应的空间
       _revs = new struct epoll_event[_revs_num];
         // 1. 创建listensock
        _listensock = Sock::Socket();
        Sock::Bind(_listensock,_port);
        Sock::Listen(_listensock);
       //2.创建epoll模型
       _epfd = Epoll::CreateEpoll();
       logMessage(DEBUG,"init success,listensock: %d,epfd: %d",_listensock,_epfd);
       //3.将listensock加入到epoll模型中,以关系读的事务加入
       if(!Epoll::CtrlEpoll(_epfd,EPOLL_CTL_ADD,_listensock,EPOLLIN)) exit(6);//
       //EPOLLIN是读事务
      logMessage(DEBUG, "add listensock to epoll success."); // 3, 4
     }
     void Accepter(int listensock)
     {
       string clientip;
       uint16_t clientport;
       int sock = Sock::Accept(listensock,&clientip,&clientport);
       if(sock < 0) 
       {
          logMessage(WARNING, "accept error!");
                return;
       }
         // 能不能直接读取?不能,因为你并不清楚,底层是否有数据!
            // 将新的sock,添加给epoll
        if(!Epoll::CtrlEpoll(_epfd,EPOLL_CTL_ADD,sock,EPOLLIN)) exit(6);
       logMessage(DEBUG, "add new sock : %d to epoll success", sock);   
     }
     void Recver(int sock)
     {
       char buffer[10240];
       ssize_t n = recv(sock,buffer,sizeof(buffer) - 1,0);
       if(n > 0)
       {
        //假设这里就是读到了一个完整的报文 // 如何保证??
        buffer[n] = 0;
        _HandlerRequest(buffer); // 2. 处理数据
       }
       else if(n == 0)
       {
          // 1. 先在epoll中去掉对sock的关
         bool res = Epoll::CtrlEpoll(_epfd,EPOLL_CTL_DEL,sock,0);
         assert(res);
         (void)res;
         //2.在close文件
         close(sock);
         logMessage(NORMAL,"client %d quit,me too...",sock);
       }
       else
       {
        //1.先在epoll中去掉对sock的关心
        bool res = Epoll::CtrlEpoll(_epfd,EPOLL_CTL_DEL,sock,0);
        assert(res);
         (void)res;
         close(sock);
         logMessage(NORMAL, "client recv %d error, close error sock", sock);
       }
     }
     void HandlerEvents(int n)
     {
      assert(n > 0);
      for(int i = 0;i < n;i++)
      {
        uint32_t revents = _revs[i].events;
        int sock = _revs[i].data.fd;//看看是哪个fd就绪了
        if(revents & EPOLLIN)
        {
          if(sock == _listensock) Accepter(_listensock);
          else Recver(sock);
        }
        if(revents & EPOLLOUT)
        {
            //TODO
        }
      }
     }
     void looponce(int timeout)
     {
       int n = Epoll::WaitEpoll(_epfd,_revs,_revs_num,timeout);
        //if(n == _revs_num) //扩容
       switch (n)//返回值n,代表有一个关心的事务就绪
       {
         case 0:
              logMessage(DEBUG, "timeout..."); // 3, 4
              break;
         case -1:
              logMessage(WARNING, "epoll wait error: %s", strerror(errno));
              break;
         default:
         //等待成功,有至少一个关系的事务已经就绪
         //去执行
         logMessage(DEBUG, "get a event");
         HandlerEvents(n);
         break;
       }
     }
     void start()
     {
        int timeout = 1000;
        while(1)
        {
          looponce(timeout);
        }
     }
     ~EpollServer()
     {
        if (_listensock >= 0)
                close(_listensock);
            if (_epfd >= 0)
                close(_epfd);
            if (_revs)
                delete[] _revs;
     }
private:
     int _listensock;
     uint16_t _port;
     int _epfd;
     struct epoll_event* _revs;
     int _revs_num;
     func_t _HandlerRequest;
};

main.cc

#include "epollserver.hpp"
#include <memory>
#include <iostream>
#include <string>
using namespace std;
void change(string str)
{
    cout<<str<<endl;
}
int main()
{
   EpollServer epollserver(change);
   cout<<"runring......."<<endl;
   epollserver.start();
}
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1月前
|
存储 监控 Linux
【Linux IO多路复用 】 Linux下select函数全解析:驾驭I-O复用的高效之道
【Linux IO多路复用 】 Linux下select函数全解析:驾驭I-O复用的高效之道
54 0
|
2月前
|
网络协议 安全 测试技术
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
44 2
|
3月前
|
存储 Linux 调度
io复用之epoll核心源码剖析
epoll底层实现中有两个关键的数据结构,一个是eventpoll另一个是epitem,其中eventpoll中有两个成员变量分别是rbr和rdlist,前者指向一颗红黑树的根,后者指向双向链表的头。而epitem则是红黑树节点和双向链表节点的综合体,也就是说epitem即可作为树的节点,又可以作为链表的节点,并且epitem中包含着用户注册的事件。当用户调用epoll_create()时,会创建eventpoll对象(包含一个红黑树和一个双链表);
72 0
io复用之epoll核心源码剖析
|
3月前
|
存储 网络协议
TCP服务器 IO多路复用的实现:select、poll、epoll
TCP服务器 IO多路复用的实现:select、poll、epoll
36 0
|
1月前
|
NoSQL Java Linux
【Linux IO多路复用 】 Linux 网络编程 认知负荷与Epoll:高性能I-O多路复用的实现与优化
【Linux IO多路复用 】 Linux 网络编程 认知负荷与Epoll:高性能I-O多路复用的实现与优化
64 0
|
3月前
|
消息中间件 架构师 Java
性能媲美epoll的io_uring
性能媲美epoll的io_uring
42 0
|
3月前
|
网络协议 架构师 Linux
一文说透IO多路复用select/poll/epoll
一文说透IO多路复用select/poll/epoll
160 0
|
3月前
|
网络协议 Linux
2.1.1网络io与io多路复用select/poll/epoll
2.1.1网络io与io多路复用select/poll/epoll
|
1月前
|
存储 Java 数据处理
|
1月前
|
Java API
java中IO与NIO有什么不同
java中IO与NIO有什么不同