1. 多进程并发服务器
我们在上一节写的TCP服务器只能处理单连接,在代码实现时,多进程并发服务器与非并发服务器在创建监听套接字、绑定、监听这几个步骤是一样的,但是在接收连接请求的时候,多进程并发服务器是这样实现的:父进程负责接受连接请求,一旦连接成功,将会创建一个子进程与客户端通信。示意图如下:
(1)什么是并发
- 单核CPU → 多进程/线程并发 → 时间片轮转
- 并发 → 某一个时间片/点所能处理的任务数
- 服务器并发:服务器在某个时间点/片所能处理的连接数所能接收的client连接越多,并发量越大
(2)多进程并发服务器需要注意的几个要点
使用多进程的方式来解决服务器处理多连接的问题,需要注意下面几点:
- 共享:读时共享、写时复制。有血缘关系的进程间将会共享
- 文件描述符
- 内存映射区mmap
- 父进程扮演什么角色?
- 等待接受客户端连接accept()
- 有连接的时候通过fork()创建一个子进程。父进程只负责等待客户端连接,即通过accept()阻塞等待连接请求,一旦有连接请求,马上通过fork()创建一个子进程,子进程通过共享父进程的文件描述符来实现和client通信。
- 将用于通信的文件描述符关闭。accept()接受连接请求后会返回一个用于通信的文件描述符,而父进程的职责是等待连接并fork()创建用于通信的子进程,所以对于父进程来说,用于通信的文件描述符是没有用处的,关闭该文件描述符来节省开销。我们知道,文件描述符是有上限的,最多1024个(0-1023),如果不关闭的话,每次fork()一个子进程都要浪费一个文件描述符,如果进程多了,可能文件描述符就不够用了。
- 子进程扮演什么角色?
- 通信。通过共享的父进程accept()返回的文件描述符来与客户端通信。
- 将用于监听的文件描述符关闭。同样是为了节省资源,子进程被fork()出来后也会拥有一个用于监听的文件描述符(因为子进程是对父进程的拷贝),但是子进程的作用是与客户端通信,所以用于监听的文件描述符对子进程而言并无用处,关闭以节省资源。
- 创建的子进程个数有限制吗?
- 受硬件限制
- 文件描述符默认上限1024
- 子进程资源回收
- wait/waitpid
- 使用信号回收
- signal
- sigaction
- 捕捉信号SIGCHLD
(3)读时共享写时复制详解
首先看图
如果父子进程都只是读数据,那么他们都通过虚拟地址去访问1号物理地址的内容,如果此时父进程修改了数据a=8,那么父进程会先复制一份数据到2号内存,然后修改2号内存的数据,父进程再读的时候就去2号内存读,而子进程依然去1号内存读。如果子进程也要修改这个全局变量,那么子进程也会拷贝一份数据到内存3,然后修改内存3的数据,子进程访问数据时会访问内存3的数据。(多个子进程就会拷贝多份)
2. 多进程并发服务器代码实现
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <string.h> #include <sys/socket.h> #include <arpa/inet.h> #include <ctype.h> #include <signal.h> #include <sys/wait.h> #include <errno.h> // 进程回收函数 void recyle(int num) { pid_t pid; while( (pid = waitpid(-1, NULL, WNOHANG)) > 0 ) { printf("child died , pid = %d\n", pid); } } int main(int argc, const char* argv[]) { if(argc < 2) { printf("eg: ./a.out port\n"); exit(1); } struct sockaddr_in serv_addr; socklen_t serv_len = sizeof(serv_addr); int port = atoi(argv[1]); // 创建套接字 int lfd = socket(AF_INET, SOCK_STREAM, 0); // 初始化服务器 sockaddr_in memset(&serv_addr, 0, serv_len); serv_addr.sin_family = AF_INET; // 地址族 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IP serv_addr.sin_port = htons(port); // 设置端口 // 绑定IP和端口 bind(lfd, (struct sockaddr*)&serv_addr, serv_len); // 设置同时监听的最大个数 listen(lfd, 36); printf("Start accept ......\n"); // 使用信号回收子进程pcb //这个子进程回收机制会被子进程复制 struct sigaction act; act.sa_handler = recyle; act.sa_flags = 0; sigemptyset(&act.sa_mask); sigaction(SIGCHLD, &act, NULL); struct sockaddr_in client_addr; socklen_t cli_len = sizeof(client_addr); while(1) { // 父进程接收连接请求 // accept阻塞的时候被信号中断, 处理信号对应的操作之后(比如子进程终止,收到信号后去回收子进程) // 回来之后不阻塞了, 直接返回-1, 这时候 errno==EINTR int cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len); //解决方法就是,在一个循环中判断,如果accept阻塞过程中被信号打断 //也就是返回值-1且errno == EINTR,那么再一次调用accept //这样accept会再次回到阻塞状态,并且返回值不是-1,也就不会进入循环 //等到再次被信号打断的时候才会再次进入循环 /*这里的cfd虽然只定义了一个,但是在每个子进程中都会有一个拷贝,并且修改一个子进程的cfd不会影响其它子进程*/ while(cfd == -1 && errno == EINTR) { cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len); } printf("connect sucessful\n"); // 创建子进程 pid_t pid = fork(); if(pid == 0) { close(lfd); // child process // 通信 char ip[64]; while(1) { // client ip port printf("client IP: %s, port: %d\n", inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)), ntohs(client_addr.sin_port)); char buf[1024]; int len = read(cfd, buf, sizeof(buf)); if(len == -1) { perror("read error"); exit(1); } else if(len == 0) { printf("客户端断开了连接\n"); close(cfd); break; } else { printf("recv buf: %s\n", buf); write(cfd, buf, len); } } // 干掉子进程 return 0; } else if(pid > 0) { // parent process close(cfd); } } close(lfd); return 0; }
3. 多线程并发服务器
多线程并发服务器示意图如下:
在多进程模型中,fork得到的子进程会复制父进程的文件描述符cfd等信息,每个进程的cfd都是自己的,操作互不影响。但是线程不同,现在只有主线程的cfd,多个线程间的信息是共享的,假如说传递给每个子线程的cfd都是同一个,那么线程1修改该文件描述符指向的内容会影响到线程2的通信,因为它们共享这一个文件描述符。所以这里需要建立一个文件描述符数组,每个子线程对应数组中的一个文件描述符。
另外连接主线程的client是哪一个,也就是说哪个client对应和哪个子线程通信,这也需要把和子线程通信的client的ip和port传给和该client通信的子线程,这样子线程才能知道通信的客户端的ip和port。
于是我们需要创建一个结构体数组,每个子线程对应结构体数组中的一个成员,而结构体数组中的每个成员将作为参数传递给子进程的回调函数。
归根到底就是因为,进程是独立的,线程是共享的。
线程共享下面的资源:
- 全局数据区
- 堆区
- 一块有效内存的地址,比如说把线程1的一块内存的地址传给线程2,那么线程2也可以操作这块内存。
4. 多线程并发服务器代码实现
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <string.h> #include <sys/socket.h> #include <arpa/inet.h> #include <ctype.h> #include <pthread.h> // 自定义数据结构 //把线程处理函数所需要的信息封装进来 typedef struct SockInfo { int fd; // 文件描述符 struct sockaddr_in addr; //ip地址结构体 pthread_t id; //线程id }SockInfo; // 子线程处理函数 void* worker(void* arg) { char ip[64]; char buf[1024]; SockInfo* info = (SockInfo*)arg; // 通信 while(1) { printf("Client IP: %s, port: %d\n", inet_ntop(AF_INET, &info->addr.sin_addr.s_addr, ip, sizeof(ip)), ntohs(info->addr.sin_port)); int len = read(info->fd, buf, sizeof(buf)); if(len == -1) { perror("read error"); pthread_exit(NULL); //只退出子线程 //exit(1); //exit会把主线程也一块退出 } else if(len == 0) { printf("客户端已经断开了连接\n"); close(info->fd); break; } else { printf("recv buf: %s\n", buf); write(info->fd, buf, len); } } return NULL; } int main(int argc, const char* argv[]) { if(argc < 2) { printf("eg: ./a.out port\n"); exit(1); } struct sockaddr_in serv_addr; socklen_t serv_len = sizeof(serv_addr); int port = atoi(argv[1]); // 创建套接字 int lfd = socket(AF_INET, SOCK_STREAM, 0); // 初始化服务器 sockaddr_in memset(&serv_addr, 0, serv_len); serv_addr.sin_family = AF_INET; // 地址族 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听本机所有的IP serv_addr.sin_port = htons(port); // 设置端口 // 绑定IP和端口 bind(lfd, (struct sockaddr*)&serv_addr, serv_len); // 设置同时监听的最大个数 listen(lfd, 36); printf("Start accept ......\n"); int i = 0; SockInfo info[256]; //每个线程对应数组的一个元素,最多256个线程 // 规定 fd == -1 说明这是一个无效文件描述符,也就是说这个文件描述符是空闲的,没被占用 for(i=0; i<sizeof(info)/sizeof(info[0]); ++i) { info[i].fd = -1; //所有文件描述符全部初始化为-1 } socklen_t cli_len = sizeof(struct sockaddr_in); while(1) { // 选一个没有被使用的, 最小的数组元素 //因为有可能我们使用的文件描述符对应数组下标i已经累加到了100,但是前面 //99个都已经被释放了(断开连接了),我们最好选用一个当前空闲的数组下标最小 //的文件描述符,以合理利用资源 for(i=0; i<256; ++i) { if(info[i].fd == -1) { break; //这样就能把数组下标最小的fd找出来,并确保i指向它,直接break出去 } } if(i == 256) //整个数组都被用完了,直接break出while循环 { break; } // 主线程 - 等待接受连接请求 info[i].fd = accept(lfd, (struct sockaddr*)&info[i].addr, &cli_len); //第二个参数是传出参数, //传出客户端ip信息(struct sockaddr*)类型 // 创建子线程 - 通信 pthread_create(&info[i].id, NULL, worker, &info[i]); // 设置线程分离 //这样子线程终止的时候会自动释放,就不需要主线程去释放了 pthread_detach(info[i].id); } close(lfd); // 只退出主线程 //对子线程无影响,子线程可以继续通信 pthread_exit(NULL); return 0; }
5. 扩展:Socket API封装
#include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <sys/socket.h> void perr_exit(const char *s) { perror(s); exit(-1); } //也可以在vim下按2K跳转到man文档中的accept函数,因为man文档跳转不区分大小写 int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) { int n; again: if ((n = accept(fd, sa, salenptr)) < 0) { //ECONNABORTED 发生在重传(一定次数)失败后,强制关闭套接字 //EINTR 进程被信号中断 //如果accept函数在阻塞时被信号打断,处理完信号 //返回时就不会在阻塞了,而是直接返回-1 if ((errno == ECONNABORTED) || (errno == EINTR)) { goto again; //如果accept阻塞时被信号打断了,需要在执行一次accept继续阻塞 } else { perr_exit("accept error"); } } return n; } int Bind(int fd, const struct sockaddr *sa, socklen_t salen) { int n; if ((n = bind(fd, sa, salen)) < 0) { perr_exit("bind error"); } return n; } int Connect(int fd, const struct sockaddr *sa, socklen_t salen) { int n; n = connect(fd, sa, salen); if (n < 0) { perr_exit("connect error"); } return n; } int Listen(int fd, int backlog) { int n; if ((n = listen(fd, backlog)) < 0) { perr_exit("listen error"); } return n; } int Socket(int family, int type, int protocol) { int n; if ((n = socket(family, type, protocol)) < 0) { perr_exit("socket error"); } return n; } ssize_t Read(int fd, void *ptr, size_t nbytes) { ssize_t n; again: if ( (n = read(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; //如果read被信号中断了,应该让它继续去read等待读数据 (read阻塞时) else return -1; } return n; } ssize_t Write(int fd, const void *ptr, size_t nbytes) { ssize_t n; again: if ((n = write(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; } int Close(int fd) { int n; if ((n = close(fd)) == -1) perr_exit("close error"); return n; } /*参三: 应该读取的字节数*/ //一直读到n字节数才会返回,否则阻塞等待 //socket 4096 readn(cfd, buf, 4096) nleft = 4096-1500 ssize_t Readn(int fd, void *vptr, size_t n) { size_t nleft; //usigned int 剩余未读取的字节数 ssize_t nread; //int 实际读到的字节数 char *ptr; ptr = vptr; nleft = n; //n 未读取字节数 while (nleft > 0) { if ((nread = read(fd, ptr, nleft)) < 0) { if (errno == EINTR) { nread = 0; } else { return -1; } } else if (nread == 0) { break; } nleft -= nread; //nleft = nleft - nread ptr += nread; } return n - nleft; } ssize_t Writen(int fd, const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft)) <= 0) { if (nwritten < 0 && errno == EINTR) nwritten = 0; else return -1; } nleft -= nwritten; ptr += nwritten; } return n; } static ssize_t my_read(int fd, char *ptr) //静态函数保证了读完第一个100字节才去读下一个100字节,而不是每次调用都读100字节 { static int read_cnt; //改变量存在静态数据区,下次调用my_read函数的时候,read_cnt会保留上次的值 static char *read_ptr; static char read_buf[100]; //因为这里的变量都是static的,所以并非每次调用my_read都会读100字节,而是读完100字节再去读下一个100字节 if (read_cnt <= 0) { again: if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) //"hello\n" { if (errno == EINTR) goto again; return -1; } else if (read_cnt == 0) return 0; read_ptr = read_buf; } read_cnt--; //在上次调用结束的值基础上--,保证了读完100字节再去读下一个100字节 *ptr = *read_ptr++; return 1; } /*readline --- fgets*/ //传出参数 vptr ssize_t Readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr; ptr = vptr; for (n = 1; n < maxlen; n++) { if ((rc = my_read(fd, &c)) == 1) //ptr[] = hello\n { *ptr++ = c; if (c == '\n') //先读100个字节,依次遍历,遇到 '\n' 说明一行读完了 break; } else if (rc == 0) { *ptr = 0; return n-1; } else return -1; } *ptr = 0; return n; }