IO多路转接 ——— select、poll、epoll(上)

简介: IO多路转接 ——— select、poll、epoll

select初识

select是系统提供的一个多路转接接口。

select系统调用可以让我们的程序同时监视多个文件描述符的上的事件是否就绪。

select的核心工作就是等,当监视的多个文件描述符中有一个或多个事件就绪时,select才会成功返回并将对应文件描述符的就绪事件告知调用者。

 

select基本工作流程
如果我们要实现一个简单的select服务器,该服务器要做的就是读取客户端发来的数据并进行打印,那么这个select服务器的工作流程应该是这样的:

先初始化服务器,完成套接字的创建、绑定和监听。

定义一个fd_array数组用于保存监听套接字和已经与客户端建立连接的套接字,刚开始时就将监听套接字添加到fd_array数组当中。

然后服务器开始循环调用select函数,检测读事件是否就绪,如果就绪则执行对应的操作。

每次调用select函数之前,都需要定义一个读文件描述符集readfds,并将fd_array当中的文件描述符依次设置进readfds当中,表示让select帮我们监视这些文件描述符的读事件是否就绪。

当select检测到数据就绪时会将读事件就绪的文件描述符设置进readfds当中,此时我们就能够得知哪些文件描述符的读事件就绪了,并对这些文件描述符进行对应的操作。

如果读事件就绪的是监听套接字,则调用accept函数从底层全连接队列获取已经建立好的连接,并将该连接对应的套接字添加到fd_array数组当中。

如果读事件就绪的是与客户端建立连接的套接字,则调用read函数读取客户端发来的数据并进行打印输出。

当然,服务器与客户端建立连接的套接字读事件就绪,也可能是因为客户端将连接关闭了,此时服务器应该调用close关闭该套接字,并将该套接字从fd_array数组当中清除,因为下一次不需要再监视该文件描述符的读事件了。

 

log.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4
const char *gLevelMap[] = {
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};
#define LOGFILE "./selectServer.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
    // va_list ap;
    // va_start(ap, format);
    // while()
    // int x = va_arg(ap, int);
    // va_end(ap); //ap=nullptr
    char stdBuffer[1024]; //标准部分
    time_t timestamp = time(nullptr);
    // struct tm *localtime = localtime(&timestamp);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
    char logBuffer[1024]; //自定义部分
    va_list args;
    va_start(args, format);
    // vprintf(format, args);
    vsnprintf(logBuffer, sizeof logBuffer, format, args);
    va_end(args);
    // FILE *fp = fopen(LOGFILE, "a");
    printf("%s%s\n", stdBuffer, logBuffer);
    // fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
    // fclose(fp);
}

selectserver.hpp

#include <iostream>
#include <cstring>
#include <sys/select.h>
#include "log.hpp"
#include "sock.hpp"
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/time.h>
#include <algorithm>
#define NUM 1024
#define FD_NONE -1
using namespace std;
class SelectServer
{
public:
  //端口类型设置为16位是因为TCP报文中端口号为16位
  SelectServer(const uint16_t &port = 8080):_port(port)
  {
    _listensock = Sock::Socket();
    Sock::Bind(_listensock,_port);
    Sock::Listen(_listensock);
    logMessage(DEBUG,"%s","create base socket success");
    for(int i = 0;i < NUM;i++) _fd_array[i] = FD_NONE;
    _fd_array[0] = _listensock;//规定第一个为监听套接字
    std::cout<<"初始化完成...."<<std::endl;
  } 
  void start()
  {
    while(1)
    {
      struct timeval timeout = {0,0};
      fd_set rfds;
      FD_ZERO(&rfds);
      int maxfd = _listensock;
      for(int i = 0;i < NUM;i++)
      {
         if(_fd_array[i] == FD_NONE) continue;
         FD_SET(_fd_array[i],&rfds);
         maxfd = max(maxfd,_fd_array[i]);
      }
     int n = select(maxfd + 1,&rfds,nullptr, nullptr,&timeout);
     DebugPrint();
     switch (n)
     {
     case 0:
        sleep(1);
        logMessage(DEBUG,"%s","time out...");
        break;
     case -1:
        logMessage(DEBUG,"%s","select error");
        break;
     default:
        //成功
        logMessage(DEBUG,"%s","get a new link event........");
        //成功了的话,如果不去读取的话会一直提醒读取,也就是说链接好的
        //链接会被放在就绪队列中,也就是看到链接在排队,操作系统会一直提醒有连接成功
        //当你要取的时候会取队列的头部连接去执行
        //因为我们的select是检查_listensock有没有获取的连接已经到达Tcp层
        //如果到达的话,就说明可以读走这个链接了,所以我们检查的是IO
        //也就是读到连接的操作,而不是建立连接的操作
        HandlerEvent(rfds);
        sleep(1);
        break;
     }
    }
  }
private:
  uint16_t _port;
  int _listensock;
  int _fd_array[NUM];
  void HandlerEvent(const fd_set &rfds)
  {
    for(int i = 0;i < NUM;i++)
    {
      //1.去掉不合法的fd,也就是去掉没有建立连接的fd,也就是去掉数组里为FD_NONE
      if(_fd_array[i] == FD_NONE) continue;
      //2.合法的就一定就绪了?,不一定,所以需要FD_ISSET判断是否已经就绪
      if(FD_ISSET(_fd_array[i],&rfds))
      {
         //1.如果是监听套接字就绪了,那就是accept
         //2.如果不是的话,那就处理该链接,进行读取函数
         if(_fd_array[i] == _listensock) Accepter();
         else Recver(i);
      }
    }
  }
  void Accepter()
  {
    string clientip;
    uint16_t clientport;
    int sock = Sock::Accept(_listensock,&clientip,&clientport);
    if(sock < 0)
    {
      logMessage(WARNING,"%s %s:%d","accept error",strerror(errno),errno);
      return;
    }
    logMessage(DEBUG,"get a new link success");
    int pos = 0;
    for(;pos < NUM;pos++)
    {
     if(_fd_array[pos] == FD_NONE) break;
    }
     if(pos == NUM)
     {
        logMessage(WARNING, "%s:%d", "select server already full,close: %d", sock);
        close(sock);
     }else
     {
        _fd_array[pos] = sock;
     }
  }
  void Recver(int pos)
  {
    logMessage(DEBUG,"message in,get IO event:%d",_fd_array[pos]);
    // 暂时先不做封装, 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞
    // 这样读取有bug吗?有的,你怎么保证以读到了一个完整包文呢?
    char buffer[1024];
    int n = recv(_fd_array[pos],buffer,sizeof(buffer) - 1,0);
    //不会堵塞,
    if(n > 0)
    {
      buffer[n] = 0;
      logMessage(DEBUG,"client[%d]# %s",_fd_array[pos],buffer);
    }
    else if(n == 0)
    {
       logMessage(DEBUG, "client[%d] quit, me too...", _fd_array[pos]);
      //对端关闭那么我也关闭
      close(_fd_array[pos]);
      _fd_array[pos] = FD_NONE;
    }
    else
    {
      logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));
      close(_fd_array[pos]);
      _fd_array[pos] = FD_NONE;
    }
  }
   void DebugPrint()
    {
        cout << "_fd_array[]: ";
        for(int i = 0; i < NUM; i++)
        {
            if(_fd_array[i] == FD_NONE) continue;
            cout << _fd_array[i] << " ";
        }
        cout << endl;
    }
};

sock.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
//全加静态成员让他变成一个方法
class Sock
{
private:
    // listen的第二个参数,意义:底层全连接队列的长度 = listen的第二个参数+1
    const static int gbacklog = 10;
public:
    Sock() {}
    static int Socket()
    {
        int listensock = socket(AF_INET, SOCK_STREAM, 0);
        if (listensock < 0)
        {
            exit(2);
        }
        int opt = 1;
        setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listensock;
    }
    static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof local);
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
        if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            exit(3);
        }
    }
    static void Listen(int sock)
    {
        if (listen(sock, gbacklog) < 0)
        {
            exit(4);
        }
    }
    // 一般经验
    // const std::string &: 输入型参数
    // std::string *: 输出型参数
    // std::string &: 输入输出型参数
    static int Accept(int listensock, std::string *ip, uint16_t *port)
    {
        struct sockaddr_in src;
        socklen_t len = sizeof(src);
        int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
        if (servicesock < 0)
        {
            return -1;
        }
        if(port) *port = ntohs(src.sin_port);
        if(ip) *ip = inet_ntoa(src.sin_addr);
        return servicesock;
    }
    static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(server_port);
        server.sin_addr.s_addr = inet_addr(server_ip.c_str());
        if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;
        else return false;
    }
    ~Sock() {}
};

main.cc

 

#include "selectserver.hpp"
using namespace std;
int main()
{
   SelectServer select;
   cout<<"runring......."<<endl;
   select.start();
}

I/O多路转接之poll

poll初识

poll也是系统提供的一个多路转接接口。

  • poll系统调用也可以让我们的程序同时监视多个文件描述符上的事件是否就绪,和select的定位是一样的,适用场景也是一样的。

poll的工作流程和select是基本类似的,这里我们也实现一个简单poll服务器,该服务器也只是读取客户端发来的数据并进行打印。

log.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4
const char *gLevelMap[] = {
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};
#define LOGFILE "./selectServer.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
    // va_list ap;
    // va_start(ap, format);
    // while()
    // int x = va_arg(ap, int);
    // va_end(ap); //ap=nullptr
    char stdBuffer[1024]; //标准部分
    time_t timestamp = time(nullptr);
    // struct tm *localtime = localtime(&timestamp);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
    char logBuffer[1024]; //自定义部分
    va_list args;
    va_start(args, format);
    // vprintf(format, args);
    vsnprintf(logBuffer, sizeof logBuffer, format, args);
    va_end(args);
    // FILE *fp = fopen(LOGFILE, "a");
    printf("%s%s\n", stdBuffer, logBuffer);
    // fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
    // fclose(fp);
}

pollserver.hpp

#include <iostream>
#include <cstring>
#include <sys/select.h>
#include "log.hpp"
#include "sock.hpp"
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/time.h>
#include <algorithm>
#include <poll.h>
#define NUM 1024
#define FD_NONE -1
using namespace std;
class PollServer
{
public:
  static const int nfds = 100;
public:
 // struct pollfd {
 //     int   fd;         // 文件描述符
 //     short events;     // 需要关注的事件
 //     short revents;    // 实际发生的事件
 // };
//  fd:待轮询的文件描述符。
// events:关注的事件,可以是以下值的组合:
// POLLIN:可读事件(数据可读取)
// POLLOUT:可写事件(数据可写入)
// POLLERR:错误事件(发生错误)
// POLLHUP:挂起事件(连接断开)
// POLLNVAL:无效事件(文件描述符未打开)
// revents:实际发生的事件,在调用 poll 后由系统设置。
// poll 函数将在等待期间阻塞,并返回发生事件的数量,如果超时则返回 0,如果出错则返回 -1。
// 您可以使用 poll 函数来同时监视多个文件描述符,并根据发生的事件采取相应的操作。
  //端口类型设置为16位是因为TCP报文中端口号为16位
  PollServer(const uint16_t &port = 8080):_port(port),_nfds(nfds)
  {
    _listensock = Sock::Socket();
    Sock::Bind(_listensock,_port);
    Sock::Listen(_listensock);
    logMessage(DEBUG,"%s","create base socket success");
    _fds = new struct pollfd[_nfds];
    for(int i = 0;i < NUM;i++) 
    {
      _fds[i].fd = FD_NONE;
      _fds[i].events = _fds[i].revents = 0;
    } 
    _fds[0].fd= _listensock;//规定第一个为监听套接字,需要关注的套接字是什么,这是对象方面
    _fds[0].events = POLLIN;//需要套接字中关注的事件是读事件,这个才是关系的动作
    _timeout = 1000;
    std::cout<<"初始化完成...."<<std::endl;
  } 
  void start()
  {
    while(1)
    {
     int n = poll(_fds,_nfds,_timeout);
     DebugPrint();
     switch (n)
     {
     case 0:
        sleep(1);
        logMessage(DEBUG,"%s","time out...");
        break;
     case -1:
        logMessage(DEBUG,"%s","select error");
        break;
     default:
        //成功
        logMessage(DEBUG,"%s","get a new link event........");
        //成功了的话,如果不去读取的话会一直提醒读取,也就是说链接好的
        //链接会被放在就绪队列中,也就是看到链接在排队,操作系统会一直提醒有连接成功
        //当你要取的时候会取队列的头部连接去执行
        //因为我们的select是检查_listensock有没有获取的连接已经到达Tcp层
        //如果到达的话,就说明可以读走这个链接了,所以我们检查的是IO
        //也就是读到连接的操作,而不是建立连接的操作
        HandlerEvent();
        sleep(1);
        break;
     }
    }
  }
  ~PollServer()
  {
    if(_listensock >= 0) close(_listensock);
    if(!_fds) delete [] _fds;
  }
private:
  uint16_t _port;
  int _listensock;
  int _timeout;
  struct pollfd* _fds;
  int _nfds;
  void HandlerEvent()
  {
    for(int i = 0;i < _nfds;i++)
    {
      //1.去掉不合法的fd,也就是去掉没有建立连接的fd,也就是去掉数组里为FD_NONE
      if(_fds[i].fd == FD_NONE) continue;
      //2.合法的就一定就绪了?,不一定,所以需要FD_ISSET判断是否已经就绪
      if(_fds[i].revents & POLLIN)//判断读事件是否就绪,就是就是一个数字
      {
         //1.如果是监听套接字就绪了,那就是accept
         //2.如果不是的话,那就处理该链接,进行读取函数
         if(_fds[i].fd == _listensock) Accepter();
         else Recver(i);
      }
    }
  }
  void Accepter()
  {
    string clientip;
    uint16_t clientport;
    int sock = Sock::Accept(_listensock,&clientip,&clientport);
    if(sock < 0)
    {
      logMessage(WARNING,"%s %s:%d","accept error",strerror(errno),errno);
      return;
    }
    logMessage(DEBUG,"get a new link success");
    int pos = 0;
    for(;pos < NUM;pos++)
    {
     if(_fds[pos].fd == FD_NONE) break;
    }
     if(pos == NUM)
     {
        logMessage(WARNING, "%s:%d", "select server already full,close: %d", sock);
        close(sock);
     }else
     {
        _fds[pos].fd = sock;
        _fds[pos].events = POLLIN;
     }
  }
  void Recver(int pos)
  {
    logMessage(DEBUG,"message in,get IO event:%d",_fds[pos].fd);
    // 暂时先不做封装, 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞
    // 这样读取有bug吗?有的,你怎么保证以读到了一个完整包文呢?
    char buffer[1024];
    int n = recv(_fds[pos].fd,buffer,sizeof(buffer) - 1,0);
    //不会堵塞,
    if(n > 0)
    {
      buffer[n] = 0;
      logMessage(DEBUG,"client[%d]# %s",_fds[pos].fd,buffer);
    }
    else if(n == 0)
    {
       logMessage(DEBUG, "client[%d] quit, me too...", _fds[pos].fd);
      //对端关闭那么我也关闭
      close(_fds[pos].fd);
      _fds[pos].fd = FD_NONE;
      _fds[pos].events = 0;
    }
    else
    {
      logMessage(WARNING, "%d sock recv error, %d : %s", _fds[pos].fd, errno, strerror(errno));
      close(_fds[pos].fd);
      _fds[pos].fd = FD_NONE;
      _fds[pos].fd = 0;
    }
  }
   void DebugPrint()
    {
        cout << "_fd_array[]: ";
        for(int i = 0; i < NUM; i++)
        {
            if(_fds[i].fd  == FD_NONE) continue;
            cout << _fds[i].fd << " ";
        }
        cout << endl;
    }
};
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1月前
|
存储 监控 Linux
【Linux IO多路复用 】 Linux下select函数全解析:驾驭I-O复用的高效之道
【Linux IO多路复用 】 Linux下select函数全解析:驾驭I-O复用的高效之道
54 0
|
2月前
|
网络协议 安全 测试技术
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
手撕测试tcp服务器效率工具——以epoll和io_uring对比为例
44 2
|
3月前
|
存储 Linux 调度
io复用之epoll核心源码剖析
epoll底层实现中有两个关键的数据结构,一个是eventpoll另一个是epitem,其中eventpoll中有两个成员变量分别是rbr和rdlist,前者指向一颗红黑树的根,后者指向双向链表的头。而epitem则是红黑树节点和双向链表节点的综合体,也就是说epitem即可作为树的节点,又可以作为链表的节点,并且epitem中包含着用户注册的事件。当用户调用epoll_create()时,会创建eventpoll对象(包含一个红黑树和一个双链表);
72 0
io复用之epoll核心源码剖析
|
3月前
|
存储 网络协议
TCP服务器 IO多路复用的实现:select、poll、epoll
TCP服务器 IO多路复用的实现:select、poll、epoll
36 0
|
1月前
|
NoSQL Java Linux
【Linux IO多路复用 】 Linux 网络编程 认知负荷与Epoll:高性能I-O多路复用的实现与优化
【Linux IO多路复用 】 Linux 网络编程 认知负荷与Epoll:高性能I-O多路复用的实现与优化
63 0
|
3月前
|
消息中间件 架构师 Java
性能媲美epoll的io_uring
性能媲美epoll的io_uring
42 0
|
3月前
|
网络协议 架构师 Linux
一文说透IO多路复用select/poll/epoll
一文说透IO多路复用select/poll/epoll
159 0
|
3月前
|
网络协议 Linux
2.1.1网络io与io多路复用select/poll/epoll
2.1.1网络io与io多路复用select/poll/epoll
|
1月前
|
存储 Java 数据处理
|
1月前
|
Java API
java中IO与NIO有什么不同
java中IO与NIO有什么不同