IO多路转接(一)

简介: IO多路转接

一、select

1.1 select初识

select是系统提供的一个多路转接接口


select系统调用可以让程序同时监视多个文件描述符的上的事件是否就绪

select核心工作就是等,当监视的文件描述符中有一个或多个事件就绪时,select才会成功返回并将对应文件描述符的就绪事件告知调用者

1.2 select函数

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

参数说明:


nfds:需要监视的文件描述符中,最大的文件描述符值 + 1

readfds:输入输出型参数,调用时用户告知内核需要监视哪些文件描述符的读事件是否就绪,返回时内核告知用户哪些文件描述符的读事件已就绪

writefds:输入输出型参数,调用时用户告知内核需要监视哪些文件描述符的写事件是否就绪,返回时内核告知用户哪些文件描述符的写事件已就绪

exceptfds:输入输出型参数,调用时用户告知内核需要监视哪些文件描述符的异常事件是否就绪,返回时内核告知用户哪些文件描述符的异常事件已就绪

timeout:输入输出型参数,调用时由用户设置select的等待时间,返回时表示timeout的剩余时间

参数timeout的取值:


NULL/nullptr:select调用后进行阻塞等待,直到被监视的某个文件描述符上的某个事件就绪

0:selec调用后进行非阻塞等待,无论被监视的文件描述符上的事件是否就绪,select检测后都会立即返回

特定的时间值:select调用后在指定的时间内进行阻塞等待,若被监视的文件描述符上一直没有事件就绪,则在该时间后select进行超时返回

返回值说明:


若函数调用成功,则返回事件就绪的文件描述符个数

若timeout时间耗尽,则返回0

若函数调用失败,则返回-1,同时错误码被设置

select调用失败时,错误码可能被设置为:


EBADF:文件描述符为无效的或该文件已关闭

EINTR:此调用被信号所中断

EINVAL:参数nfds为负值

ENOMEM:核心内存不足

fd_set结构


fd_set结构与sigset_t结构类似,fd_set本质也是一个位图,用位图中对应的位来表示要监视的文件描述符


dc861dfc9a1e4b619766a67f0d9b8f88.png


调用select函数之前就需用fd_set结构定义出对应的文件描述符集,然后将需要监视的文件描述符添加到文件描述符集中,这个添加的过程本质就是在进行位操作,但是这个位操作不需要用户自己进行,系统提供了一组专门的接口,用于对fd_set类型的位图进行各种操作

void FD_CLR(int fd, fd_set *set);      //用来清除描述词组set中相关fd的位
int  FD_ISSET(int fd, fd_set *set);    //用来测试描述词组set中相关fd的位是否为真
void FD_SET(int fd, fd_set *set);      //用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set);             //用来清除描述词组set的全部位

timeval结构


传入select函数的最后一个参数timeout,是一个指向timeval结构的指针。timeval结构用于描述一段时间长度,该结构中包含两个成员,其中tv_sec表示的是秒,tv_usec表示的是微秒


1d4423d32bf241f59069e6681c0b4fe5.png


1.3 scoket就绪条件

读就绪


socket内核中,接收缓冲区中的字节数,大于等于低水位标记SO_RCVLOWAT,此时可以无阻塞的读取该文件描述符,并且返回值大于0

socket TCP通信中,对端关闭连接,此时对该socket读,则返回0

监听socket上有新的连接请求

socket上有未处理的错误

写就绪


socket内核中,发送缓冲区中的可用字节数,大于等于低水位标记SO_SNDLOWAT,此时可以无阻塞的写,并且返回值大于0

socket的写操作被关闭(close或者shutdown),此时进行写操作,会触发SIGPIPE信号

socket使用非阻塞connect连接成功或失败之后

socket上有未读取的错误

异常就绪


socket上收到带外数据

注意:带外数据和TCP的紧急模式相关,TCP报头中的URG标志位和16位紧急指针搭配使用,就能够发送/接收带外数据


1.4 select基本工作流程

若要实现一个简单的select服务器,该服务器要做的就是读取客户端发来的数据并进行打印,那么该select服务器的工作流程如下:


先初始化服务器,完成套接字的创建、绑定和监听

定义一个_fd_array数组用于保存监听套接字和已经与客户端建立连接的套接字,初始化时就可将监听套接字添加到_fd_array数组中

然后服务器开始循环调用select函数,检测读事件是否就绪,若就绪则执行对应操作

每次调用select函数之前,都需要定义一个读文件描述符集readfds,并将_fd_array中的文件描述符依次设置进readfds中,表示让select监视这些文件描述符的读事件是否就绪

当select检测到数据就绪时会将读事件就绪的文件描述符设置进readfds中,此时就能够得知哪些文件描述符的读事件就绪,并对这些文件描述符进行对应操作

若读事件就绪的是监听套接字,则调用accept函数从底层全连接队列获取已建立的连接,并将该连接对应的套接字添加到_fd_array数组中

若读事件就绪的是与客户端建立连接的套接字,则调用read函数读取客户端发来的数据并进行打印输出

服务器与客户端建立连接的套接字读事件就绪,也可能是客户端将连接关闭了,此时服务器应该调用close关闭该套接字,并将该套接字从_fd_array数组中清除,不需要再监视该文件描述符的读事件了

注意:


传入select函数的readfds、writefds和exceptfds都是输入输出型参数,当select函数返回时这些参数中的值已经被修改了,因此每次调用select函数时都需对其进行重新设置,timeout也是如此

因为每次调用select函数之前都需要对readfds进行重新设置,所以需要定义一个_fd_array数组保存与客户端已经建立的若干连接和监听套接字,实际_fd_array数组中的文件描述符就是需要让select监视读事件的文件描述符

select服务器只是读取客户端发来的数据,因此只需要让select监视特定文件描述符的读事件,若要同时让select监视特定文件描述符的读事件和写事件,则需要分别定义readfds和writefds,并定义两个数组分别保存需要被监视读事件和写事件的文件描述符,便于每次调用select函数前对readfds和writefds进行重新设置

由于调用select时还需要传入被监视的文件描述符中最大文件描述符值+1,因此每次在遍历_fd_array对readfds进行重新设置时,还需要记录最大文件描述符值

1.5 select服务器

Socket类


编写一个Socket类,对套接字相关的接口进行一定封装,为了让外部能直接调用Socket类中封装的函数,于是将部分函数定义成静态成员函数

//网络套接字封装
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "Log.hpp"
class Socket
{
    const static int gbacklog = 15;
public://服务端客户端通用
    static int SocketCreate() {
        int SocketFd = socket(AF_INET, SOCK_STREAM, 0);
        if(SocketFd < 0) {
            LogMessage(FATAL, "socket create fail, %d:%s", errno, strerror(errno));
            exit(1);
        }
        LogMessage(NORMAL, "socket create success, SocketFd:%d", SocketFd);
        return SocketFd;
    }
public://服务端专用
    static void Bind(int listenSocketFd, uint16_t serverPort, std::string serverIp = "0.0.0.0") {
        struct sockaddr_in local;
        memset(&local, '\0', sizeof local);
        local.sin_family = AF_INET;
        local.sin_port = htons(serverPort);
        inet_pton(AF_INET, serverIp.c_str(), &local.sin_addr);
        if(bind(listenSocketFd, (struct sockaddr*)&local, sizeof local) < 0) {
            LogMessage(FATAL, "bind fail, %d:%s", errno, strerror(errno));
            exit(2);
        }
        LogMessage(NORMAL, "bind success, serverPort:%d", serverPort);
    }
    static void Listen(int listenSocketFd) {
        if(listen(listenSocketFd, gbacklog) < 0) {
            LogMessage(FATAL, "listen fail, %d:%s", errno, strerror(errno));
            exit(3);
        }
        LogMessage(NORMAL, "listen success");
    }
    static int Accept(int listenSocketFd, std::string* clientIp, uint16_t* clientPort) {
        struct sockaddr_in client;
        socklen_t length = sizeof client;
        int serviceSocketFd = accept(listenSocketFd, (struct sockaddr*)&client, &length);
        if(serviceSocketFd < 0) {
            LogMessage(ERROR, "accept fail, %d:%s", errno, strerror(errno));
            exit(4);
        }
        if(clientIp != nullptr) *clientIp = inet_ntoa(client.sin_addr);
        if(clientPort != nullptr) *clientPort = ntohs(client.sin_port);
        return serviceSocketFd;
    }
public://客户端专用
    bool Connect(int clientSocketFd, std::string& serverIp, uint16_t& serverPort) {
        struct sockaddr_in server;
        server.sin_family = AF_INET;
        server.sin_addr.s_addr = inet_addr(serverIp.c_str());
        server.sin_port = htons(serverPort);
        if(connect(clientSocketFd, (struct sockaddr*)&server, sizeof server) == 0) return true;
        else return false;
    }
public:
    Socket() {}
    ~Socket() {}
};


SelectServer类

#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__
#include <iostream>
#include <string>
#include <sys/select.h>
#include "Socket.hpp"
#include "Log.hpp"
#include <unistd.h>
#include <cstring>
#include <cerrno>
#include <sys/time.h>
using namespace std;
#define BITS 8
#define NUM (sizeof(fd_set) * BITS)
#define FD_NONE -1
// 只完成读取,写入和异常不做处理
class SelectServer
{
public:
    SelectServer(const uint16_t &port = 9090) : _port(port)
    {
        _listenSocketFd = Socket::SocketCreate();
        Socket::Bind(_listenSocketFd, _port);
        Socket::Listen(_listenSocketFd);
        LogMessage(DEBUG, "create base socket success");
        _fd_array[0] = _listenSocketFd;
        for (int i = 1; i < NUM; ++i)
            _fd_array[i] = FD_NONE;
    }
    ~SelectServer()
    {
        if (_listenSocketFd > 0)
            close(_listenSocketFd);
    }
public:
    void Start()
    {
        while (true)
        {
            DebugPrint();
            fd_set readfds;
            FD_ZERO(&readfds);
            int maxFd = _listenSocketFd;
            for (int i = 0; i < NUM; ++i)
            {
                if (_fd_array[i] == FD_NONE)
                    continue;
                else
                    FD_SET(_fd_array[i], &readfds);
                if (maxFd < _fd_array[i]) maxFd = _fd_array[i];
            }
            int number = select(maxFd + 1, &readfds, nullptr, nullptr, nullptr);
            switch (number)
            {
            case 0:
                LogMessage(DEBUG, "%s", "Time Out ...");
                break;
            case -1:
                LogMessage(WARNING, "Select Fail: %d : %s", errno, strerror(errno));
                break;
            default:
                LogMessage(DEBUG, "Get a event");
                HandlerEvent(readfds);
                break;
            }
        }
    }
private:
    void Accepter()
    {
        string clientIp;
        uint16_t clientPort = 0;
        int socketfd = Socket::Accept(_listenSocketFd, &clientIp, &clientPort);
        if (socketfd < 0)
        {
            LogMessage(ERROR, "accept error");
            return;
        }
        LogMessage(DEBUG, "Get a link success : [%s : %d] , socketFd : %d", clientIp.c_str(), clientPort, socketfd);
        int pos = 1;
        for (; pos < NUM; ++pos)
            if (_fd_array[pos] == FD_NONE) break;
        if (pos == NUM) { // 满了
            LogMessage(ERROR, "%s:%d", "SelectServer already full, close:", socketfd);
            close(socketfd);
        }
        else { // 找到空位置
            _fd_array[pos] = socketfd;
        }
    }
    void Recver(int i) 
    {
        LogMessage(DEBUG, "message in , get IO event:%d", _fd_array[i]);
        char buffer[1024];
        int num = recv(_fd_array[i], buffer, sizeof(buffer) - 1, 0);
        if(num > 0) {
            buffer[num] = 0;
            LogMessage(DEBUG, "client[%d]#%s", _fd_array[i], buffer);
        }
        else if(num == 0) {
            LogMessage(DEBUG, "client[%d] link close, me too...", _fd_array[i]);
            close(_fd_array[i]);
            _fd_array[i] = FD_NONE;
        }
        else {
            LogMessage(WARNING, "%d recv error, %d : %s", _fd_array[i], errno, strerror(errno));
            close(_fd_array[i]);
            _fd_array[i] = FD_NONE;
        }
    }
    void HandlerEvent(const fd_set &readfds)
    {
        for (int i = 0; i < NUM; ++i)
        {
            // 去掉不合法的fd
            if (_fd_array[i] == FD_NONE) continue;
            // 判断是否就绪
            if (FD_ISSET(_fd_array[i], &readfds))
            {
                if (i == 0 && _fd_array[i] == _listenSocketFd) Accepter(); //链接事件
                else  Recver(i);// 读事件
            }
        }
    }
    void DebugPrint()
    {
        cout << "_fd_array[]:";
        for (int i = 0; i < NUM; ++i) {
            if (_fd_array[i] != FD_NONE) cout << _fd_array[i] << " ";
        }
        cout << endl;
    }
private:
    uint16_t _port;
    int _listenSocketFd;
    int _fd_array[NUM];
};
#endif


当调用accept函数从底层获取上来连接后,不能立即调用read函数读取该连接中的数据,因为此时新连接中的数据可能并没就绪,若直接调用read函数可能阻塞,应该将这个等待过程交给select函数来完成,因此在获取完连接后直接将该连接对应的文件描述符添加到_fd_array数组中即可,当该连接的读事件就绪时再进行数据读取

添加文件描述符到fd_array数组中,本质就是遍历fd_array数组,找到一个没有被使用的位置将该文件描述符添加进去。但有可能_fd_array数组中全部的位置都已被占用,那么文件描述符就会添加失败,此时就只能将刚获取上来的连接对应的套接字进行关闭,因为此时服务器已经没有能力处理这个连接了

select服务器测试


使用telnet工具连接服务器,此时通过telnet向服务器发送的数据就能够被服务器读到并且打印输出了


c5b977d7047242738b02c638cb11c556.png


虽然SelectServer仅是一个单进程、单线程服务器,但却可以同时为多个客户端提供服务,因为select函数调用后会告知select服务器是哪个客户端对应的连接事件就绪,此时select服务器就可以读取对应客户端发来的数据,读取完后又会调用select函数等待某个客户端连接的读事件就绪

ec3922fdd90f4896ad38facb279454de.png



当服务器检测到客户端退出后,也会关闭对应的连接,并将对应的套接字从_fd_array数组中清除


c0710e025bab419b9615e60f8f44dbf8.png


存在的问题


select服务器若要向客户端发送数据,不能直接调用write函数,因为调用write函数时实际也分为"等"和"拷贝"两步,也应将"等"的这个过程交给select函数,因此在每次调用select函数之前,除了需要重新设置readfds还需要重新设置writefds,并且还需要一个数组来保存需被监视写事件是否就绪的文件描述符,当某一文件描述符的写事件就绪时才能够调用write函数向客户端发送数据

没有定制协议。代码中读取数据时并没有按照某种规则进行读取,可能造成粘包问题,根本原因就是没有定制协议。如HTTP协议规定在读取底层数据时读取到空行就表明读完了一个HTTP报头,此时再根据HTTP报头中的Content-Length属性得知正文的长度,最终就能够读取到一个完整的HTTP报文,HTTP协议通过这种方式避免了粘包问题

没有对应的输入输出缓冲区。代码中直接将读取的数据存储到了字符数组buffer中,这是不严谨的,因为本次数据读取可能并没有读取到一个完整的报文,此时服务器就不能进行数据的分析处理,应该将读取到的数据存储到一个输入缓冲区中,当读取到一个完整的报文后再让服务器进行处理。此外,若服务器要能够对客户端进行响应,那么服务器的响应数据也不应该直接调用write函数发送给客户端,应该先存储到一个输出缓冲区中,因为响应数据可能很庞大,无法一次发送完毕,可能需要进行分批发送

综上所述,本博客中的SelectServer仅仅是一个Demo,用于理解select函数的使用


1.6 select的优点

可以同时等待多个文件描述符,且只负责等待,实际的IO操作由accept、read、write等接口完成,保证接口在进行IO操作时不会被阻塞

select同时等待多个文件描述符,因此可以将"等"的时间重叠,提高IO效率

上述优点也是所有多路转接接口的优点


1.7 select的缺点

每次调用select,都需手动设置fd集合,从接口使用角度来说也非常不便

每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大

同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

select可监控的文件描述符数量太少

select可监控的文件描述符个数


调用select函数时传入的readfds、writefds以及exceptfds都是fd_set结构,fd_set结构本质是一个位图,用一个bit位来标记一个文件描述符,因此select可监控的文件描述符个数取决于fd_set类型的bit位个数

#include <iostream>
#include <sys/types.h>
using namespace std;
int main()
{
  cout << sizeof(fd_set)* 8 << endl;//1字节 8bit位
  return 0;
}

运行代码后可以发现,select可监控的文件描述符个数为1024


7e62d0b32c6f48928028a167e9a5b93d.png


一个进程能打开的文件描述符个数


进程控制块task_struct中有一个files指针,该指针指向一个struct files_struct结构,进程的文件描述符表fd_array就存储在该结构中,其中文件描述符表fd_array的大小定义为NR_OPEN_DEFAULT,NR_OPEN_DEFAULT的值实际就是32


但不意味着一个进程最多只能打开32个文件描述符,进程能打开的文件描述符个数是可以扩展的,通过ulimit -a命令可以看到进程能打开的文件描述符上限


2a2f5a84de124f078747f327a522d643.png


select可监控的文件描述符个数是1024,除去监听套接字,那么最多只能连接1023个客户端


1.8 select的适用场景

多路转接接口select、poll和epoll,需在一定的场景下使用,若场景不适宜,可能会适得其反


多路转接接口一般适用于多连接,且多连接中只有少部分连接比较活跃。因为少量连接比较活跃,也意味着几乎所有的连接在进行IO操作时,都需要花费大量时间来等待事件就绪,此时使用多路转接接口就可以将这些等的事件进行重叠,提高IO效率。

对于多连接中大部分连接都很活跃的场景,其实并不适合使用多路转接。因为每个连接都很活跃,也意味着任何时刻每个连接上的事件基本都是就绪的,此时根本不需要动用多路转接接口来进行等待,毕竟使用多路转接接口也是需要花费系统的时间和空间资源的

多连接中只有少量连接是比较活跃的,如聊天工具,登录QQ后大部分时间其实是没有聊天的,此时服务器端不可能调用一个read函数阻塞等待读事件就绪


多连接中大部分连接都很活跃,如企业当中进行数据备份时,两台服务器之间不断在交互数据,这时连接是特别活跃的,几乎不需要等的过程,也就没必要使用多路转接接口了


目录
相关文章
|
数据处理 C语言
网络IO 多路IO复用 之 epoll
网络IO 多路IO复用 之 epoll
网络IO 多路IO复用 之 select
网络IO 多路IO复用 之 select
|
存储 NoSQL Linux
计算机网络 | IO多路转接技术 | select详解
计算机网络 | IO多路转接技术 | select详解
79 0
|
安全 应用服务中间件 Linux
IO多路转接(三)
IO多路转接
57 0
|
监控
IO多路转接(二)
IO多路转接
92 0
|
3月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
4月前
|
Java 大数据
解析Java中的NIO与传统IO的区别与应用
解析Java中的NIO与传统IO的区别与应用
|
2月前
|
Java 大数据 API
Java 流(Stream)、文件(File)和IO的区别
Java中的流(Stream)、文件(File)和输入/输出(I/O)是处理数据的关键概念。`File`类用于基本文件操作,如创建、删除和检查文件;流则提供了数据读写的抽象机制,适用于文件、内存和网络等多种数据源;I/O涵盖更广泛的输入输出操作,包括文件I/O、网络通信等,并支持异常处理和缓冲等功能。实际开发中,这三者常结合使用,以实现高效的数据处理。例如,`File`用于管理文件路径,`Stream`用于读写数据,I/O则处理复杂的输入输出需求。