C语言的TCPServer和select/poll/epoll并发探讨
TCPServer
开启一个服务器
首先看最简单的Linux系统下的TCPServer的实现:
int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(struct sockaddr_in)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { printf("bind failed: %s", strerror(errno)); return -1; } listen(sockfd, 10); sleep(10); }
我们运行以上代码可以发现进程被阻塞,我们使用命令netstat -anop | grep 9999查看端口情况可以看到状态为LISTEN,就是说端口被在监听。我们继续完善代码:
int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(struct sockaddr_in)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { printf("bind failed: %s", strerror(errno)); return -1; } listen(sockfd, 10); struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); while(1) { int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); printf("accept\n"); } }
运行以上程序,你会发现进程阻塞在了accept这个地方,在等待客户端连接我们可以来尝试连接一下:
大家可以看到客户端可以轻松连接上,这里不知道linux的IP的话可以ifconfig看一下,这个现象说明accept是一个阻塞函数,一直在等待client的连接,只有客户端连接上才会打印一个accept,那么如何设置成非阻塞呢,我们需要给sockfd设置成非阻塞模式:
int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(struct sockaddr_in)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { printf("bind failed: %s", strerror(errno)); return -1; } listen(sockfd, 10); // sleep(10); printf("sleep\n"); int flags = fcntl(sockfd, F_GETFL, 0); flags |= O_NONBLOCK; fcntl(sockfd, F_SETFL, flags); struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); while(1) { int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); printf("accept\n"); } }
这样accept就不会阻塞,没有client连接依然执行下面的代码。
数据收发
在开启一个server之后,我们就要开始进行数据的收发,代码这样实现:
//接受缓冲区大小 #define BUFFER_LENGTH 1024 int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(struct sockaddr_in)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { printf("bind failed: %s\n", strerror(errno)); return -1; } listen(sockfd, 10); struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); printf("accept\n"); while(1) { char buffer[BUFFER_LENGTH] = {0}; int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0); printf("ret: %d, buffer: %s\n", ret, buffer); send(clientfd, buffer, ret, 0); } }
从上面的现象可以看到,数据接受函数recv的返回值是接受到字符串的长度,而且是个阻塞函数,在等待我们发送数据过去。收到数据的同事,使用send发送回来。
总结
以上就是TCPServer的实现,我们今天主要讨论并发的实现,所以TCPServer的只是简单实现一下。
并发
我们主要讨论多线程,select,poll和epoll的区别和其中的运行原理和一些问题。
多线程
如果要一下子连接很多个客户端,肯定第一个想到多线程,我们先来实现一下TCPServer的多线程在讨论他的局限性:
#define BUFFER_LENGTH 1024 //线程函数 void *client_thread(void *arg) { int clientfd = *(int*)arg; while(1) { char buffer[BUFFER_LENGTH] = {0}; int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0); if(ret == 0) { close(clientfd); break; } printf("ret: %d, buffer: %s\n", ret, buffer); send(clientfd, buffer, ret, 0); } } int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(struct sockaddr_in)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { printf("bind failed: %s\n", strerror(errno)); return -1; } listen(sockfd, 10); struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); while(1) { int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); pthread_t threadid; //将clientfd昨晚参数传入线程 pthread_create(&threadid, NULL, client_thread, &clientfd); } }
大家可以看到,我们通过一个构造一个线程函数,将clientfd放入不同的线程实现了服务端并发。但是如果我们有很大的用户量,我就要有几万,几十万甚至几百万的线程,这样服务器资源肯定是不够的,所以我们引入了select机制来节省服务器资源同时满足并发的需求。
句柄号
关于这个我不知道怎么表述,但是再Windows的API中,fd就是句柄,也是int类型,所以这里就先叫句柄号吧。他是一个什么东西呢?大家可以发现,我们的sockfd和clientfd都是int类型,那我们不妨打印出来看看他们是什么关系,这边我们使用我们已经写好的代码稍作修改:
void *client_thread(void *arg) { int clientfd = *(int*)arg; while(1) { char buffer[BUFFER_LENGTH] = {0}; int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0); if(ret == 0) { close(clientfd); break; } printf("ret: %d, buffer: %s\n", ret, buffer); send(clientfd, buffer, ret, 0); } } int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); printf("sockfd: %d\n", sockfd); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(struct sockaddr_in)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { printf("bind failed: %s\n", strerror(errno)); return -1; } listen(sockfd, 10); struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); while(1) { int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); printf("clientfd: %d\n", clientfd); pthread_t threadid; pthread_create(&threadid, NULL, client_thread, &clientfd); } }
我们向代码中加两个printf,分别打印出sockfd和clientfd,结果如下:
大家可以看到,我们的clientfd编号其实是再随着sockfd的编号一个一个增长的,这就是关于句柄号的问题,有了这个概念,就可以往下看了。
select机制
首先我们必须明白什么是select机制,简单来说就是将所有线程存在一个容器中,通过遍历这个容器来查看哪些clientfd是可读的,哪些是可写的,再对齐进行相应的读写操作。我们先看代码:
#define BUFFER_LENGTH 1024 int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(struct sockaddr_in)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); printf("begin bind...\n"); if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { printf("bind error: %s\n", strerror(errno)); return -1; } listen(sockfd, 10); struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); fd_set rfds, rset; FD_ZERO(&rfds); FD_SET(sockfd, &rfds); int maxfd = sockfd; int clientfd = 0; while(1) { rset = rfds; int nready = select(maxfd + 1, &rset, NULL, NULL, NULL); printf("nready: %d\n", nready); if(FD_ISSET(sockfd, &rset)) { clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); printf("accept: %d\n", clientfd); FD_SET(clientfd, &rfds); if(clientfd > maxfd) maxfd = clientfd; if(--nready == 0) continue; } int i = 0; for(i = sockfd + 1; i <= maxfd; i++) { if(FD_ISSET(i, &rset)) { char buffer[BUFFER_LENGTH] = {0}; int ret = recv(i, buffer, BUFFER_LENGTH, 0); if(ret == 0) { close(i); break; } printf("ret: %d, buffer: %s\n", ret, buffer); send(i, buffer, ret, 0); } } } }
从实验效果可以看到,我们使用select机制实现了一个并发服务器。下面我们逐步分析一下select机制的实现。
//定义rfds(可读的fd集合)和rset(传递给内核的fd集合) fd_set rfds, rset; //初始化为0 FD_ZERO(&rfds); //将集合和socket的数据比较,设置集合 FD_SET(sockfd, &rfds); //最大的句柄号为sockfd的句柄号() int maxfd = sockfd; int clientfd = 0; while(1) { //复制一份rfds,准备传给内核 rset = rfds; //更新可读套接字合集 int nready = select(maxfd + 1, &rset, NULL, NULL, NULL); printf("nready: %d\n", nready); //判断sockfd是否可读 if(FD_ISSET(sockfd, &rset)) { clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); printf("accept: %d\n", clientfd); //将新的fd加入集合 FD_SET(clientfd, &rfds); if(clientfd > maxfd) //更新macfd maxfd = clientfd; if(--nready == 0) continue; } int i = 0; //从第一个clientfd开始遍历 for(i = sockfd + 1; i <= maxfd; i++) { if(FD_ISSET(i, &rset)) { char buffer[BUFFER_LENGTH] = {0}; int ret = recv(i, buffer, BUFFER_LENGTH, 0); if(ret == 0) { close(i); break; } printf("ret: %d, buffer: %s\n", ret, buffer); send(i, buffer, ret, 0); } } } }
说明:
- 关于macfd = clientfd,不是说clientfd只会增长,如果有client断开,空出了前面的clientfd位置,那么后面的clientfd会补足前面的。
- 关于select函数中的参数,其实正常需要传三组集合rfds,wfds,efds分别代表读,写和错误。但是在演示中我们就传了读的集合,在正式使用的时候,需要拷贝3份集合,十分消耗服务器资源。
- 关于int nready = select(maxfd + 1, &rset, NULL, NULL, NULL);这一行代码:由于底层遍历是 < 号,所以maxfd需要+1(for循环)。
- 缺点:select最多只能有1024个fd。
- 优点:select跨平台
poll机制
总体来说,poll和select差不多,都是通过遍历数组来实现并发,我们先看代码:
#define BUFFER_LENGTH 1024 #define POLL_SIZE 1024 int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(struct sockaddr_in)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { printf("bind failed: %s", strerror(errno)); return -1; } listen(sockfd, 10); struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); //poll struct pollfd fds[POLL_SIZE] = {0}; fds[sockfd].fd = sockfd; fds[sockfd].events = POLLIN; int maxfd = sockfd; int clientfd = 0; while(1) { int nready = poll(fds, maxfd + 1, -1); if(fds[sockfd].revents & POLLIN) { clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); printf("accept: %d\n", clientfd); fds[clientfd].fd = clientfd; fds[clientfd].events = POLLIN; if(clientfd > maxfd) maxfd = clientfd; if(--nready == 0) continue; } int i = 0; for(int i = 0; i < maxfd + 1; i++) { if(fds[i].revents & POLLIN) { char buffer[BUFFER_LENGTH] = {0}; int ret = recv(i, buffer, BUFFER_LENGTH, 0); if(ret == 0) { fds[i].fd = -1; fds[i].events = 0; close(i); break; } printf("ret: %d, buffer: %s\n", ret, buffer); send(i, buffer, ret, 0); } } } }
大家可以看到,从代码上看,poll和select原理差不多,但是还是有几个不同的地方。
- poll的接口少,从接口上看,select有FD_SET,FDZERO等接口,比poll更加消耗资源。
- poll只有一个集合,不需要像select那样拷贝多个集合。
epoll机制
这是一个非常重要的机制,重要到如果没有这个机制,linux不会作为服务器。在以前,linux主要作用是嵌入式的工业开发,直到出现了epoll,linux才走进了服务器开发的市场。epoll的重要在于:它解决了IO数量的问题,不再局限于clientfd的数量。我们先看代码实现:
#define BUFFER_LENGTH 1024 int main() { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(struct sockaddr_in)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) { printf("bind failed: %s\n", strerror(errno)); return -1; } listen(sockfd, 10); struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int epfd = epoll_create(1); struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = sockfd; epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); struct epoll_event events[1024] = {0}; while(1) { //需要遍历的io数量 int nready = epoll_wait(epfd, events, 1024, -1); printf("nready: %d\n", nready); if(nready < 0) continue; int i = 0; //遍历IO for(int i = 0; i < nready; i++) { int connfd = events[i].data.fd; if(sockfd == connfd) { int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); if(clientfd < 0) continue; printf("clientfd: %d\n", clientfd); ev.events = EPOLLIN | EPOLLET; ev.data.fd = clientfd; epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev); } else if(events[i].events & EPOLLIN) { char buffer[BUFFER_LENGTH] = {0}; short len = 0; recv(connfd, &len, 2, 0); len = ntohs(len); int n = recv(connfd, buffer, BUFFER_LENGTH, 0); if(n > 0) { printf("recv: %s\n", buffer); send(connfd, buffer, n, 0); } else if(n == 0) { printf("close\n"); epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL); close(connfd); } } } } }
epoll的机制和前面两个有很大的不同。他机制类似蜂巢。比如一个居民楼有一个蜂巢,每次增加一个clientfd,那么住户+1,但是蜂巢不变。只有当居民需要寄快递,居民把快递放在蜂巢,或者去蜂巢收快递。这样对IO的遍历仅限于蜂巢,而不是每一户居民。等于说,以前的快递员需要去每一户居民家问要不要收发快递,现在只要去蜂巢就可以了,大大提高效率,节省了资源。
注意:
关于这一段代码的书写,其实不需要这么麻烦,这里只是实验了一下水平触发和边缘触发,大家可以自行改写。