Reactor学习,从数据结构,内存分配,概念上分析

简介: Reactor学习,从数据结构,内存分配,概念上分析

@TOC

1 数据结构

1.1 节点

节点描述的是每一个tcp connection要处理的信息,粗略的用下面的结构体出抽象

 typedef struct zv_connect_s{
   
   
    int fd;
    ZVCALLBACK cb;
    char rbuffer[BUFFER_LENGTH];  // channel
    int rc;   // read buffer count
    int count;
    char wbuffer[BUFFER_LENGTH]; // channel
    int wc;  // wirte buffer count    
} zv_connect_t;

1.2 链表与链表的内容(数组)

typedef struct zv_connblock_s {
   
     // a list of connects
    zv_connect_t *block; //   这里是指针,是因为要动态分配内存,把它理解为数组
    struct zv_connblocks_s *next;   //  next指向自己===> 链表
} zv_connblock_t;

可以参照下面的图片理解:
长方形相当于是一个block 在这里插入图片描述每一个方框都是一个zv_connect_t 的集合(数组),末尾是链表的尾指针。

1.3 reactor 抽象定义

reactor可以用下面的结构体去抽象,它包含epoll的fd, zv_connect_t *block的数量(需要动态分配内存),然后它指向 zv_connblock_s链表的表头

typedef struct zv_reactor_s {
   
   
    int epfd;
    int blkcnt;

    zv_connblock_t *blockheader; //  这里是指针,指向block的头
} zv_reactor_t;

现在初始化它,假设有1024个block

zv_reactor_t *reactor;
reactor->blockheader = malloc(sizeof(zv_connblock_t) + 1024* (sizeof(zv_connect_t) + sizeof(reactor->blockheader->next)));
// 分配内存详解参见:https://blog.csdn.net/qq43645149/article/details/130300102
    if (reactor->blockheader == NULL) return -1;
reactor->blockheader->block = (zv_connect_t*)(reactor->blockheader + 1);

2 reactor模式

把网络和业务分离,有如下特点:

  1. 高并发:通过非阻塞IO和事件驱动的方式,可以同时处理多个请求,提高并发处理能力。

  2. 可扩展性:由于避免了阻塞,因此可以很容易地增加更多的线程或服务器节点,以支持更大规模的应用程序。

  3. 灵活性:Reactor模式可以适应不同的应用场景,如多线程、单线程、多进程等,可以根据需要进行灵活配置。

  4. 易于维护:Reactor模式将各个组件分离开来,使得代码结构清晰、易于维护。

2.1 这么好的模式,怎么实现它呢?

2.1.1 联系前面定义的数据结构

定位规则

比如第n个fd, 用它除以1024,以得到的商确定它在第几个block, 以余数确定它是第几个block中的第几个元素(这里是 zv_connect_s)

回到前面看zv_connect_s里面有什么呢?---> callback, rbuffer, wbuffer, 以及相应的size。
也就是说,下图中的每个block块里面的节点都可以绑定 callback, rbuffer,rc, wbuffer, wc,一个节点分裂处理好几个元素,
于是这就叫做reactor( 反应堆)模式吧。

图解:

在这里插入图片描述

2.1.2 为各种fd安家落户,安排相应的任务

上面的block可看成是小区,每个小区里面都有1024户人家,当有成千上万个fd的时候,就可以根据fd, 读写事件等注册相应的任务(回调函数)
reactor模式用在服务端,
从TCP三次握手的流程看,listen, accept 都是发生在服务端,对应的都是读事件,

listen之后,注册读事件的业务 accept_cb,
accept之后,注册读事件的业务 read_cb
执行 read_cb 的时候,可以注册写事件的业务send_cb
执行 send_cb 的时候,可以注册读事件的业务read_cb 。。。。。(事件循环)

==注册事件的时候事件(cb)与 fd 绑定==
这样reactor的事件与业务分离的模式就有概念了。
这里reactor是站在服务器的角度去处理问题的,处理(cb)了客户端的消息后,会有一个回馈(又一个cb)发给客户端
cb的执行的地方是业务,注册事件是事件开始的地方。

3 代码部分

3.1 初始化部分代码


//printf("sockfd: %d\n", sockfd);
zv_reactor_t reactor;
zv_init_reactor(&reactor);  // epoll 



int port = atoi(argv[1]);

int i = 0;
for (i = 0;i < 1;i ++) {
   
   
    int sockfd = init_server(port+i);  //
    set_listener(&reactor, sockfd, accept_cb); // 
}

3.2 事件循环部分

struct epoll_event events[EVENT_LENGTH] = {
   
   0};

    while (1) {
   
    //mainloop, event driver

        int nready = epoll_wait(reactor.epfd, events, EVENT_LENGTH, -1);

        int i = 0;
        for (i = 0;i < nready;i ++) {
   
     // 这里只用处理读写事件,业务部分在这里已经看不到了

            int connfd = events[i].data.fd;
            zv_connect_t *conn = zv_connect_idx(&reactor, connfd);

            if (events[i].events & EPOLLIN) {
   
   

                conn->cb(connfd, events[i].events, &reactor);

            }

            if (events[i].events & EPOLLOUT) {
   
   

                conn->cb(connfd, events[i].events, &reactor);

            }
        }

    }

3.2 事件注册

listen 注册读事件

int set_listener(zv_reactor_t *reactor, int fd, ZVCALLBACK cb) {
   
   

    if (!reactor || !reactor->blockheader) return -1;

    reactor->blockheader->block[fd].fd = fd;
    reactor->blockheader->block[fd].cb = cb;

    struct epoll_event ev;
    ev.events = EPOLLIN;                                                   //  注册读事件
    ev.data.fd = fd;

    epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, fd, &ev);

}

accept 注册读事件

int accept_cb(int fd, int events, void *arg) {
   
   

    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
    if (clientfd < 0) {
   
   
        printf("accept errno: %d\n", errno);
        return -1;
    }

    // 

    printf(" clientfd: %d\n", clientfd);

    zv_reactor_t *reactor = (zv_reactor_t*)arg;
    zv_connect_t *conn = zv_connect_idx(reactor, clientfd);

    conn->fd = clientfd;
    conn->cb = recv_cb;                                     //  事件对应的回调
    conn->count = BUFFER_LENGTH;

    struct epoll_event ev;
    ev.events = EPOLLIN;                                         //  注册读事件
    ev.data.fd = clientfd;
    epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev);

}

业务处理, 注册读写事件

int send_cb(int fd, int event, void *arg) {
   
   

    //printf("send_cb\n");

    zv_reactor_t *reactor = (zv_reactor_t*)arg;
    zv_connect_t *conn = zv_connect_idx(reactor, fd);


    zv_http_response(conn);


    //echo 
    send(fd, conn->wbuffer, conn->wc, 0);  // send header

#if 1
    if (conn->enable_sendfile) {
   
    // sendbody

        int filefd = open(conn->resource, O_RDONLY);
        if (filefd == -1) {
   
   
            printf("errno: %d\n", errno);
            return -1;
        }

        struct stat stat_buf;
        fstat(filefd, &stat_buf);

        int ret = sendfile(fd, filefd, NULL, stat_buf.st_size);  // sendbody
        if (ret == -1) {
   
   
            printf("errno: %d\n", errno);
        }

        close(filefd);

    }

#endif


    conn->cb = recv_cb;                                                        //  事件对应的回调

    //
    struct epoll_event ev;
    ev.events = EPOLLIN; //                                        //  注册读事件
    ev.data.fd = fd;
    epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);

}


int recv_cb(int fd, int event, void *arg) {
   
   

    zv_reactor_t *reactor = (zv_reactor_t*)arg;
    zv_connect_t *conn = zv_connect_idx(reactor, fd);


    int ret = recv(fd, conn->rbuffer+conn->rc, conn->count, 0);
    if (ret < 0) {
   
   

    } else if (ret == 0) {
   
   

        //
        conn->fd = -1;
        conn->rc = 0;
        conn->wc = 0;

        //
        epoll_ctl(reactor->epfd, EPOLL_CTL_DEL, fd, NULL);

        close(fd);

        return -1;
    } else {
   
   

        conn->rc += ret;
        printf("rbuffer: %s, ret: %d\n", conn->rbuffer, conn->rc);

        // --> echo
        //memcpy(conn->wbuffer, conn->rbuffer, conn->rc);
        //conn->wc = conn->rc;

        zv_http_request(conn);

        conn->cb = send_cb;                                        //  事件对应的回调

        //
        struct epoll_event ev;
        ev.events = EPOLLOUT;                                       //  注册写事件
        ev.data.fd = fd;
        epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);


    }

}

文章参考与<零声教育>的C/C++linux服务期高级架构系统教程学习:链接

相关文章
|
30天前
|
安全 程序员 编译器
动态内存管理学习分享
动态内存管理学习分享
45 0
|
1月前
|
机器学习/深度学习 算法 Windows
数据结构中的几种时间复杂度分析方式
数据结构中的几种时间复杂度分析方式
31 0
|
11天前
|
Python
python学习-函数模块,数据结构,字符串和列表(下)
python学习-函数模块,数据结构,字符串和列表
55 0
|
30天前
|
存储 算法 搜索推荐
数据结构-概念版(七)
数据结构-概念版
49 0
|
30天前
|
存储 算法 Serverless
数据结构-概念版(六)
数据结构-概念版
38 0
|
30天前
|
存储 机器学习/深度学习 算法
数据结构-概念版(二)
数据结构-概念版
32 0
|
1月前
|
算法 Java C++
【C/C++ 内存知识扩展】内存不足的可能性分析
【C/C++ 内存知识扩展】内存不足的可能性分析
12 0
|
1月前
|
消息中间件 Linux
Linux进程间通信(IPC)教程 Linux共享内存介绍:介绍POSIX共享内存的基本概念、用途和编程实践
Linux进程间通信(IPC)教程 Linux共享内存介绍:介绍POSIX共享内存的基本概念、用途和编程实践
25 2
|
1月前
|
缓存 Java
java使用MAT进行内存分析
java使用MAT进行内存分析
|
存储
【树和二叉树】数据结构二叉树和树的概念认识
【树和二叉树】数据结构二叉树和树的概念认识

热门文章

最新文章