百万并发连接的实践测试01

简介: 百万并发连接的实践测试01

主要是记录一下使用前面学的epoll+reactor的多路IO复用的网络编程技巧做一个百万并发连接的测试。这篇文章主要是实现,没用使用复杂的数据结构。第二篇会使用一些数据结构优化。

代码

服务端就用稍微改一下的epoll就好了

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/time.h>
#define BUFFER_LENGTH   256
typedef int (*RCALLBACK)(int fd);
// listenfd
// EPOLLIN --> 
int accept_cb(int fd);
// clientfd
// 
int recv_cb(int fd);
int send_cb(int fd);
// conn, fd, buffer, callback
struct conn_item {
  int fd;
  
  char rbuffer[BUFFER_LENGTH];
  int rlen;
  char wbuffer[BUFFER_LENGTH];
  int wlen;
  union {
    RCALLBACK accept_callback;
    RCALLBACK recv_callback;
  } recv_t;
  RCALLBACK send_callback;
};
// libevent --> 
int epfd = 0;
struct conn_item connlist[1048576] = {0}; //直接把数组暴力改到100w
struct timeval zvoice_king;
// 
// 1000000
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
int set_event(int fd, int event, int flag) {
  if (flag) { // 1 add, 0 mod
    struct epoll_event ev;
    ev.events = event ;
    ev.data.fd = fd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
  } else {
  
    struct epoll_event ev;
    ev.events = event;
    ev.data.fd = fd;
    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
  }
  
}
int accept_cb(int fd) {
  struct sockaddr_in clientaddr;
  socklen_t len = sizeof(clientaddr);
  
  int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
  if (clientfd < 0) {
    return -1;
  }
  set_event(clientfd, EPOLLIN, 1);
  connlist[clientfd].fd = clientfd;
  memset(connlist[clientfd].rbuffer, 0, BUFFER_LENGTH);
  connlist[clientfd].rlen = 0;
  memset(connlist[clientfd].wbuffer, 0, BUFFER_LENGTH);
  connlist[clientfd].wlen = 0;
  
  connlist[clientfd].recv_t.recv_callback = recv_cb;
  connlist[clientfd].send_callback = send_cb;
  if ((clientfd % 1000) == 999) {
    struct timeval tv_cur;
    gettimeofday(&tv_cur, NULL);
    int time_used = TIME_SUB_MS(tv_cur, zvoice_king);//计算一下用时
    memcpy(&zvoice_king, &tv_cur, sizeof(struct timeval));
    
    printf("clientfd : %d, time_used: %d\n", clientfd, time_used);
  }
  return clientfd;
}
int recv_cb(int fd) { // fd --> EPOLLIN
  char *buffer = connlist[fd].rbuffer;
  int idx = connlist[fd].rlen;
  
  int count = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
  if (count == 0) {
    printf("disconnect\n");
    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);   
    close(fd);
    
    return -1;
  }
  connlist[fd].rlen += count;
  memcpy(connlist[fd].wbuffer, connlist[fd].rbuffer, connlist[fd].rlen);
  connlist[fd].wlen = connlist[fd].rlen;
  connlist[fd].rlen -= connlist[fd].rlen;
  set_event(fd, EPOLLOUT, 0);
  
  return count;
}
int send_cb(int fd) {
  char *buffer = connlist[fd].wbuffer;
  int idx = connlist[fd].wlen;
  int count = send(fd, buffer, idx, 0);
  set_event(fd, EPOLLIN, 0);
  return count;
}
int init_server(unsigned short port) {
  int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in serveraddr;
  memset(&serveraddr, 0, sizeof(struct sockaddr_in));
  serveraddr.sin_family = AF_INET;
  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
  serveraddr.sin_port = htons(port);
  if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {
    perror("bind");
    return -1;
  }
  listen(sockfd, 10);
  return sockfd;
}
// tcp 
int main() {
  int port_count = 100;
  unsigned short port = 2048;
  int i = 0;
  
  epfd = epoll_create(1); // int size
  for (i = 0;i < port_count;i ++) {
    int sockfd = init_server(port + i);  // 2048, 2049, 2050, 2051 ... 2057
    connlist[sockfd].fd = sockfd;
    connlist[sockfd].recv_t.accept_callback = accept_cb;
    set_event(sockfd, EPOLLIN, 1);
  }
  gettimeofday(&zvoice_king, NULL);
  struct epoll_event events[1024] = {0};
  
  while (1) { // mainloop();
    int nready = epoll_wait(epfd, events, 1024, -1); // 
    int i = 0;
    for (i = 0;i < nready;i ++) {
      int connfd = events[i].data.fd;
      if (events[i].events & EPOLLIN) { //
        int count = connlist[connfd].recv_t.recv_callback(connfd);
      } else if (events[i].events & EPOLLOUT) { 
        int count = connlist[connfd].send_callback(connfd);
      }
    }
  }
}

测试连接的代码用这个,测试别的服务端的并发数量都可以使用。

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#define MAX_BUFFER    128
#define MAX_EPOLLSIZE (384*1024)
#define MAX_PORT    100
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
int isContinue = 0;
static int ntySetNonblock(int fd) {
  int flags;
  flags = fcntl(fd, F_GETFL, 0);
  if (flags < 0) return flags;
  flags |= O_NONBLOCK;
  if (fcntl(fd, F_SETFL, flags) < 0) return -1;
  return 0;
}
static int ntySetReUseAddr(int fd) {
  int reuse = 1;
  return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}
int main(int argc, char **argv) {
  if (argc <= 2) {
    printf("Usage: %s ip port\n", argv[0]);
    exit(0);
  }
  const char *ip = argv[1];
  int port = atoi(argv[2]);
  int connections = 0;
  char buffer[128] = {0};
  int i = 0, index = 0;
  struct epoll_event events[MAX_EPOLLSIZE];
  
  int epoll_fd = epoll_create(MAX_EPOLLSIZE);
  
  strcpy(buffer, " Data From MulClient\n");
    
  struct sockaddr_in addr;
  memset(&addr, 0, sizeof(struct sockaddr_in));
  
  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = inet_addr(ip);
  struct timeval tv_begin;
  gettimeofday(&tv_begin, NULL);
  while (1) {
    if (++index >= MAX_PORT) index = 0;
    
    struct epoll_event ev;
    int sockfd = 0;
    if (connections < 380000 && !isContinue) {
      sockfd = socket(AF_INET, SOCK_STREAM, 0);
      if (sockfd == -1) {
        perror("socket");
        goto err;
      }
      //ntySetReUseAddr(sockfd);
      addr.sin_port = htons(port+index);
      if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
        perror("connect");
        goto err;
      }
      ntySetNonblock(sockfd);
      ntySetReUseAddr(sockfd);
      sprintf(buffer, "Hello Server: client --> %d\n", connections);
      send(sockfd, buffer, strlen(buffer), 0);
      ev.data.fd = sockfd;
      ev.events = EPOLLIN | EPOLLOUT;
      epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
    
      connections ++;
    }
    //connections ++;
    if (connections % 1000 == 999 || connections >= 380000) {
      struct timeval tv_cur;
      memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
      
      gettimeofday(&tv_begin, NULL);
      int time_used = TIME_SUB_MS(tv_begin, tv_cur);
      printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);
      int nfds = epoll_wait(epoll_fd, events, connections, 100);
      for (i = 0;i < nfds;i ++) {
        int clientfd = events[i].data.fd;
        if (events[i].events & EPOLLOUT) {
          sprintf(buffer, "data from %d\n", clientfd);
          send(sockfd, buffer, strlen(buffer), 0);
        } else if (events[i].events & EPOLLIN) {
          char rBuffer[MAX_BUFFER] = {0};       
          ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
          if (length > 0) {
            printf(" RecvBuffer:%s\n", rBuffer);
            if (!strcmp(rBuffer, "quit")) {
              isContinue = 0;
            }
            
          } else if (length == 0) {
            printf(" Disconnect clientfd:%d\n", clientfd);
            connections --;
            close(clientfd);
          } else {
            if (errno == EINTR) continue;
            printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
            close(clientfd);
          }
        } else {
          printf(" clientfd:%d, errno:%d\n", clientfd, errno);
          close(clientfd);
        }
      }
    }
    usleep(1 * 1000);
  }
  return 0;
err:
  printf("error : %s\n", strerror(errno));
  return 0;
  
}

虚拟机

我们准备三台虚拟机,来做实验。配置如下:

  • server: 8核 8G
  • client: 2核 4G
  • client: 2核 4G
  • client: 2核 4G

我这边是使用的ubuntu,其他的发行版应该也差不多。

参数设置

修改打开文件句柄数量

首先我们需要设置一下可以打开的文件句柄个数:

ulimit -n 1048576

装载IP追踪模块

modprobe ip_conntrack

修改系统设置

cd /etc
ls | grep sysctl.conf

如果没有找到,就创建一个

touch sysctl.conf

如果找到了

sudo vim sysctl.conf

在最后添加以下内容:

server:

net.ipv4.ip_local_port_range = 1024 65535
net.ipv4.tcp_mem = 1772864 1572864 1572864
net.ipv4.tcp_wmem = 512 512 1024
net.ipv4.tcp_rmem = 512 512 1024
fs.file-max = 1048576

client:

net.ipv4.ip_local_port_range = 1024 65535
net.ipv4.tcp_mem = 262144 524288 786432
net.ipv4.tcp_wmem = 512 512 1024
net.ipv4.tcp_rmem = 512 512 1024

然后都要让他们生效

sudo sysctl -p

开始测试

首先,把这两个代码编译一下。然后ifconfig看一下IP地址。

服务端运行第一段代码,客户端运行客户端代码(参数是IP 和 端口号)。

遇到的问题

在使用htop监视系统资源的时候。我们会发现,跑着跑着mem突然下降的问题。就是内存不够的问题。这里需要修改tcp协议栈的分配,就是sysctl.conf的参数。我提供的是我配置的参数。将tcp.mem增大可以增大tcp协议栈的内存分配。将tcp_rmem tcp_wmem改小可以修改每一个tcp连接的内存分配。如果还是跑不到100w。去修改客户端的参数。最后解决的办法就是修改虚拟机虚拟内存大额分配。

相关文章
|
2月前
|
人工智能 自然语言处理 测试技术
从人工到AI驱动:天猫测试全流程自动化变革实践
天猫技术质量团队探索AI在测试全流程的落地应用,覆盖需求解析、用例生成、数据构造、执行验证等核心环节。通过AI+自然语言驱动,实现测试自动化、可溯化与可管理化,在用例生成、数据构造和执行校验中显著提效,推动测试体系从人工迈向AI全流程自动化,提升效率40%以上,用例覆盖超70%,并构建行业级知识资产沉淀平台。
从人工到AI驱动:天猫测试全流程自动化变革实践
|
2月前
|
数据采集 存储 人工智能
从0到1:天猫AI测试用例生成的实践与突破
本文系统阐述了天猫技术团队在AI赋能测试领域的深度实践与探索,讲述了智能测试用例生成的落地路径。
从0到1:天猫AI测试用例生成的实践与突破
|
3月前
|
Java 测试技术 API
自动化测试工具集成及实践
自动化测试用例的覆盖度及关键点最佳实践、自动化测试工具、集成方法、自动化脚本编写等(兼容多语言(Java、Python、Go、C++、C#等)、多框架(Spring、React、Vue等))
158 6
|
3月前
|
人工智能 边缘计算 搜索推荐
AI产品测试学习路径全解析:从业务场景到代码实践
本文深入解析AI测试的核心技能与学习路径,涵盖业务理解、模型指标计算与性能测试三大阶段,助力掌握分类、推荐系统、计算机视觉等多场景测试方法,提升AI产品质量保障能力。
|
3月前
|
人工智能 自然语言处理 测试技术
AI测试平台的用例管理实践:写得清晰,管得高效,执行更智能
在测试过程中,用例分散、步骤模糊、回归测试效率低等问题常困扰团队。霍格沃兹测试开发学社推出的AI测试平台,打通“用例编写—集中管理—智能执行”全流程,提升测试效率与覆盖率。平台支持标准化用例编写、统一管理操作及智能执行,助力测试团队高效协作,释放更多精力优化测试策略。目前平台已开放内测,欢迎试用体验!
|
4月前
|
人工智能 资源调度 jenkins
精准化回归测试:大厂实践与技术落地解析
在高频迭代时代,全量回归测试成本高、效率低,常导致关键 bug 漏测。精准化测试通过代码变更影响分析,智能筛选高价值用例,显著提升测试效率与缺陷捕获率,实现降本增效。已被阿里、京东、腾讯等大厂成功落地,成为质量保障的新趋势。
|
4月前
|
搜索推荐 Devops 测试技术
避免无效回归!基于MCP协议的精准测试影响分析实践
本文揭示传统测试的"孤岛困境",提出MCP(Model Context Protocol)测试新范式,通过模型抽象业务、上下文感知环境和协议规范协作,实现从机械执行到智能测试的转变。剖析MCP如何颠覆测试流程,展示典型应用场景,并提供团队落地实践路径,助力测试工程师把握质量效率革命的新机遇。
|
4月前
|
人工智能 缓存 自然语言处理
大模型性能测试完全指南:从原理到实践
本文介绍了大模型性能测试的核心价值与方法,涵盖流式响应机制、PD分离架构、五大关键指标(如首Token延迟、吐字率等),并通过实战演示如何使用Locust进行压力测试。同时探讨了多模态测试的挑战与优化方向,帮助测试工程师成长为AI系统性能的“诊断专家”。
|
4月前
|
安全 网络安全 数据安全/隐私保护
解决SSH测试连接GitHub时出现“connection closed by remote host”的问题。
然后使用 `ssh -T git@ssh.github.com`来测试连接。
522 0
|
6月前
|
人工智能 Java 测试技术
SpringBoot 测试实践:单元测试与集成测试
在 Spring Boot 测试中,@MockBean 用于创建完全模拟的 Bean,替代真实对象行为;而 @SpyBean 则用于部分模拟,保留未指定方法的真实实现。两者结合 Mockito 可灵活控制依赖行为,提升测试覆盖率。合理使用 @ContextConfiguration 和避免滥用 @SpringBootTest 可优化测试上下文加载速度,提高测试效率。
351 5