百万并发
服务器百万并发是指能够承载的客户端的数量,也就是说可以承接100w个客户端的连接。通常来说服务器的并发量还与业务、与后台数据库的承载量有关,本文中只考虑做到百万连接建立,不做业务处理.。
测试准备
相应的服务器请参考前文:《用反应器模式和epoll构建百万并发服务器》。
所需的客户端请参考文章末尾。
需要的环境,1台8G8核的ubuntu(配置服务器),3台4G4核的ubuntu(配置客户端,每个客户端负责的连接数尽量平均,也就是33.33w)。如果电脑资源足够,性能够好,建议把配置做大些,问题会少不少。
解决连接上限问题
先用一台客户端尝试向服务器发起大量连接,发现报错。
再看看服务器的情况,同样是报错。
我们用ulimit -a查看服务器情况发现是一个进程最多能打开1024个文件,也就是建立1024个文件描述符。于是用ulimit -n解决,客户端和服务器都需要解决这些问题。
解决五元不组问题
服务器每建立一个连接,都会在内核中建立一个tcp控制块,简称tcb。tcb包含了以下元素:源ip,目的ip,源port,目的port,协议类型(tcp,udp等)。
通常协议类型和目的ip(服务器ip)是固定下来的。百万并发确保每个连接都对应剩下3个元素的不同的排列组合,若不能保证,产生的问题叫“五元不组”。
源port一般不被客户端指定,一台机子可用的port大概是(65535-1024)个,1024是因为部分非用户主动空控制的程序也在使用端口,在高并发下情况下1024-65535可能会无法用尽。
因此解决“五元不组”通常由以下几种解法:
- 源ip多样化:多准备几个不同ip的客户端。
- 目的port多样化:让服务器多开几个listen port并且负载均衡。
- 源port多样化,修改/etc/sysctl.conf文件,明确指定客户端在本地的端口的可用范围。
本文中两种方法都用,基于解法1配置了3台不同的ubuntu,每台上都有1个客户端。基于解法2服务器开了20个监听端口,并且客户端循环地进行connect。
基于解法3,则需要修改客户端所在的ubuntu的配置文件。
如图,在相关配置文件的末尾添加一条语句:
解决内存泄漏的问题
服务器每建立一个clientfd,都需要相应的数据结构储存clinetfd。普通的数组往往容易发生越栈从而发生段错误,不了解的人往往会以为这是因为代码的逻辑错误。我们在服务器中建立了百万长度的数组进行存储,不够优雅但是有效。
解决效率受限的问题
在并发连接的时候,经常连着连着就发现,每一千个连接的耗时突然暴增,这是受到了系统级别的文件描述符数量的限制。也需要调整。往客户端的配置文件里添加这句话。
该命令与ulimit -n的差别在于作用的级别不同,前者作用于系统层面,需要root权限,后者作用于当前用户的会话级别。
解决进程制终止问题
进程被强制终止可能发生在客户端,也有可能发生在服务器。
有些机子可能会发生服务器进程CPU和内存消耗过大,被强制kill掉的情况。这是由两种解法:
- 把配置做高,内存更大。
- 修改内核tcp协议栈内存、发送缓冲区、接收缓冲区的大小。
CPU和内核消耗的监控命令:htop
在conf文件中可以继续添加以下语句,分别用于修改内核tcp协议栈内存、单个连接发送缓冲区、单个连接接收缓冲区的最小值、默认值、最大值,可以逐步逐步地进行测试什么数据合适。
注意,也有可能是客户端配置不够高,发生了崩溃,导致服务器大量断连从而也崩溃。也有可能是因为进行被强制终止,内存被强制回收导致的。重点是用htop观察谁的内存爆了。
成功完成百万并发测试
测试用客户端代码
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/epoll.h> #include <errno.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <netdb.h> #include <fcntl.h> #include <sys/time.h> #include <unistd.h> #define MAX_BUFFER 128 #define MAX_EPOLLSIZE (384*1024) #define MAX_PORT 20 #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) int isContinue = 0; static int ntySetNonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL, 0); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } static int ntySetReUseAddr(int fd) { int reuse = 1; return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)); } int main(int argc, char **argv) { if (argc <= 2) { printf("Usage: %s ip port\n", argv[0]); exit(0); } const char *ip = argv[1]; int port = atoi(argv[2]); int connections = 0; char buffer[128] = {0}; int i = 0, index = 0; struct epoll_event events[MAX_EPOLLSIZE]; int epoll_fd = epoll_create(MAX_EPOLLSIZE); strcpy(buffer, " Data From MulClient\n"); struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(ip); struct timeval tv_begin; gettimeofday(&tv_begin, NULL); while (1) { if (++index >= MAX_PORT) index = 0; struct epoll_event ev; int sockfd = 0; if (!isContinue) { sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { perror("socket"); goto err; } //ntySetReUseAddr(sockfd); addr.sin_port = htons(port+index); if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) { perror("connect"); goto err; } //ntySetNonblock(sockfd); ntySetReUseAddr(sockfd); //sprintf(buffer, "Hello Server: client --> %d\n", connections); //send(sockfd, buffer, strlen(buffer), 0); ev.data.fd = sockfd; ev.events = EPOLLIN; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev); connections ++; } //connections ++; if (connections % 1000 == 999) { struct timeval tv_cur; memcpy(&tv_cur, &tv_begin, sizeof(struct timeval)); gettimeofday(&tv_begin, NULL); int time_used = TIME_SUB_MS(tv_begin, tv_cur); printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used); int nfds = epoll_wait(epoll_fd, events, connections, 100); for (i = 0;i < nfds;i ++) { int clientfd = events[i].data.fd; if (events[i].events & EPOLLOUT) { sprintf(buffer, "data from %d\n", clientfd); send(sockfd, buffer, strlen(buffer), 0); } else if (events[i].events & EPOLLIN) { char rBuffer[MAX_BUFFER] = {0}; ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0); if (length > 0) { printf(" RecvBuffer:%s\n", rBuffer); if (!strcmp(rBuffer, "quit")) { isContinue = 0; } } else if (length == 0) { printf(" Disconnect clientfd:%d\n", clientfd); connections --; close(clientfd); } else { if (errno == EINTR) continue; printf(" Error clientfd:%d, errno:%d\n", clientfd, errno); close(clientfd); } } else { printf(" clientfd:%d, errno:%d\n", clientfd, errno); close(clientfd); } } } usleep(100); } return 0; err: printf("error : %s\n", strerror(errno)); return 0; }