开启极速之旅:了解Linux网络设计中的Reactor模型与百万级并发实践

简介: 本文将带您进入Linux网络设计的奇妙世界,着重介绍了Reactor模型和百万级并发的实践经验。在快节奏的现代互联网环境下,高性能的网络应用对于提供卓越的用户体验至关重要。通过深入探索Reactor模型,我们将揭示其在构建高并发应用中的关键作用。从基本概念到实际应用,您将了解到如何通过Reactor模型优化网络通信,实现快速响应和高吞吐量。此外,我们将分享一些实践中的经验和技巧,包括事件驱动编程、多线程与多进程处理、负载均衡等方面,以帮助您更好地应对百万级并发挑战。

一、Reactor网络模型简介

什么是并发:网络并发,通俗的讲就是服务器可以承载的客户端数量,即服务器可以稳定保证客户端同时接入的数量。

Reactor模型开发效率比直接使用IO多路复用要高,它一般是单线程的,设计目标是希望一个线程使用CPU的全部资源;带来的优点是,在每个事件处理中很多时候不需要考虑共享资源的互斥访问。

Reactor模式是处理并发IO比较常见的模式,用于同步IO,核心思想是将所有要处理的IO事件注册到一个中心IO多路复用器上,同时主线程或进程阻塞在IO多路复用器上;一旦有事件到来或准备就绪,多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。

二、Reactor的优点

1、响应快;不必为单个同步事件阻塞,虽然Reactor本身依然是同步的。
2、编程相对简单;可以最大程度的避免复杂的多线程及同步问题,尽可能的避免多线程、多进程的切换开销。
3、可扩展性;可通过增加Reactor实例个数,充分利用CPU资源。
4、高复用性;Reactor模型本身与事件处理逻辑无关,具有很高的复用性。

三、实现过程

step 1:定义Reactor模型相关结构体

reactor数据结构设计图如下:
image.png

结构说明:以fd作为索引,存放在block中;当一个fd到来时,通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于10000%1024=784。这样就可以找到fd对应的数据存放地址item。

数据结构的代码实现如下:

struct ntyevnt{
   
   
    int fd;//事件fd
    char buffer[BUFFER_LENGTH];//缓冲区
    int length;//缓存长度
    int status;//状态

    int events;//事件
    void *arg;//callback的参数
    int(*callback)(int fd, int events, void* arg);//回调函数
};
struct eventblock{
   
   
    struct *sock_items;//事件集合
    struct eventblock *next;//指向下一个内存块
};
struct reactor{
   
   
    int epfd;//epoll的文件描述符
    int blkcnt;//事件块的数量
    struct eventblock *evtblk;//事件块的起始地址
};

step 2:实现Reactor容器初始化功能

我们这里使用epoll作为IO多路复用器。
思路:初始化reactor内存块,避免脏数据;创建events和block并初始化,将events添加到block中,将block添加到reactor的链表中管理。

int ntyreactor_init(struct ntyreactor *reactor)
{
   
   
    if (reactor == NULL)
        return -1;
    memset(reactor, 0, sizeof(struct ntyreactor));
    //创建epoll,作为IO多路复用器
    reactor->epfd = epoll_create(1);
    if (reactor->epfd <= 0)
    {
   
   
        printf("create epfd in %s error %s\n", __func__, strerror(errno));
        return -2;
    }

    // 创建事件集
    struct ntyevnt *events = (struct ntyevnt *)malloc(MAX_EPOLL_EVENTS * sizeof(struct ntyevnt));
    if (events == NULL)
    {
   
   
        printf("create ntyevnt in %s error %s\n", __func__, strerror(errno));
        close(reactor->epfd);
        return -3;
    }
    memset(events, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));

    //创建事件内存块
    struct eventblock *block = (struct eventblock*)malloc(sizeof(struct eventblock));
    if (block == NULL)
    {
   
   
        printf("create eventblock in %s error %s\n", __func__, strerror(errno));
        free(events);
        close(reactor->epfd);
        return -4;
    }
    block->events = events;
    block->next = NULL;

    // reactor初始化赋值
    reactor->evblks=block;
    reactor->blkcnt = 1;

    return 0;
}

step 3:实现socket初始化功能

定义成一个函数,方便初始化多个监听端口。

int init_sock(short port)
{
   
   
    int ret = 0;
    int fd = socket(AF_INET, SOCK_STREAM, 0);//创建套字接
    if (fd == -1)
    {
   
   
        printf("create socket in %s error %s\n", __func__, strerror(errno));
        return -1;
    }
    ret=fcntl(fd, F_SETFL, O_NONBLOCK);//设置非阻塞
    if (ret == -1)
    {
   
   
        printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
        return -1;
    }

    // 设置属性
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;// IPV4
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(port);

    // 绑定
    ret = bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
    if (ret == -1)
    {
   
   
        printf("bind() in %s error %s\n", __func__, strerror(errno));
        return -1;
    }

    //监听
    ret = listen(fd, 20);
    if (ret < 0)
    {
   
   
        printf("listen failed : %s\n", strerror(errno));
        return -1;
    }

    printf("listen server port : %d\n", port);

    return fd;
}

step 4:实现Reactor动态扩容功能

为了实现高并发,服务器需要监听多个端口。当高并发时需要reactor容器进行扩容管理。
核心思路:找到链表的末端,分别为events和block分配内存并初始化,将events添加到block中,将block添加到reactor的链表中管理。

int ntyreactor_alloc(struct ntyreactor *reactor)
{
   
   
    if (reactor == NULL)
        return -1;
    if (reactor->evblks == NULL)
        return -1;

    //找到链表末端
    struct eventblock *blk = reactor->evblks;
    while (blk->next != NULL)
        blk = blk->next;

    // 创建事件集
    struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (evs == NULL)
    {
   
   
        printf("ntyreactor_alloc ntyevent failed\n");
        return -2;
    }
    memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));

    // 创建事件块
    struct eventblock *block = (struct eventblock*)malloc(sizeof(struct eventblock));
    if (block == NULL)
    {
   
   
        printf("ntyreactor_alloc eventblock failed\n");
        return -3;
    }
    block->events = evs;
    block->next = NULL;

    //实现扩容
    blk->next = block;
    reactor->blkcnt++;

    return 0;
}

step 5:实现Reactor索引功能

思路:通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。
例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于10000%1024=784。这样就可以找到fd对应的数据存放地址item。

struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd)
{
   
   
    if (reactor == NULL)
        return NULL;
    if (reactor->evblks == NULL)
        return NULL;
    // fd所在block序号
    int blkidx = sockfd / MAX_EPOLL_EVENTS;
    while (blkidx >= reactor->blkcnt)
    {
   
   
        // 扩容
        ntyreactor_alloc(reactor);
    }

    //找到fd对应block的位置
    int i = 0;
    struct eventblock *blk = reactor->evblks;
    while (i++ != blkidx && blk != NULL)
    {
   
   
        blk = blk->next;
    }

    // 返回item 地址
    return &blk->events[sockfd%MAX_EPOLL_EVENTS];
}

step 6:实现设置事件信息功能

将事件的相关信息保存到数据结构中。主要实现填充关键信息到event结构体中。

void nty_event_set(struct ntyevent *ev,int fd,NCALLBACK callback,void *arg)
{
   
   
    ev->fd = fd;
    ev->events = 0;
    ev->callback = callback;
    ev->arg = arg;
}

step 7:实现IO事件监听功能

这里使用epoll作为IO多路复用器,将事件添加到epoll中监听。
思路:主要是epoll_ctl操作,将事件添加到reactor的event结构体中。

int nty_event_add(int epfd, int events, struct ntyevent *ev)
{
   
   
    // 设置epoll事件信息
    struct epoll_event ep_ev = {
   
    0,{
   
   0} };
    ep_ev.data.ptr = ev;
    ep_ev.events = ev->events = events;

    // 判断,设置epfd的操作模式
    int op;
    if (ev->status == 1)
        op = EPOLL_CTL_MOD;
    else
    {
   
   
        op = EPOLL_CTL_ADD;
        ev->status = 1;
    }


    // 设置epoll
    int ret = epoll_ctl(epfd, op, ev->fd, &ep_ev);
    if (ret < 0)
    {
   
   
        printf("event add failed [fd=%d], events[%d],ret:%d\n", ev->fd, events,ret);
        printf("event add failed in %s error %s\n", __func__, strerror(errno));
        return -1;
    }

    return 0;
}

step 8:实现IO事件移除功能

由于设置了非阻塞模式,当事件到来时,需要暂时移除监听,避免干扰。

int nty_event_del(int epfd, struct ntyevent *event)
{
   
   
    if (event->status != 1)
        return -1;

    struct epoll_event ep_ev = {
   
    0,{
   
   0} };
    ep_ev.data.ptr = event;
    event->status = 0;
    // 移除fd的监听
    epoll_ctl(epfd, EPOLL_CTR_DEL, &ep_ev);
    return 0;
}

step 9:实现Reactor事件监听功能

思路:设置fd的事件信息,添加事件到epoll监听。

int ntyreactor_addlistener(struct ntyreactor *reactor,int sockfd,NCALLBACK *acceptor)
{
   
   
    if (reactor == NULL)
        return -1;
    if (reactor->evblks == NULL)
        return -1;

    // 找到fd对应的event地址
    struct ntyevent *event = ntyreactor_idx(reactor, sockfd);
    if (event == NULL)
        return -1;
    // 设置fd的事件信息
    nty_event_set(event, sockfd, acceptor, reactor);
    // 添加事件到epoll监听
    nty_event_add(reactor->epfd, EPOLLIN, event);

    return 0;
}

step 10:实现recv回调函数

思路:找到fd对应的信息内存块;使用recv接收数据;暂时移除该事件的监听;如果接收成功,设置监听事件为是否可写,添加到IO多路复用器(epoll)中;返回收到的数据长度。

int recv_cb(int fd, int events, void *arg)
{
   
   
    struct ntyreactor *reactor = (struct ntyreactor *)arg;
    if (reactor == NULL)
        return -1;
    // 找到fd对应的event地址
    struct ntyevent *event = ntyreactor_idx(reactor, fd);
    if (event == NULL)
        return -1;
    // 接收数据
    int len = recv(fd, event->buffer, BUFFER_LENGTH, 0);
    // 暂时移除监听
    nty_event_del(reactor->epfd, event);
    if (len > 0)
    {
   
   
        event->length = len;
        event->buffer[len] = '\0';
        printf("recv [%d]:%s\n", fd, event->buffer);
        //设置fd的事件信息
        nty_event_set(event, fd, send_cb, reactor);
        // 添加事件到epoll监听
        nty_event_add(reactor->epfd, EPOLLOUT, event);
    }
    else if (len == 0)
    {
   
   
        nty_event_del(reactor->epfd, event);
        printf("recv_cb --> disconnect\n");
        close(event->fd);
    }
    else {
   
   

        if (errno == EAGAIN && errno == EWOULDBLOCK) {
   
    //

        }
        else if (errno == ECONNRESET) {
   
   
            nty_event_del(reactor->epfd, event);
            close(event->fd);
        }
        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));

    }
    return len;
}

step 11:实现send回调函数

思路:找到fd对应的信息内存块;使用send发送数据;暂时移除该事件的监听;如果发送成功,设置监听事件为是否可读,添加到IO多路复用器(epoll)中;返回发送的数据长度。

int send_cb(int fd, int events, void *arg)
{
   
   
    struct ntyreactor *reactor = (struct ntyreactor*)arg;
    if (reactor == NULL)
    {
   
   
        return -1;
    }

    // 查找fd对应的信息存放内存块
    struct ntyevent *ev = ntyreactor_idx(reactor, fd);
    if (ev == NULL)
        return -1;

    int len = send(fd, ev->buffer, BUFFER_LENGTH, 0);
    // 暂时移除监听
    nty_event_del(reactor->epfd, ev);

    if (len > 0)
    {
   
   

        printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);
        nty_event_set(ev, fd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, ev);
    }
    else
    {
   
   
        close(ev->fd);
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
    }
    return len;
}

step 12:实现accept回调函数

思路:使用accept获得连接的客户端fd;设置客户端fd为非阻塞模式;找到fd对应的信息内存块;设置fd的事件信息;设置监听事件为是否可读,添加到IO多路复用器(epoll)中。

int accept_cb(int fd, int events, void *arg)
{
   
   
    struct ntyreactor *reactor = (struct ntyreactor *)arg;
    if (reactor == NULL)
        return -1;

    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);

    int client_fd = accept(fd,(struct sockaddr*)&client_addr,&len);
    if (client_fd == -1)
    {
   
   
        printf("accept: %s\n", strerror(errno));
        return -1;
    }

    int flag = fcntl(client_fd, F_SETFL, O_NONBLOCK);
    if ((flag = fcntl(client_fd, F_SETFL, O_NONBLOCK)) < 0) {
   
   
        printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
        return -1;
    }

    // 找到fd对应的event地址
    struct ntyevent *event = ntyreactor_idx(reactor, client_fd);
    if (event == NULL)
        return -1;
    // 设置fd的事件信息
    nty_event_set(event, client_fd, recv_cb, reactor);
    // 添加事件到epoll监听
    nty_event_add(reactor->epfd, EPOLLIN, event);

    printf("new connect [%s:%d], pos[%d]\n",
        inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), client_fd);

    return 0;
}

step 13:实现reactor运行函数

主要是epoll的等待功能,将监听到的事件进行回调处理。

int ntyreactor_run(struct ntyreactor *reactor)
{
   
   
    if (reactor == NULL)
        return -1;
    if (reactor->epfd < 0)
        return -1;
    if (reactor->evblks == NULL)
        return -1;

    struct epoll_event events[MAX_EPOLL_EVENTS + 1];
    int i;
    while (1)
    {
   
   
        // epoll监听客户端接入
        int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (nready < 0)
        {
   
   
            printf("epoll wait error\n");
            continue;
        }
        for (i = 0; i < nready; i++)
        {
   
   
            struct ntyevent *ev = (struct ntyevent *)events[i].data.ptr;
            if ((events[i].events &EPOLLIN) && (ev->events &EPOLLIN))
            {
   
   
                // 处理可读事件
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
            if ((events[i].events &EPOLLOUT) && (ev->events &EPOLLOUT))
            {
   
   
                //处理可写事件
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
        }
    }

}

step 14:实现reactor销毁功能

int ntyreactor_destory(struct ntyreactor *reactor)
{
   
   
    // 关闭epoll
    close(reactor->epfd);

    struct eventblock *blk= reactor->evblks;
    struct eventblock *next;

    while (blk != NULL)
    {
   
   
        next = blk->next;
        // 释放内存块
        free(blk->events);
        free(blk);
        blk = next;
    }
    return 0;
}

简单使用示例

int main(int argc,char* argv[])
{
   
   
    // 创建reactor对象
    struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
    // 初始化reactor容器
    ntyreactor_init(reactor);

    // 定义监控的socket开始端口
    unsigned short port = SERVER_PORT;
    if (argc == 2)
    {
   
   
        port = atoi(argv[1]);
    }

    // 初始化套字接和监听端口列表
    int i = 0;
    int sockfds[PORT_COUNT] = {
   
    0 };
    for(i=0;i<PORT_COUNT;i++)
    {
   
   
        // 初始化socket端口
        sockfds[i] = init_sock(port + i);
        // 添加事件监听
        ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
    }

    // reactor运行,主要是epoll的循环监听
    ntyreactor_run(reactor);
    // 销毁 reactor
    ntyreactor_destory(reactor);

    // 关闭socket集
    for (i = 0; i < PORT_COUNT; i++)
    {
   
   
        close(sockfds[i]);
    }

    // 释放reactor
    free(reactor);

    return 0;
}

四、总结

1、创建一个reactor对象,分配内存。
2、Reactor容器初始化:
(1)创建IO多路复用器epoll,将文件描述符epfd保存到reactor容器中;
(2)创建一个事件集合MAX_EPOLL_EVENTS * sizeof(struct events);
(3)创建一个管理模块,用于管理(2)中创建的事件集合。这是一个链表,一个指针指向(2)中创建的事件集合,一个指针指向下一个块(block);
(4)将事件集合添加到管理模块(block)中进管理;
(5)将管理模块(block)保存到reactor容器,同时reactor的管理模块(block)置为1;
(6)注意:如果申请内存使用malloc函数,最好使用memset将内存块初始化,避免脏数据。
3、soket初始化:
(1)创建一个监听socket fd,socket(AF_INET,SOCK_STREAM,0);
(2)设置socket fd为非阻塞模式,fcntl(fd,SETFL,O_NONBLOCK);
(3)配置socket属性,主要是struct sockaddr_in结构体的sin_family、sin_addr.s_addr、sin_port;
(4)bind();
(5)listen();
4、事件监听
(1)以fd为key找到对应的管理块序号和偏移地址;
(2)设置事件信息;
(3)添加监听事件:是否可读。
5、reactor主循环(mainloop),主要是epoll的循环监听,处理事件到相关回调函数。
(1)事件监听,epoll_wait;
(2)回调函数recv,处理可读事件(EPOLLIN);
(3)回调函数send,处理可写事件(EPOLLOUT)。
6、销毁reactor,注意是事件集内存块的释放和链表的内存释放。
7、close监听端口socket fd。
8、释放reactor内存。
image.png

相关实践学习
CentOS 7迁移Anolis OS 7
龙蜥操作系统Anolis OS的体验。Anolis OS 7生态上和依赖管理上保持跟CentOS 7.x兼容,一键式迁移脚本centos2anolis.py。本文为您介绍如何通过AOMS迁移工具实现CentOS 7.x到Anolis OS 7的迁移。
目录
相关文章
|
4天前
|
监控 安全 Linux
在 Linux 系统中,网络管理是重要任务。本文介绍了常用的网络命令及其适用场景
在 Linux 系统中,网络管理是重要任务。本文介绍了常用的网络命令及其适用场景,包括 ping(测试连通性)、traceroute(跟踪路由路径)、netstat(显示网络连接信息)、nmap(网络扫描)、ifconfig 和 ip(网络接口配置)。掌握这些命令有助于高效诊断和解决网络问题,保障网络稳定运行。
17 2
|
16天前
|
域名解析 网络协议 安全
|
22天前
|
运维 监控 网络协议
|
6天前
|
缓存 Linux 开发者
Linux内核中的并发控制机制:深入理解与应用####
【10月更文挑战第21天】 本文旨在为读者提供一个全面的指南,探讨Linux操作系统中用于实现多线程和进程间同步的关键技术——并发控制机制。通过剖析互斥锁、自旋锁、读写锁等核心概念及其在实际场景中的应用,本文将帮助开发者更好地理解和运用这些工具来构建高效且稳定的应用程序。 ####
23 5
|
9天前
|
Linux 数据库
Linux内核中的锁机制:保障并发操作的数据一致性####
【10月更文挑战第29天】 在多线程编程中,确保数据一致性和防止竞争条件是至关重要的。本文将深入探讨Linux操作系统中实现的几种关键锁机制,包括自旋锁、互斥锁和读写锁等。通过分析这些锁的设计原理和使用场景,帮助读者理解如何在实际应用中选择合适的锁机制以优化系统性能和稳定性。 ####
26 6
|
18天前
|
存储 Ubuntu Linux
2024全网最全面及最新且最为详细的网络安全技巧 (三) 之 linux提权各类技巧 上集
在本节实验中,我们学习了 Linux 系统登录认证的过程,文件的意义,并通过做实验的方式对 Linux 系统 passwd 文件提权方法有了深入的理解。祝你在接下来的技巧课程中学习愉快,学有所获~和文件是 Linux 系统登录认证的关键文件,如果系统运维人员对shadow或shadow文件的内容或权限配置有误,则可以被利用来进行系统提权。上一章中,我们已经学习了文件的提权方法, 在本章节中,我们将学习如何利用来完成系统提权。在本节实验中,我们学习了。
|
26天前
|
Ubuntu Linux 虚拟化
Linux虚拟机网络配置
【10月更文挑战第25天】在 Linux 虚拟机中,网络配置是实现虚拟机与外部网络通信的关键步骤。本文介绍了四种常见的网络配置方式:桥接模式、NAT 模式、仅主机模式和自定义网络模式,每种模式都详细说明了其原理和配置步骤。通过这些配置,用户可以根据实际需求选择合适的网络模式,确保虚拟机能够顺利地进行网络通信。
|
4天前
|
存储 SQL 安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
【10月更文挑战第39天】在数字化时代,网络安全和信息安全成为了我们生活中不可或缺的一部分。本文将介绍网络安全漏洞、加密技术和安全意识等方面的内容,帮助读者更好地了解网络安全的重要性,并提供一些实用的技巧和方法来保护自己的信息安全。
15 2
|
5天前
|
安全 网络安全 数据安全/隐私保护
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
【10月更文挑战第38天】本文将探讨网络安全与信息安全的重要性,包括网络安全漏洞、加密技术和安全意识等方面。我们将通过代码示例和实际操作来展示如何保护网络和信息安全。无论你是个人用户还是企业,都需要了解这些知识以保护自己的网络安全和信息安全。
|
4天前
|
存储 安全 网络安全
云计算与网络安全:探索云服务中的信息安全策略
【10月更文挑战第39天】随着云计算的飞速发展,越来越多的企业和个人将数据和服务迁移到云端。然而,随之而来的网络安全问题也日益突出。本文将从云计算的基本概念出发,深入探讨在云服务中如何实施有效的网络安全和信息安全措施。我们将分析云服务模型(IaaS, PaaS, SaaS)的安全特性,并讨论如何在这些平台上部署安全策略。文章还将涉及最新的网络安全技术和实践,旨在为读者提供一套全面的云计算安全解决方案。

热门文章

最新文章