I/O多路复用【Linux/网络】(C++实现select、poll和epoll服务器)(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: I/O多路复用【Linux/网络】(C++实现select、poll和epoll服务器)

阅读前导:

“I/O 多路复用”处于知识树中网络和操作系统的最后,因此本文默认读者有计算机网络和操作系统的基础。

1. 引入:C10K 问题

c10k 问题是指如何让一个服务器同时处理超过 10000 个客户端的连接,这是一个网络编程中的经典挑战。

切入点是一个进程或线程一次只能维护一个链接,也就是一个进程或线程一次只能对一个文件操作。要解决服务端同时处理多个链接,自然而然地想到用多进程或多线程。并且在处理意见数据接收场景时,我们通常会选择阻塞式等待(它是同步的),这是因为阻塞式等待不会占用 CPU 资源,非阻塞忙轮询占用 CPU 和 OS 资源。

问题出在两方面:

  1. 传统的同步阻塞 I/O 模型(如 read、recv 等 I/O 接口)无法同时处理多个数据请求,也就是一次只能处理一个 I/O 事件。
  2. 如果需要为每个连接创建一个进程或线程,这会消耗大量的系统资源和上下文切换开销。

作为一个服务器,它首先要实现读取客户端发送的数据,才能进行数据处理等后续操作。而实现这个读取的操作,也是要讲究效率的,它方式而不同(以读取为例):

  1. 单进程模式:最简单的服务器实现方式,但是 recv、read 这样的 I/O 系统调用默认是阻塞式的,如果客户端只发送连接请求而不发送数据,会使进程阻塞,占用 CPU 和系统资源。并发性是最低的。
  2. 多线程模式:主进程创建线程来阻塞式地等待读取事件就绪,虽然服务器不会被阻塞,但是它创建的线程依然是阻塞的,线程资源的申请和回收也会占用系统资源。由于这个原因,多线程模式的并发性受限于机器的性能。
  3. 线程池模式:主进程预先创建若干个线程,用队列控制它们执行或等待任务,这虽然解决了多线程占用系统资源的问题,但是线程的数量是有限的,如果大量线程 recv、read 系统调用发生阻塞,那么也会造成同样的问题。解决办法只有把它们的操作修改为非阻塞模式,但问题又来了:线程如何得知什么时候读取事件就绪呢?
  • 轮询:还是那句话,轮询会消耗 CPU 资源,过多的线程会降低效率。
  • 事件驱动:服务器处理数据的本质是 I/O,I/O 的本质是“等待事件就绪”+“数据拷贝”。上面这些做法都是上层用户进程在做这两件事,影响效率的就是这个“等”。事件驱动就是将“等”这件事交给内核去做,用户进程只需要将要等待“事件”的文件描述符交给内核关心,在这期间可以做其他事情。直到事件就绪,内核通知上层应用程序。

事件驱动就是 I/O 多路复用。

2. 什么是 I/O 多路复用

I/O 多路复用(也叫多路转接)是一种解决方案,它可以让一个进程或线程同时监控多个文件描述符(通常是网络套接字),并在其中一个或多个文件描述符准备好进行 I/O 操作时(至少一个),通知应用程序进行相应的读写操作。这样,应用程序可以在等待数据的过程中执行其他任务,而不会被阻塞,从而提高了程序的性能和响应速度。

I/O 多路复用的实现方式有多种,比如 select,poll,epoll 等,它们各有优缺点,具体的选择要根据应用场景和需求来决定。

在稍后的学习过程中,我们会注意到这些 I/O 多路复用接口的参数不再像诸如 read、recv 等传统 I/O(它们默认是阻塞的)一样,它们是一个文件描述符数组,而不是单个文件描述符。

这就好像上学时老师总会定几个组长,这样每次收作业时老师只需要等这几个组长,但实际上等待不同组的同学上交作业的时间是有重叠的,这样便节省了时间。

2.1 socket 就绪条件

socket 就绪条件是指在使用 I/O 多路复用的方式来监控多个文件描述符时,判断哪些文件描述符已经准备好进行 I/O 操作(如读或写)的条件。不同的 I/O 模型和文件描述符类型可能有不同的就绪条件,但一般来说,可以分为以下几种情况:

  • 一个文件描述符准备好,当满足以下条件之一时:
  • 该文件描述符接收缓冲区中的数据字节数大于等于其接收缓冲区低水位标记的当前大小(SO_RCVLOWAT)。这意味着对这样的文件描述符执行读操作不会阻塞,并返回一个大于 0 的值(也就是可读数据的大小)。
  • 该连接的读半部关闭(也就是接收了 FIN 的 TCP 连接)。对这样的文件描述符的读操作将不阻塞并返回 0(也就是 EOF)。
  • 该文件描述符是一个监听套接字且已完成的连接数不为 0。对这样的文件描述符的 accept 操作通常不会阻塞。
  • 该文件描述符上有一个未处理的错误。对这样的文件描述符的读操作将不阻塞并返回 -1(也就是一个错误),同时把 errno 设置成确切的错误条件。
  • 一个文件描述符准备好,当满足以下条件之一时:
  • 该文件描述符发送缓冲区中的可用空间字节数大于等于其发送缓冲区低水位标记的当前大小(SO_SNDLOWAT),并且该文件描述符已经成功连接(TCP)或者不需要连接(UDP)。这意味着对这样的文件描述符执行写操作不会阻塞,并返回一个正值(例如由传输层接收的字节数)。
  • 该连接的写半部关闭(也就是主动发送 FIN 的 TCP 连接)。对这样的文件描述符的写操作将产生 SIGPIPE 信号。
  • 使用非阻塞的 connect 的套接字已建立连接,或者已经以失败告终
  • 该文件描述符上有一个未处理的错误。对这样的文件描述符的写操作将不阻塞并返回 -1(也就是一个错误),同时把 errno 设置成确切的错误条件。
  • 异常就绪:
  • socket 上收到带外数据。

[注] 带外数据和 TCP 的紧急模式相关,TCP 报头中的 URG 标志位和 16 位紧急指针搭配使用,就能够发送/接收带外数据。

3. select

3.1 select 函数

select 函数的名称的含义是:它可以从一组文件描述符中选择出那些已经准备好的文件描述符,然后返回给应用程序。

函数原型:

#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

参数:

  • nfds 是一个整数值,表示集合中所有文件描述符的范围,即所有文件描述符的最大值+1。
  • readfds 是一个指向 fd_set 结构的指针,表示要监视的可读文件描述符的集合。
  • writefds 是一个指向 fd_set 结构的指针,表示要监视的可写文件描述符的集合。
  • exceptfds 是一个指向 fd_set 结构的指针,表示要监视的异常文件描述符的集合。
  • timeout 是一个指向 struct timeval 结构的指针,表示 select 函数的超时时间(即等待时间)。
  • 阻塞式:如果为 NULL 或 nullptr,表示无限等待;
  • 非阻塞式:如果(都)为 0,表示不等待,直接返回;
  • 规定时间内:如果为正值,表示等待的秒数和微秒数。

fd_set 是一个位图结构,它的不同标志位用来记录被监视的文件描述符的属性,如可读、可写或异常状态等,它的大小固定是 128 字节,最多 能够记录 128 * 8 = 1024 个文件描述符。原型:

#include <sys/select.h>
typedef struct {
    long int fds_bits[32]; // 一个长整型数组,每一位对应一个文件描述符
} fd_set;

因此在调用 select 函数之前,需要用 fd_set 定义一个文件描述符集合(也就是数组),以供后续添加要监视的文件描述符。

系统提供了一些接口(它们是宏实现的)来操作 fd_set 结构,如:

void FD_CLR(int fd, fd_set *set);      // 用来清除描述词组 set 中相关 fd 的位
int  FD_ISSET(int fd, fd_set *set);    // 用来测试描述词组 set 中相关 fd 的位是否为真
void FD_SET(int fd, fd_set *set);      // 用来设置描述词组 set 中相关 fd 的位
void FD_ZERO(fd_set *set);             // 用来清除描述词组 set 的全部位

参数 timeout 指向的结构体包含秒和毫属性:

struct timeval {
    time_t tv_sec; // seconds
    long tv_usec; // microseconds
};

值得注意的是,除了第一个 nfds 参数之外,剩下的四个参数都是输入输出型参数:

  • 输入时:用户告知内核应该要关心哪些文件描述符对应的事件(读,写或异常);
  • 输出时:内核告知用户,它关心的文件描述符对应的事件中的某些事件已经就绪。

具体细节将会在代码中体现。

返回值(整数):

  • 成功:返回准备好的文件描述符个数;
  • 失败:
  • 超时:返回 0;
  • 出错:返回-1,设置错误码 errno。

其中,出错后错误码可能会被设置为:

  • EBADF:文件描述符为无效的或该文件已关闭。
  • EINTR:此调用被信号所中断。
  • EINVAL:参数 nfds 为负值。
  • ENOMEM:核心内存不足。

3.2 select 服务器

Sock 类

由于本节是网络部分中靠后的知识点,因此 socket 套接字的编写不是本节的重点,因此将它们封装为一个 Sock 类,以供后续使用。

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
// 注:为了方便使用,并且将重点放在 select Server 的编写上,
// 所有接口都设置为静态,通过 类名:: 函数名 调用
class Sock
{
private:
    const static int gbacklog = 20;
public:
    Sock() {}
    static int Socket()
    {
        int listensock = socket(AF_INET, SOCK_STREAM, 0);
        if (listensock < 0)
        {
            exit(2);
        }
        int opt = 1;
        setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listensock;
    }
    static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof local);
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
        if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            exit(3);
        }
    }
    static void Listen(int sock)
    {
        if (listen(sock, gbacklog) < 0)
        {
            exit(4);
        }
    }
    static int Accept(int listensock, std::string *ip, uint16_t *port)
    {
        struct sockaddr_in src;
        socklen_t len = sizeof(src);
        int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
        if (servicesock < 0)
        {
            return -1;
        }
        if(port) *port = ntohs(src.sin_port);
        if(ip) *ip = inet_ntoa(src.sin_addr);
        return servicesock;
    }
    static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(server_port);
        server.sin_addr.s_addr = inet_addr(server_ip.c_str());
        if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;
        else return false;
    }
    ~Sock() {}
};

可以把它们直接当做系统调用来看,只不过是省略了参数设置的细节。

日志类

为了方便观察现象,下面实现了一个简单的 Log 日志类(这里是我直接拿了之前写的),下面的代码中可以把它当做普通的打印语句。

#pragma once
#include <iostream>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志级别
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4
const char *LevelMap[] = 
{
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};
// 打印版本
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
    if(level== DEBUG) return;
#endif
    // 标准部分
    char stdBuffer[1024];
    time_t timestamp = time(nullptr);
    snprintf(stdBuffer, sizeof stdBuffer, "level[%s], time[%ld] ", LevelMap[level], timestamp);
    // 自定义部分
    char logBuffer[1024];
    va_list args;
    va_start(args, format);
    vsnprintf(logBuffer, sizeof logBuffer, format, args);
    va_end(args);
    // 打印
    printf("%s%s\n", stdBuffer, logBuffer);
}

select 的基本工作流程

注:在这三个(select、poll 和 epoll)接口中,select server 的实现难度最大,但它们都是类似的。本文实现的三个 server 中只实现读操作,读、写和异常三个操作将会在下一篇文章中实现。由于网络并不是本节的重点,因此在阐述时默认已经完成套接字 Socket 的编写。

  1. 初始化服务器,完成套接字的创建、绑定和监听。
  2. 创建一个 fd_set 结构体(它底层是一个数组),用来存放所有的套接字对象,包括服务器套接字和客户端套接字。使用 FD_ZERO() 和 FD_SET() 宏来初始化和添加套接字到集合中。
  3. 进入一个无限循环,不断地检查套接字的状态。使用 select() 函数来实现,它会返回三个集合,分别是可读的套接字,可写的套接字,和发生异常的套接字。将之前创建的 fd_set 集合作为 readfds 参数传入,表示关注哪些套接字的可读状态。
  4. 遍历返回的可读套接字集合,对每个套接字进行相应的处理。
  • 如果套接字是服务器套接字(监听套接字),那么表示有新的连接请求到来,使用 accept() 函数来接受连接,并返回一个客户端套接字和客户端地址。将客户端套接字添加到之前的 fd_set 集合中,以便下次检查它的状态。
  • 如果套接字是客户端套接字,那么表示有新的数据到来,使用 recv() 函数或 read() 函数来接收数据。对接收到的数据进行处理,例如打印到屏幕,或者回复给客户端。如果接收到的字节数(返回值)为零,那么表示客户端已经断开连接,使用 close() 函数来关闭套接字,并从 fd_set 集合中移除它。

上面的“套接字”在网络层面指的是套接字文件,在系统层面指的是套接字对应的文件描述符,这是因为在 Linux 一切皆文件的意义下,文件描述符可以操作套接字文件。套接字编写时用到的 socket() 函数的返回值就是一个文件描述符。

SelectServer 类

构造函数和析构函数

在构造函数中实现套接字的创建、绑定和监听。在析构函数中关闭套接字文件描述符。

// SelectServer.hpp
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__
#include <iostream>
#include <sys/select.h>
#include "Sock.hpp"
#include "Log.hpp"
class SelectServer
{
public:
    SelectServer(const uint16_t &port = 8080)
    : _port(port)
    {
        _listensock = Sock::Socket();
        Sock::Bind(_listensock, _port);
        Sock::Listen(_listensock);
    // 调试信息
        logMessage(DEBUG, "create socket success");
    }
    // 其他接口
    ~SelectServer()
    {
        if (_listensock >= 0) close(_listensock);
    }
private:
    uint16_t _port;
    int _listensock;
};
#endif

值得注意的是,这里使用的是云服务器测试,所以 IP 地址可能是厂商虚拟提供给我们的,在实现 Sock 类时,设置为任何 IP 都可以使用,如果要显式地设置为参数也可以。

作为一个服务器,端口号和监听套接字文件描述符是必不可少的。

Start 函数

当服务器初始化完成以后,就要让它运行起来,运行的逻辑在 Start 函数中实现。

  1. 创建文件描述符集合并初始化
  2. 在一个循环中添加套接字到集合中,并且将集合作为参数传入 select 函数,表示让内核关心这些文件描述符的 I/O 事件是否就绪
void Start()
{
    // 1. 创建文件描述符集合(以读为例:read fds)
    fd_set rfds;
    // 2. 初始化集合
    FD_ZERO(&rfds);
    struct timeval timeout = {3, 0};
    while (true)
    {
        // 3. 添加套接字到集合中
        FD_SET(_listensock, &rfds);
        // 4. 将集合传入,表示让内核关心这些文件描述符(只以读为例)
        int n = select(_listensock + 1, &rfds, nullptr, nullptr, &timeout);
        // 5. 根据返回值采取不同措施(以打印日志代替)
        switch (n)
        {
        case 0:
            // 超时
            logMessage(DEBUG, "timeout...");
            break;
        case -1:
            // 出错
            logMessage(DEBUG, "select error: [%d : %s]", errno, strerror(errno));
            break;
        default:
            // 成功
            logMessage(DEBUG, "get a new link event!");
            break;
        }
    }
}

在 main.cc 中,将服务器运行起来(使用普通指针也可以):

#include "selectServer.hpp"
#include <memory>
int main()
{
    std::unique_ptr<SelectServer> svr(new SelectServer());
    svr->Start();
    return 0;
}

测试:

设置 timeout 参数为 3.0 秒,但是 3 秒过后却不断地打印。这是因为 timeout 是一个输入输出型参数,它的值就像倒计时一样,如果在这个时间范围内成功返回,那么 timeout 最终输出的值就是剩余的秒数;如果超时,它就是 0,那么下次循环时它依然是 0,也就是让 select 函数非阻塞式地等待。

所以要将 timeout 参数的初始化放在循环内。这个例子只是为了说明 timeout 是一个输入输出型参数,为了更好地观察现象,后续测试仍然以阻塞式等待,也就是参数 timeout 的值为 NULL 或 nullptr。

为什么 select 函数的第一个参数是套接字的文件描述符+1?

这是因为 select 函数需要知道要监视的文件描述符的范围,即从 0 到最大的文件描述符。文件描述符是从 0 开始编号的,所以最大的文件描述符+1 就是文件描述符的总数。select 函数会遍历这个范围内的所有文件描述符,检查它们是否在指定的集合中,以及它们是否有可读、可写或异常的事件发生。如果第一个参数传递的是最大的文件描述符,那么 select 函数就会忽略这个文件描述符,因为它不在遍历的范围内。所以,为了让 select 函数能够正确地监视所有的文件描述符,必须传递最大的文件描述符+1 作为第一个参数。

下面用 telnet 工具,在本地模拟客户端进行测试:

但是一旦连接成功,服务端会一直打印“新连接”信息,这是因为建立连接后,我们并没有设置将连接“取走”的逻辑,select 函数就会不断地在循环中通知用户进程。

什么是将连接“取走”呢?就是调用 Accept() 函数。

为什么不在循环中调用 Accept() 函数呢?

这是因为 Accept() 函数是阻塞式的,它会主动地使用户进程阻塞等待,直到一个新连接到来。多路复用 I/O 就是解决这个问题的,select 函数可以代替它等待,直到有新连接请求到来后才会通知用户进程,所以要把它留在有连接请求到来时再调用。

HandlerEvent 函数

它应该在 Start 函数的最后一个分支被调用。

private:
// 处理连接请求
void HandlerEvent(const fd_set &rfds)
{
    uint16_t client_port = 0;
    std::string client_ip;
    if (FD_ISSET(_listensock, &rfds)) // 判断_listensock 是否在 rfds 集合中就绪
    {
        // 获取新连接
        int sock = Sock::Accept(_listensock, &client_ip, &client_port);
        if (sock < 0)
        {
            logMessage(WARNING, "accept error");
            return;
        }
        logMessage(DEBUG, "get a new link success...[%s : %d] : %d", client_ip.c_str(), client_port, sock);
    }
}

这个函数是类内辅助的,并不对外开放,所以用 private 限制权限。

通过 FD_ISSET 宏判断_listensock 是否在 rfds 集合中就绪,如果就绪,那么就用 Accept() 函数处理连接请求,并打印请求的 IP 和端口;否则提示错误。

这次调用 Accept() 还会被阻塞吗?

这个过程是不会阻塞的,因为 select 函数已经替用户进程等待连接了。

select 函数不是监听套接字对应的文件描述符的 I/O 事件是否就绪吗?为什么它还能代替用户进程阻塞式地监听客户端发出的连接请求?是不是站在文件读写的角度看,连接请求也是一种 I/O?

select 函数的作用是监听一组文件描述符的 I/O 事件是否就绪,也就是说,它可以检测这些文件描述符是否可以进行读、写或异常处理。当我们使用 select 函数监听套接字对应的文件描述符时,我们其实是在关注这些套接字的 I/O 状态,而不是它们的连接状态。连接状态是由 TCP 协议来管理的,它是在传输层的一个抽象概念,而不是在应用层的一个 I/O 操作。

那么,为什么 select 函数还能代替用户进程阻塞式地监听客户端发出的连接请求呢?这是因为在 TCP 协议中,当客户端向服务器发送一个 SYN 包,表示发起一个连接请求时,服务器会回复一个 SYN+ACK 包,表示接受请求,并将该请求放入一个队列中,等待用户进程调用 accept 函数来接受连接。这个队列的长度是有限的,由 listen 函数的 backlog 参数指定。当队列满了之后,服务器就不会再回复 SYN+ACK 包,而是直接丢弃后续的连接请求,直到队列有空位为止。

这样,我们就可以把服务器套接字对应的文件描述符的可读事件,理解为队列中有连接请求等待被接受。当 select 函数检测到服务器套接字可读时,就表示有客户端发出的连接请求到达了服务器,并被放入了队列中,等待用户进程调用 accept 函数来接受连接。这样,我们就可以用 select 函数来代替用户进程阻塞式地监听客户端发出的连接请求,而不会错过任何一个连接请求。

所以,站在文件读写的角度看,连接请求也是一种 I/O,因为套接字也是一种文件,但是它是一种特殊的 I/O,它是由 TCP 协议在传输层实现的,而不是由用户进程在应用层实现的。我们只是借用了 select 函数的功能,来实现一个非阻塞的连接监听,而不是真正地对连接请求进行读写操作。

测试:

注意,处理完连接后,我们不应该立即调用 recv、read 这样传统的阻塞式 I/O 接口,为什么呢?因为即使建立了连接,用户进程是无法的值客户端什么时候会发送数据的,极端地说,如果有恶意客户端只连接不发送,会造成服务端阻塞,这样就前功尽弃了。但这个场景依然是我们熟悉的,我们第一次处理阻塞式 Accept() 函数也是类似的,那就再用一次 select 函数,只不过这次连接已经建立了,那么任务变成了:监测客户端是否发送数据,有数据说明读事件应该就绪,通知用户进程读取;反之则否。这样读取时用户进程就可以避免因为不知道客户端什么时候发送数据而导致的阻塞了。

现在的问题是:

  1. Start() 和 HandlerEvent() 是两个独立的函数,很难将后者获取的连接再次交给前者监测。
  2. nfds 的一致性:服务端需要和若干个客户端建立连接,Socket 会不断增加,对应的文件描述符也会不断变化。另外,客户端的服务请求在时间线上并不是连续的,所以 select 函数的第一个参数可能不一定是最大的文件描述符+1。
  3. rfds(本次示例)、writefds、exceptfds 以及 timeout 参数(如果需要)都是输入输出型参数,输入和输出两个状态会影响它们的值。

这三个问题需要我们手动地将合法的文件描述符保存起来,以更新 select 函数的第一个参数(即最大的 fd)和更新文件描述符集合 fd_set。

select 服务器的编写模式

的一般编写模式(以读取为例):

  1. 用一个数组维护所有的合法 fd
  2. 在一个死循环中(服务器需要一直运行):
  1. 遍历第一次数组,记录最大的 fd 值,并添加所有需要关心的文件描述符 fd 到文件描述符集合 fd_set 中
  2. 调用 select 函数进行事件监测
  3. 遍历数组,找到就绪事件,根据就绪事件的类型(读或写),完成对应的动作(下面以读为例)。这是因为文件描述符集合 fd_set 中,既包含套接字文件描述符,也包含普通的文件描述符。
  1. Accepter:当连接事件就绪,我们要对连接(套接字文件描述符)进行 Accept()。
  2. Recver:当写事件就绪,我们要对普通文件描述符进行读取(recv() 或 read())。

可以使用原生数组,也可以为了方便维护,使用 vector 容器,但为了突出 select 服务器的缺点(引入另外两个更好的多路复用 I/O 接口),下面使用更“地道”的原生数组。原生数组在定义时必须指定大小,所以我们将数组的大小设置为 select 函数能够同时处理的最大事件数,即 128*8=1024 个字节,取名为_fd_array,作为 SelectServer 类的成员属性,这样便能减少函数传参的成本。

初始化:

数组在 SelectServer 类的构造函数中被初始化为 FD_NONE(自定义值为-1 的宏),表示数组中这个位置未添加文件描述符,并且约定下标为 0 的位置为监听套接字的文件描述符

这是一种编程习惯或者约定,方便管理和操作文件描述符集合 fd_set。一般来说,我们会将服务器套接字(包括监听套接字、已连接的套接字或者其他,只要是在服务端使用的)作为第一个元素添加到文件描述符集合中,这样可以保证它在 select 函数返回后被优先检查,避免因为队列满了而丢失连接请求。另外,这样也可以简化代码的逻辑,因为我们只需要遍历从 1 开始的文件描述符,就可以处理所有的客户端套接字,而不需要额外判断服务器套接字是否在集合中。

维护

  1. 在定义一个文件描述符集合后,遍历_fd_array[i],如果_fd_array[i]的值为 FD_NONE,则说明这个位置的文件描述符并未被 select 函数监视,跳过;记录有效的文件描述符的同时,记录有效集合中的最大值,以保证 select 函数的第一个参数的正确性。
  2. 在 HandlerEvent() 函数中,处理 select 函数检测到的读取事件。但由于文件描述符集合 fd_set 中既包含了监听套接字文件描述符,也包含了普通的文件描述符,因此我们要根据它们的类型做不同的处理。在上一个 HandlerEvent() 函数的编写中,只实现了前者的处理。为了将读写逻辑模块化,将处理二者的逻辑分别用成员函数 Accepter() 和 Recver() 函数封装。
#define BITS 8
#define NUM (sizeof(fd_set) * BITS)
#define FD_NONE -1
class SelectServer
{
public:
    SelectServer(const uint16_t &port = 8080)
        : _port(port)
    {
        _listensock = Sock::Socket();
        Sock::Bind(_listensock, _port);
        Sock::Listen(_listensock);
        logMessage(DEBUG, "%s", "create socket success");
        // 初始化_fd_array[]
        for (int i = 0; i < NUM; i++)
            _fd_array[i] = FD_NONE;
        // 约定第一个位置是监听套接字
        _fd_array[0] = _listensock;
    }
    void Start()
    {
        while (true)
        {
            PrintForDebug();
            fd_set rfds;
            FD_ZERO(&rfds);
            // 维护_fd_array[]
            int max_fd = _listensock;
            for (int i = 0; i < NUM; i++)
            {
                // a. 添加被监视的文件描述符
                if (_fd_array[i] == FD_NONE)
                    continue;
                FD_SET(_fd_array[i], &rfds);
                // b. 记录最大 fd 值
                if (max_fd < _fd_array[i])
                    max_fd = _fd_array[i];
            }
            // 注意第一个参数是动态更新的 max_fd
            int n = select(max_fd + 1, &rfds, nullptr, nullptr, nullptr);
            switch (n)
            {
            case 0:
                logMessage(DEBUG, "timeout...");
                break;
            case -1:
                logMessage(DEBUG, "select error: [%d : %s]", errno, strerror(errno));
                break;
            default:
                logMessage(DEBUG, "get a new link event!");
                HandlerEvent(rfds);
                break;
            }
        }
    }
    ~SelectServer()
    {
        if (_listensock >= 0)
            close(_listensock);
    }
private:
    void HandlerEvent(const fd_set &rfds)
    {
        // rfds 中包含:a. 监听套接字文件描述符 b. 普通文件描述符
        for (int i = 0; i < NUM; i++)
        {
            // 过滤
            if (_fd_array[i] == FD_NONE)
                continue;
            if (FD_ISSET(_fd_array[i], &rfds))
            {
                if (_fd_array[i] == _listensock)
                {
                    logMessage(DEBUG, "accept a new link, fd[%d]", _fd_array[i]);
                    Accepter();
                }
                else
                {
                    logMessage(DEBUG, "get a new IO event, fd[%d]", _fd_array[i]);
                    Recver(i);
                }
            }
        }
    }
    // 处理新连接
    void Accepter()
    {
        uint16_t client_port = 0;
        std::string client_ip;
        // 获取新连接
        int sock = Sock::Accept(_listensock, &client_ip, &client_port);
        if (sock < 0)
        {
            logMessage(WARNING, "accept error");
            return;
        }
        logMessage(DEBUG, "get a new link success...[%s : %d] : %d", client_ip.c_str(), client_port, sock);
        // 处理连接事件
        int pos = 1;
        for (; pos < NUM; pos++)
        {
            if (_fd_array[pos] == FD_NONE)
                break;
        }
        if (pos == NUM) // 满
        {
            logMessage(WARNING, "SecletServer is full, the fd[%d] will be closed...", sock);
            close(sock);
        }
        else // 未满
        {
            _fd_array[pos] = sock;
        }
    }
    // 处理文件读取 (recv()/read())
    void Recver(int pos)
    {
        char buffer[1024];
        int n = recv(_fd_array[pos], buffer, sizeof (buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = '\0';
            std::cout << "client[" << _fd_array[pos] << "]>>> " << buffer << std::endl;
        }
        else if (n == 0) // 对端关闭连接
        {
            logMessage(DEBUG, "client[%d] quit, me too...", _fd_array[pos]);
            // [先] 关闭不需要的 fd
            close(_fd_array[pos]);
            // [再] 将这个 fd 从集合中去除
            // 也就是说:让 select() 不要再监测这个 fd 了
            _fd_array[pos] = FD_NONE;
        }
        else // 错误
        {
            logMessage(WARNING, "sock[%d] recv/read error, code:%d: %s", _fd_array[pos], errno, strerror(errno));
            close(_fd_array[pos]);
            _fd_array[pos] = FD_NONE;           
        }
    }
    // 打印目前所有被监视的文件描述符
    void PrintForDebug()
    {
        std::cout << "_fd_array[]: ";
        for (int i = 0; i < NUM; i++)
        {
            if (_fd_array[i] == FD_NONE)
                continue;
            std::cout << _fd_array[i] << " ";
        }
        std::cout << std::endl;
    }
private:
    uint16_t _port;
    int _listensock;
    int _fd_array[NUM];
};

测试 :注意到文件描述符集合 fd_set 在每次循环中都要重新定义,文件描述符也要重新添加,这是因为文件描述符是动态变化的,在每次循环的一开始,打印当前服务端系统中打开的文件描述符。下面用多个 telnet 客户端连接,进行测试:

缺陷

值得注意的是,这里处理普通文件描述符的 I/O 事件时(只实现了读取,即 Input),只是简单地用一个数组接收客户端发送的数据,因为我们通常用数据量很小的文本来进行测试,所以在测试时不容易出错。事实是通信的数据类型不一定是文本,大小也是不确定的,所以通信双方要通过相同的协议才能完整地交换数据。

细节

  1. 在处理新连接请求时,只有当文件描述符合法,并且数组有剩余空间才会被添加到数组中。
  2. 在差错处理时,必须先关闭文件描述符,然后再将它设置为默认值 FD_NONE。否则就是关闭-1 这个文件描述符,会出错。
  3. 文件描述符分为两种,一种是套接字文件描述符,一种是普通文件描述符。我们要知道,(TCP 协议,telnet 工具也是)客户端连接到服务器,必须建立连接,然后才能进行数据传输。言外之意是,普通文件描述符就是套接字文件描述符通过 Sock 类中的 Accept() 得到的(它调用了 accept() 系统调用,其返回值是一个普通文件描述符),这在“Linux 一切皆文件”的意义下是说得通的。因此普通文件描述符要被 select 监视,一定是在建立连接之后进行的(上面有说到,来自客户端的建立连接请求在文件读写层面上看也是一种 I/O 请求,也是被 select 监听的)。
  4. 数组就是 Accepter() 和 select() 之间的桥梁,因为文件描述符集合 fd_set 每次循环都会通过这个数组重新添加,所以在 Accept() 中获取到成功连接后的 sock 时,就不用再主动调用一次 select 函数以让它检测这个文件描述符的 I/O 事件了,而是将 sock 添加到数组中,下一次 while 循环时就会在第一次 for 循环中被添加到 fd_set 中,这样 select 函数只通过数组来监测文件描述符,而不用担心文件描述符的类型。由于我们约定数组第一个元素为套接字文件描述符,这样我们就能通过元素的值来处理连接(Accepter()),还是处理 I/O 事件(Recver())。

理解第三点和第四点,是理解 I/O 多路复用服务器的要点。

下面就第三点进行测试,代码中为三个函数增加了一个计数器,以观察现象:

通过测试结果可以知道:每个(TCP)客户端在进行数据传输之前,都必须与服务端建立连接,服务端每建立一个新的连接,都要调用一次 Accepter();而同一个客户端每次发送信息都要调用一次 Recver(),而不会调用 Accepter(),因为 Accepter() 只是用来处理连接事件的,也就是处理就绪的监听套接字。

这样便实现了一个简单的读模式的 I/O 多路复用的服务器,虽然它是一个单进程服务器,但它能够同时监测 1024 个(包括 1 个监听套接字)文件描述符。

[注] 在学习 select 时,我们只以读为例,如果要实现写,和读是类似的,用一个数组维护所有合法的文件描述符,需要 select 监测的文件描述符只要添加到这个数组即可。

关于完整服务的 I/O 多路复用的服务器,将会在下一节的 Reactor 模式的服务器中实现。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
29天前
|
网络协议 安全 Linux
Linux C/C++之IO多路复用(select)
这篇文章主要介绍了TCP的三次握手和四次挥手过程,TCP与UDP的区别,以及如何使用select函数实现IO多路复用,包括服务器监听多个客户端连接和简单聊天室场景的应用示例。
86 0
|
29天前
|
存储 Linux C语言
Linux C/C++之IO多路复用(aio)
这篇文章介绍了Linux中IO多路复用技术epoll和异步IO技术aio的区别、执行过程、编程模型以及具体的编程实现方式。
66 1
Linux C/C++之IO多路复用(aio)
|
26天前
|
Ubuntu Linux 编译器
Linux/Ubuntu下使用VS Code配置C/C++项目环境调用OpenCV
通过以上步骤,您已经成功在Ubuntu系统下的VS Code中配置了C/C++项目环境,并能够调用OpenCV库进行开发。请确保每一步都按照您的系统实际情况进行适当调整。
217 3
|
29天前
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
23 0
Linux C/C++之线程基础
|
29天前
|
Linux C++
Linux C/C++之IO多路复用(poll,epoll)
这篇文章详细介绍了Linux下C/C++编程中IO多路复用的两种机制:poll和epoll,包括它们的比较、编程模型、函数原型以及如何使用这些机制实现服务器端和客户端之间的多个连接。
22 0
Linux C/C++之IO多路复用(poll,epoll)
|
29天前
|
网络协议 Linux 网络性能优化
Linux C/C++之TCP / UDP通信
这篇文章详细介绍了Linux下C/C++语言实现TCP和UDP通信的方法,包括网络基础、通信模型、编程示例以及TCP和UDP的优缺点比较。
35 0
Linux C/C++之TCP / UDP通信
|
29天前
|
消息中间件 Linux API
Linux c/c++之IPC进程间通信
这篇文章详细介绍了Linux下C/C++进程间通信(IPC)的三种主要技术:共享内存、消息队列和信号量,包括它们的编程模型、API函数原型、优势与缺点,并通过示例代码展示了它们的创建、使用和管理方法。
26 0
Linux c/c++之IPC进程间通信
|
29天前
|
Linux C++
Linux c/c++进程间通信(1)
这篇文章介绍了Linux下C/C++进程间通信的几种方式,包括普通文件、文件映射虚拟内存、管道通信(FIFO),并提供了示例代码和标准输入输出设备的应用。
20 0
Linux c/c++进程间通信(1)
|
29天前
|
Linux C++
Linux c/c++进程之僵尸进程和守护进程
这篇文章介绍了Linux系统中僵尸进程和守护进程的概念、产生原因、解决方法以及如何创建守护进程。
17 0
|
SQL 消息中间件 弹性计算
从ECS到C++软件工程师
从ECS到C++软件工程师
200 0
从ECS到C++软件工程师