主要是记录一下使用前面学的epoll+reactor的多路IO复用的网络编程技巧做一个百万并发连接的测试。这篇文章主要是实现,没用使用复杂的数据结构。第二篇会使用一些数据结构优化。
代码
服务端就用稍微改一下的epoll就好了
#include <sys/socket.h> #include <errno.h> #include <netinet/in.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <pthread.h> #include <sys/poll.h> #include <sys/epoll.h> #include <sys/time.h> #define BUFFER_LENGTH 256 typedef int (*RCALLBACK)(int fd); // listenfd // EPOLLIN --> int accept_cb(int fd); // clientfd // int recv_cb(int fd); int send_cb(int fd); // conn, fd, buffer, callback struct conn_item { int fd; char rbuffer[BUFFER_LENGTH]; int rlen; char wbuffer[BUFFER_LENGTH]; int wlen; union { RCALLBACK accept_callback; RCALLBACK recv_callback; } recv_t; RCALLBACK send_callback; }; // libevent --> int epfd = 0; struct conn_item connlist[1048576] = {0}; //直接把数组暴力改到100w struct timeval zvoice_king; // // 1000000 #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) int set_event(int fd, int event, int flag) { if (flag) { // 1 add, 0 mod struct epoll_event ev; ev.events = event ; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); } else { struct epoll_event ev; ev.events = event; ev.data.fd = fd; epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); } } int accept_cb(int fd) { struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len); if (clientfd < 0) { return -1; } set_event(clientfd, EPOLLIN, 1); connlist[clientfd].fd = clientfd; memset(connlist[clientfd].rbuffer, 0, BUFFER_LENGTH); connlist[clientfd].rlen = 0; memset(connlist[clientfd].wbuffer, 0, BUFFER_LENGTH); connlist[clientfd].wlen = 0; connlist[clientfd].recv_t.recv_callback = recv_cb; connlist[clientfd].send_callback = send_cb; if ((clientfd % 1000) == 999) { struct timeval tv_cur; gettimeofday(&tv_cur, NULL); int time_used = TIME_SUB_MS(tv_cur, zvoice_king);//计算一下用时 memcpy(&zvoice_king, &tv_cur, sizeof(struct timeval)); printf("clientfd : %d, time_used: %d\n", clientfd, time_used); } return clientfd; } int recv_cb(int fd) { // fd --> EPOLLIN char *buffer = connlist[fd].rbuffer; int idx = connlist[fd].rlen; int count = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0); if (count == 0) { printf("disconnect\n"); epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); close(fd); return -1; } connlist[fd].rlen += count; memcpy(connlist[fd].wbuffer, connlist[fd].rbuffer, connlist[fd].rlen); connlist[fd].wlen = connlist[fd].rlen; connlist[fd].rlen -= connlist[fd].rlen; set_event(fd, EPOLLOUT, 0); return count; } int send_cb(int fd) { char *buffer = connlist[fd].wbuffer; int idx = connlist[fd].wlen; int count = send(fd, buffer, idx, 0); set_event(fd, EPOLLIN, 0); return count; } int init_server(unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10); return sockfd; } // tcp int main() { int port_count = 100; unsigned short port = 2048; int i = 0; epfd = epoll_create(1); // int size for (i = 0;i < port_count;i ++) { int sockfd = init_server(port + i); // 2048, 2049, 2050, 2051 ... 2057 connlist[sockfd].fd = sockfd; connlist[sockfd].recv_t.accept_callback = accept_cb; set_event(sockfd, EPOLLIN, 1); } gettimeofday(&zvoice_king, NULL); struct epoll_event events[1024] = {0}; while (1) { // mainloop(); int nready = epoll_wait(epfd, events, 1024, -1); // int i = 0; for (i = 0;i < nready;i ++) { int connfd = events[i].data.fd; if (events[i].events & EPOLLIN) { // int count = connlist[connfd].recv_t.recv_callback(connfd); } else if (events[i].events & EPOLLOUT) { int count = connlist[connfd].send_callback(connfd); } } } }
测试连接的代码用这个,测试别的服务端的并发数量都可以使用。
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/epoll.h> #include <errno.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <netdb.h> #include <fcntl.h> #define MAX_BUFFER 128 #define MAX_EPOLLSIZE (384*1024) #define MAX_PORT 100 #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) int isContinue = 0; static int ntySetNonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL, 0); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } static int ntySetReUseAddr(int fd) { int reuse = 1; return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)); } int main(int argc, char **argv) { if (argc <= 2) { printf("Usage: %s ip port\n", argv[0]); exit(0); } const char *ip = argv[1]; int port = atoi(argv[2]); int connections = 0; char buffer[128] = {0}; int i = 0, index = 0; struct epoll_event events[MAX_EPOLLSIZE]; int epoll_fd = epoll_create(MAX_EPOLLSIZE); strcpy(buffer, " Data From MulClient\n"); struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(ip); struct timeval tv_begin; gettimeofday(&tv_begin, NULL); while (1) { if (++index >= MAX_PORT) index = 0; struct epoll_event ev; int sockfd = 0; if (connections < 380000 && !isContinue) { sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { perror("socket"); goto err; } //ntySetReUseAddr(sockfd); addr.sin_port = htons(port+index); if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) { perror("connect"); goto err; } ntySetNonblock(sockfd); ntySetReUseAddr(sockfd); sprintf(buffer, "Hello Server: client --> %d\n", connections); send(sockfd, buffer, strlen(buffer), 0); ev.data.fd = sockfd; ev.events = EPOLLIN | EPOLLOUT; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev); connections ++; } //connections ++; if (connections % 1000 == 999 || connections >= 380000) { struct timeval tv_cur; memcpy(&tv_cur, &tv_begin, sizeof(struct timeval)); gettimeofday(&tv_begin, NULL); int time_used = TIME_SUB_MS(tv_begin, tv_cur); printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used); int nfds = epoll_wait(epoll_fd, events, connections, 100); for (i = 0;i < nfds;i ++) { int clientfd = events[i].data.fd; if (events[i].events & EPOLLOUT) { sprintf(buffer, "data from %d\n", clientfd); send(sockfd, buffer, strlen(buffer), 0); } else if (events[i].events & EPOLLIN) { char rBuffer[MAX_BUFFER] = {0}; ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0); if (length > 0) { printf(" RecvBuffer:%s\n", rBuffer); if (!strcmp(rBuffer, "quit")) { isContinue = 0; } } else if (length == 0) { printf(" Disconnect clientfd:%d\n", clientfd); connections --; close(clientfd); } else { if (errno == EINTR) continue; printf(" Error clientfd:%d, errno:%d\n", clientfd, errno); close(clientfd); } } else { printf(" clientfd:%d, errno:%d\n", clientfd, errno); close(clientfd); } } } usleep(1 * 1000); } return 0; err: printf("error : %s\n", strerror(errno)); return 0; }
虚拟机
我们准备三台虚拟机,来做实验。配置如下:
- server: 8核 8G
- client: 2核 4G
- client: 2核 4G
- client: 2核 4G
我这边是使用的ubuntu,其他的发行版应该也差不多。
参数设置
修改打开文件句柄数量
首先我们需要设置一下可以打开的文件句柄个数:
ulimit -n 1048576
装载IP追踪模块
modprobe ip_conntrack
修改系统设置
cd /etc ls | grep sysctl.conf
如果没有找到,就创建一个
touch sysctl.conf
如果找到了
sudo vim sysctl.conf
在最后添加以下内容:
server:
net.ipv4.ip_local_port_range = 1024 65535 net.ipv4.tcp_mem = 1772864 1572864 1572864 net.ipv4.tcp_wmem = 512 512 1024 net.ipv4.tcp_rmem = 512 512 1024 fs.file-max = 1048576
client:
net.ipv4.ip_local_port_range = 1024 65535 net.ipv4.tcp_mem = 262144 524288 786432 net.ipv4.tcp_wmem = 512 512 1024 net.ipv4.tcp_rmem = 512 512 1024
然后都要让他们生效
sudo sysctl -p
开始测试
首先,把这两个代码编译一下。然后ifconfig看一下IP地址。
服务端运行第一段代码,客户端运行客户端代码(参数是IP 和 端口号)。
遇到的问题
在使用htop监视系统资源的时候。我们会发现,跑着跑着mem突然下降的问题。就是内存不够的问题。这里需要修改tcp协议栈的分配,就是sysctl.conf的参数。我提供的是我配置的参数。将tcp.mem增大可以增大tcp协议栈的内存分配。将tcp_rmem tcp_wmem改小可以修改每一个tcp连接的内存分配。如果还是跑不到100w。去修改客户端的参数。最后解决的办法就是修改虚拟机虚拟内存大额分配。