IO多路转接 ——— select、poll、epoll(下)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: IO多路转接 ——— select、poll、epoll

sock.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
class Sock
{
private:
    // listen的第二个参数,意义:底层全连接队列的长度 = listen的第二个参数+1
    const static int gbacklog = 10;
public:
    Sock() {}
    static int Socket()
    {
        int listensock = socket(AF_INET, SOCK_STREAM, 0);
        if (listensock < 0)
        {
            exit(2);
        }
        int opt = 1;
        setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listensock;
    }
    static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof local);
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
        if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            exit(3);
        }
    }
    static void Listen(int sock)
    {
        if (listen(sock, gbacklog) < 0)
        {exit(4);}
    }
    // 一般经验
    // const std::string &: 输入型参数
    // std::string *: 输出型参数
    // std::string &: 输入输出型参数
    static int Accept(int listensock, std::string *ip, uint16_t *port)
    {
        struct sockaddr_in src;
        socklen_t len = sizeof(src);
        int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
        if (servicesock < 0)
        {
            return -1;
        }
        if(port) *port = ntohs(src.sin_port);
        if(ip) *ip = inet_ntoa(src.sin_addr);
        return servicesock;
    }
    static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(server_port);
        server.sin_addr.s_addr = inet_addr(server_ip.c_str());
        if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;
        else return false;
    }
    ~Sock() {}
};

main.cc

#include "pollserver.hpp"
using namespace std;
int main()
{
   PollServer pollserver;
   cout<<"runring......."<<endl;
   pollserver.start();
}

/O多路转接之epoll
epoll初识
epoll也是系统提供的一个多路转接接口。

epoll系统调用也可以让我们的程序同时监视多个文件描述符上的事件是否就绪,与select和poll的定位是一样的,适用场景也相同。

epoll在命名上比poll多了一个e,这个e可以理解成是extend,epoll就是为了同时处理大量文件描述符而改进的poll。

epoll在2.5.44内核中被引进,它几乎具备了select和poll的所有优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll的相关系统调用

epoll有三个相关的系统调用,分别是epoll_create、epoll_ctl和epoll_wait。

epoll工作原理

 

epoll.hpp

#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
using namespace std;
class Epoll
{
public:
   static const int gsize = 256;
public:
   static int CreateEpoll()
   {
       int epfd = epoll_create(gsize);
       if(epfd > 0) return epfd;
       exit(5);
   }
   static bool CtrlEpoll(int epfd,int oper,int sock,uint32_t events)
   {
      struct epoll_event ev;
      ev.events = events;
      ev.data.fd = sock;
      int n = epoll_ctl(epfd,oper,sock,&ev);
      return n == 0;
   }
   static int WaitEpoll(int epfd,struct epoll_event* revs,int num,int timeout)
   {
            // 细节1:如果底层就绪的sock非常多,revs承装不下,怎么办??不影响!一次拿不完,就下一次再拿
        // 细节2:关于epoll_wait的返回值问题:有几个fd上的事件就绪,就返回几,epoll返回的时候,会将所有
        //       就绪的event按照顺序放入到revs数组中!一共有返回值个!
      return epoll_wait(epfd,revs,num,timeout);
   }
};

log.hpp

#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4
const char *gLevelMap[] = {
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};
#define LOGFILE "./selectServer.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
    // va_list ap;
    // va_start(ap, format);
    // while()
    // int x = va_arg(ap, int);
    // va_end(ap); //ap=nullptr
    char stdBuffer[1024]; //标准部分
    time_t timestamp = time(nullptr);
    // struct tm *localtime = localtime(&timestamp);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
    char logBuffer[1024]; //自定义部分
    va_list args;
    va_start(args, format);
    // vprintf(format, args);
    vsnprintf(logBuffer, sizeof logBuffer, format, args);
    va_end(args);
    // FILE *fp = fopen(LOGFILE, "a");
    printf("%s%s\n", stdBuffer, logBuffer);
    // fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
    // fclose(fp);
}

sock.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
//全加静态成员让他变成一个方法
class Sock
{
private:
    // listen的第二个参数,意义:底层全连接队列的长度 = listen的第二个参数+1
    const static int gbacklog = 10;
public:
    Sock() {}
    static int Socket()
    {
        int listensock = socket(AF_INET, SOCK_STREAM, 0);
        if (listensock < 0)
        {
            exit(2);
        }
        int opt = 1;
        setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listensock;
    }
    static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof local);
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
        if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            exit(3);
        }
    }
    static void Listen(int sock)
    {
        if (listen(sock, gbacklog) < 0)
        {
            exit(4);
        }
    }
    // 一般经验
    // const std::string &: 输入型参数
    // std::string *: 输出型参数
    // std::string &: 输入输出型参数
    static int Accept(int listensock, std::string *ip, uint16_t *port)
    {
        struct sockaddr_in src;
        socklen_t len = sizeof(src);
        int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
        if (servicesock < 0)
        {
            return -1;
        }
        if(port) *port = ntohs(src.sin_port);
        if(ip) *ip = inet_ntoa(src.sin_addr);
        return servicesock;
    }
    static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(server_port);
        server.sin_addr.s_addr = inet_addr(server_ip.c_str());
        if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;
        else return false;
    }
    ~Sock() {}
};

epollserver.hpp

#include "sock.hpp"
#include "log.hpp"
#include <memory>
#include <sys/types.h>
#include <cstring>
#include <functional>
#include <string>
#include "epoll.hpp"
using namespace std;
const static int default_port = 8080;
const static int gnum = 64;
using func_t = function<void(string)>;
class EpollServer
{
public:
     EpollServer(func_t HandlerRequese,const int &port = default_port)
     :_port(port),_HandlerRequest(HandlerRequese),_revs_num(gnum)
     {
        // 0. 申请对应的空间
       _revs = new struct epoll_event[_revs_num];
         // 1. 创建listensock
        _listensock = Sock::Socket();
        Sock::Bind(_listensock,_port);
        Sock::Listen(_listensock);
       //2.创建epoll模型
       _epfd = Epoll::CreateEpoll();
       logMessage(DEBUG,"init success,listensock: %d,epfd: %d",_listensock,_epfd);
       //3.将listensock加入到epoll模型中,以关系读的事务加入
       if(!Epoll::CtrlEpoll(_epfd,EPOLL_CTL_ADD,_listensock,EPOLLIN)) exit(6);//
       //EPOLLIN是读事务
      logMessage(DEBUG, "add listensock to epoll success."); // 3, 4
     }
     void Accepter(int listensock)
     {
       string clientip;
       uint16_t clientport;
       int sock = Sock::Accept(listensock,&clientip,&clientport);
       if(sock < 0) 
       {
          logMessage(WARNING, "accept error!");
                return;
       }
         // 能不能直接读取?不能,因为你并不清楚,底层是否有数据!
            // 将新的sock,添加给epoll
        if(!Epoll::CtrlEpoll(_epfd,EPOLL_CTL_ADD,sock,EPOLLIN)) exit(6);
       logMessage(DEBUG, "add new sock : %d to epoll success", sock);   
     }
     void Recver(int sock)
     {
       char buffer[10240];
       ssize_t n = recv(sock,buffer,sizeof(buffer) - 1,0);
       if(n > 0)
       {
        //假设这里就是读到了一个完整的报文 // 如何保证??
        buffer[n] = 0;
        _HandlerRequest(buffer); // 2. 处理数据
       }
       else if(n == 0)
       {
          // 1. 先在epoll中去掉对sock的关
         bool res = Epoll::CtrlEpoll(_epfd,EPOLL_CTL_DEL,sock,0);
         assert(res);
         (void)res;
         //2.在close文件
         close(sock);
         logMessage(NORMAL,"client %d quit,me too...",sock);
       }
       else
       {
        //1.先在epoll中去掉对sock的关心
        bool res = Epoll::CtrlEpoll(_epfd,EPOLL_CTL_DEL,sock,0);
        assert(res);
         (void)res;
         close(sock);
         logMessage(NORMAL, "client recv %d error, close error sock", sock);
       }
     }
     void HandlerEvents(int n)
     {
      assert(n > 0);
      for(int i = 0;i < n;i++)
      {
        uint32_t revents = _revs[i].events;
        int sock = _revs[i].data.fd;//看看是哪个fd就绪了
        if(revents & EPOLLIN)
        {
          if(sock == _listensock) Accepter(_listensock);
          else Recver(sock);
        }
        if(revents & EPOLLOUT)
        {
            //TODO
        }
      }
     }
     void looponce(int timeout)
     {
       int n = Epoll::WaitEpoll(_epfd,_revs,_revs_num,timeout);
        //if(n == _revs_num) //扩容
       switch (n)//返回值n,代表有一个关心的事务就绪
       {
         case 0:
              logMessage(DEBUG, "timeout..."); // 3, 4
              break;
         case -1:
              logMessage(WARNING, "epoll wait error: %s", strerror(errno));
              break;
         default:
         //等待成功,有至少一个关系的事务已经就绪
         //去执行
         logMessage(DEBUG, "get a event");
         HandlerEvents(n);
         break;
       }
     }
     void start()
     {
        int timeout = 1000;
        while(1)
        {
          looponce(timeout);
        }
     }
     ~EpollServer()
     {
        if (_listensock >= 0)
                close(_listensock);
            if (_epfd >= 0)
                close(_epfd);
            if (_revs)
                delete[] _revs;
     }
private:
     int _listensock;
     uint16_t _port;
     int _epfd;
     struct epoll_event* _revs;
     int _revs_num;
     func_t _HandlerRequest;
};

main.cc

#include "epollserver.hpp"
#include <memory>
#include <iostream>
#include <string>
using namespace std;
void change(string str)
{
    cout<<str<<endl;
}
int main()
{
   EpollServer epollserver(change);
   cout<<"runring......."<<endl;
   epollserver.start();
}
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1月前
|
网络协议 安全 Linux
Linux C/C++之IO多路复用(select)
这篇文章主要介绍了TCP的三次握手和四次挥手过程,TCP与UDP的区别,以及如何使用select函数实现IO多路复用,包括服务器监听多个客户端连接和简单聊天室场景的应用示例。
92 0
|
1月前
|
Linux C++
Linux C/C++之IO多路复用(poll,epoll)
这篇文章详细介绍了Linux下C/C++编程中IO多路复用的两种机制:poll和epoll,包括它们的比较、编程模型、函数原型以及如何使用这些机制实现服务器端和客户端之间的多个连接。
25 0
Linux C/C++之IO多路复用(poll,epoll)
|
2月前
|
网络协议 Java Linux
高并发编程必备知识IO多路复用技术select,poll讲解
高并发编程必备知识IO多路复用技术select,poll讲解
|
4月前
|
安全 Java Linux
(七)Java网络编程-IO模型篇之从BIO、NIO、AIO到内核select、epoll剖析!
IO(Input/Output)方面的基本知识,相信大家都不陌生,毕竟这也是在学习编程基础时就已经接触过的内容,但最初的IO教学大多数是停留在最基本的BIO,而并未对于NIO、AIO、多路复用等的高级内容进行详细讲述,但这些却是大部分高性能技术的底层核心,因此本文则准备围绕着IO知识进行展开。
165 1
|
4月前
|
存储 Java Unix
(八)Java网络编程之IO模型篇-内核Select、Poll、Epoll多路复用函数源码深度历险!
select/poll、epoll这些词汇相信诸位都不陌生,因为在Redis/Nginx/Netty等一些高性能技术栈的底层原理中,大家应该都见过它们的身影,接下来重点讲解这块内容。
|
3月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
4月前
|
Java 大数据
解析Java中的NIO与传统IO的区别与应用
解析Java中的NIO与传统IO的区别与应用
|
2月前
|
Java 大数据 API
Java 流(Stream)、文件(File)和IO的区别
Java中的流(Stream)、文件(File)和输入/输出(I/O)是处理数据的关键概念。`File`类用于基本文件操作,如创建、删除和检查文件;流则提供了数据读写的抽象机制,适用于文件、内存和网络等多种数据源;I/O涵盖更广泛的输入输出操作,包括文件I/O、网络通信等,并支持异常处理和缓冲等功能。实际开发中,这三者常结合使用,以实现高效的数据处理。例如,`File`用于管理文件路径,`Stream`用于读写数据,I/O则处理复杂的输入输出需求。
|
3月前
|
Java 数据处理
Java IO 接口(Input)究竟隐藏着怎样的神秘用法?快来一探究竟,解锁高效编程新境界!
【8月更文挑战第22天】Java的输入输出(IO)操作至关重要,它支持从多种来源读取数据,如文件、网络等。常用输入流包括`FileInputStream`,适用于按字节读取文件;结合`BufferedInputStream`可提升读取效率。此外,通过`Socket`和相关输入流,还能实现网络数据读取。合理选用这些流能有效支持程序的数据处理需求。
48 2
|
3月前
|
XML 存储 JSON
【IO面试题 六】、 除了Java自带的序列化之外,你还了解哪些序列化工具?
除了Java自带的序列化,常见的序列化工具还包括JSON(如jackson、gson、fastjson)、Protobuf、Thrift和Avro,各具特点,适用于不同的应用场景和性能需求。