Reactor 模式结合 epoll

简介: Reactor 模式结合 epoll

Reactor 模式结合 epoll

一、核心概念

  1. Reactor 模式

    • 事件驱动架构
    • 核心组件:
      • Event Demultiplexer(epoll)
      • Event Handler(事件处理器)
      • Reactor(事件循环)
  2. epoll 机制

    • Linux 高效 I/O 多路复用
    • 三种系统调用:
      • epoll_create():创建实例
      • epoll_ctl():注册事件
      • epoll_wait():等待事件

二、实现架构

+------------------+
|    Event Loop    |
+--------+---------+
         | 通过 epoll_wait 获取事件
+--------v---------+
|  Event Demux     |
| (epoll instance) |
+--------+---------+
         | 分发到处理器
+--------v---------+
|  Event Handlers  |
| (accept/read/write) 
+------------------+

三、关键实现代码

1. 创建 epoll 实例

int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
   
    perror("epoll_create1");
    exit(EXIT_FAILURE);
}

2. 注册事件

struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;  // 边缘触发模式
ev.data.fd = sockfd;

if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {
   
    perror("epoll_ctl: listen_sock");
    exit(EXIT_FAILURE);
}

3. 事件循环

#define MAX_EVENTS 64
struct epoll_event events[MAX_EVENTS];

while (1) {
   
    int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
    if (nfds == -1) {
   
        perror("epoll_wait");
        exit(EXIT_FAILURE);
    }

    for (int i = 0; i < nfds; ++i) {
   
        if (events[i].data.fd == listen_fd) {
   
            handle_accept(listen_fd);
        } else {
   
            if (events[i].events & EPOLLIN) {
   
                handle_read(events[i].data.fd);
            }
            if (events[i].events & EPOLLOUT) {
   
                handle_write(events[i].data.fd);
            }
        }
    }
}

四、关键处理函数示例

1. 接受连接

void handle_accept(int listen_fd) {
   
    struct sockaddr_in client_addr;
    socklen_t addrlen = sizeof(client_addr);

    int conn_fd;
    while ((conn_fd = accept(listen_fd, 
                           (struct sockaddr*)&client_addr,
                           &addrlen)) > 0) {
   
        set_nonblocking(conn_fd);

        struct epoll_event ev;
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = conn_fd;
        epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev);
    }

    if (errno != EAGAIN && errno != EWOULDBLOCK) {
   
        perror("accept error");
    }
}

2. 读取数据

void handle_read(int fd) {
   
    char buf[1024];
    ssize_t n;

    while ((n = read(fd, buf, sizeof(buf))) > 0) {
   
        // 处理数据逻辑
        buf[n] = '\0';
        printf("Received: %s\n", buf);

        // 修改为监听写事件
        struct epoll_event ev;
        ev.events = EPOLLOUT | EPOLLET;
        ev.data.fd = fd;
        epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev);
    }

    if (n == 0) {
     // 连接关闭
        close(fd);
    } else if (errno != EAGAIN) {
   
        perror("read error");
        close(fd);
    }
}

五、重要注意事项

  1. 非阻塞 I/O

    void set_nonblocking(int fd) {
         
        int flags = fcntl(fd, F_GETFL, 0);
        fcntl(fd, F_SETFL, flags | O_NONBLOCK);
    }
    
  2. 触发模式选择

    • 水平触发(LT,默认):数据未处理完会持续通知
    • 边缘触发(ET):只在状态变化时通知,必须搭配非阻塞 I/O
  3. 资源管理

    • 及时关闭不需要的文件描述符
    • 合理设置 epoll 事件类型
    • 处理 EAGAIN/EWOULDBLOCK 错误

六、完整示例(TCP Echo Server)

#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define PORT 8080
#define MAX_EVENTS 64

void set_nonblocking(int fd) {
   
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
   
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in addr = {
   
        .sin_family = AF_INET,
        .sin_port = htons(PORT),
        .sin_addr.s_addr = INADDR_ANY
    };

    bind(listen_fd, (struct sockaddr*)&addr, sizeof(addr));
    listen(listen_fd, SOMAXCONN);

    int epoll_fd = epoll_create1(0);
    struct epoll_event ev;
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = listen_fd;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev);

    struct epoll_event events[MAX_EVENTS];

    while (1) {
   
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);

        for (int i = 0; i < nfds; ++i) {
   
            if (events[i].data.fd == listen_fd) {
   
                // 处理新连接
                struct sockaddr_in client_addr;
                socklen_t addrlen = sizeof(client_addr);
                int conn_fd;
                while ((conn_fd = accept(listen_fd, 
                        (struct sockaddr*)&client_addr,
                        &addrlen)) > 0) {
   
                    set_nonblocking(conn_fd);
                    struct epoll_event ev_conn;
                    ev_conn.events = EPOLLIN | EPOLLET;
                    ev_conn.data.fd = conn_fd;
                    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev_conn);
                }
            } else {
   
                // 处理数据
                if (events[i].events & EPOLLIN) {
   
                    char buf[1024];
                    ssize_t n;
                    while ((n = read(events[i].data.fd, buf, sizeof(buf))) > 0) {
   
                        write(events[i].data.fd, buf, n); // Echo 回显
                    }

                    if (n == 0 || (n < 0 && errno != EAGAIN)) {
   
                        close(events[i].data.fd);
                    }
                }
            }
        }
    }

    close(epoll_fd);
    close(listen_fd);
    return 0;
}
目录
相关文章
|
11月前
|
关系型数据库 应用服务中间件 nginx
Docker一键安装中间件(RocketMq、Nginx、MySql、Minio、Jenkins、Redis)
本系列脚本提供RocketMQ、Nginx、MySQL、MinIO、Jenkins和Redis的Docker一键安装与配置方案,适用于快速部署微服务基础环境。
|
8月前
|
Ubuntu 关系型数据库 MySQL
MySQL源码编译安装
本文详细介绍了MySQL 8.0及8.4版本的源码编译安装全过程,涵盖用户创建、依赖安装、cmake配置、编译优化等步骤,并提供支持多Linux发行版的一键安装脚本,适用于定制化数据库部署需求。
1999 4
MySQL源码编译安装
|
8月前
|
Ubuntu 关系型数据库 MySQL
MySQL二进制包安装
本文详细介绍了在多种Linux系统上通过二进制包安装MySQL 8.0和8.4版本的完整过程,涵盖用户创建、glibc版本匹配、程序解压、环境变量配置、初始化数据库及服务启动等步骤,并提供支持多发行版的一键安装脚本,助力高效部署MySQL环境。
1197 4
MySQL二进制包安装
|
8月前
|
安全 关系型数据库 MySQL
MySQL包安装 -- SUSE系列(离线RPM包安装MySQL)
本文详细介绍在openSUSE系统上通过离线RPM包安装MySQL 8.0和8.4版本的完整步骤,包括下载地址、RPM包解压、GPG密钥导入、使用rpm或zypper命令安装及服务启动验证,涵盖初始密码获取与安全修改方法,适用于无网络环境下的MySQL部署。
819 3
MySQL包安装 -- SUSE系列(离线RPM包安装MySQL)
|
8月前
|
关系型数据库 MySQL Linux
MySQL包安装 -- SUSE系列(SUSE资源库安装MySQL)
本文介绍了在openSUSE系统上通过SUSE资源库安装MySQL 8.0和8.4版本的完整步骤,包括配置国内镜像源、安装MySQL服务、启动并验证运行状态,以及修改初始密码等操作,适用于希望在SUSE系列系统中快速部署MySQL的用户。
783 3
MySQL包安装 -- SUSE系列(SUSE资源库安装MySQL)
|
8月前
|
NoSQL 算法 Redis
【Docker】(3)学习Docker中 镜像与容器数据卷、映射关系!手把手带你安装 MySql主从同步 和 Redis三主三从集群!并且进行主从切换与扩容操作,还有分析 哈希分区 等知识点!
Union文件系统(UnionFS)是一种**分层、轻量级并且高性能的文件系统**,它支持对文件系统的修改作为一次提交来一层层的叠加,同时可以将不同目录挂载到同一个虚拟文件系统下(unite several directories into a single virtual filesystem) Union 文件系统是 Docker 镜像的基础。 镜像可以通过分层来进行继承,基于基础镜像(没有父镜像),可以制作各种具体的应用镜像。
852 6
|
9月前
|
安全 关系型数据库 MySQL
CentOS 7 yum 安装 MySQL教程
在CentOS 7上安装MySQL 8,其实流程很清晰。首先通过官方Yum仓库来安装服务,然后启动并设为开机自启。最重要的环节是首次安全设置:需要先从日志里找到临时密码来登录,再修改成你自己的密码,并为远程连接创建用户和授权。最后,也别忘了在服务器防火墙上放行3306端口,这样远程才能连上。
2117 16
|
8月前
|
运维 Ubuntu 关系型数据库
MySQL包安装 -- Debian系列(Apt资源库安装MySQL)
本文介绍了在Debian系列系统(如Ubuntu、Debian 11/12)中通过APT仓库安装MySQL 8.0和8.4版本的完整步骤,涵盖添加官方源、配置国内镜像、安装服务及初始化设置,并验证运行状态,适用于各类Linux运维场景。
2439 0
MySQL包安装 -- Debian系列(Apt资源库安装MySQL)
|
8月前
|
Oracle 关系型数据库 MySQL
MySQL包安装 -- RHEL系列(离线RPM包安装MySQL)
本文详细介绍在Rocky、CentOS、AlmaLinux、openEuler等主流Linux系统上,通过离线RPM包安装MySQL 8.0和8.4版本的完整步骤,涵盖下载、依赖处理、rpm/yum安装、服务启动、密码设置等关键环节,适用于多种企业级环境部署需求。
2504 0
MySQL包安装 -- RHEL系列(离线RPM包安装MySQL)

推荐镜像

更多