1 数据结构
1.1 节点
节点描述的是每一个tcp connection要处理的信息,粗略的用下面的结构体出抽象
typedef struct zv_connect_s{
int fd;
ZVCALLBACK cb;
char rbuffer[BUFFER_LENGTH]; // channel
int rc; // read buffer count
int count;
char wbuffer[BUFFER_LENGTH]; // channel
int wc; // wirte buffer count
} zv_connect_t;
1.2 链表与链表的内容(数组)
typedef struct zv_connblock_s { // a list of connects
zv_connect_t *block; // 这里是指针,是因为要动态分配内存,把它理解为数组
struct zv_connblocks_s *next; // next指向自己===> 链表
} zv_connblock_t;
可以参照下面的图片理解:
每一个方框都是一个zv_connect_t 的集合(数组),末尾是链表的尾指针。
1.3 reactor 抽象定义
reactor可以用下面的结构体去抽象,它包含epoll的fd, zv_connect_t *block的数量(需要动态分配内存),然后它指向 zv_connblock_s链表的表头
typedef struct zv_reactor_s {
int epfd;
int blkcnt;
zv_connblock_t *blockheader; // 这里是指针,指向block的头
} zv_reactor_t;
现在初始化它,假设有1024个block
zv_reactor_t *reactor;
reactor->blockheader = malloc(sizeof(zv_connblock_t) + 1024* (sizeof(zv_connect_t) + sizeof(reactor->blockheader->next)));
// 分配内存详解参见:https://blog.csdn.net/qq43645149/article/details/130300102
if (reactor->blockheader == NULL) return -1;
reactor->blockheader->block = (zv_connect_t*)(reactor->blockheader + 1);
2 reactor模式
把网络和业务分离,有如下特点:
- 高并发:通过非阻塞IO和事件驱动的方式,可以同时处理多个请求,提高并发处理能力。
- 可扩展性:由于避免了阻塞,因此可以很容易地增加更多的线程或服务器节点,以支持更大规模的应用程序。
- 灵活性:Reactor模式可以适应不同的应用场景,如多线程、单线程、多进程等,可以根据需要进行灵活配置。
- 易于维护:Reactor模式将各个组件分离开来,使得代码结构清晰、易于维护。
2.1 这么好的模式,怎么实现它呢?
2.1.1 联系前面定义的数据结构
定位规则
比如第n个fd, 用它除以1024,以得到的商确定它在第几个block, 以余数确定它是第几个block中的第几个元素(这里是 zv_connect_s)
回到前面看zv_connect_s里面有什么呢?---> callback, rbuffer, wbuffer, 以及相应的size。
也就是说,下图中的每个block块里面的节点都可以绑定 callback, rbuffer,rc, wbuffer, wc,一个节点分裂处理好几个元素,
于是这就叫做reactor( 反应堆)模式吧。
图解:
2.1.2 为各种fd安家落户,安排相应的任务
上面的block可看成是小区,每个小区里面都有1024户人家,当有成千上万个fd的时候,就可以根据fd, 读写事件等注册相应的任务(回调函数)
reactor模式用在服务端,
从TCP三次握手的流程看,listen, accept 都是发生在服务端,对应的都是读事件,
listen之后,注册读事件的业务 accept_cb,
accept之后,注册读事件的业务 read_cb
执行 read_cb 的时候,可以注册写事件的业务send_cb
执行 send_cb 的时候,可以注册读事件的业务read_cb 。。。。。(事件循环)
==注册事件的时候事件(cb)与 fd 绑定==
这样reactor的事件与业务分离的模式就有概念了。
这里reactor是站在服务器的角度去处理问题的,处理(cb)了客户端的消息后,会有一个回馈(又一个cb)发给客户端
cb的执行的地方是业务,注册事件是事件开始的地方。
3 代码部分
3.1 初始化部分代码
//printf("sockfd: %d\n", sockfd);
zv_reactor_t reactor;
zv_init_reactor(&reactor); // epoll
int port = atoi(argv[1]);
int i = 0;
for (i = 0;i < 1;i ++) {
int sockfd = init_server(port+i); //
set_listener(&reactor, sockfd, accept_cb); //
}
3.2 事件循环部分
struct epoll_event events[EVENT_LENGTH] = {0};
while (1) { //mainloop, event driver
int nready = epoll_wait(reactor.epfd, events, EVENT_LENGTH, -1);
int i = 0;
for (i = 0;i < nready;i ++) { // 这里只用处理读写事件,业务部分在这里已经看不到了
int connfd = events[i].data.fd;
zv_connect_t *conn = zv_connect_idx(&reactor, connfd);
if (events[i].events & EPOLLIN) {
conn->cb(connfd, events[i].events, &reactor);
}
if (events[i].events & EPOLLOUT) {
conn->cb(connfd, events[i].events, &reactor);
}
}
}
3.2 事件注册
listen 注册读事件
int set_listener(zv_reactor_t *reactor, int fd, ZVCALLBACK cb) {
if (!reactor || !reactor->blockheader) return -1;
reactor->blockheader->block[fd].fd = fd;
reactor->blockheader->block[fd].cb = cb;
struct epoll_event ev;
ev.events = EPOLLIN; // 注册读事件
ev.data.fd = fd;
epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, fd, &ev);
}
accept 注册读事件
int accept_cb(int fd, int events, void *arg) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
if (clientfd < 0) {
printf("accept errno: %d\n", errno);
return -1;
}
//
printf(" clientfd: %d\n", clientfd);
zv_reactor_t *reactor = (zv_reactor_t*)arg;
zv_connect_t *conn = zv_connect_idx(reactor, clientfd);
conn->fd = clientfd;
conn->cb = recv_cb; // 事件对应的回调
conn->count = BUFFER_LENGTH;
struct epoll_event ev;
ev.events = EPOLLIN; // 注册读事件
ev.data.fd = clientfd;
epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev);
}
业务处理, 注册读写事件
int send_cb(int fd, int event, void *arg) {
//printf("send_cb\n");
zv_reactor_t *reactor = (zv_reactor_t*)arg;
zv_connect_t *conn = zv_connect_idx(reactor, fd);
zv_http_response(conn);
//echo
send(fd, conn->wbuffer, conn->wc, 0); // send header
#if 1
if (conn->enable_sendfile) { // sendbody
int filefd = open(conn->resource, O_RDONLY);
if (filefd == -1) {
printf("errno: %d\n", errno);
return -1;
}
struct stat stat_buf;
fstat(filefd, &stat_buf);
int ret = sendfile(fd, filefd, NULL, stat_buf.st_size); // sendbody
if (ret == -1) {
printf("errno: %d\n", errno);
}
close(filefd);
}
#endif
conn->cb = recv_cb; // 事件对应的回调
//
struct epoll_event ev;
ev.events = EPOLLIN; // // 注册读事件
ev.data.fd = fd;
epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
}
int recv_cb(int fd, int event, void *arg) {
zv_reactor_t *reactor = (zv_reactor_t*)arg;
zv_connect_t *conn = zv_connect_idx(reactor, fd);
int ret = recv(fd, conn->rbuffer+conn->rc, conn->count, 0);
if (ret < 0) {
} else if (ret == 0) {
//
conn->fd = -1;
conn->rc = 0;
conn->wc = 0;
//
epoll_ctl(reactor->epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return -1;
} else {
conn->rc += ret;
printf("rbuffer: %s, ret: %d\n", conn->rbuffer, conn->rc);
// --> echo
//memcpy(conn->wbuffer, conn->rbuffer, conn->rc);
//conn->wc = conn->rc;
zv_http_request(conn);
conn->cb = send_cb; // 事件对应的回调
//
struct epoll_event ev;
ev.events = EPOLLOUT; // 注册写事件
ev.data.fd = fd;
epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
文章参考与<零声教育>的C/C++linux服务期高级架构系统教程学习:链接