IO多路转接 ——— select、poll、epoll(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: IO多路转接 ——— select、poll、epoll

select初识

select是系统提供的一个多路转接接口。

select系统调用可以让我们的程序同时监视多个文件描述符的上的事件是否就绪。

select的核心工作就是等,当监视的多个文件描述符中有一个或多个事件就绪时,select才会成功返回并将对应文件描述符的就绪事件告知调用者。

 

select基本工作流程
如果我们要实现一个简单的select服务器,该服务器要做的就是读取客户端发来的数据并进行打印,那么这个select服务器的工作流程应该是这样的:

先初始化服务器,完成套接字的创建、绑定和监听。

定义一个fd_array数组用于保存监听套接字和已经与客户端建立连接的套接字,刚开始时就将监听套接字添加到fd_array数组当中。

然后服务器开始循环调用select函数,检测读事件是否就绪,如果就绪则执行对应的操作。

每次调用select函数之前,都需要定义一个读文件描述符集readfds,并将fd_array当中的文件描述符依次设置进readfds当中,表示让select帮我们监视这些文件描述符的读事件是否就绪。

当select检测到数据就绪时会将读事件就绪的文件描述符设置进readfds当中,此时我们就能够得知哪些文件描述符的读事件就绪了,并对这些文件描述符进行对应的操作。

如果读事件就绪的是监听套接字,则调用accept函数从底层全连接队列获取已经建立好的连接,并将该连接对应的套接字添加到fd_array数组当中。

如果读事件就绪的是与客户端建立连接的套接字,则调用read函数读取客户端发来的数据并进行打印输出。

当然,服务器与客户端建立连接的套接字读事件就绪,也可能是因为客户端将连接关闭了,此时服务器应该调用close关闭该套接字,并将该套接字从fd_array数组当中清除,因为下一次不需要再监视该文件描述符的读事件了。

 

log.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4
const char *gLevelMap[] = {
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};
#define LOGFILE "./selectServer.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
    // va_list ap;
    // va_start(ap, format);
    // while()
    // int x = va_arg(ap, int);
    // va_end(ap); //ap=nullptr
    char stdBuffer[1024]; //标准部分
    time_t timestamp = time(nullptr);
    // struct tm *localtime = localtime(&timestamp);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
    char logBuffer[1024]; //自定义部分
    va_list args;
    va_start(args, format);
    // vprintf(format, args);
    vsnprintf(logBuffer, sizeof logBuffer, format, args);
    va_end(args);
    // FILE *fp = fopen(LOGFILE, "a");
    printf("%s%s\n", stdBuffer, logBuffer);
    // fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
    // fclose(fp);
}

selectserver.hpp

#include <iostream>
#include <cstring>
#include <sys/select.h>
#include "log.hpp"
#include "sock.hpp"
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/time.h>
#include <algorithm>
#define NUM 1024
#define FD_NONE -1
using namespace std;
class SelectServer
{
public:
  //端口类型设置为16位是因为TCP报文中端口号为16位
  SelectServer(const uint16_t &port = 8080):_port(port)
  {
    _listensock = Sock::Socket();
    Sock::Bind(_listensock,_port);
    Sock::Listen(_listensock);
    logMessage(DEBUG,"%s","create base socket success");
    for(int i = 0;i < NUM;i++) _fd_array[i] = FD_NONE;
    _fd_array[0] = _listensock;//规定第一个为监听套接字
    std::cout<<"初始化完成...."<<std::endl;
  } 
  void start()
  {
    while(1)
    {
      struct timeval timeout = {0,0};
      fd_set rfds;
      FD_ZERO(&rfds);
      int maxfd = _listensock;
      for(int i = 0;i < NUM;i++)
      {
         if(_fd_array[i] == FD_NONE) continue;
         FD_SET(_fd_array[i],&rfds);
         maxfd = max(maxfd,_fd_array[i]);
      }
     int n = select(maxfd + 1,&rfds,nullptr, nullptr,&timeout);
     DebugPrint();
     switch (n)
     {
     case 0:
        sleep(1);
        logMessage(DEBUG,"%s","time out...");
        break;
     case -1:
        logMessage(DEBUG,"%s","select error");
        break;
     default:
        //成功
        logMessage(DEBUG,"%s","get a new link event........");
        //成功了的话,如果不去读取的话会一直提醒读取,也就是说链接好的
        //链接会被放在就绪队列中,也就是看到链接在排队,操作系统会一直提醒有连接成功
        //当你要取的时候会取队列的头部连接去执行
        //因为我们的select是检查_listensock有没有获取的连接已经到达Tcp层
        //如果到达的话,就说明可以读走这个链接了,所以我们检查的是IO
        //也就是读到连接的操作,而不是建立连接的操作
        HandlerEvent(rfds);
        sleep(1);
        break;
     }
    }
  }
private:
  uint16_t _port;
  int _listensock;
  int _fd_array[NUM];
  void HandlerEvent(const fd_set &rfds)
  {
    for(int i = 0;i < NUM;i++)
    {
      //1.去掉不合法的fd,也就是去掉没有建立连接的fd,也就是去掉数组里为FD_NONE
      if(_fd_array[i] == FD_NONE) continue;
      //2.合法的就一定就绪了?,不一定,所以需要FD_ISSET判断是否已经就绪
      if(FD_ISSET(_fd_array[i],&rfds))
      {
         //1.如果是监听套接字就绪了,那就是accept
         //2.如果不是的话,那就处理该链接,进行读取函数
         if(_fd_array[i] == _listensock) Accepter();
         else Recver(i);
      }
    }
  }
  void Accepter()
  {
    string clientip;
    uint16_t clientport;
    int sock = Sock::Accept(_listensock,&clientip,&clientport);
    if(sock < 0)
    {
      logMessage(WARNING,"%s %s:%d","accept error",strerror(errno),errno);
      return;
    }
    logMessage(DEBUG,"get a new link success");
    int pos = 0;
    for(;pos < NUM;pos++)
    {
     if(_fd_array[pos] == FD_NONE) break;
    }
     if(pos == NUM)
     {
        logMessage(WARNING, "%s:%d", "select server already full,close: %d", sock);
        close(sock);
     }else
     {
        _fd_array[pos] = sock;
     }
  }
  void Recver(int pos)
  {
    logMessage(DEBUG,"message in,get IO event:%d",_fd_array[pos]);
    // 暂时先不做封装, 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞
    // 这样读取有bug吗?有的,你怎么保证以读到了一个完整包文呢?
    char buffer[1024];
    int n = recv(_fd_array[pos],buffer,sizeof(buffer) - 1,0);
    //不会堵塞,
    if(n > 0)
    {
      buffer[n] = 0;
      logMessage(DEBUG,"client[%d]# %s",_fd_array[pos],buffer);
    }
    else if(n == 0)
    {
       logMessage(DEBUG, "client[%d] quit, me too...", _fd_array[pos]);
      //对端关闭那么我也关闭
      close(_fd_array[pos]);
      _fd_array[pos] = FD_NONE;
    }
    else
    {
      logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));
      close(_fd_array[pos]);
      _fd_array[pos] = FD_NONE;
    }
  }
   void DebugPrint()
    {
        cout << "_fd_array[]: ";
        for(int i = 0; i < NUM; i++)
        {
            if(_fd_array[i] == FD_NONE) continue;
            cout << _fd_array[i] << " ";
        }
        cout << endl;
    }
};

sock.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
//全加静态成员让他变成一个方法
class Sock
{
private:
    // listen的第二个参数,意义:底层全连接队列的长度 = listen的第二个参数+1
    const static int gbacklog = 10;
public:
    Sock() {}
    static int Socket()
    {
        int listensock = socket(AF_INET, SOCK_STREAM, 0);
        if (listensock < 0)
        {
            exit(2);
        }
        int opt = 1;
        setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listensock;
    }
    static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof local);
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
        if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            exit(3);
        }
    }
    static void Listen(int sock)
    {
        if (listen(sock, gbacklog) < 0)
        {
            exit(4);
        }
    }
    // 一般经验
    // const std::string &: 输入型参数
    // std::string *: 输出型参数
    // std::string &: 输入输出型参数
    static int Accept(int listensock, std::string *ip, uint16_t *port)
    {
        struct sockaddr_in src;
        socklen_t len = sizeof(src);
        int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
        if (servicesock < 0)
        {
            return -1;
        }
        if(port) *port = ntohs(src.sin_port);
        if(ip) *ip = inet_ntoa(src.sin_addr);
        return servicesock;
    }
    static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(server_port);
        server.sin_addr.s_addr = inet_addr(server_ip.c_str());
        if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;
        else return false;
    }
    ~Sock() {}
};

main.cc

 

#include "selectserver.hpp"
using namespace std;
int main()
{
   SelectServer select;
   cout<<"runring......."<<endl;
   select.start();
}

I/O多路转接之poll

poll初识

poll也是系统提供的一个多路转接接口。

  • poll系统调用也可以让我们的程序同时监视多个文件描述符上的事件是否就绪,和select的定位是一样的,适用场景也是一样的。

poll的工作流程和select是基本类似的,这里我们也实现一个简单poll服务器,该服务器也只是读取客户端发来的数据并进行打印。

log.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4
const char *gLevelMap[] = {
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};
#define LOGFILE "./selectServer.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
    // va_list ap;
    // va_start(ap, format);
    // while()
    // int x = va_arg(ap, int);
    // va_end(ap); //ap=nullptr
    char stdBuffer[1024]; //标准部分
    time_t timestamp = time(nullptr);
    // struct tm *localtime = localtime(&timestamp);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
    char logBuffer[1024]; //自定义部分
    va_list args;
    va_start(args, format);
    // vprintf(format, args);
    vsnprintf(logBuffer, sizeof logBuffer, format, args);
    va_end(args);
    // FILE *fp = fopen(LOGFILE, "a");
    printf("%s%s\n", stdBuffer, logBuffer);
    // fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
    // fclose(fp);
}

pollserver.hpp

#include <iostream>
#include <cstring>
#include <sys/select.h>
#include "log.hpp"
#include "sock.hpp"
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/time.h>
#include <algorithm>
#include <poll.h>
#define NUM 1024
#define FD_NONE -1
using namespace std;
class PollServer
{
public:
  static const int nfds = 100;
public:
 // struct pollfd {
 //     int   fd;         // 文件描述符
 //     short events;     // 需要关注的事件
 //     short revents;    // 实际发生的事件
 // };
//  fd:待轮询的文件描述符。
// events:关注的事件,可以是以下值的组合:
// POLLIN:可读事件(数据可读取)
// POLLOUT:可写事件(数据可写入)
// POLLERR:错误事件(发生错误)
// POLLHUP:挂起事件(连接断开)
// POLLNVAL:无效事件(文件描述符未打开)
// revents:实际发生的事件,在调用 poll 后由系统设置。
// poll 函数将在等待期间阻塞,并返回发生事件的数量,如果超时则返回 0,如果出错则返回 -1。
// 您可以使用 poll 函数来同时监视多个文件描述符,并根据发生的事件采取相应的操作。
  //端口类型设置为16位是因为TCP报文中端口号为16位
  PollServer(const uint16_t &port = 8080):_port(port),_nfds(nfds)
  {
    _listensock = Sock::Socket();
    Sock::Bind(_listensock,_port);
    Sock::Listen(_listensock);
    logMessage(DEBUG,"%s","create base socket success");
    _fds = new struct pollfd[_nfds];
    for(int i = 0;i < NUM;i++) 
    {
      _fds[i].fd = FD_NONE;
      _fds[i].events = _fds[i].revents = 0;
    } 
    _fds[0].fd= _listensock;//规定第一个为监听套接字,需要关注的套接字是什么,这是对象方面
    _fds[0].events = POLLIN;//需要套接字中关注的事件是读事件,这个才是关系的动作
    _timeout = 1000;
    std::cout<<"初始化完成...."<<std::endl;
  } 
  void start()
  {
    while(1)
    {
     int n = poll(_fds,_nfds,_timeout);
     DebugPrint();
     switch (n)
     {
     case 0:
        sleep(1);
        logMessage(DEBUG,"%s","time out...");
        break;
     case -1:
        logMessage(DEBUG,"%s","select error");
        break;
     default:
        //成功
        logMessage(DEBUG,"%s","get a new link event........");
        //成功了的话,如果不去读取的话会一直提醒读取,也就是说链接好的
        //链接会被放在就绪队列中,也就是看到链接在排队,操作系统会一直提醒有连接成功
        //当你要取的时候会取队列的头部连接去执行
        //因为我们的select是检查_listensock有没有获取的连接已经到达Tcp层
        //如果到达的话,就说明可以读走这个链接了,所以我们检查的是IO
        //也就是读到连接的操作,而不是建立连接的操作
        HandlerEvent();
        sleep(1);
        break;
     }
    }
  }
  ~PollServer()
  {
    if(_listensock >= 0) close(_listensock);
    if(!_fds) delete [] _fds;
  }
private:
  uint16_t _port;
  int _listensock;
  int _timeout;
  struct pollfd* _fds;
  int _nfds;
  void HandlerEvent()
  {
    for(int i = 0;i < _nfds;i++)
    {
      //1.去掉不合法的fd,也就是去掉没有建立连接的fd,也就是去掉数组里为FD_NONE
      if(_fds[i].fd == FD_NONE) continue;
      //2.合法的就一定就绪了?,不一定,所以需要FD_ISSET判断是否已经就绪
      if(_fds[i].revents & POLLIN)//判断读事件是否就绪,就是就是一个数字
      {
         //1.如果是监听套接字就绪了,那就是accept
         //2.如果不是的话,那就处理该链接,进行读取函数
         if(_fds[i].fd == _listensock) Accepter();
         else Recver(i);
      }
    }
  }
  void Accepter()
  {
    string clientip;
    uint16_t clientport;
    int sock = Sock::Accept(_listensock,&clientip,&clientport);
    if(sock < 0)
    {
      logMessage(WARNING,"%s %s:%d","accept error",strerror(errno),errno);
      return;
    }
    logMessage(DEBUG,"get a new link success");
    int pos = 0;
    for(;pos < NUM;pos++)
    {
     if(_fds[pos].fd == FD_NONE) break;
    }
     if(pos == NUM)
     {
        logMessage(WARNING, "%s:%d", "select server already full,close: %d", sock);
        close(sock);
     }else
     {
        _fds[pos].fd = sock;
        _fds[pos].events = POLLIN;
     }
  }
  void Recver(int pos)
  {
    logMessage(DEBUG,"message in,get IO event:%d",_fds[pos].fd);
    // 暂时先不做封装, 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞
    // 这样读取有bug吗?有的,你怎么保证以读到了一个完整包文呢?
    char buffer[1024];
    int n = recv(_fds[pos].fd,buffer,sizeof(buffer) - 1,0);
    //不会堵塞,
    if(n > 0)
    {
      buffer[n] = 0;
      logMessage(DEBUG,"client[%d]# %s",_fds[pos].fd,buffer);
    }
    else if(n == 0)
    {
       logMessage(DEBUG, "client[%d] quit, me too...", _fds[pos].fd);
      //对端关闭那么我也关闭
      close(_fds[pos].fd);
      _fds[pos].fd = FD_NONE;
      _fds[pos].events = 0;
    }
    else
    {
      logMessage(WARNING, "%d sock recv error, %d : %s", _fds[pos].fd, errno, strerror(errno));
      close(_fds[pos].fd);
      _fds[pos].fd = FD_NONE;
      _fds[pos].fd = 0;
    }
  }
   void DebugPrint()
    {
        cout << "_fd_array[]: ";
        for(int i = 0; i < NUM; i++)
        {
            if(_fds[i].fd  == FD_NONE) continue;
            cout << _fds[i].fd << " ";
        }
        cout << endl;
    }
};
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1月前
|
Linux C++
Linux C/C++之IO多路复用(poll,epoll)
这篇文章详细介绍了Linux下C/C++编程中IO多路复用的两种机制:poll和epoll,包括它们的比较、编程模型、函数原型以及如何使用这些机制实现服务器端和客户端之间的多个连接。
24 0
Linux C/C++之IO多路复用(poll,epoll)
|
2月前
|
网络协议 Java Linux
高并发编程必备知识IO多路复用技术select,poll讲解
高并发编程必备知识IO多路复用技术select,poll讲解
|
4月前
|
安全 Java Linux
(七)Java网络编程-IO模型篇之从BIO、NIO、AIO到内核select、epoll剖析!
IO(Input/Output)方面的基本知识,相信大家都不陌生,毕竟这也是在学习编程基础时就已经接触过的内容,但最初的IO教学大多数是停留在最基本的BIO,而并未对于NIO、AIO、多路复用等的高级内容进行详细讲述,但这些却是大部分高性能技术的底层核心,因此本文则准备围绕着IO知识进行展开。
165 1
|
4月前
|
存储 Java Unix
(八)Java网络编程之IO模型篇-内核Select、Poll、Epoll多路复用函数源码深度历险!
select/poll、epoll这些词汇相信诸位都不陌生,因为在Redis/Nginx/Netty等一些高性能技术栈的底层原理中,大家应该都见过它们的身影,接下来重点讲解这块内容。
|
5月前
|
Linux C++
c++高级篇(三) ——Linux下IO多路复用之poll模型
c++高级篇(三) ——Linux下IO多路复用之poll模型
|
6月前
|
NoSQL Java Linux
【Linux IO多路复用 】 Linux 网络编程 认知负荷与Epoll:高性能I-O多路复用的实现与优化
【Linux IO多路复用 】 Linux 网络编程 认知负荷与Epoll:高性能I-O多路复用的实现与优化
186 0
|
3月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
4月前
|
Java 大数据
解析Java中的NIO与传统IO的区别与应用
解析Java中的NIO与传统IO的区别与应用
|
2月前
|
Java 大数据 API
Java 流(Stream)、文件(File)和IO的区别
Java中的流(Stream)、文件(File)和输入/输出(I/O)是处理数据的关键概念。`File`类用于基本文件操作,如创建、删除和检查文件;流则提供了数据读写的抽象机制,适用于文件、内存和网络等多种数据源;I/O涵盖更广泛的输入输出操作,包括文件I/O、网络通信等,并支持异常处理和缓冲等功能。实际开发中,这三者常结合使用,以实现高效的数据处理。例如,`File`用于管理文件路径,`Stream`用于读写数据,I/O则处理复杂的输入输出需求。
|
3月前
|
Java 数据处理
Java IO 接口(Input)究竟隐藏着怎样的神秘用法?快来一探究竟,解锁高效编程新境界!
【8月更文挑战第22天】Java的输入输出(IO)操作至关重要,它支持从多种来源读取数据,如文件、网络等。常用输入流包括`FileInputStream`,适用于按字节读取文件;结合`BufferedInputStream`可提升读取效率。此外,通过`Socket`和相关输入流,还能实现网络数据读取。合理选用这些流能有效支持程序的数据处理需求。
46 2