一、多线程并发和IO多路复用(select、poll、epoll)
一请求一线程是通过多线程实现的,而selet,poll,epoll是通过io多路复用
- 一请求一线程
简单,但线程个数有限,C10K (1W个线程) - select 复杂度O(1)
1.select,1024fd,多做几次select,可以突破C10K(1W个线程),但是往上走没法突破C1000K (100W个线程)
2.首先要把cpu中的rset全部拷贝出来,然后在select里面选,虽然只有几个nready,但是要把所有的rset拷贝出来,所以select具有O(n)的无差别轮询复杂度 - poll 复杂度O(n)
和select基本上没有差别,但是它比select方便,只是不需要定义读、写、检查差错,三样东西了,全部包含在一起。它没有最大连接数的限制,原因是它是基于链表来存储的. - epoll 复杂度O(n)
不同于忙轮询和无差别轮询,epoll会把哪个流发生了怎样的I/O事件通知我们。所以我们说epoll实际上是事件驱动(每个事件关联上fd)的,此时我们对这些流的操作都是有意义的。(复杂度降低到了O(1))
二、代码部分
#include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> #include <sys/poll.h> #include <sys/epoll.h> #include <pthread.h> #define MAXLNE 4096 #define POLL_SIZE 1024 //8m * 4G = 128 , 512 //C10k void *client_routine(void *arg) { // int connfd = *(int *)arg; char buff[MAXLNE]; while (1) { int n = recv(connfd, buff, MAXLNE, 0); if (n > 0) { buff[n] = '\0'; printf("recv msg from client: %s\n", buff); send(connfd, buff, n, 0);//服务器向客户端的应答 } else if (n == 0) { close(connfd); break; } } return NULL; } int main(int argc, char **argv) { int listenfd, connfd, n; struct sockaddr_in servaddr; char buff[MAXLNE]; if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { printf("create socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) { printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } if (listen(listenfd, 10) == -1) { printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } #if 0 struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } printf("========waiting for client's request========\n"); while (1) { n = recv(connfd, buff, MAXLNE, 0); if (n > 0) { buff[n] = '\0'; printf("recv msg from client: %s\n", buff); send(connfd, buff, n, 0); } else if (n == 0) { close(connfd); } //close(connfd); } #elif 0 //这种情况:每个客户端连接后只能发送一次消息 //accept如果没有连接会阻塞(TCP三次握手发生在listen后,accept前),recv没有接受也会阻塞 //因此想要发送第二条消息的时候,就会被阻塞到accept了 printf("========waiting for client's request========\n"); while (1) { struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } n = recv(connfd, buff, MAXLNE, 0); if (n > 0) { buff[n] = '\0'; printf("recv msg from client: %s\n", buff); send(connfd, buff, n, 0); } else if (n == 0) { close(connfd); } //close(connfd); } #elif 0 // 一请求一线程 //通过多线程的方式,在client_routine里面放一个while(1)循环,使得可以反复recv,从而解决上面的问题(客服端只能发送出一条数据) //这种方式的 优点: 简单 缺点:线程个数有限,导致可以接入的客户端个数有限 while (1) { struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } pthread_t threadid; pthread_create(&threadid, NULL, client_routine, (void*)&connfd); } #elif 0 // select 选有事件可以处理的io,利用select解决了 上述用一请求一线程,用多个线程去处理的问题 fd_set rfds, rset, wfds, wset;//fd_set通过bitset,将其比特位置为1表明有数据,0表示没数据 FD_ZERO(&rfds); FD_SET(listenfd, &rfds);//将listenfd加入set集合 FD_ZERO(&wfds); int max_fd = listenfd;//在select的时候是通过遍历bitmap(集合)去找到的,因此把listenfd作为最大的索引,因此最大个数要为listenfd+1(刚开始的时候只有0、1、2(标准输入、输出、错误)和listenfd=3),因此初始化最大值 为listenfd+1) while (1) { rset = rfds; wset = wfds; int nready = select(max_fd+1, &rset, &wset, NULL, NULL);//select的做法就是将fd接受到的数据,传入集合中(rset,wset),后续只需要处理,rset,wset就行。最后一个参数timeout:如果设置为NULL(0),并且没有可读的数据,会一直阻塞;如果设置>0,那么经过该时间,阻塞就会停止,向下继续运行 if (FD_ISSET(listenfd, &rset)) { //如果有新的客户端连接,rset中有存在listenfd接受到数据的标志(连接) struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } FD_SET(connfd, &rfds); if (connfd > max_fd) max_fd = connfd;//动态扩容 if (--nready == 0) continue;//如果只有listenfd(只有新客户端连接,但没有接收到其他客户端数据),就不执行下面内容了 } int i = 0; for (i = listenfd+1;i <= max_fd;i ++) {//fd是依次增加的,fd=0,1,2是确定好的(标注输入,输出,错误值),listenfd=3,后续的connfd都是在4、5、6...依次增加的。如果connfd为8了,但connfd=4回收了,那么下一个就从4开始 if (FD_ISSET(i, &rset)) { // 读操作 n = recv(i, buff, MAXLNE, 0); if (n > 0) { buff[n] = '\0'; printf("recv msg from client: %s\n", buff); FD_SET(i, &wfds); // 接收到数据后,就赋予客户端 读操作,也就是服务端的 写操作wfds。 但并不会在此轮进行FD_ISSET(i, &wset),而是在下一轮select后,进入FD_ISSET(i, &wset) //reactor //send(i, buff, n, 0); } else if (n == 0) { //如果断开连接(rset有客户端断开的信息的标志) FD_CLR(i, &rfds); // 如果不关闭会导致一直进入(n==0这个情况,一致打印disconnect。因为下面有close(i),所以这行即使不加也不会循环打印,但最好要加) // printf("disconnect\n"); close(i); } if (--nready == 0) break; } else if (FD_ISSET(i, &wset)) { send(i, buff, n, 0);//为什么send要发送一堆16进制内容?? FD_SET(i, &rfds);//设置读操作 } } } #elif 0 struct pollfd fds[POLL_SIZE] = {0}; fds[0].fd = listenfd;//当然也可以写成fds[listenfd]=listenfd; fds[0].events = POLLIN; int max_fd = listenfd; int i = 0; for (i = 1;i < POLL_SIZE;i ++) { fds[i].fd = -1; } while (1) { int nready = poll(fds, max_fd+1, -1); if (fds[0].revents & POLLIN) { struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } printf("accept \n"); fds[connfd].fd = connfd; fds[connfd].events = POLLIN; if (connfd > max_fd) max_fd = connfd; if (--nready == 0) continue; } //int i = 0; for (i = listenfd+1;i <= max_fd;i ++) { if (fds[i].revents & POLLIN) { n = recv(i, buff, MAXLNE, 0); if (n > 0) { buff[n] = '\0'; printf("recv msg from client: %s\n", buff); send(i, buff, n, 0); } else if (n == 0) { // fds[i].fd = -1; close(i); } if (--nready == 0) break; } } } #else //poll/select --> // epoll_create // epoll_ctl(ADD, DEL, MOD) // epoll_wait int epfd = epoll_create(1); //int size struct epoll_event events[POLL_SIZE] = {0};//epfd可以理解为一个快递员,而events可以理解为快递员的盒子,POLL_SIZE可以理解为盒子大小,不用担心POLL_SIZE设置的小,无非是快递员多跑几趟 struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = listenfd; epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); while (1) { int nready = epoll_wait(epfd, events, POLL_SIZE, 5); if (nready == -1) {//设置一个大于0的timeout,如果没有接收到任何信息,就会返回-1 continue; } int i = 0; for (i = 0;i < nready;i ++) { int clientfd = events[i].data.fd; if (clientfd == listenfd) { struct sockaddr_in client; socklen_t len = sizeof(client); if ((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) { printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno); return 0; } printf("accept\n"); ev.events = EPOLLIN; ev.data.fd = connfd; epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); } else if (events[i].events & EPOLLIN) { n = recv(clientfd, buff, MAXLNE, 0); if (n > 0) { buff[n] = '\0'; printf("recv msg from client: %s\n", buff); send(clientfd, buff, n, 0);//非标准写法 } else if (n == 0) { // ev.events = EPOLLIN; ev.data.fd = clientfd; epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev); close(clientfd); } } } } #endif close(listenfd); return 0; }
三、补充
1.tcp三次握手的时刻
可以把listenfd理解成 迎宾的人
把clientfd理解成 服务员
tcp三次握手发生在listen后
accpet前,accpet只是在三次握手后取出一个clientfd
2.epoll_create()
int epfd = epoll_create(1); 只要传一个大于0的参数就行了。早期是一个固定大小,但是现在是以链表实现的,没有上限,这么做的目的是为了代码兼容
3.EPOLLOUT触发条件
把EPOLLIN理解为可读(当管道内为空是,不可读)
把EPOLLOUT理解为可写(当管道满时候,不可写)