epoll高度封装reactor,几乎所有可见服务器的底层框架

简介: epoll高度封装reactor,几乎所有可见服务器的底层框架

前言

    亲爱的各位友友们, 小杰从今天开始就自己网络服务器开发方向所学的东西,边学边写随笔,这个系列从epoll 封装  reactor 作为开始, 从0 到 1,小杰也是一样的从0 到 1,小杰之前学习网络高级IO的时候,学会了select poll 和 epoll 等支持IO多路复用的系统调用,但是都是处在很浅显的部分.   做过一部分练习,也是根据接口来封装出最简单的服务器,但是这些都没有借鉴过源码的精华


    所写的东西几乎都是根据自己的理解来写,但是小杰发现封装性不强,而且感觉写的东西很散,没有框架性,然后小杰为了想要走服务器开发方向,于是在网上找了一家机构进行系统的学习。 之后小杰会将所学尽数写成博文随笔,跟各位博友们相互分享讨论技术。   如果您看完小杰的博文觉得有所问题,请在评论区中给出您宝贵的意见,小杰会万分的感谢,  如果您觉得系列对自己有所帮助,麻烦关注下小杰,让我们共同学习进步

    reactor是什么,如何理解?

      reactor是一种设计模式, 是服务器的重要模型, 框架:   是一种事件驱动的反应堆模式, 高效的事件处理模型


      reactor 反应堆:  事件来了,执行,事件类型可能不尽相同,所以我们需要提前注册好不同的事件处理函数.           事件到来就由  epoll_wait  获取同时到来的多个事件,并且根据数据的不同类型将事件分发给事件处理机制 (事件处理器), 也就是我们提前注册的哪些接口函数


      思考reactor模型的设计思想和思维方式:    它需要的是事件驱动,相应的事件发生,我们需要根据事件自动的调用相应的函数,所以我们需要提前注册好处理函数的接口到reactor中, 函数是由reactor去调用的,而不是再主函数中直接进行调用的, 所以我需要使用回调函数.     --------   本质:函数指针


      reactor中的  IO  使用的是select poll  epoll 这种多路复用IO,    以便提高 IO 事件的处理能力,提高IO事件处理效率,支持更高的并发    


      reactor所需组件流程分析

      Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的 I/O 事件注册到一个 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上; 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。

      组件

      • 多路复用器 :由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用

      • 事件分发器 :将多路复用器中返回的就绪事件分到对应的处理函数中,分发给事件处理器

      • 事件处理器 :处理对应的IO事件

      流程

      注册事件 和 对应的事件处理器


      多路复用器等待事件到来


      事件到来,激发事件分发器分发事件到对应的处理器


      事件处理器处理事件,然后注册新的事件,   (比如处理读事件,处理完成之后需要将其设置为写事件再注册,因为读取之后我们需要针对业务需求进行数据处理,之后将其send 回去响应客户端结果,所以自然需要改成写事件,也就需要从新注册)

      如何将epoll的IO驱动封装成reactor事件反应堆驱动

      其实现在流程还有运作方式已经清楚了,然后关键在于这个封装上了,IO事件fd应该如何封装,reactor又应该如何封装


      首先事件我们需要接口API, 为了后序可以使用reactor进行调用api函数, 然后 fd 肯定也是需要的,然后为了便于数据的临时存储我们需要用户态的recvbuffer 和 sendbuffer, 然后用户态的两个缓冲区中数据所占的大小我们也需要封装进去,  why? 因为 我们 send 和 recv的时候都需要传入这两个参数.  于是这样一分析大体框架出来了

      这个回调函数我们应该如何设置? 才能符合我们后序的需求?


      首先我们肯定需要传入的是  fd 作为参数,   然后我们需要传入事件类型events   还有我们需要传入sockitem 结构体指针,   因为如果是读IO事件我们需要将从客户端读取的数据写入到sockitem的处在用户空间的recvbuffer中去, 以及如果是写IO事件我们需要将sockitem的处在用户空间的sendbuffer中的数据写回客户端


      然后针对返回值我们设置为int类型即可, 所以接口设计为了如下结果,

      然后就是针对reactor的封装了

      首先我们肯定需要一个epoll句柄,所以epfd肯定需要封装进去,其次我们需要一个容器存储触发的IO事件,至此我们应该设置一个 sruct epoll_event events[512];在其中存储触发的IO事件,也就是将所有需要的全局数据封装成reactor

      reactor分块分析实现

      • 注册事件处理器部分流程

      • 事件分发器分发事件给对应的处理器

      • 各种具体的事件处理器的分析

      accept_cb :  新连接到来事件处理器

      recv_cb : 处理读事件的处理器

      send_cb  写事件处理器

      reactor整体代码以及测试结果


      #include <stdio.h>
      #include <stdlib.h>
      #include <unistd.h>
      #include <sys/types.h>
      #include <sys/socket.h>
      #include <string.h>
      #include <arpa/inet.h>
      #include <sys/epoll.h>
      #include <errno.h>
      #include <fcntl.h>
      typedef struct sockaddr SA;
      #define BUFFSIZE 1024
      struct sockitem {
        int sockfd;
        //事件处理器,处理函数回调接口
        int (*callback)(int fd, int events, void* arg);
        //读写函数
        char recvbuffer[BUFFSIZE];
        char sendbuffer[BUFFSIZE];
        //读写字节数
        int rlen;
        int slen;
      };
      struct reactor {
        int epfd;
        struct epoll_event events[512];
      };
      //定义全局的eventloop --> 事件循环
      struct reactor* eventloop = NULL;
      //申明这些事件处理器函数
      int recv_cb(int fd, int events, void *arg);
      int accept_cb(int fd, int events, void* arg);
      int send_cb(int fd, int evnts, void* arg);
      int recv_cb(int fd, int events, void *arg) {
        struct sockitem* si = (struct sockitem*)arg;
        struct epoll_event ev;//后面需要 
        //处理IO读事件
        int ret = recv(fd, si->recvbuffer, BUFFSIZE, 0);
        if (ret < 0) {
          if (errno == EAGAIN || errno == EWOULDBLOCK) { //
            return -1;
          } else {
          }
          //出错了,从监视IO事件红黑树中移除结点,避免僵尸结点
          ev.events = EPOLLIN;
          epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
          close(fd);
          free(si);   
        } else if (ret == 0) {
          //对端断开连接
          printf("fd %d disconnect\n", fd);
          ev.events = EPOLLIN;
          epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
          //close同一断开连接,避免客户端大量的close_wait状态
          close(fd);
          free(si); 
        } else {
          //打印接收到的数据
          printf("recv: %s, %d Bytes\n", si->recvbuffer, ret);
          //设置sendbuffer
          si->rlen = ret;
          memcpy(si->sendbuffer, si->recvbuffer, si->rlen);
          si->slen = si->rlen;
          //注册写事件处理器
          struct epoll_event ev;
          ev.events = EPOLLOUT | EPOLLET;
          si->sockfd = fd;
          si->callback = send_cb;
          ev.data.ptr = si;
          epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
        }
      }
      int accept_cb(int fd, int events, void* arg) {
        //处理新的连接。 连接IO事件处理流程
        struct sockaddr_in cli_addr;
        memset(&cli_addr, 0, sizeof(cli_addr));
        socklen_t cli_len = sizeof(cli_addr);
        int cli_fd = accept(fd, (SA*)&cli_addr, &cli_len);
        if (cli_fd <= 0) return -1;
        char cli_ip[INET_ADDRSTRLEN] = {0}; //存储cli_ip
        printf("Recv from ip %s at port %d\n", inet_ntop(AF_INET, &cli_addr.sin_addr, cli_ip, sizeof(cli_ip)),
          ntohs(cli_addr.sin_port));
        //注册接下来的读事件处理器
        struct epoll_event ev;
        ev.events = EPOLLIN | EPOLLET;
        struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
        si->sockfd = cli_fd;
        si->callback = recv_cb;//设置事件处理器
        ev.data.ptr = si;
        epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, cli_fd, &ev);
        return cli_fd;
      }
      int send_cb(int fd, int events, void* arg) {
        //处理send IO事件
        struct sockitem *si = (struct sockitem*)arg;
        send(fd, si->sendbuffer, si->slen, 0); 
        //再次注册IO读事件处理器
        struct epoll_event ev;
        ev.events = EPOLLIN | EPOLLET;
        si->sockfd = fd;
        si->callback = recv_cb;//设置事件处理器
        ev.data.ptr = si;
        epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
      }
      int main(int argc, char* argv[]) {
        if (argc != 2) {
          fprintf(stderr, "uasge: %s <port>", argv[0]);
          return 1;
        }
        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
        struct sockaddr_in serv_addr;
        int port = atoi(argv[1]);
        //确定服务端协议地址簇
        memset(&serv_addr, 0, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_addr.s_addr = INADDR_ANY;
        serv_addr.sin_port = htons(port);
        //进行绑定
        if (-1 == bind(sockfd, (SA*)&serv_addr, sizeof(serv_addr))) {
          fprintf(stderr, "bind error");
          return 2;
        }
        if (-1 == listen(sockfd, 5)) {
          fprintf(stderr, "listen error");
          return 3;
        }
        //init eventloop
        eventloop = (struct reactor*)malloc(sizeof(struct reactor));
        //创建epoll句柄.
        eventloop->epfd = epoll_create(1);
        //注册建立连接IO事件处理函数
        struct epoll_event ev;
        ev.events = EPOLLIN;
        struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
        si->sockfd = sockfd;
        si->callback = accept_cb;//设置事件处理器
        ev.data.ptr = si;
        //将监视事件加入到reactor的epfd中
        epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev);
        while (1) {
          //多路复用器监视多个IO事件
          int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1);
          if (nready < -1) {
            break;
          }
          int i = 0;
          //循环分发所有的IO事件给处理器
          for (i = 0; i < nready; ++i) {
            if (eventloop->events[i].events & EPOLLIN) {
              struct sockitem* si = (struct sockitem*)eventloop->events[i].data.ptr;
              si->callback(si->sockfd, eventloop->events[i].events, si);
            } 
            if (eventloop->events[i].events & EPOLLOUT) {
              struct sockitem* si = (struct sockitem*)eventloop->events[i].data.ptr;
              si->callback(si->sockfd, eventloop->events[i].events, si);
            }
          }
        }
        return 0;
      }

      总结本章

      本章的核心是实现了一个网络经典模型,设计模式reactor 事件循环,事件驱动的反应堆模式.


      组件: 事件处理器 :回调函数callback    事件分发器  (将事件分发给对应的事件处理器), 多路复用器 (select poll epoll 等操作系统提供的多路复用技术)


      流程:

      注册事件处理器,和书写事件处理函数

      多路复用监视多路IO事件的来临

      将触发的IO事件分发到对应的事件处理器中进行处理



      相关文章
      |
      7月前
      |
      存储 监控 Java
      【深度挖掘Java性能调优】「底层技术原理体系」深入探索Java服务器性能监控Metrics框架的实现原理分析(Counter篇)
      【深度挖掘Java性能调优】「底层技术原理体系」深入探索Java服务器性能监控Metrics框架的实现原理分析(Counter篇)
      164 0
      |
      7月前
      |
      监控 算法 Java
      【深度挖掘Java性能调优】「底层技术原理体系」深入探索Java服务器性能监控Metrics框架的实现原理分析(Gauge和Histogram篇)
      【深度挖掘Java性能调优】「底层技术原理体系」深入探索Java服务器性能监控Metrics框架的实现原理分析(Gauge和Histogram篇)
      97 0
      |
      2月前
      |
      Python
      Flask学习笔记(二):基于Flask框架上传图片到服务器端并原名保存
      关于如何使用Flask框架上传图片到服务器端并以其原名保存的教程。
      83 1
      |
      2月前
      |
      Python
      Flask学习笔记(三):基于Flask框架上传特征值(相关数据)到服务器端并保存为txt文件
      这篇博客文章是关于如何使用Flask框架上传特征值数据到服务器端,并将其保存为txt文件的教程。
      32 0
      Flask学习笔记(三):基于Flask框架上传特征值(相关数据)到服务器端并保存为txt文件
      |
      2月前
      |
      分布式计算 Hadoop
      Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
      Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
      50 1
      |
      2月前
      |
      Web App开发 JavaScript 前端开发
      使用Node.js和Express框架构建Web服务器
      使用Node.js和Express框架构建Web服务器
      |
      2月前
      |
      存储 SQL 消息中间件
      Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
      Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
      51 0
      |
      4月前
      |
      JSON API 数据格式
      基于服务器响应的实时天气数据进行JSON解析的详细代码及其框架
      【8月更文挑战第25天】这段资料介绍了一个使用Python从服务器获取实时天气数据并解析JSON格式数据的基本框架。主要分为三个部分:一是安装必要的`requests`库以发起HTTP请求获取数据,同时利用Python内置的`json`库处理JSON数据;二是提供了具体的代码实现,包括获取天气数据的`get_weather_data`函数和解析数据的`parse_weather_data`函数;三是对代码逻辑进行了详细说明,包括如何通过API获取数据以及如何解析这些数据来获取温度和天气描述等信息。用户需要根据实际使用的天气API调整代码中的API地址、参数和字段名称。
      |
      4月前
      |
      缓存 监控 中间件
      构建高效的Go语言Web服务器:基于Fiber框架的性能优化实践
      在追求极致性能的Web开发领域,Go语言(Golang)凭借其高效的并发处理能力、垃圾回收机制及简洁的语法赢得了广泛的青睐。本文不同于传统的性能优化教程,将深入剖析如何在Go语言环境下,利用Fiber这一高性能Web框架,通过精细化配置、并发策略调整及代码层面的微优化,构建出既快速又稳定的Web服务器。通过实际案例与性能测试数据对比,揭示一系列非直觉但极为有效的优化技巧,助力开发者在快节奏的互联网环境中抢占先机。
      |
      7月前
      |
      资源调度
      在 Next.js 中使用自定义服务器框架进行服务器端渲染
      在 Next.js 中使用自定义服务器框架进行服务器端渲染