基于ubuntu虚拟机实现服务器百万并发测试
==》基于零声学院学习笔记整理。。。
1:了解概念
服务器并发量:
1:服务器能同时承载的客户端并发量。
2:服务器处理客户端请求的数量
3:单位时间内,能够同时处理的请求数量
服务器能承载百万fd:
1:5w以上的响应请求
2:对数据库的操作
3:对磁盘的操作
4:CPU占用率 不能超过60%
5:内存占用率不能超过80%
服务器能够承载的fd数量达到一定的量只是最基本的条件。
服务端处理客户端的请求包括==》请求,响应,处理,关闭。
2:实现在reactor的基础上,实现能够同时承载100万连接的fd。
(这里只是测试,使连接数可以达到百万,实际中业务处理,内存消耗其实也要考虑)
1:如果使用1个端口进行连接时,客户端改为1个端口,运行时发现fd达到1024就会出问题
日志如下:
new connect [192.168.105.128:39872][time:0], pos[0] C[1023]:Hello Server: client --> 487 send[fd=1023], [29]Hello Server: client --> 487 accept: Too many open files accept: Too many open files accept: Too many open files accept: Too many open files
看到1024,想到是客户端能打开的最大fd的限制,
进行查看:
ubuntu@ubuntu:~/c10k$ ulimit -a core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 3792 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) unlimited open files (-n) 1024 ===》每个进程能打开最大的fd的数量。 pipe size (512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 8192 cpu time (seconds, -t) unlimited max user processes (-u) 3792 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited ubuntu@ubuntu:~/c10k$ ulimit -Sn ==》查看/调整进程级最大文件打开数 1024 ubuntu@ubuntu:~/c10k$ ulimit -Hn ===》查看Hard limit限制,使用ulimit -Sn 修改进程最大打开数时不能超过该数字 4096 ubuntu@ubuntu:~/c10k$ ulimit -Sn 5000 ==》修改当前用户进程可以打开的文件数 -bash: ulimit: open files: cannot modify limit: Invalid argument
修改该限制:
hlp@ubuntu:~/c10k$ sudo vi /etc/security/limits.conf 在打开文件中加入: *代表的时普通用户 * hard nofile 1048576 ===》Hard limit限制,下面的设置不能超过该数 * soft nofile 1048576 ===》 重新打开一个xsheel进行ulimit -a查看,发现已经生效
查看当前系统使用的打开文件描述符数,可以使用下面的命令:
ubuntu@ubuntu:~$ cat /proc/sys/fs/file-nr 1440 0 96364 #第一个数表示当前系统已分配使用的打开文件描述符数,第二个数为分配后已释放的(目前已不再使用),第三个数等于file-max。
操作系统对fd的限制:file-max
操作系统所能打开的最多的fd的限制,fd的最大值,所有进程能打开的fd的数量:
修改
ubuntu@ubuntu:~$ sudo vi /etc/sysctl.conf ==>在文件中增加如下配置 #TCP内存设定8G,32G,96G net.ipv4.tcp_mem = 2097152 8388608 25165824 #tcp发送和接收缓冲区设置 1024 1024 2048 测试够用 net.ipv4.tcp_wmem = 4096 87380 4161536 net.ipv4.tcp_rmem = 4096 87380 4161536 #设置操作系统所能打开的fd的最大数 fs.file-max = 1048576 #最大孤儿套接字 net.ipv4.tcp_max_orphans = 65536 #系统默认值为”65536” 服务器上连接超过这个设定的值时,系统会主动丢掉新连接包 net.nf_conntrack_max = 1048576 #iptables对于已建立的连接,1200秒若没有活动,那么则清除掉,默认的时间是432000秒(5天)。 net.netfilter.nf_conntrack_tcp_timeout_established = 1200 ubuntu@ubuntu:~/c10k$ sudo modprobe ip_conntrack ==》内核需要加载ip_conntrack模块 [sudo] password for ubuntu: ubuntu@ubuntu:~/c10k$ sudo sysctl -p net.ipv4.tcp_mem = 2097152 8388608 25165824 net.ipv4.tcp_wmem = 4096 87380 4161536 net.ipv4.tcp_rmem = 4096 87380 4161536 fs.file-max = 1048576 net.ipv4.tcp_max_orphans = 65536 net.nf_conntrack_max = 1048576 net.netfilter.nf_conntrack_tcp_timeout_established = 1200 ubuntu@ubuntu:~/c10k$ ubuntu@ubuntu:~$ sudo sysctl -p ===》从配置文件“/etc/sysctl.conf”加载内核参数设置 ==》使用如下命令进行生效,使参数生效在/proc/net/tcp中
看到一篇文章,涉及相关的参数设置:tcp参数设置 - 刨根问底_liang - 博客园 (cnblogs.com)
参数 | 描述 | 默认值 | 优化值 |
net.core.rmem_default | 默认的TCP数据接收窗口大小(字节)。 | 229376 | 256960 |
net.core.rmem_max | 最大的TCP数据接收窗口(字节)。 | 131071 | 513920 |
net.core.wmem_default | 默认的TCP数据发送窗口大小(字节)。 | 229376 | 256960 |
net.core.wmem_max | 最大的TCP数据发送窗口(字节)。 | 131071 | 513920 |
net.core.netdev_max_backlog | 在每个网络接口接收数据包的速率比内核处理这些包的速率快时,允许送到队列的数据包的最大数目。 | 1000 | 2000 |
net.core.somaxconn | 定义了系统中每一个端口最大的监听队列的长度,这是个全局的参数。 | 128 | 2048 |
net.core.optmem_max | 表示每个套接字所允许的最大缓冲区的大小。 | 20480 | 81920 |
net.ipv4.tcp_mem | 确定TCP栈应该如何反映内存使用,每个值的单位都是内存页(通常是4KB)。第一个值是内存使用的下限;第二个值是内存压力模式开始对缓冲区使用应用压力的上限;第三个值是内存使用的上限。在这个层次上可以将报文丢弃,从而减少对内存的使用。对于较大的BDP可以增大这些值(注意,其单位是内存页而不是字节)。 | 94011 125351 188022 | 131072 262144 524288 |
net.ipv4.tcp_rmem | 为自动调优定义socket使用的内存。第一个值是为socket接收缓冲区分配的最少字节数;第二个值是默认值(该值会被rmem_default覆盖),缓冲区在系统负载不重的情况下可以增长到这个值;第三个值是接收缓冲区空间的最大字节数(该值会被rmem_max覆盖)。 | 4096 87380 4011232 | 8760 256960 4088000 |
net.ipv4.tcp_wmem | 为自动调优定义socket使用的内存。第一个值是为socket发送缓冲区分配的最少字节数;第二个值是默认值(该值会被wmem_default覆盖),缓冲区在系统负载不重的情况下可以增长到这个值;第三个值是发送缓冲区空间的最大字节数(该值会被wmem_max覆盖)。 | 4096 16384 4011232 | 8760 256960 4088000 |
net.ipv4.tcp_keepalive_time | TCP发送keepalive探测消息的间隔时间(秒),用于确认TCP连接是否有效。 | 7200 | 1800 |
net.ipv4.tcp_keepalive_intvl | 探测消息未获得响应时,重发该消息的间隔时间(秒)。 | 75 | 30 |
net.ipv4.tcp_keepalive_probes | 在认定TCP连接失效之前,最多发送多少个keepalive探测消息。 | 9 | 3 |
net.ipv4.tcp_sack | 启用有选择的应答(1表示启用),通过有选择地应答乱序接收到的报文来提高性能,让发送者只发送丢失的报文段,(对于广域网通信来说)这个选项应该启用,但是会增加对CPU的占用。 | 1 | 1 |
net.ipv4.tcp_fack | 启用转发应答,可以进行有选择应答(SACK)从而减少拥塞情况的发生,这个选项也应该启用。 | 1 | 1 |
net.ipv4.tcp_timestamps | TCP时间戳(会在TCP包头增加12个字节),以一种比重发超时更精确的方法(参考RFC 1323)来启用对RTT 的计算,为实现更好的性能应该启用这个选项。 | 1 | 1 |
net.ipv4.tcp_window_scaling | 启用RFC 1323定义的window scaling,要支持超过64KB的TCP窗口,必须启用该值(1表示启用),TCP窗口最大至1GB,TCP连接双方都启用时才生效。 | 1 | 1 |
net.ipv4.tcp_syncookies | 表示是否打开TCP同步标签(syncookie),内核必须打开了CONFIG_SYN_COOKIES项进行编译,同步标签可以防止一个套接字在有过多试图连接到达时引起过载。 | 1 | 1 |
net.ipv4.tcp_tw_reuse | 表示是否允许将处于TIME-WAIT状态的socket(TIME-WAIT的端口)用于新的TCP连接 。 | 0 | 1 |
net.ipv4.tcp_tw_recycle | 能够更快地回收TIME-WAIT套接字。 | 0 | 1 |
net.ipv4.tcp_fin_timeout | 对于本端断开的socket连接,TCP保持在FIN-WAIT-2状态的时间(秒)。对方可能会断开连接或一直不结束连接或不可预料的进程死亡。 | 60 | 30 |
net.ipv4.ip_local_port_range | 表示TCP/UDP协议允许使用的本地端口号 | 32768 61000 | 1024 65000 |
net.ipv4.tcp_max_syn_backlog | 对于还未获得对方确认的连接请求,可保存在队列中的最大数目。如果服务器经常出现过载,可以尝试增加这个数字。 | 2048 | 2048 |
2:如果reactor.c服务器端代码只监听一个端口,会发现,执行到一定程度,会出现问题
分析:根据五元组(源ip,源port,目的ip,目的port,协议)和fd的关系,fd的个数其实是 源ip源port目的ip*目的port 的结果,如果服务端只监听一个端口,因为操作系统port端口的个数是65535个,则通过分析五元组,在ip固定只能是1个的前提下,服务端的端口如果只有一个,最大连接数也就是客户端port的限制,所以这里进行测试时,在只有一个服务端ip的前提下,需要多个端口的监听:
如何实现服务端对多个端口的监听》
1:多线程多进程实现,如nginx accept分配在不同的进程中
2:reactor实现
增加监听多个端口进行测试,出现卡死问题,服务端和客户端都无反应,客户端都限制在65535,是因为net.nf_conntrack_max = 1048576(#系统默认值为”65536” 服务器上连接超过这个设定的值时,系统会主动丢掉新连接包)参数的问题
3:使用代码进行测试时,第一个问题:
每个进程可以打开的文件的个数限制:
查看相关文件限制:
修改配置文件 sudo vi /etc/security/limits.conf,增加如下内容:
* soft nofile 100000 * hard nofile 100000 root soft nofile 100000 root hard nofile 100000
比如使用xshell重新登陆,用ulimit -a进行查看就会发现已经生效
服务器设置更大一点:2100000
4:第二个问题 :
这是因为socket连接没有及时断开吧,Socket默认连接60秒,60秒之内没有进行心跳交互,即读写数据,就会自动关闭连接。
使用ulimit -a进行查看,发现服务端的进程fd限制是1024,没有发生改变,这是因为系统对fd打开数的限制导致:
需要修改配置,sudo vi /etc/sysctl.conf 增加如下配置:
# 系统最大文件打开数 fs.file-max = 20000000 # 单个用户进程最大文件打开数 fs.nr_open = 20000000
使用sudo sysctl -p使配置生效,用limit -a进行查看
5:第三个问题,客户端报错: socket: Too many open files
客户端已经到达进程可以打开的最大限制100 000;
服务器这边recv()也有相关的报错:
这应该是客户端异常,服务端收到对端的回复:
1.当尝试和未开放的服务器端口建立tcp连接时,服务器tcp将会直接向客户端发送reset报文; 2.双方之前已经正常建立了通信通道,也可能进行过了交互,当某一方在交互的过程中发生了异常,如崩溃等,异常的一方会向对端发送reset报文,通知对方将连接关闭; 3.当收到TCP报文,但是发现该报文不是已建立的TCP连接列表可处理的,则其直接向对端发送reset报文; 4.ack报文丢失,并且超出一定的重传次数或时间后,会主动向对端发送reset报文释放该TCP连接;
4:因为这里是客户端的限制到达,所以总会出现类似的问题,其实已经实现了C10K
6. 梳理一下实现C10K的注意事项:
1:单线程对打开fd的限制,使用ulimit -a进行查看, open files对应的参数,在 /etc/security/limits.conf文件中做修改
* soft nofile 150000 * hard nofile 150000 root soft nofile 150000 root hard nofile 150000
2:服务端对进程打开的fd要求比较多,这个参数多设置一些, /etc/security/limits.conf
root soft nofile 2100000 root hard nofile 2100000 * soft nofile 2100000 * hard nofile 2100000
同时,由于系统对fd的限制,服务器设置这么大的打开数量是不成功的,可以查看系统fd的最大值cat /proc/sys/fs/file-max 需要修改/etc/sysctl.conf
# 系统最大文件打开数 fs.file-max = 20000000 # 单个用户进程最大文件打开数 fs.nr_open = 20000000
需要使用sysctl -p生效
3:修改了系统对fd的限制,单个进程对fd的限制,接下来就是内存的限制,tcp相关收发缓冲区,tcp内存设置,以及tcp相关参数设置:
客户端的tcp设置可以如下: /etc/sysctl.conf
# 系统级别最大打开文件 fs.file-max = 100000 # 单用户进程最大文件打开数 fs.nr_open = 100000 # 是否重用, 快速回收time-wait状态的tcp连接 net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_tw_recycle = 1 # 单个tcp连接最大缓存byte单位 net.core.optmem_max = 8192 # 可处理最多孤儿socket数量,超过则警告,每个孤儿socket占用64KB空间 net.ipv4.tcp_max_orphans = 10240 # 最多允许time-wait数量 net.ipv4.tcp_max_tw_buckets = 10240 # 从客户端发起的端口范围,默认是32768 61000,则只能发起2w多连接,改为一下值,可一个IP可发起差不多6.4w连接。 net.ipv4.ip_local_port_range = 1024 65535
服务端tcp的设置可以如下:/etc/sysctl.conf
# 系统最大文件打开数 fs.file-max = 20000000 # 单个用户进程最大文件打开数 fs.nr_open = 20000000 # 全连接队列长度,默认128 net.core.somaxconn = 10240 # 半连接队列长度,当使用sysncookies无效,默认128 net.ipv4.tcp_max_syn_backlog = 16384 net.ipv4.tcp_syncookies = 0 # 网卡数据包队列长度 net.core.netdev_max_backlog = 41960 # time-wait 最大队列长度 net.ipv4.tcp_max_tw_buckets = 300000 # time-wait 是否重新用于新链接以及快速回收 net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_tw_recycle = 1 # tcp报文探测时间间隔, 单位s net.ipv4.tcp_keepalive_intvl = 30 # tcp连接多少秒后没有数据报文时启动探测报文 net.ipv4.tcp_keepalive_time = 900 # 探测次数 net.ipv4.tcp_keepalive_probes = 3 # 保持fin-wait-2 状态多少秒 net.ipv4.tcp_fin_timeout = 15 # 最大孤儿socket数量,一个孤儿socket占用64KB,当socket主动close掉,处于fin-wait1, last-ack net.ipv4.tcp_max_orphans = 131072 # 每个套接字所允许得最大缓存区大小 net.core.optmem_max = 819200 # 默认tcp数据接受窗口大小 net.core.rmem_default = 262144 net.core.wmem_default = 262144 net.core.rmem_max = 16777216 net.core.wmem_max = 16777216 # tcp栈内存使用第一个值内存下限, 第二个值缓存区应用压力上限, 第三个值内存上限, 单位为page,通常为4kb net.ipv4.tcp_mem = 786432 4194304 8388608 # 读, 第一个值为socket缓存区分配最小字节, 第二个,第三个分别被rmem_default, rmem_max覆盖 net.ipv4.tcp_rmem = 4096 4096 4206592 # 写, 第一个值为socket缓存区分配最小字节, 第二个,第三个分别被wmem_default, wmem_max覆盖 net.ipv4.tcp_wmem = 4096 4096 4206592
4:因为我们这里使用的同一个环境上的虚拟机进行测试,这里暂时不关注tcp内存,参数相关的设置,则,剩下就是我们的代码实现。
其实就是五元组:(sip,dsp,sport,dport,proto)
3 1 6w 5(监听) 1(tcp)
如果用3个客户端进行测试,则每个进程能打开的fd的数量,其实就是它的限制:
客户端分析:
每个客户端可以创建限制数个socketfd,使用该fd去连接服务器的监听端口,这里的限制其实就是客户端可以打开的fd的数量。
服务端分析:
需要创建n个监听的socketfd,accept可以监听到对端的ip和端口号:
则这里3个ip *每个ip遍历的端口号的个数 *listenfd的个数就是我们最终可以连接的实际最大的fd的数量,当然,小于限制线程可以连接的限制。
注意:这里listen的fd一定要客户端进行连接才有效
5:好像tcp在没有数据的时候,是一起接收的,一下子接收一大堆。
6:疑惑,在自己的电脑上进行测试的时候,当3个客户端达到大约5多的时候,服务器端会直接调用killed???
===》好像是内存超出相关问题,使用一定的手段进行监听测试,可以查看相关内存的使用率
7:问题:
1:epoll监听事件的个数
2:对fd的关闭导致重新连接
8:select并不是受限与1024个,而是受限于linux进程所能打开的fd的个数
7:相关源码
服务端测试源码:reactor.c
// typedef unsigned long int uint64_t; // static uint64_t GetCurrentTime() // { // struct timeval tv; // gettimeofday(&tv, NULL); // return tv.tv_sec * 1000 + tv.tv_usec / 1000; // } /*************************** typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; Epoll events epoll_data_t data; User data variable } __EPOLL_PACKED; 从epoll_event 结构体可以看出,可以使用data中的ptr,或者fd进行事件触发的判断 ****************************/ //一般的epoll用fd进行判断,这里我们用ptr封装实现 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/epoll.h> #include <arpa/inet.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> /***************************************************** 梳理实现思路: 借用epoll实现事件触发,一个epoll可以管理一个reactor处理器 epoll事件需要包括: fd, 回调函数,参数 *****************************************************/ #define EPOLL_EVENT_SIZE 1024*1024 //注意这里的回调函数定义没有带* typedef int (*callFunc)(void *data); typedef int CALLBACK(int, int, void*); typedef struct my_event{ int fd; int events; //保存监听的事件类型 in out 还是其他 int (*callback)(int fd, int events, void * arg); void* arg; int status; char buffer[1024]; //这里直接当作接收和处理的buffer,实际上要分开 int length; }MY_EPOLL_EVENT; //管理节点,每个epoll对应管理一个 typedef struct reactor_manager { int epfd; MY_EPOLL_EVENT* events; }REACTOR_MANAGER; //初始化对应的事件 保存相关的参数 void epoll_event_set(MY_EPOLL_EVENT * ev, int fd, CALLBACK callback, void *arg) { ev->fd = fd; //触发的fd ev->events = 0; //需要监听的事件 ev->callback = callback; //对应的回调函数 ev->arg = arg; //把管理节点传进来,后续操作 return ; } //使用epoll进行监听 int epoll_event_add(int epfd, int events, MY_EPOLL_EVENT* ev) { struct epoll_event ep_ev = {0, {0}}; ep_ev.data.ptr = ev; //epoll这里可以存 ep_ev.events = ev->events = events; // epoll和这里的事件都改变监听类型 //业务处理 epoll_ctl 参数标识 只是为了监听add和mod的状态 int op; if(ev->status == 1) { op = EPOLL_CTL_MOD; }else { op = EPOLL_CTL_ADD; ev->status = 1; } //加入epoll的监听中 if(epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) { printf("event add failed [fd = %d], events[%d] \n",ev->fd, events); return -1; } return 0; } //删除监听的事件 int epoll_event_del(int epfd, MY_EPOLL_EVENT *ev) { //证明就没加入过 if(ev->status != 1) { return -1; } struct epoll_event ep_ev = {0, {0}}; ep_ev.data.ptr = ev; ev->status = 0; ev->events = 0; epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev); return 0; } //传入端口,实现监听 端口号用short int init_sock(unsigned short port); //直接申请了数组大小的事件 int reactor_init(REACTOR_MANAGER * reator); //加入监听事件中 自己的fd对应自己的回调函数 int reactor_addlistener(REACTOR_MANAGER* reactor, int fd, CALLBACK *callback); int reactor_run(REACTOR_MANAGER* reactor); int reactor_destory(REACTOR_MANAGER* reactor); //三种回调函数 int accept_callback(int fd, int events, void * arg); int send_callback(int fd, int events, void * arg); int recv_callback(int fd, int events, void * arg); int main(int argc, char* argv[]) { unsigned short port = 8888; if(argc == 2) { port = atoi(argv[1]); } REACTOR_MANAGER * reactor = (REACTOR_MANAGER*)malloc(sizeof(REACTOR_MANAGER)); reactor_init(reactor); //可以监听多个fd,先创建fd,再加入监听事件中 int listenfd[100] = {0}; for(int i=0; i<100; i++) { listenfd[i] = init_sock(port+i); reactor_addlistener(reactor, listenfd[i], accept_callback); } //事件的触发以及循环进行处理 reactor_run(reactor); //对reactor进行销毁 reactor_destory(reactor); //关闭对应的端口 for(int i=0; i<100; i++) { close(listenfd[i]); } free(reactor); return 0; } //创建socket,设置非阻塞 设置对应的回调 //accept进行监听 //监听到设置非阻塞,设置对应的回调函数 int init_sock(unsigned short port) { int fd = socket(AF_INET, SOCK_STREAM, 0); fcntl(fd, F_SETFL, O_NONBLOCK); struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); server_addr.sin_addr.s_addr = htonl(INADDR_ANY); bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr)); //研究源码了解一下listen的第二个参数 if(listen(fd, 20) < 0) { printf("fd [%d] listen failed: %s \n", fd, strerror(errno)); } printf("listen port: [%d] fd: [%d] \n", port, fd); return fd; } //这里直接申请一定数据的event,是限制 //为管理节点进行初始化 int reactor_init(REACTOR_MANAGER * reactor) { if(reactor == NULL) { return -1; } memset(reactor, 0, sizeof(REACTOR_MANAGER)); reactor->epfd = epoll_create(1); if(reactor->epfd < 0) { printf("create epfd in [%s] error [%s]\n ",__func__, strerror(errno)); return -2; } reactor->events = (MY_EPOLL_EVENT*)malloc(EPOLL_EVENT_SIZE* sizeof(MY_EPOLL_EVENT)); if(reactor->events == NULL) { printf("create epfd in [%s] error [%s]\n ",__func__, strerror(errno)); close(reactor->epfd); return -3; } return 0; } int reactor_destory(REACTOR_MANAGER* reactor) { if(reactor != NULL) { close(reactor->epfd); free(reactor->events); } return 0; } /********************************************************************************** void epoll_event_set(MY_EPOLL_EVENT * ev, int fd, CALLBACK callback, void *arg); int epoll_event_add(int epfd, int events, MY_EPOLL_EVENT* ev); int epoll_event_del(int epfd, MY_EPOLL_EVENT *ev); int accept_callback(int fd, int events, void * arg); int send_callback(int fd, int events, void * arg); int recv_callback(int fd, int events, void * arg); **********************************************************************************/ //把对应的fd加入到epoll事件中 //默认规则是fd就是事件下标 int reactor_addlistener(REACTOR_MANAGER* reactor, int fd, CALLBACK *callback) { if(reactor == NULL ||reactor->events == NULL) { return -1; } //epoll的监听类型和epoll_ctl是两回事 epoll_event_set(&reactor->events[fd], fd, callback, reactor); epoll_event_add(reactor->epfd, EPOLLIN, &reactor->events[fd]); return 0; } int reactor_run(REACTOR_MANAGER* reactor) { if(reactor == NULL || reactor->events ==NULL || reactor->epfd <0) { return -1; } //定义事件数组,接收epoll_wait参数做处理 struct epoll_event event_waits[102400]; int checkpos = 0, i; while(1) { //可以增加一个判断,在event中加个获取最后操作时间, //循环遍历,可以每次编译100个,依次遍历事件数组,把超时的进行释放epoll_event_del int nready = epoll_wait(reactor->epfd, event_waits, 102400, 1000);//超时时间 if(nready < 0) //超时返回0 错误返回-1 { printf("epoll_wait error. exit \n"); continue; } //触发到对应的事件,执行对应的回调 for( i = 0;i <nready; i++) { MY_EPOLL_EVENT* ev = (MY_EPOLL_EVENT*)event_waits[i].data.ptr; if((event_waits[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { ev->callback(ev->fd, event_waits[i].events, ev->arg); } if((event_waits[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { ev->callback(ev->fd, event_waits[i].events, ev->arg); } } } return 0; } //MY_EPOLL_EVENT REACTOR_MANAGER //accept监听回调 int accept_callback(int fd, int events, void * arg) { //通过参数获取到管理节点,然后进行事件处理 REACTOR_MANAGER * reactor = (REACTOR_MANAGER*)arg; if(reactor == NULL) { return -1; } //开始进行accept监听,监听到的fd设置非阻塞塞入reactor事件中 struct sockaddr_in client_addr; socklen_t len = sizeof(client_addr); int clientfd; if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) { //出错 if (errno != EAGAIN && errno != EINTR) { //应该跳过, 重试或者中断逻辑 printf("errno not EAGAIN or EINTR \n"); } printf("accept: %s\n", strerror(errno)); return -1; } int flag = 0; //fcntl正确做法应该是先获取状态再设置 if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) { printf("%s: fcntl nonblocking failed, %d\n", __func__, clientfd); return -1; } epoll_event_set(&reactor->events[clientfd], clientfd, recv_callback, reactor); epoll_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]); // printf("new connect [%s:%d][time:%ld], pos[%d]\n", // inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i); printf("new connect [%s:%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); return 0; } // void epoll_event_set(MY_EPOLL_EVENT * ev, int fd, CALLBACK callback, void *arg); // int epoll_event_add(int epfd, int events, MY_EPOLL_EVENT* ev); // int epoll_event_del(int epfd, MY_EPOLL_EVENT *ev); //接收监听 开始执行接收,并设置发送的事件 int recv_callback(int fd, int events, void * arg) { REACTOR_MANAGER * reactor = (REACTOR_MANAGER*)arg; if(reactor == NULL) { return -1; } MY_EPOLL_EVENT* ev = reactor->events +fd; int len = recv(fd, ev->buffer, 1024, 0); epoll_event_del(reactor->epfd, ev); if(len>0) { ev->length = len; ev->buffer[len] = '\0'; printf("client [%d]: [%s] \n", fd, ev->buffer); epoll_event_set(&reactor->events[fd], fd, send_callback, reactor); epoll_event_add(reactor->epfd, EPOLLOUT, ev); }else if(len == 0) { close(ev->fd); printf("[fd=%d] closed\n", fd); }else{ close(ev->fd); printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno)); } } //发送监听 完了监听接收 int send_callback(int fd, int events, void * arg) { REACTOR_MANAGER * reactor = (REACTOR_MANAGER*)arg; if(reactor == NULL) { return -1; } MY_EPOLL_EVENT* ev = reactor->events +fd; int len = send(fd, ev->buffer, ev->length, 0); if(len > 0) { printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer); epoll_event_del(reactor->epfd, ev); epoll_event_set(ev, fd, recv_callback, reactor); epoll_event_add(reactor->epfd, EPOLLIN, ev); }else { close(ev->fd); epoll_event_del(reactor->epfd, ev); printf("send[fd=%d] error %s\n", fd, strerror(errno)); } return len; }
客户端测试代码 :multiple_port_client_epoll.c
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <fcntl.h> #include <unistd.h> #include <sys/time.h> #include <sys/epoll.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/tcp.h> #include <arpa/inet.h> /************************************************* 作为测试百万并发的客户端 创建多个socket,一直进行连接。 注意,如果服务器的端口监听多个,这里也要遍历去进行连接 设置端口可重用主要是针对服务端,Address already in use 这里也是大量的客户端:涉及epoll监听的大小 ***************************************************/ #define MAX_EPOLLSIZE (384*1024) #define LISTEN_MAX_PORT 100 #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) static int SetNonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL, 0); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } static int SetReUseAddr(int fd) { int reuse = 1; return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)); } //输入服务端的ip和port开始进行连接 int main(int argc, char* argv[]) { if(argc <= 2) { printf("Usage : %s ip port\n", argv[0]); return -1; } //获取ip port const char * ip = argv[1]; int port = atoi(argv[2]); //epoll相关 struct epoll_event events[MAX_EPOLLSIZE]; int epfd = epoll_create(1); //服务器地址相关 struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(ip); //相关参数 int connect_nums = 0; //连接数量的判断 int port_index = 0; //监听端口的判断 int sockfd = 0; struct epoll_event ev; char buffer[128] = {0}; //为了获取间隔时间做准备 gettimeofday接口 struct timeval tv_begin; gettimeofday(&tv_begin, NULL); //循环遍历塞入不同的服务端监听port while(1) { if(++port_index >= LISTEN_MAX_PORT) port_index = 0; //socket,连接并进行发送,加入epoll_ctl sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { perror("socket create error \n"); goto err; } //连接服务端 addr.sin_port = htons(port+port_index); if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) { perror("connect error\n"); goto err; } //设置非阻塞和可重用,并进行发送探测: SetNonblock(sockfd); SetReUseAddr(sockfd); sprintf(buffer, "Hello Server: client num --> %d\n", connect_nums); send(sockfd, buffer, strlen(buffer), 0); connect_nums++; ev.data.fd = sockfd; ev.events = EPOLLIN | EPOLLOUT; epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); //每间隔999个epoll就去开始监听一次,进行发送和接收 if(connect_nums%1000 == 999) { //获取当前时间,替换时间 struct timeval tv_cur; memcpy(&tv_cur, &tv_begin, sizeof(struct timeval)); gettimeofday(&tv_begin, NULL); int time_used = TIME_SUB_MS(tv_begin, tv_cur); printf("connect_nums: %d, sockfd:%d, time_used:%d\n", connect_nums, sockfd, time_used); //使用epoll进行监听等待 int nready = epoll_wait(epfd, events, connect_nums, 100); for(int i=0; i<nready; i++) { //获取监听到的fd,进行发送 int clientfd = events[i].data.fd; if (events[i].events & EPOLLOUT) { //这里是客户端连接服务端的fd 自己的port和服务端信息的五元组 sprintf(buffer, "data from fd:%d\n", clientfd); send(clientfd, buffer, strlen(buffer), 0); }else if(events[i].events & EPOLLIN) { char recv_buff[128] = {0}; ssize_t recv_length = recv(clientfd, recv_buff, 128, 0); if(recv_length >0) { printf("recv buff is [%s] \n", recv_buff); // if (!strcmp(recv_buff, "quit")) { // isContinue = 0; // } }else if (recv_length == 0) { printf(" Disconnect clientfd:%d\n", clientfd); connect_nums --; close(clientfd); }else { if (errno == EINTR ||errno == EAGAIN) continue; //errno代码为11(EAGAIN) 对非阻塞socket而言,EAGAIN不是一种错误 printf(" Error clientfd:%d, recv errno:%d\n", clientfd, errno); close(clientfd); } }else { //监听到其他异常 printf(" clientfd:%d, epoll_wait errno:%d\n", clientfd, errno); close(clientfd); } } } usleep(1 * 1000); } return 0; err: printf("return error : %s\n", strerror(errno)); return 0; }