select、poll、epoll、多线程实现并发请求处理

简介: select、poll、epoll、多线程实现并发请求处理

select、poll、epoll、多线程实现并发请求处理

网络编程相关文章:

epoll-reactor模型原理代码解析

epoll的水平触发LT以及边沿触发ET的原理及使用及优缺点

Http解析实现/服务器Get请求的实现


服务器与客户端建立连接需要使用到一些接口,包括但不限于socket、bind、listen、accept

一个简单的服务器网络程序如下:

int listenfd, connfd, n;
    struct sockaddr_in servaddr;
    char buff[MAXLNE];
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {//socket创建listen的文件描述符
        printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(10000);//端口信息可以自己设置
 //bind函数绑定IP,端口信息
    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
        printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
    if (listen(listenfd, 10) == -1) {//listen监听函数,参数10表示未完成连接与已完成连接队列之和,一般设置为5的倍数
        printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
    struct sockaddr_in client;
    socklen_t len = sizeof(client);
    if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {//accept函数从已建立连接的队列中取出一个连接与socket进行绑定,之后就可以与绑定的文件描述符connfd发送数据
        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
    printf("========waiting for client's request========\n");
    while (1) {
        n = recv(connfd, buff, MAXLNE, 0);//接收数据
        if (n > 0) {
            buff[n] = '\0';
            printf("recv msg from client: %s\n", buff);
        send(connfd, buff, n, 0);//发送数据
        } else if (n == 0) {//当客户端调用close时,会返回n=0
            close(connfd);
        }
    }

注意

listen(listenfd, 10)中,这个10在Linux和MAC OS系统中有不一样的解释,Linux中只是指全连接的队列个数上限,MAC OS中指完成连接与已完成连接队列总和个数上限。因此DOS攻击时在Linux改这个值用处不大,而在苹果系统中修改这个值有一定效果。

上述代码只能处理一个连接请求,如果想实现多个客户端的同时通信需要如何处理呢?

是否可以将accept放入while(1)循环中?

NO! accept会阻塞,以及socket描述符覆盖,影响数据发送

解决方法1:多线程

采用多线程来分别处理单个连接,实现多个客户端连接的同时处理

void *client_routine(void *arg) { //client_routine函数是多线程的回调函数,用来处理每一个连接的请求,也就是接收与发送信息功能
  int connfd = *(int *)arg;
  char buff[MAXLNE];
  while (1) {
    int n = recv(connfd, buff, MAXLNE, 0);
        if (n > 0) {
            buff[n] = '\0';
            printf("recv msg from client: %s\n", buff);
        send(connfd, buff, n, 0);
        } else if (n == 0) {
            close(connfd);
      break;
        }
  }
  return NULL;
}
  int listenfd, connfd, n;
    struct sockaddr_in servaddr;
    char buff[MAXLNE];
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {//socket创建listen的文件描述符
        printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(10000);//端口信息可以自己设置
 //bind函数绑定IP,端口信息
    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
        printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
    if (listen(listenfd, 10) == -1) {//listen监听函数,参数10表示未完成连接与已完成连接队列之和,一般设置为5的倍数
        printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
  while (1) {
    struct sockaddr_in client;
      socklen_t len = sizeof(client);
      if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
          printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
          return 0;
      }
    pthread_t threadid;
    pthread_create(&threadid, NULL, client_routine, (void*)&connfd);//创建线程,并执行client_routine回调函数功能,将connfd作为回调函数的参数传入,client_routine函数将处理相应文件描述符connfd的发送和接收功能
    }

该操作可行,但是每次连接生成创建一个线程,消耗内存会很大。

另外进行IO操作时,例如发生 IO 操作 read 时,它会经历两个阶段:

  1. 等待数据准备就绪
  2. 将数据从内核拷贝到进程或者线程中

因此当使用默认的阻塞套接字时,由于1 个线程捆绑处理 1 个连接,这两个阶段合而为一,这样操作套接字的代码所在的线程就得睡眠来等待消息准备好,这导致了高并发下线程会频繁的睡眠、唤醒,从而影响了 CPU 的使用效率。

因此高并发编程会有一些服务器模型,例如reactorproactor。这两类都要使用到IO多路复用,下面将叙述IO多路复用的三种方法。

采用IO多路复用实现并发处理客户端请求

IO多路复用是指单个进程/线程就可以同时处理多个IO请求。有三个方式select、poll、epoll

解决方法2:select

select:将文件描述符放入一个集合中,调用select时,将这个集合从用户空间拷贝到内核空间(缺点:每次都要复制,开销大),由内核根据就绪状态修改该集合的内容。

select主要涉及4个相关函数操作

void FD_CLR(int fd, fd_set *set);//清除某一个被监视的文件描述符。
int  FD_ISSET(int fd, fd_set *set);//测试一个文件描述符是否是集合中的一员
void FD_SET(int fd, fd_set *set);//添加一个文件描述符,将set中的某一位设置成1;
void FD_ZERO(fd_set *set);//清空集合中的文件描述符,将每一位都设置为0;

select实现并发处理客户端请求

//socket、bind、listen函数和上面一样,这里不重复写了
  fd_set rfds, wfds, rset, wset;  //声明四个集合用于保存文件描述符,rfds和rset是监控可读事件的文件描述符;wfds和wset是监控可写事件的文件描述符 该监控文件描述符的大小是有限的,但也可以更改,一般默认1024个文件描述符
    FD_ZERO(&rfds); //文件描述符清0
    FD_ZERO(&wfds); //文件描述符清0
    FD_SET(listenfd, &rfds); // 将listenfd在rfds处置1
    int max_fd = listenfd;
    while(1) {
        rset = rfds;  
        wset = wfds;  
        //select 是阻塞的,且函数有5个参数
        //第一个参数是一个整数值, 表示集合中所有文件描述符的范围,即所有文件描述符的最大值+1。系统会从0开始监测到你设置的这个参数的描述符范围(0~ maxfd+1)
        //第二个参数是监视文件描述符的一个集合,我们监视其中的文件描述符是不是可读,或者更准确的说,读取是不是不阻塞了。发生可读事件,该rset集合上对应文件描述符的位置将会置1
        //第三个参数是监控可写事件的文件描述符,和第二个参数在状态变化上类似
        int nready = select(max_fd + 1, &rset, &wset, NULL, NULL); //nready是返回的监控文件描述符中有响应的描述符个数
        if(FD_ISSET(listenfd, &rset)) {//检查listenfd是否在rset这个集合中,存在则listenfd监控到客户端发来的连接请求
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            if((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
                printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
                return 0;
            }
            printf("accept Success fd=  %d\n", connfd);
            FD_SET(connfd, &rfds);  //将与客户端连接的socket加入select的监控集合中
            if(connfd > max_fd) max_fd = connfd;  //重置监控的fd范围
            if(--nready == 0) continue; //如果只有一个(说明没有其他的可读事件),则继续while(1)循环监控
        }
        int i = 0;
        for(i = listenfd + 1; i <= max_fd; i++) {//处理其余已与客户端建立连接的事件
            if(FD_ISSET(i, &rset)) {
                printf("message arrive  fd=  %d\n", i);
                n = recv(i, buff, MAXLNE, 0);
                if (n > 0) {
                    buff[n] = '\0';
                    printf("recv msg from client: %s\n", buff);
                //send(i, buff, n, 0);//将发送来的数据依次返回发送回去
                    FD_SET(i, &wfds);  //把相应的写入监控描述符置1,下一次监控时,会检查到可写事件
                } else if (n == 0) {//当客户端调用close时,会返回n=0
                    FD_CLR(i, &rfds);//必须得将处理过的描述符相应的标志位清空
                    FD_CLR(i, &wfds);//必须得将处理过的描述符相应的标志位清空
                    close(i);
                }
                if(--nready == 0) break;
            } else if(FD_ISSET(i, &wset)) {//会在下一次监控时,将数据发送出去
                send(i, buff, n, 0);
                FD_CLR(i, &wfds);//必须得将处理过的描述符相应的标志位清空
            }
        }
    }

注意:listenfd是依次增加的,文件描述符0,1,2是系统确定的,我们的listenfd从3开始开始分配,如果回收了,就空在那里,下次新分配fd再将空出的fd分配出去。所以我们采用 for(i = listenfd + 1; i <= max_fd; i++),来依次判断文件描述符

采用select实现并发处理请求的缺点

1: 每次都要将文件描述符集合从用户空间拷贝到内核空间复制,处理完事件之后又需要将监听的fd从用户态拷贝到内核态,开销大

2: 保存fd数组的大小有限,并且监听的fd越多,性能可能越差(当活跃的fd较少时)。

3: 需要轮询遍历所有文件描述符才能知道哪些fd句柄有事件发生,轮询方式效率慢

解决方法3:poll

poll:和select几乎没有区别,区别在于文件描述符的存储方式不同,poll采用链表的方式存储,没有最大存储数量的限制;

poll实现并发处理客户端请求

struct pollfd fds[POLL_SIZE] = {0};
    fds[0].fd = listenfd;//将listenfd存入fd[0]处
    fds[0].events = POLLIN;//监控POLLIN事件,就是读事件,有新连接发起连接和客服端发送数据来都会触发这个事件
    int max_fd = listenfd;
    while(1) {
        int nready = poll(fds, max_fd + 1, -1); //nready是返回的监控文件描述符中有响应的描述符个数
        if(fds[0].revents & POLLIN) {//检查listenfd是否存在POLLIN事件
            //建立新的连接
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            if((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
                printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
                return 0;
            }
            printf("accept Success fd=  %d\n", connfd);
            fds[connfd].fd = connfd;
            fds[connfd].events = POLLIN;//重新注册读事件
            if(connfd > max_fd) max_fd = connfd;  //重置监控的fd范围
            if(--nready == 0) continue; //如果只有一个(说明没有其他的可读事件),则继续while(1)循环监控
        }
        int i = 0;
        for(i = listenfd + 1; i <= max_fd; i++) {//处理其余已与客户端建立连接的事件
            if(fds[i].revents & POLLIN) {
                //printf("message arrive  fd=  %d\n", i);
                n = recv(i, buff, MAXLNE, 0);
                if (n > 0) {
                    buff[n] = '\0';
                    printf("recv msg from client: %s\n", buff);
                    send(i, buff, n, 0);//将发送来的数据依次返回发送回去
                } else if (n == 0) {//当客户端调用close时,会返回n=0
                    fds[i].fd = -1;
                    close(i);
                }
                if(--nready == 0) break;
            } 
        }
    }

采用poll实现并发请求处理同样和select面临轮询遍历文件描述符,轮询方式效率慢的问题

解决方法4:epoll

epollepoll底层通过红黑树来描述,并维护一个ready list,将事件表中已经就绪的事件添加到这里,在使用epoll_wait调用时,仅观察这个list中有没有数据即可。

通过内核将就绪事件队列复制到用户空间,避免了select中不断复制fd的问题;支持的同时连接数上限很高;文件描述符就绪时,采用回调机制,避免了轮询(回调函数将就绪的描述符添加到一个链表中,执行epoll_wait时,返回这个链表);支持水平触发和边缘触发,采用边缘触发机制时,只有活跃的描述符才会触发回调函数。

select,poll上面提到需要将文件描述符集合从用户态到内核态,再内核态到用户态这样拷贝:开销大,而不会让文件描述符频繁的在用户态与内核态切换,因此在这一方面epoll更快速。

注意:epoll是不涉及共享内存的,开始查阅一些文章有说epoll内核与用户空间是通过共享内存交换数据,我也这么以为,但最近发现epoll源码中并不涉及mmap函数,没有共享内存相关的api。所以epoll是不涉及共享内存的。

epoll涉及的函数

int epoll_create(int size);//创建一个监听红黑树,并且返回红黑树的根节点  失败:-1,设置errno
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);//对该监听红黑树所做的操作
//总共三个操作可选
//EPOLL_CTL_ADD 添加fd到监听红黑树
//EPOLL_CTL_MOD 修改fd在监听红黑树上的监听事件
//EPOLL_CTL_DEL 将一个fd从监听红黑树上取下(取消监听)
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout)//监听文件描述符

就绪事件队列里的数据就会拷贝到events数组中。

使用红黑树的优点:

epoll中的fd都是存在红黑树上的,所以主要是对红黑树节点的增删改查操作。采用红黑树也便于在这里增删查改。

epoll实现并发处理客户端请求

int epfd = epoll_create(1);
    struct epoll_event events[EPOLL_SIZE] = {0};
    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = listenfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);//通过EPOLL_CTL_ADD添加文件描述符
    while(1) {
        int nready = epoll_wait(epfd, events, EPOLL_SIZE, 5);//监听描述符  参数5是阻塞超时时间,可以自己设置
        if(nready == -1) {
            continue;
        }
        for(int i = 0; i < nready; i++) {
            int clientfd = events[i].data.fd;
            if(clientfd == listenfd) {//如果监听的fd是listenfd时,表明是新的连接到来
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                if((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
                    printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
                    return 0;
                }
                printf("accept Success fd=  %d\n", connfd);
                ev.events = EPOLLIN;
                ev.data.fd = connfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
            }
            else if(events[i].events & EPOLLIN) {//处理其余可读事件,就是客服端发来的数据
                n = recv(clientfd, buff, MAXLNE, 0);
                if (n > 0) {
                    buff[n] = '\0';
                    printf("recv msg from client: %s\n", buff);
                    send(clientfd, buff, n, 0);//将发送来的数据依次返回发送回去                 
                } 
                else if (n == 0) {//当客户端调用close时,会返回n=0
                    ev.events = EPOLLIN;
                    ev.data.fd = clientfd;
                    epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev);
                    close(clientfd);
                }
            }
        }
    }

注意:int epfd = epoll_create(1);//int size,最开始时1是设置fd的大小,但是如今采用链表保存,该参数没有用处了,只是为了保证兼容之前的代码,所以保留了该参数。

epoll支持水平触发(LT)和边沿触发(ET),而select和poll只支持水平触发

在高并发服务器中边沿触发(ET) 的效率更高

因为边沿触发只在数据到来的一刻才触发,很多时候服务器在接受大量数据时会先接受数据头部(水平触发在此触发第一次,边沿触发第一次)。

接着服务器通过解析头部决定要不要接这个数据。此时,如果不接受数据,水平触发需要手动清除(水平触发当有数据时,会一直触发,直到没有数据可读),而边沿触发可以将清除工作交给一个定时的清除程序去做(只触发一次,不需要的数据可以不读),自己立刻返回。

另外EPOLLIN和EPOLLOUT的触发条件如下:

  • EPOLLIN: 对端有数据写入时才会触发。ET模式下只触发一次,LT模式下有数据每读完就会一直触发
  • EPOLLOUT: (1)一次write操作,写满了发送缓冲区,返回错误码为EAGAIN(11)。(2)对端读取了一些数据,又重新可写了,此时会触发EPOLLOUT。(3)暴力方法:直接调用epoll_ctl()重新设置一下event就可以了, event跟原来的设置一模一样都行(但必须包含EPOLLOUT),关键是重新设置,就会马上触发一次EPOLLOUT事件。

epoll的Reactor服务器模型的代码在,链接: link.

select、poll、epoll的选择

当监测的fd数量较,且各个fd都很活跃的情况下,建议使用select和poll;当监听的fd数量较,且单位时间仅部分fd活跃的情况下,使用epoll会明显提升性能。

上述代码的完整程序可在我的git上下载,包括4种方法的完整程序以及调试结果

链接: link.

目录
相关文章
|
1月前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
180 0
|
28天前
|
安全
List并发线程安全问题
【10月更文挑战第21天】`List` 并发线程安全问题是多线程编程中一个非常重要的问题,需要我们认真对待和处理。只有通过不断地学习和实践,我们才能更好地掌握多线程编程的技巧和方法,提高程序的性能和稳定性。
135 59
|
7天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
|
19天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
|
Java
【编程进阶知识】揭秘Java多线程:并发与顺序编程的奥秘
本文介绍了Java多线程编程的基础,通过对比顺序执行和并发执行的方式,展示了如何使用`run`方法和`start`方法来控制线程的执行模式。文章通过具体示例详细解析了两者的异同及应用场景,帮助读者更好地理解和运用多线程技术。
29 1
|
2月前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
2月前
|
数据采集 消息中间件 并行计算
进程、线程与协程:并发执行的三种重要概念与应用
进程、线程与协程:并发执行的三种重要概念与应用
61 0
|
2月前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
3月前
|
Rust 并行计算 安全
揭秘Rust并发奇技!线程与消息传递背后的秘密,让程序性能飙升的终极奥义!
【8月更文挑战第31天】Rust 以其安全性和高性能著称,其并发模型在现代软件开发中至关重要。通过 `std::thread` 模块,Rust 支持高效的线程管理和数据共享,同时确保内存和线程安全。本文探讨 Rust 的线程与消息传递机制,并通过示例代码展示其应用。例如,使用 `Mutex` 实现线程同步,通过通道(channel)实现线程间安全通信。Rust 的并发模型结合了线程和消息传递的优势,确保了高效且安全的并行执行,适用于高性能和高并发场景。
64 0
|
3月前
|
开发框架 Android开发 iOS开发
跨平台开发的双重奏:Xamarin在不同规模项目中的实战表现与成功故事解析
【8月更文挑战第31天】在移动应用开发领域,选择合适的开发框架至关重要。Xamarin作为一款基于.NET的跨平台解决方案,凭借其独特的代码共享和快速迭代能力,赢得了广泛青睐。本文通过两个案例对比展示Xamarin的优势:一是初创公司利用Xamarin.Forms快速开发出适用于Android和iOS的应用;二是大型企业借助Xamarin实现高性能的原生应用体验及稳定的后端支持。无论是资源有限的小型企业还是需求复杂的大公司,Xamarin均能提供高效灵活的解决方案,彰显其在跨平台开发领域的强大实力。
48 0