Reactor学习,从数据结构,内存分配,概念上分析

简介: Reactor学习,从数据结构,内存分配,概念上分析

@TOC

1 数据结构

1.1 节点

节点描述的是每一个tcp connection要处理的信息,粗略的用下面的结构体出抽象

 typedef struct zv_connect_s{
   
   
    int fd;
    ZVCALLBACK cb;
    char rbuffer[BUFFER_LENGTH];  // channel
    int rc;   // read buffer count
    int count;
    char wbuffer[BUFFER_LENGTH]; // channel
    int wc;  // wirte buffer count    
} zv_connect_t;

1.2 链表与链表的内容(数组)

typedef struct zv_connblock_s {
   
     // a list of connects
    zv_connect_t *block; //   这里是指针,是因为要动态分配内存,把它理解为数组
    struct zv_connblocks_s *next;   //  next指向自己===> 链表
} zv_connblock_t;

可以参照下面的图片理解:
长方形相当于是一个block 在这里插入图片描述每一个方框都是一个zv_connect_t 的集合(数组),末尾是链表的尾指针。

1.3 reactor 抽象定义

reactor可以用下面的结构体去抽象,它包含epoll的fd, zv_connect_t *block的数量(需要动态分配内存),然后它指向 zv_connblock_s链表的表头

typedef struct zv_reactor_s {
   
   
    int epfd;
    int blkcnt;

    zv_connblock_t *blockheader; //  这里是指针,指向block的头
} zv_reactor_t;

现在初始化它,假设有1024个block

zv_reactor_t *reactor;
reactor->blockheader = malloc(sizeof(zv_connblock_t) + 1024* (sizeof(zv_connect_t) + sizeof(reactor->blockheader->next)));
// 分配内存详解参见:https://blog.csdn.net/qq43645149/article/details/130300102
    if (reactor->blockheader == NULL) return -1;
reactor->blockheader->block = (zv_connect_t*)(reactor->blockheader + 1);

2 reactor模式

把网络和业务分离,有如下特点:

  1. 高并发:通过非阻塞IO和事件驱动的方式,可以同时处理多个请求,提高并发处理能力。

  2. 可扩展性:由于避免了阻塞,因此可以很容易地增加更多的线程或服务器节点,以支持更大规模的应用程序。

  3. 灵活性:Reactor模式可以适应不同的应用场景,如多线程、单线程、多进程等,可以根据需要进行灵活配置。

  4. 易于维护:Reactor模式将各个组件分离开来,使得代码结构清晰、易于维护。

2.1 这么好的模式,怎么实现它呢?

2.1.1 联系前面定义的数据结构

定位规则

比如第n个fd, 用它除以1024,以得到的商确定它在第几个block, 以余数确定它是第几个block中的第几个元素(这里是 zv_connect_s)

回到前面看zv_connect_s里面有什么呢?---> callback, rbuffer, wbuffer, 以及相应的size。
也就是说,下图中的每个block块里面的节点都可以绑定 callback, rbuffer,rc, wbuffer, wc,一个节点分裂处理好几个元素,
于是这就叫做reactor( 反应堆)模式吧。

图解:

在这里插入图片描述

2.1.2 为各种fd安家落户,安排相应的任务

上面的block可看成是小区,每个小区里面都有1024户人家,当有成千上万个fd的时候,就可以根据fd, 读写事件等注册相应的任务(回调函数)
reactor模式用在服务端,
从TCP三次握手的流程看,listen, accept 都是发生在服务端,对应的都是读事件,

listen之后,注册读事件的业务 accept_cb,
accept之后,注册读事件的业务 read_cb
执行 read_cb 的时候,可以注册写事件的业务send_cb
执行 send_cb 的时候,可以注册读事件的业务read_cb 。。。。。(事件循环)

==注册事件的时候事件(cb)与 fd 绑定==
这样reactor的事件与业务分离的模式就有概念了。
这里reactor是站在服务器的角度去处理问题的,处理(cb)了客户端的消息后,会有一个回馈(又一个cb)发给客户端
cb的执行的地方是业务,注册事件是事件开始的地方。

3 代码部分

3.1 初始化部分代码


//printf("sockfd: %d\n", sockfd);
zv_reactor_t reactor;
zv_init_reactor(&reactor);  // epoll 



int port = atoi(argv[1]);

int i = 0;
for (i = 0;i < 1;i ++) {
   
   
    int sockfd = init_server(port+i);  //
    set_listener(&reactor, sockfd, accept_cb); // 
}

3.2 事件循环部分

struct epoll_event events[EVENT_LENGTH] = {
   
   0};

    while (1) {
   
    //mainloop, event driver

        int nready = epoll_wait(reactor.epfd, events, EVENT_LENGTH, -1);

        int i = 0;
        for (i = 0;i < nready;i ++) {
   
     // 这里只用处理读写事件,业务部分在这里已经看不到了

            int connfd = events[i].data.fd;
            zv_connect_t *conn = zv_connect_idx(&reactor, connfd);

            if (events[i].events & EPOLLIN) {
   
   

                conn->cb(connfd, events[i].events, &reactor);

            }

            if (events[i].events & EPOLLOUT) {
   
   

                conn->cb(connfd, events[i].events, &reactor);

            }
        }

    }

3.2 事件注册

listen 注册读事件

int set_listener(zv_reactor_t *reactor, int fd, ZVCALLBACK cb) {
   
   

    if (!reactor || !reactor->blockheader) return -1;

    reactor->blockheader->block[fd].fd = fd;
    reactor->blockheader->block[fd].cb = cb;

    struct epoll_event ev;
    ev.events = EPOLLIN;                                                   //  注册读事件
    ev.data.fd = fd;

    epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, fd, &ev);

}

accept 注册读事件

int accept_cb(int fd, int events, void *arg) {
   
   

    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
    if (clientfd < 0) {
   
   
        printf("accept errno: %d\n", errno);
        return -1;
    }

    // 

    printf(" clientfd: %d\n", clientfd);

    zv_reactor_t *reactor = (zv_reactor_t*)arg;
    zv_connect_t *conn = zv_connect_idx(reactor, clientfd);

    conn->fd = clientfd;
    conn->cb = recv_cb;                                     //  事件对应的回调
    conn->count = BUFFER_LENGTH;

    struct epoll_event ev;
    ev.events = EPOLLIN;                                         //  注册读事件
    ev.data.fd = clientfd;
    epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev);

}

业务处理, 注册读写事件

int send_cb(int fd, int event, void *arg) {
   
   

    //printf("send_cb\n");

    zv_reactor_t *reactor = (zv_reactor_t*)arg;
    zv_connect_t *conn = zv_connect_idx(reactor, fd);


    zv_http_response(conn);


    //echo 
    send(fd, conn->wbuffer, conn->wc, 0);  // send header

#if 1
    if (conn->enable_sendfile) {
   
    // sendbody

        int filefd = open(conn->resource, O_RDONLY);
        if (filefd == -1) {
   
   
            printf("errno: %d\n", errno);
            return -1;
        }

        struct stat stat_buf;
        fstat(filefd, &stat_buf);

        int ret = sendfile(fd, filefd, NULL, stat_buf.st_size);  // sendbody
        if (ret == -1) {
   
   
            printf("errno: %d\n", errno);
        }

        close(filefd);

    }

#endif


    conn->cb = recv_cb;                                                        //  事件对应的回调

    //
    struct epoll_event ev;
    ev.events = EPOLLIN; //                                        //  注册读事件
    ev.data.fd = fd;
    epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);

}


int recv_cb(int fd, int event, void *arg) {
   
   

    zv_reactor_t *reactor = (zv_reactor_t*)arg;
    zv_connect_t *conn = zv_connect_idx(reactor, fd);


    int ret = recv(fd, conn->rbuffer+conn->rc, conn->count, 0);
    if (ret < 0) {
   
   

    } else if (ret == 0) {
   
   

        //
        conn->fd = -1;
        conn->rc = 0;
        conn->wc = 0;

        //
        epoll_ctl(reactor->epfd, EPOLL_CTL_DEL, fd, NULL);

        close(fd);

        return -1;
    } else {
   
   

        conn->rc += ret;
        printf("rbuffer: %s, ret: %d\n", conn->rbuffer, conn->rc);

        // --> echo
        //memcpy(conn->wbuffer, conn->rbuffer, conn->rc);
        //conn->wc = conn->rc;

        zv_http_request(conn);

        conn->cb = send_cb;                                        //  事件对应的回调

        //
        struct epoll_event ev;
        ev.events = EPOLLOUT;                                       //  注册写事件
        ev.data.fd = fd;
        epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd, &ev);


    }

}

文章参考与<零声教育>的C/C++linux服务期高级架构系统教程学习:链接

相关文章
|
4天前
|
存储 定位技术 开发工具
Android 开发前的设计,Android之内存泄漏调试学习与总结
Android 开发前的设计,Android之内存泄漏调试学习与总结
|
6天前
|
存储 Arthas 监控
JVM工作原理与实战(三十):堆内存状况的对比分析
JVM作为Java程序的运行环境,其负责解释和执行字节码,管理内存,确保安全,支持多线程和提供性能监控工具,以及确保程序的跨平台运行。本文主要介绍了堆内存状况的对比分析、产生内存溢出的原因等内容。
15 0
|
6天前
|
存储 索引 Python
python数据结构知识学习
【5月更文挑战第6天】Python提供四种核心数据结构:列表(List)——可变有序集合,支持索引和切片;元组(Tuple)——不可变有序集合;字典(Dictionary)——键值对结构,通过键访问值;集合(Set)——无序不重复元素集合,支持数学运算。此外,Python允许自定义数据结构,如链表、树、图,以适应不同问题需求。
19 0
|
6天前
|
存储 分布式数据库
【数据结构】树和二叉树堆(基本概念介绍)
【数据结构】树和二叉树堆(基本概念介绍)
24 6
|
6天前
|
缓存 Linux
linux性能分析之内存分析(free,vmstat,top,ps,pmap等工具使用介绍)
这些工具可以帮助你监视系统的内存使用情况、识别内存泄漏、找到高内存消耗的进程等。根据具体的问题和需求,你可以选择使用其中一个或多个工具来进行内存性能分析。注意,内存分析通常需要综合考虑多个指标和工具的输出,以便更好地理解系统的行为并采取相应的优化措施。
34 6
|
6天前
|
机器学习/深度学习 分布式计算 数据处理
Spark是一个基于内存的通用数据处理引擎,可以进行大规模数据处理和分析
【5月更文挑战第2天】Spark是一个基于内存的通用数据处理引擎,可以进行大规模数据处理和分析
25 3
|
6天前
|
存储 索引
操作数栈的字节码指令执行分析
操作数栈的字节码指令执行分析
|
6天前
|
监控 算法 测试技术
【Go语言专栏】Go语言的性能优化与内存分析
【4月更文挑战第30天】本文探讨了Go语言的性能优化策略和内存分析方法。性能优化原则包括基准测试、分析瓶颈、避免过早优化和持续监控。优化策略涉及减少内存分配、避免内存逃逸、利用并发、优化算法和数据结构以及减少系统调用。内存分析借助于Go的`pprof`工具、内存分配跟踪和第三方工具,以发现内存泄漏和管理问题。通过这些方法,开发者能提升Go程序效率和资源利用率。
|
6天前
|
存储 算法 Java
22个常用数据结构实现与原理分析
前两天V哥跟一个老学员吃饭,聊起面试大厂的事,说为啥大厂面试第一看基本条件,第二就是考数据结构算法,其他高阶的内容会比较少,最近V哥也在跟大厂对接这一块业务,了解得多一些,这是因为考察基本功能力被放到了重要位置,大厂认为硬性条件,比如学历过关,基本功够扎实,那对于实际工作用的上层技能,内部培养就好,也就是相比你掌握了多少多少牛逼的高阶技术,他们更在乎你的基本功,所以,进大厂,基本功必须要搞稳,否则白扯,今天 V 哥把总结好的22个常用的数据结构实现原理,和示例分析分享给大家,希望对你有帮助,觉得内容有收获,请帮忙转发给更多需求的朋友,共同进步。
|
6天前
|
存储 算法
【数据结构与算法】8.二叉树的基本概念|前序遍历|中序遍历|后序遍历
【数据结构与算法】8.二叉树的基本概念|前序遍历|中序遍历|后序遍历