C语言的TCPServer和select/poll/epoll并发探讨

简介: C语言的TCPServer和select/poll/epoll并发探讨

C语言的TCPServer和select/poll/epoll并发探讨

TCPServer

开启一个服务器

首先看最简单的Linux系统下的TCPServer的实现:

int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)))
    {
        printf("bind failed: %s", strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    sleep(10);
}

我们运行以上代码可以发现进程被阻塞,我们使用命令netstat -anop | grep 9999查看端口情况可以看到状态为LISTEN,就是说端口被在监听。我们继续完善代码:

int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)))
    {
        printf("bind failed: %s", strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    while(1)
    {
        int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
        printf("accept\n");
    }
}

运行以上程序,你会发现进程阻塞在了accept这个地方,在等待客户端连接我们可以来尝试连接一下:

大家可以看到客户端可以轻松连接上,这里不知道linux的IP的话可以ifconfig看一下,这个现象说明accept是一个阻塞函数,一直在等待client的连接,只有客户端连接上才会打印一个accept,那么如何设置成非阻塞呢,我们需要给sockfd设置成非阻塞模式:

int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)))
    {
        printf("bind failed: %s", strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    // sleep(10);
    printf("sleep\n");
    int flags = fcntl(sockfd, F_GETFL, 0);
    flags |= O_NONBLOCK;
    fcntl(sockfd, F_SETFL, flags);
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    while(1)
    {
        int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
        printf("accept\n");
    }
}

这样accept就不会阻塞,没有client连接依然执行下面的代码。

数据收发

在开启一个server之后,我们就要开始进行数据的收发,代码这样实现:

//接受缓冲区大小
#define BUFFER_LENGTH       1024
int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)))
    {
        printf("bind failed: %s\n", strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
    printf("accept\n");
    while(1)
    {
        char buffer[BUFFER_LENGTH] = {0};
        int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
        printf("ret: %d, buffer: %s\n", ret, buffer);
        send(clientfd, buffer, ret, 0);
    }
}

从上面的现象可以看到,数据接受函数recv的返回值是接受到字符串的长度,而且是个阻塞函数,在等待我们发送数据过去。收到数据的同事,使用send发送回来。

总结

以上就是TCPServer的实现,我们今天主要讨论并发的实现,所以TCPServer的只是简单实现一下。

并发

我们主要讨论多线程,select,poll和epoll的区别和其中的运行原理和一些问题。

多线程

如果要一下子连接很多个客户端,肯定第一个想到多线程,我们先来实现一下TCPServer的多线程在讨论他的局限性:

#define BUFFER_LENGTH       1024
//线程函数
void *client_thread(void *arg)
{
    int clientfd = *(int*)arg;
    while(1)
    {
        char buffer[BUFFER_LENGTH] = {0};
        int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
        if(ret == 0)
        {
            close(clientfd);
            break;
        }
        printf("ret: %d, buffer: %s\n", ret, buffer);
        send(clientfd, buffer, ret, 0);
    }
}
int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)))
    {
        printf("bind failed: %s\n", strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    while(1)
    {
        int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
        pthread_t threadid;
        //将clientfd昨晚参数传入线程
        pthread_create(&threadid, NULL, client_thread, &clientfd);
    }
}

大家可以看到,我们通过一个构造一个线程函数,将clientfd放入不同的线程实现了服务端并发。但是如果我们有很大的用户量,我就要有几万,几十万甚至几百万的线程,这样服务器资源肯定是不够的,所以我们引入了select机制来节省服务器资源同时满足并发的需求。

句柄号

关于这个我不知道怎么表述,但是再Windows的API中,fd就是句柄,也是int类型,所以这里就先叫句柄号吧。他是一个什么东西呢?大家可以发现,我们的sockfd和clientfd都是int类型,那我们不妨打印出来看看他们是什么关系,这边我们使用我们已经写好的代码稍作修改:

void *client_thread(void *arg)
{
    int clientfd = *(int*)arg;
    while(1)
    {
        char buffer[BUFFER_LENGTH] = {0};
        int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
        if(ret == 0)
        {
            close(clientfd);
            break;
        }
        printf("ret: %d, buffer: %s\n", ret, buffer);
        send(clientfd, buffer, ret, 0);
    }
}
int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    printf("sockfd: %d\n", sockfd);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)))
    {
        printf("bind failed: %s\n", strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    while(1)
    {
        int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
        printf("clientfd: %d\n", clientfd);
        pthread_t threadid;
        pthread_create(&threadid, NULL, client_thread, &clientfd);
    }
}

我们向代码中加两个printf,分别打印出sockfd和clientfd,结果如下:

大家可以看到,我们的clientfd编号其实是再随着sockfd的编号一个一个增长的,这就是关于句柄号的问题,有了这个概念,就可以往下看了。

select机制

首先我们必须明白什么是select机制,简单来说就是将所有线程存在一个容器中,通过遍历这个容器来查看哪些clientfd是可读的,哪些是可写的,再对齐进行相应的读写操作。我们先看代码:

#define BUFFER_LENGTH 1024
int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    printf("begin bind...\n");
    if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)))
    {
        printf("bind error: %s\n", strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    struct sockaddr_in clientaddr;
    socklen_t len  = sizeof(clientaddr);
    fd_set rfds, rset;
    FD_ZERO(&rfds);
    FD_SET(sockfd, &rfds);
    int maxfd = sockfd;
    int clientfd = 0;
    while(1)
    {
        rset = rfds;
        int nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
        printf("nready: %d\n", nready);
        if(FD_ISSET(sockfd, &rset))
        {
            clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
            printf("accept: %d\n", clientfd);
            FD_SET(clientfd, &rfds);
            if(clientfd > maxfd)
                maxfd = clientfd;
            if(--nready == 0)
                continue;
        }
        int i = 0;
        for(i = sockfd + 1; i <= maxfd; i++)
        {
            if(FD_ISSET(i, &rset))
            {
                char buffer[BUFFER_LENGTH] = {0};
                int ret = recv(i, buffer, BUFFER_LENGTH, 0);
                if(ret == 0)
                {
                    close(i);
                    break;
                }
                printf("ret: %d, buffer: %s\n", ret, buffer);
                send(i, buffer, ret, 0);
            }
        }
    }
}

从实验效果可以看到,我们使用select机制实现了一个并发服务器。下面我们逐步分析一下select机制的实现。

//定义rfds(可读的fd集合)和rset(传递给内核的fd集合)
  fd_set rfds, rset;
  //初始化为0
    FD_ZERO(&rfds);
  //将集合和socket的数据比较,设置集合
    FD_SET(sockfd, &rfds);
  //最大的句柄号为sockfd的句柄号()
    int maxfd = sockfd;
    int clientfd = 0;
    while(1)
    {
        //复制一份rfds,准备传给内核
        rset = rfds;
        //更新可读套接字合集
        int nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
        printf("nready: %d\n", nready);
        //判断sockfd是否可读
        if(FD_ISSET(sockfd, &rset))
        {
            clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
            printf("accept: %d\n", clientfd);
      //将新的fd加入集合
            FD_SET(clientfd, &rfds);
            if(clientfd > maxfd)
                //更新macfd
                maxfd = clientfd;
            if(--nready == 0)
                continue;
        }
        int i = 0;
        //从第一个clientfd开始遍历
        for(i = sockfd + 1; i <= maxfd; i++)
        {
            if(FD_ISSET(i, &rset))
            {
                char buffer[BUFFER_LENGTH] = {0};
                int ret = recv(i, buffer, BUFFER_LENGTH, 0);
                if(ret == 0)
                {
                    close(i);
                    break;
                }
                printf("ret: %d, buffer: %s\n", ret, buffer);
                send(i, buffer, ret, 0);
            }
        }
    }
}

说明:

  • 关于macfd = clientfd,不是说clientfd只会增长,如果有client断开,空出了前面的clientfd位置,那么后面的clientfd会补足前面的。
  • 关于select函数中的参数,其实正常需要传三组集合rfds,wfds,efds分别代表读,写和错误。但是在演示中我们就传了读的集合,在正式使用的时候,需要拷贝3份集合,十分消耗服务器资源。
  • 关于int nready = select(maxfd + 1, &rset, NULL, NULL, NULL);这一行代码:由于底层遍历是 < 号,所以maxfd需要+1(for循环)。
  • 缺点:select最多只能有1024个fd。
  • 优点:select跨平台
poll机制

总体来说,poll和select差不多,都是通过遍历数组来实现并发,我们先看代码:

#define BUFFER_LENGTH 1024
#define POLL_SIZE     1024
int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)))
    {
        printf("bind failed: %s", strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    //poll
    struct pollfd fds[POLL_SIZE] = {0};
    fds[sockfd].fd = sockfd;
    fds[sockfd].events = POLLIN;
    int maxfd = sockfd;
    int clientfd = 0;
    while(1)
    {
        int nready = poll(fds, maxfd + 1, -1);
        if(fds[sockfd].revents & POLLIN)
        {
            clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
            printf("accept: %d\n", clientfd);
            fds[clientfd].fd = clientfd;
            fds[clientfd].events = POLLIN;
            if(clientfd > maxfd)
                maxfd = clientfd;
            if(--nready == 0)
                continue;
        }
        int i = 0;
        for(int i = 0; i < maxfd + 1; i++)
        {
            if(fds[i].revents & POLLIN)
            {
                char buffer[BUFFER_LENGTH] = {0};
                int ret = recv(i, buffer, BUFFER_LENGTH, 0);
                if(ret == 0)
                {
                    fds[i].fd = -1;
                    fds[i].events = 0;
                    close(i);
                    break;
                }
                printf("ret: %d, buffer: %s\n", ret, buffer);
                send(i, buffer, ret, 0);
            }
        }
    }
}

大家可以看到,从代码上看,poll和select原理差不多,但是还是有几个不同的地方。

  1. poll的接口少,从接口上看,select有FD_SET,FDZERO等接口,比poll更加消耗资源。
  2. poll只有一个集合,不需要像select那样拷贝多个集合。
epoll机制

这是一个非常重要的机制,重要到如果没有这个机制,linux不会作为服务器。在以前,linux主要作用是嵌入式的工业开发,直到出现了epoll,linux才走进了服务器开发的市场。epoll的重要在于:它解决了IO数量的问题,不再局限于clientfd的数量。我们先看代码实现:

#define BUFFER_LENGTH       1024
int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr)))
    {
        printf("bind failed: %s\n", strerror(errno));
        return -1;
    }
    listen(sockfd, 10);
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    int epfd = epoll_create(1);
    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
    struct epoll_event events[1024] = {0};
    while(1)
    {
        //需要遍历的io数量
        int nready = epoll_wait(epfd, events, 1024, -1);
        printf("nready: %d\n", nready);
        if(nready < 0)
            continue;
        int i = 0;
        //遍历IO
        for(int i = 0; i < nready; i++)
        {
            int connfd = events[i].data.fd;
            if(sockfd == connfd)
            {
                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
                if(clientfd < 0)
                    continue;
                printf("clientfd: %d\n", clientfd);
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
            }
            else if(events[i].events & EPOLLIN)
            {
                char buffer[BUFFER_LENGTH] = {0};
                short len = 0;
                recv(connfd, &len, 2, 0);
                len = ntohs(len);
                int n = recv(connfd, buffer, BUFFER_LENGTH, 0);
                if(n > 0)
                {
                    printf("recv: %s\n", buffer);
                    send(connfd, buffer, n, 0);
                }
                else if(n == 0)
                {
                    printf("close\n");
                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
                    close(connfd);
                }
            }
        }
    }
}

epoll的机制和前面两个有很大的不同。他机制类似蜂巢。比如一个居民楼有一个蜂巢,每次增加一个clientfd,那么住户+1,但是蜂巢不变。只有当居民需要寄快递,居民把快递放在蜂巢,或者去蜂巢收快递。这样对IO的遍历仅限于蜂巢,而不是每一户居民。等于说,以前的快递员需要去每一户居民家问要不要收发快递,现在只要去蜂巢就可以了,大大提高效率,节省了资源。

注意:

关于这一段代码的书写,其实不需要这么麻烦,这里只是实验了一下水平触发和边缘触发,大家可以自行改写。

相关文章
|
4月前
|
网络协议 C语言
C语言 网络编程(十三)并发的TCP服务端-以进程完成功能
这段代码实现了一个基于TCP协议的多进程并发服务端和客户端程序。服务端通过创建子进程来处理多个客户端连接,解决了粘包问题,并支持不定长数据传输。客户端则循环发送数据并接收服务端回传的信息,同样处理了粘包问题。程序通过自定义的数据长度前缀确保了数据的完整性和准确性。
|
4月前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
4月前
|
C语言
C语言 网络编程(八)并发的UDP服务端 以进程完成功能
这段代码展示了如何使用多进程处理 UDP 客户端和服务端通信。客户端通过发送登录请求与服务端建立连接,并与服务端新建的子进程进行数据交换。服务端则负责接收请求,验证登录信息,并创建子进程处理客户端的具体请求。子进程会创建一个新的套接字与客户端通信,实现数据收发功能。此方案有效利用了多进程的优势,提高了系统的并发处理能力。
|
4月前
|
存储 Linux C语言
C语言 多路复用 epoll
本文详细介绍了 Linux 下的非阻塞 IO 多路复用技术——`epoll`,对比 `select` 和 `poll`,`epoll` 使用红黑树结构存储文件描述符,支持动态增删节点,无数量限制,采用回调机制提高效率。文章通过示例代码展示了如何使用 `epoll_create()` 创建 `epoll` 实例,`epoll_ctl()` 管理文件描述符,以及 `epoll_wait()` 等待事件。最后简要分析了 `epoll` 的核心数据结构 `struct eventpoll` 和红黑树节点 `struct epitem`。
|
4月前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
4月前
|
网络协议 数据处理 C语言
利用C语言基于poll实现TCP回声服务器的多路复用模型
此代码仅为示例,展示了如何基于 `poll`实现多路复用的TCP回声服务器的基本框架。在实际应用中,你可能需要对其进行扩展或修改,以满足具体的需求。
106 0
|
4月前
|
存储 监控 Linux
C语言 多路复用 select源码分析
本文详细介绍了阻塞IO与非阻塞IO的概念及其在Linux系统中的实现方式。首先阐述了常见的IO模型,包括阻塞型、非阻塞型及多路复用IO模型。阻塞IO模型会在IO请求未完成时阻塞进程,而非阻塞IO模型则允许在IO未完成时立即返回。非阻塞IO可通过设置`O_NONBLOCK`标志实现。接着介绍了多路复用IO模型,利用`select`、`poll`和`epoll`等系统调用监控多个文件描述符。`select`函数通过内核检测文件描述符是否就绪,并通知调用者。
|
5月前
|
C语言
【C语言】epoll函数
【C语言】epoll函数
45 0
|
Linux API C语言
C语言的TCPServer和select/poll/epoll并发探讨
C语言的TCPServer和select/poll/epoll并发探讨
|
Linux C语言
linux c语言 select函数用法
<h4 id="subjcns!e72884ac7e5cb457!168" class="TextColor1" style="">linux c语言 select函数用法</h4> <div id="msgcns!e72884ac7e5cb457!168" style=""> <div style="word-wrap:break-word"> <table width="100%
1959 0