c10k百万并发思考

简介: c10k百万并发思考

基于ubuntu虚拟机实现服务器百万并发测试

==》基于零声学院学习笔记整理。。。

1:了解概念

服务器并发量:

1:服务器能同时承载的客户端并发量。

2:服务器处理客户端请求的数量

3:单位时间内,能够同时处理的请求数量

服务器能承载百万fd:

1:5w以上的响应请求

2:对数据库的操作

3:对磁盘的操作

4:CPU占用率 不能超过60%

5:内存占用率不能超过80%

服务器能够承载的fd数量达到一定的量只是最基本的条件。

服务端处理客户端的请求包括==》请求,响应,处理,关闭。

2:实现在reactor的基础上,实现能够同时承载100万连接的fd。

(这里只是测试,使连接数可以达到百万,实际中业务处理,内存消耗其实也要考虑)

1:如果使用1个端口进行连接时,客户端改为1个端口,运行时发现fd达到1024就会出问题

日志如下:

new connect [192.168.105.128:39872][time:0], pos[0]
C[1023]:Hello Server: client --> 487
send[fd=1023], [29]Hello Server: client --> 487
accept: Too many open files
accept: Too many open files
accept: Too many open files
accept: Too many open files

看到1024,想到是客户端能打开的最大fd的限制,

进行查看:

ubuntu@ubuntu:~/c10k$ ulimit -a
core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 3792
max locked memory       (kbytes, -l) 64
max memory size         (kbytes, -m) unlimited
open files                      (-n) 1024       ===》每个进程能打开最大的fd的数量。
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 3792
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited
ubuntu@ubuntu:~/c10k$ ulimit -Sn   ==》查看/调整进程级最大文件打开数
1024
ubuntu@ubuntu:~/c10k$ ulimit -Hn    ===》查看Hard limit限制,使用ulimit -Sn 修改进程最大打开数时不能超过该数字
4096
ubuntu@ubuntu:~/c10k$ ulimit -Sn 5000  ==》修改当前用户进程可以打开的文件数
-bash: ulimit: open files: cannot modify limit: Invalid argument

修改该限制:

hlp@ubuntu:~/c10k$ sudo vi /etc/security/limits.conf 
在打开文件中加入: *代表的时普通用户
*       hard     nofile         1048576     ===》Hard limit限制,下面的设置不能超过该数
*       soft     nofile         1048576     ===》
重新打开一个xsheel进行ulimit -a查看,发现已经生效

查看当前系统使用的打开文件描述符数,可以使用下面的命令:

ubuntu@ubuntu:~$  cat /proc/sys/fs/file-nr
1440  0 96364
#第一个数表示当前系统已分配使用的打开文件描述符数,第二个数为分配后已释放的(目前已不再使用),第三个数等于file-max。

操作系统对fd的限制:file-max

操作系统所能打开的最多的fd的限制,fd的最大值,所有进程能打开的fd的数量:

修改

ubuntu@ubuntu:~$ sudo vi /etc/sysctl.conf
==>在文件中增加如下配置
#TCP内存设定8G,32G,96G
net.ipv4.tcp_mem = 2097152 8388608 25165824
#tcp发送和接收缓冲区设置 1024 1024 2048 测试够用
net.ipv4.tcp_wmem = 4096 87380 4161536
net.ipv4.tcp_rmem = 4096 87380 4161536
#设置操作系统所能打开的fd的最大数 
fs.file-max = 1048576 
#最大孤儿套接字  
net.ipv4.tcp_max_orphans = 65536
#系统默认值为”65536” 服务器上连接超过这个设定的值时,系统会主动丢掉新连接包
net.nf_conntrack_max = 1048576
#iptables对于已建立的连接,1200秒若没有活动,那么则清除掉,默认的时间是432000秒(5天)。
net.netfilter.nf_conntrack_tcp_timeout_established = 1200
ubuntu@ubuntu:~/c10k$ sudo modprobe ip_conntrack   ==》内核需要加载ip_conntrack模块
[sudo] password for ubuntu: 
ubuntu@ubuntu:~/c10k$ sudo sysctl -p
net.ipv4.tcp_mem = 2097152 8388608 25165824
net.ipv4.tcp_wmem = 4096 87380 4161536
net.ipv4.tcp_rmem = 4096 87380 4161536
fs.file-max = 1048576
net.ipv4.tcp_max_orphans = 65536
net.nf_conntrack_max = 1048576
net.netfilter.nf_conntrack_tcp_timeout_established = 1200
ubuntu@ubuntu:~/c10k$ 
ubuntu@ubuntu:~$ sudo sysctl -p     ===》从配置文件“/etc/sysctl.conf”加载内核参数设置
==》使用如下命令进行生效,使参数生效在/proc/net/tcp中

看到一篇文章,涉及相关的参数设置:tcp参数设置 - 刨根问底_liang - 博客园 (cnblogs.com)

参数 描述 默认值 优化值
net.core.rmem_default 默认的TCP数据接收窗口大小(字节)。 229376 256960
net.core.rmem_max 最大的TCP数据接收窗口(字节)。 131071 513920
net.core.wmem_default 默认的TCP数据发送窗口大小(字节)。 229376 256960
net.core.wmem_max 最大的TCP数据发送窗口(字节)。 131071 513920
net.core.netdev_max_backlog 在每个网络接口接收数据包的速率比内核处理这些包的速率快时,允许送到队列的数据包的最大数目。 1000 2000
net.core.somaxconn 定义了系统中每一个端口最大的监听队列的长度,这是个全局的参数。 128 2048
net.core.optmem_max 表示每个套接字所允许的最大缓冲区的大小。 20480 81920
net.ipv4.tcp_mem 确定TCP栈应该如何反映内存使用,每个值的单位都是内存页(通常是4KB)。第一个值是内存使用的下限;第二个值是内存压力模式开始对缓冲区使用应用压力的上限;第三个值是内存使用的上限。在这个层次上可以将报文丢弃,从而减少对内存的使用。对于较大的BDP可以增大这些值(注意,其单位是内存页而不是字节)。 94011 125351 188022 131072 262144 524288
net.ipv4.tcp_rmem 为自动调优定义socket使用的内存。第一个值是为socket接收缓冲区分配的最少字节数;第二个值是默认值(该值会被rmem_default覆盖),缓冲区在系统负载不重的情况下可以增长到这个值;第三个值是接收缓冲区空间的最大字节数(该值会被rmem_max覆盖)。 4096 87380 4011232 8760 256960 4088000
net.ipv4.tcp_wmem 为自动调优定义socket使用的内存。第一个值是为socket发送缓冲区分配的最少字节数;第二个值是默认值(该值会被wmem_default覆盖),缓冲区在系统负载不重的情况下可以增长到这个值;第三个值是发送缓冲区空间的最大字节数(该值会被wmem_max覆盖)。 4096 16384 4011232 8760 256960 4088000
net.ipv4.tcp_keepalive_time TCP发送keepalive探测消息的间隔时间(秒),用于确认TCP连接是否有效。 7200 1800
net.ipv4.tcp_keepalive_intvl 探测消息未获得响应时,重发该消息的间隔时间(秒)。 75 30
net.ipv4.tcp_keepalive_probes 在认定TCP连接失效之前,最多发送多少个keepalive探测消息。 9 3
net.ipv4.tcp_sack 启用有选择的应答(1表示启用),通过有选择地应答乱序接收到的报文来提高性能,让发送者只发送丢失的报文段,(对于广域网通信来说)这个选项应该启用,但是会增加对CPU的占用。 1 1
net.ipv4.tcp_fack 启用转发应答,可以进行有选择应答(SACK)从而减少拥塞情况的发生,这个选项也应该启用。 1 1
net.ipv4.tcp_timestamps TCP时间戳(会在TCP包头增加12个字节),以一种比重发超时更精确的方法(参考RFC 1323)来启用对RTT 的计算,为实现更好的性能应该启用这个选项。 1 1
net.ipv4.tcp_window_scaling 启用RFC 1323定义的window scaling,要支持超过64KB的TCP窗口,必须启用该值(1表示启用),TCP窗口最大至1GB,TCP连接双方都启用时才生效。 1 1
net.ipv4.tcp_syncookies 表示是否打开TCP同步标签(syncookie),内核必须打开了CONFIG_SYN_COOKIES项进行编译,同步标签可以防止一个套接字在有过多试图连接到达时引起过载。 1 1
net.ipv4.tcp_tw_reuse 表示是否允许将处于TIME-WAIT状态的socket(TIME-WAIT的端口)用于新的TCP连接 。 0 1
net.ipv4.tcp_tw_recycle 能够更快地回收TIME-WAIT套接字。 0 1
net.ipv4.tcp_fin_timeout 对于本端断开的socket连接,TCP保持在FIN-WAIT-2状态的时间(秒)。对方可能会断开连接或一直不结束连接或不可预料的进程死亡。 60 30
net.ipv4.ip_local_port_range 表示TCP/UDP协议允许使用的本地端口号 32768 61000 1024 65000
net.ipv4.tcp_max_syn_backlog 对于还未获得对方确认的连接请求,可保存在队列中的最大数目。如果服务器经常出现过载,可以尝试增加这个数字。 2048 2048
2:如果reactor.c服务器端代码只监听一个端口,会发现,执行到一定程度,会出现问题

分析:根据五元组(源ip,源port,目的ip,目的port,协议)和fd的关系,fd的个数其实是 源ip源port目的ip*目的port 的结果,如果服务端只监听一个端口,因为操作系统port端口的个数是65535个,则通过分析五元组,在ip固定只能是1个的前提下,服务端的端口如果只有一个,最大连接数也就是客户端port的限制,所以这里进行测试时,在只有一个服务端ip的前提下,需要多个端口的监听:

如何实现服务端对多个端口的监听》

1:多线程多进程实现,如nginx accept分配在不同的进程中

2:reactor实现

增加监听多个端口进行测试,出现卡死问题,服务端和客户端都无反应,客户端都限制在65535,是因为net.nf_conntrack_max = 1048576(#系统默认值为”65536” 服务器上连接超过这个设定的值时,系统会主动丢掉新连接包)参数的问题

3:使用代码进行测试时,第一个问题:

每个进程可以打开的文件的个数限制:

查看相关文件限制:

修改配置文件 sudo vi /etc/security/limits.conf,增加如下内容:

* soft nofile 100000
* hard nofile 100000
root soft nofile 100000
root hard nofile 100000

比如使用xshell重新登陆,用ulimit -a进行查看就会发现已经生效

服务器设置更大一点:2100000

4:第二个问题 :

这是因为socket连接没有及时断开吧,Socket默认连接60秒,60秒之内没有进行心跳交互,即读写数据,就会自动关闭连接。

使用ulimit -a进行查看,发现服务端的进程fd限制是1024,没有发生改变,这是因为系统对fd打开数的限制导致:

需要修改配置,sudo vi /etc/sysctl.conf 增加如下配置:

# 系统最大文件打开数
fs.file-max = 20000000
# 单个用户进程最大文件打开数
fs.nr_open = 20000000

使用sudo sysctl -p使配置生效,用limit -a进行查看

5:第三个问题,客户端报错: socket: Too many open files

客户端已经到达进程可以打开的最大限制100 000;

服务器这边recv()也有相关的报错:

这应该是客户端异常,服务端收到对端的回复:

1.当尝试和未开放的服务器端口建立tcp连接时,服务器tcp将会直接向客户端发送reset报文;
2.双方之前已经正常建立了通信通道,也可能进行过了交互,当某一方在交互的过程中发生了异常,如崩溃等,异常的一方会向对端发送reset报文,通知对方将连接关闭;
3.当收到TCP报文,但是发现该报文不是已建立的TCP连接列表可处理的,则其直接向对端发送reset报文;
4.ack报文丢失,并且超出一定的重传次数或时间后,会主动向对端发送reset报文释放该TCP连接;

4:因为这里是客户端的限制到达,所以总会出现类似的问题,其实已经实现了C10K

6. 梳理一下实现C10K的注意事项:

1:单线程对打开fd的限制,使用ulimit -a进行查看, open files对应的参数,在 /etc/security/limits.conf文件中做修改

* soft nofile 150000
* hard nofile 150000
root soft nofile 150000
root hard nofile 150000

2:服务端对进程打开的fd要求比较多,这个参数多设置一些, /etc/security/limits.conf

root soft nofile 2100000
root hard nofile 2100000
* soft nofile 2100000
* hard nofile 2100000

同时,由于系统对fd的限制,服务器设置这么大的打开数量是不成功的,可以查看系统fd的最大值cat /proc/sys/fs/file-max 需要修改/etc/sysctl.conf

# 系统最大文件打开数
fs.file-max = 20000000
# 单个用户进程最大文件打开数
fs.nr_open = 20000000

需要使用sysctl -p生效

3:修改了系统对fd的限制,单个进程对fd的限制,接下来就是内存的限制,tcp相关收发缓冲区,tcp内存设置,以及tcp相关参数设置:

客户端的tcp设置可以如下: /etc/sysctl.conf

# 系统级别最大打开文件
fs.file-max = 100000
# 单用户进程最大文件打开数
fs.nr_open = 100000
# 是否重用, 快速回收time-wait状态的tcp连接
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
# 单个tcp连接最大缓存byte单位
net.core.optmem_max = 8192
# 可处理最多孤儿socket数量,超过则警告,每个孤儿socket占用64KB空间
net.ipv4.tcp_max_orphans = 10240
# 最多允许time-wait数量
net.ipv4.tcp_max_tw_buckets = 10240
# 从客户端发起的端口范围,默认是32768 61000,则只能发起2w多连接,改为一下值,可一个IP可发起差不多6.4w连接。
net.ipv4.ip_local_port_range = 1024 65535

服务端tcp的设置可以如下:/etc/sysctl.conf

# 系统最大文件打开数
fs.file-max = 20000000
# 单个用户进程最大文件打开数
fs.nr_open = 20000000
# 全连接队列长度,默认128
net.core.somaxconn = 10240
# 半连接队列长度,当使用sysncookies无效,默认128
net.ipv4.tcp_max_syn_backlog = 16384
net.ipv4.tcp_syncookies = 0
# 网卡数据包队列长度  
net.core.netdev_max_backlog = 41960
# time-wait 最大队列长度
net.ipv4.tcp_max_tw_buckets = 300000
# time-wait 是否重新用于新链接以及快速回收
net.ipv4.tcp_tw_reuse = 1  
net.ipv4.tcp_tw_recycle = 1
# tcp报文探测时间间隔, 单位s
net.ipv4.tcp_keepalive_intvl = 30
# tcp连接多少秒后没有数据报文时启动探测报文
net.ipv4.tcp_keepalive_time = 900
# 探测次数
net.ipv4.tcp_keepalive_probes = 3
# 保持fin-wait-2 状态多少秒
net.ipv4.tcp_fin_timeout = 15  
# 最大孤儿socket数量,一个孤儿socket占用64KB,当socket主动close掉,处于fin-wait1, last-ack
net.ipv4.tcp_max_orphans = 131072  
# 每个套接字所允许得最大缓存区大小
net.core.optmem_max = 819200
# 默认tcp数据接受窗口大小
net.core.rmem_default = 262144  
net.core.wmem_default = 262144  
net.core.rmem_max = 16777216  
net.core.wmem_max = 16777216
# tcp栈内存使用第一个值内存下限, 第二个值缓存区应用压力上限, 第三个值内存上限, 单位为page,通常为4kb
net.ipv4.tcp_mem = 786432 4194304 8388608
# 读, 第一个值为socket缓存区分配最小字节, 第二个,第三个分别被rmem_default, rmem_max覆盖
net.ipv4.tcp_rmem = 4096 4096 4206592
# 写, 第一个值为socket缓存区分配最小字节, 第二个,第三个分别被wmem_default, wmem_max覆盖
net.ipv4.tcp_wmem = 4096 4096 4206592

4:因为我们这里使用的同一个环境上的虚拟机进行测试,这里暂时不关注tcp内存,参数相关的设置,则,剩下就是我们的代码实现。

其实就是五元组:(sip,dsp,sport,dport,proto)

3 1 6w 5(监听) 1(tcp)

如果用3个客户端进行测试,则每个进程能打开的fd的数量,其实就是它的限制:

客户端分析:

每个客户端可以创建限制数个socketfd,使用该fd去连接服务器的监听端口,这里的限制其实就是客户端可以打开的fd的数量。

服务端分析:

需要创建n个监听的socketfd,accept可以监听到对端的ip和端口号:

则这里3个ip *每个ip遍历的端口号的个数 *listenfd的个数就是我们最终可以连接的实际最大的fd的数量,当然,小于限制线程可以连接的限制。

注意:这里listen的fd一定要客户端进行连接才有效

5:好像tcp在没有数据的时候,是一起接收的,一下子接收一大堆。

6:疑惑,在自己的电脑上进行测试的时候,当3个客户端达到大约5多的时候,服务器端会直接调用killed???

===》好像是内存超出相关问题,使用一定的手段进行监听测试,可以查看相关内存的使用率

7:问题:

1:epoll监听事件的个数

2:对fd的关闭导致重新连接

8:select并不是受限与1024个,而是受限于linux进程所能打开的fd的个数

7:相关源码

服务端测试源码:reactor.c

// typedef unsigned long int uint64_t;
// static uint64_t GetCurrentTime()
// {
//     struct timeval tv;
//     gettimeofday(&tv, NULL);
//     return tv.tv_sec * 1000 + tv.tv_usec / 1000;
// }
/***************************
typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;
struct epoll_event
{
  uint32_t events;     Epoll events 
  epoll_data_t data;   User data variable 
} __EPOLL_PACKED;
从epoll_event 结构体可以看出,可以使用data中的ptr,或者fd进行事件触发的判断
****************************/
//一般的epoll用fd进行判断,这里我们用ptr封装实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
/*****************************************************
梳理实现思路:
  借用epoll实现事件触发,一个epoll可以管理一个reactor处理器
  epoll事件需要包括:
    fd, 回调函数,参数
*****************************************************/
#define EPOLL_EVENT_SIZE 1024*1024
//注意这里的回调函数定义没有带* typedef int (*callFunc)(void *data);
typedef int CALLBACK(int, int, void*);
typedef struct my_event{
  int fd;
  int events;   //保存监听的事件类型 in out 还是其他
  int (*callback)(int fd, int events, void * arg);
  void* arg;
  int status;
  char buffer[1024]; //这里直接当作接收和处理的buffer,实际上要分开
  int length;
}MY_EPOLL_EVENT;
//管理节点,每个epoll对应管理一个
typedef struct reactor_manager
{
  int epfd;
  MY_EPOLL_EVENT* events;
}REACTOR_MANAGER;
//初始化对应的事件  保存相关的参数
void epoll_event_set(MY_EPOLL_EVENT * ev, int fd, CALLBACK callback, void *arg)
{
  ev->fd = fd; //触发的fd
  ev->events = 0; //需要监听的事件
  ev->callback = callback; //对应的回调函数
  ev->arg = arg; //把管理节点传进来,后续操作
  return ;
}
//使用epoll进行监听
int epoll_event_add(int epfd, int events, MY_EPOLL_EVENT* ev)
{
  struct epoll_event ep_ev = {0, {0}};
  ep_ev.data.ptr = ev; //epoll这里可以存
  ep_ev.events = ev->events = events; // epoll和这里的事件都改变监听类型
//业务处理 epoll_ctl 参数标识 只是为了监听add和mod的状态
  int op;
  if(ev->status == 1)
  {
    op = EPOLL_CTL_MOD;
  }else
  {
    op = EPOLL_CTL_ADD;
    ev->status = 1;
  }
//加入epoll的监听中
  if(epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0)
  {
    printf("event add failed [fd = %d], events[%d] \n",ev->fd, events);
    return -1;
  }
  return 0;
}
//删除监听的事件
int epoll_event_del(int epfd, MY_EPOLL_EVENT *ev)
{
  //证明就没加入过
  if(ev->status != 1)
  {
    return -1;
  }
  struct epoll_event ep_ev = {0, {0}};
  ep_ev.data.ptr = ev;
  ev->status = 0;
  ev->events = 0;
  epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
  return 0;
}
//传入端口,实现监听  端口号用short
int init_sock(unsigned short port);
//直接申请了数组大小的事件
int reactor_init(REACTOR_MANAGER * reator);
//加入监听事件中 自己的fd对应自己的回调函数
int reactor_addlistener(REACTOR_MANAGER* reactor, int fd, CALLBACK *callback);
int reactor_run(REACTOR_MANAGER* reactor);
int reactor_destory(REACTOR_MANAGER* reactor);
//三种回调函数
int accept_callback(int fd, int events, void * arg);
int send_callback(int fd, int events, void * arg);
int recv_callback(int fd, int events, void * arg);
int main(int argc, char* argv[])
{
  unsigned short port = 8888;
  if(argc == 2)
  {
    port = atoi(argv[1]);
  }
  REACTOR_MANAGER * reactor = (REACTOR_MANAGER*)malloc(sizeof(REACTOR_MANAGER));
  reactor_init(reactor);
  //可以监听多个fd,先创建fd,再加入监听事件中
  int listenfd[100] = {0};
  for(int i=0; i<100; i++)
  {
    listenfd[i] = init_sock(port+i);
    reactor_addlistener(reactor, listenfd[i], accept_callback);
  }
  //事件的触发以及循环进行处理
  reactor_run(reactor);
  //对reactor进行销毁
  reactor_destory(reactor);
  //关闭对应的端口
  for(int i=0; i<100; i++)
  {
    close(listenfd[i]);
  }
  free(reactor);
  return 0;
}
//创建socket,设置非阻塞 设置对应的回调
//accept进行监听 
//监听到设置非阻塞,设置对应的回调函数
int init_sock(unsigned short port)
{
  int fd = socket(AF_INET, SOCK_STREAM, 0);
  fcntl(fd, F_SETFL, O_NONBLOCK);
  struct sockaddr_in server_addr;
  memset(&server_addr, 0, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(port);
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
  bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
  //研究源码了解一下listen的第二个参数
  if(listen(fd, 20) < 0)
  {
    printf("fd [%d] listen failed: %s \n", fd, strerror(errno));
  }
  printf("listen port: [%d] fd: [%d] \n", port, fd);
  return fd;
}
//这里直接申请一定数据的event,是限制
//为管理节点进行初始化
int reactor_init(REACTOR_MANAGER * reactor)
{
  if(reactor == NULL)
  {
    return -1;
  }
  memset(reactor, 0, sizeof(REACTOR_MANAGER));
  reactor->epfd = epoll_create(1);
  if(reactor->epfd < 0)
  {
    printf("create epfd in [%s] error [%s]\n ",__func__, strerror(errno));
    return -2;
  }
  reactor->events = (MY_EPOLL_EVENT*)malloc(EPOLL_EVENT_SIZE* sizeof(MY_EPOLL_EVENT));
  if(reactor->events == NULL)
  {
    printf("create epfd in [%s] error [%s]\n ",__func__, strerror(errno));
    close(reactor->epfd);
    return -3;
  }
  return 0;
}
int reactor_destory(REACTOR_MANAGER* reactor)
{
  if(reactor != NULL)
  {
    close(reactor->epfd);
    free(reactor->events);
  }
  return 0;
}
/**********************************************************************************
void epoll_event_set(MY_EPOLL_EVENT * ev, int fd, CALLBACK callback, void *arg);
int epoll_event_add(int epfd, int events, MY_EPOLL_EVENT* ev);
int epoll_event_del(int epfd, MY_EPOLL_EVENT *ev);
int accept_callback(int fd, int events, void * arg);
int send_callback(int fd, int events, void * arg);
int recv_callback(int fd, int events, void * arg);
**********************************************************************************/
//把对应的fd加入到epoll事件中
//默认规则是fd就是事件下标
int reactor_addlistener(REACTOR_MANAGER* reactor, int fd, CALLBACK *callback)
{
  if(reactor == NULL ||reactor->events == NULL)
  {
    return -1;
  }
  //epoll的监听类型和epoll_ctl是两回事
  epoll_event_set(&reactor->events[fd], fd, callback, reactor);
  epoll_event_add(reactor->epfd, EPOLLIN, &reactor->events[fd]);
  return 0;
}
int reactor_run(REACTOR_MANAGER* reactor)
{
  if(reactor == NULL || reactor->events ==NULL || reactor->epfd <0)
  {
    return -1;
  }
  //定义事件数组,接收epoll_wait参数做处理
  struct epoll_event event_waits[102400];
  int checkpos = 0, i;
  while(1)
  {
    //可以增加一个判断,在event中加个获取最后操作时间,
    //循环遍历,可以每次编译100个,依次遍历事件数组,把超时的进行释放epoll_event_del
    int nready = epoll_wait(reactor->epfd, event_waits, 102400, 1000);//超时时间
    if(nready < 0) //超时返回0 错误返回-1
    {
      printf("epoll_wait error. exit \n");
      continue;
    }
    //触发到对应的事件,执行对应的回调
    for( i = 0;i <nready; i++)
    {
      MY_EPOLL_EVENT* ev = (MY_EPOLL_EVENT*)event_waits[i].data.ptr;
      if((event_waits[i].events & EPOLLIN) && (ev->events & EPOLLIN))
      {
        ev->callback(ev->fd, event_waits[i].events, ev->arg);
      }
      if((event_waits[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
      {
        ev->callback(ev->fd, event_waits[i].events, ev->arg);
      }
    }
  }
  return 0;
}
//MY_EPOLL_EVENT  REACTOR_MANAGER
//accept监听回调
int accept_callback(int fd, int events, void * arg)
{
  //通过参数获取到管理节点,然后进行事件处理
  REACTOR_MANAGER * reactor = (REACTOR_MANAGER*)arg;
  if(reactor == NULL)
  {
    return -1;
  }
  //开始进行accept监听,监听到的fd设置非阻塞塞入reactor事件中
  struct sockaddr_in client_addr;
  socklen_t len = sizeof(client_addr);
  int clientfd;
  if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
  {
    //出错
    if (errno != EAGAIN && errno != EINTR)
    {
      //应该跳过, 重试或者中断逻辑
      printf("errno not EAGAIN or EINTR \n");
    }
    printf("accept: %s\n", strerror(errno));
    return -1;
  }
  int flag = 0;
  //fcntl正确做法应该是先获取状态再设置
  if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) 
  {
    printf("%s: fcntl nonblocking failed, %d\n", __func__, clientfd);
    return -1;
  }
  epoll_event_set(&reactor->events[clientfd], clientfd, recv_callback, reactor);
  epoll_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
  // printf("new connect [%s:%d][time:%ld], pos[%d]\n", 
  //  inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i);
  printf("new connect [%s:%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
  return 0;
}
// void epoll_event_set(MY_EPOLL_EVENT * ev, int fd, CALLBACK callback, void *arg);
// int epoll_event_add(int epfd, int events, MY_EPOLL_EVENT* ev);
// int epoll_event_del(int epfd, MY_EPOLL_EVENT *ev);
//接收监听 开始执行接收,并设置发送的事件
int recv_callback(int fd, int events, void * arg)
{
  REACTOR_MANAGER * reactor = (REACTOR_MANAGER*)arg;
  if(reactor == NULL)
  {
    return -1;
  }
  MY_EPOLL_EVENT* ev = reactor->events +fd;
  int len = recv(fd, ev->buffer, 1024, 0);
  epoll_event_del(reactor->epfd, ev);
  if(len>0)
  {
    ev->length = len;
    ev->buffer[len] = '\0';
    printf("client [%d]: [%s] \n", fd, ev->buffer);
    epoll_event_set(&reactor->events[fd], fd, send_callback, reactor);
    epoll_event_add(reactor->epfd, EPOLLOUT, ev);
  }else if(len == 0)
  {
    close(ev->fd);
    printf("[fd=%d]  closed\n", fd);
  }else{
    close(ev->fd);
    printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno)); 
  }
}
//发送监听 完了监听接收
int send_callback(int fd, int events, void * arg)
{
  REACTOR_MANAGER * reactor = (REACTOR_MANAGER*)arg;
  if(reactor == NULL)
  {
    return -1;
  }
  MY_EPOLL_EVENT* ev = reactor->events +fd;
  int len = send(fd, ev->buffer, ev->length, 0);
  if(len > 0)
  {
    printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);
    epoll_event_del(reactor->epfd, ev);
    epoll_event_set(ev, fd, recv_callback, reactor);
    epoll_event_add(reactor->epfd, EPOLLIN, ev);
  }else
  {
    close(ev->fd);
    epoll_event_del(reactor->epfd, ev);
    printf("send[fd=%d] error %s\n", fd, strerror(errno));
  }
  return len;
}

客户端测试代码 :multiple_port_client_epoll.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
/*************************************************
作为测试百万并发的客户端
创建多个socket,一直进行连接。
注意,如果服务器的端口监听多个,这里也要遍历去进行连接
设置端口可重用主要是针对服务端,Address already in use
这里也是大量的客户端:涉及epoll监听的大小
***************************************************/
#define MAX_EPOLLSIZE (384*1024)
#define LISTEN_MAX_PORT 100
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
static int SetNonblock(int fd) {
  int flags;
  flags = fcntl(fd, F_GETFL, 0);
  if (flags < 0) return flags;
  flags |= O_NONBLOCK;
  if (fcntl(fd, F_SETFL, flags) < 0) return -1;
  return 0;
}
static int SetReUseAddr(int fd) {
  int reuse = 1;
  return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}
//输入服务端的ip和port开始进行连接
int main(int argc, char* argv[])
{
  if(argc <= 2)
  {
    printf("Usage : %s ip port\n", argv[0]);
    return -1;
  }
  //获取ip port
  const char * ip = argv[1];
  int port = atoi(argv[2]);
  //epoll相关
  struct epoll_event events[MAX_EPOLLSIZE];
  int epfd = epoll_create(1);
  //服务器地址相关
  struct sockaddr_in addr;
  memset(&addr, 0, sizeof(struct sockaddr_in));
  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = inet_addr(ip);
  //相关参数
  int connect_nums = 0; //连接数量的判断
  int port_index = 0;   //监听端口的判断
  int sockfd = 0;
  struct epoll_event ev;
  char buffer[128] = {0};
  //为了获取间隔时间做准备 gettimeofday接口
  struct timeval tv_begin;
  gettimeofday(&tv_begin, NULL);
  //循环遍历塞入不同的服务端监听port
  while(1)
  {
    if(++port_index >= LISTEN_MAX_PORT) port_index = 0;
    //socket,连接并进行发送,加入epoll_ctl
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) 
    {
      perror("socket create error \n");
      goto err;
    }
    //连接服务端
    addr.sin_port = htons(port+port_index);
    if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) 
    {
      perror("connect error\n");
      goto err;
    }
    //设置非阻塞和可重用,并进行发送探测:
    SetNonblock(sockfd);
    SetReUseAddr(sockfd);
    sprintf(buffer, "Hello Server: client num --> %d\n", connect_nums);
    send(sockfd, buffer, strlen(buffer), 0);
    connect_nums++;
    ev.data.fd = sockfd;
    ev.events = EPOLLIN | EPOLLOUT;
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
    //每间隔999个epoll就去开始监听一次,进行发送和接收
    if(connect_nums%1000 == 999)
    {
      //获取当前时间,替换时间
      struct timeval tv_cur;
      memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
      gettimeofday(&tv_begin, NULL);
      int time_used = TIME_SUB_MS(tv_begin, tv_cur);
      printf("connect_nums: %d, sockfd:%d, time_used:%d\n", connect_nums, sockfd, time_used);
      //使用epoll进行监听等待
      int nready = epoll_wait(epfd, events, connect_nums, 100);
      for(int i=0; i<nready; i++)
      {
        //获取监听到的fd,进行发送
        int clientfd = events[i].data.fd;
        if (events[i].events & EPOLLOUT) {
          //这里是客户端连接服务端的fd 自己的port和服务端信息的五元组
          sprintf(buffer, "data from fd:%d\n", clientfd);
          send(clientfd, buffer, strlen(buffer), 0);
        }else if(events[i].events & EPOLLIN)
        {
          char recv_buff[128] = {0};
          ssize_t recv_length = recv(clientfd, recv_buff, 128, 0);
          if(recv_length >0)
          {
            printf("recv buff is [%s] \n", recv_buff);
            // if (!strcmp(recv_buff, "quit")) {
            //  isContinue = 0;
            // }
          }else if (recv_length == 0) 
          {
            printf(" Disconnect clientfd:%d\n", clientfd);
            connect_nums --;
            close(clientfd);
          }else
          {
            if (errno == EINTR ||errno == EAGAIN) continue;
            //errno代码为11(EAGAIN)  对非阻塞socket而言,EAGAIN不是一种错误
            printf(" Error clientfd:%d, recv errno:%d\n", clientfd, errno);
            close(clientfd);
          }
        }else 
        {
          //监听到其他异常
          printf(" clientfd:%d, epoll_wait  errno:%d\n", clientfd, errno);
          close(clientfd);
        }
      }
    }
    usleep(1 * 1000);
  }
  return 0;
err:
  printf("return error : %s\n", strerror(errno));
  return 0;
}
目录
相关文章
|
网络协议
reactor(百万并发服务器) - 2
reactor(百万并发服务器) - 2
54 0
|
18小时前
|
架构师 Java 测试技术
一文搞透高并发指标(QPS、TPS、吞吐量等)
详解高并发场景下的QPS、TPS、RT及吞吐量等关键性能指标,帮助理解系统性能评估的核心概念。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
一文搞透高并发指标(QPS、TPS、吞吐量等)
|
19小时前
|
消息中间件 存储 缓存
QPS多少,才算高并发 ?
本文详解高并发概念及 QPS 标准,大厂面试高频点,建议掌握收藏。关注【mikechen的互联网架构】,10年+BAT架构经验分享。
QPS多少,才算高并发 ?
|
6月前
|
存储 缓存 安全
2.2.1服务器百万并发实现
2.2.1服务器百万并发实现
|
6月前
|
存储 网络协议 Linux
百万并发服务器
百万并发服务器
48 0
|
6月前
|
消息中间件 Java 程序员
阿里巴巴高并发架构到底多牛逼?是如何抗住淘宝双11亿级并发量?
众所周知,在Java的知识体系中,并发编程是非常重要的一环,也是面试的必问题,一个好的Java程序员是必须对并发编程这块有所了解的。
|
存储
服务器百万并发的原理与实现
服务器百万并发的原理与实现
178 0
|
编解码 搜索推荐 测试技术
读书笔记第四讲:《百万级并发商品服务架构解密》丁鸣亮
读书笔记第四讲:《百万级并发商品服务架构解密》丁鸣亮
|
设计模式 架构师 算法
这个时代,达不到百万以上并发量都不叫高并发!!收藏学以致用
成为一名年薪百万的顶尖架构师,实现财富自由,是大多数JAVA高级程序员的职业追求。 这不仅是技术发展的趋势,同时也是个人职业价值的体现。 但最终能否成为IT架构中的「灵魂人物」,做出亿级用户量的产品、搭建承载百万级并发的架构,还要取决于你能不能翻过并发量这道坎。
|
安全 网络协议 Shell
高并发服务器的限制有哪些,如何提高并发量
高并发服务器的限制有哪些,如何提高并发量
高并发服务器的限制有哪些,如何提高并发量