Reactor模式(一)

简介: Reactor模式

一、Reactor模式

1.1 Reactor模式定义

Reactor反应器模式,也被称为分发者模式或通知者模式,是一种将就绪事件派发给对应服务处理程序的事件设计模式


5093feab42874bdeb39ac8af1dd1c38a.png


1.2 Reactor模式的角色构成


fcb6210fbc5b4d51b7ec5acdd34b13c8.png

1.3 Reactor模式的工作流程

当向初始分发器注册具体事件处理器时,会标识出该事件处理器希望初始分发器在某个事件发生时向其通知,该事件与Handle关联

初始分发器会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器

当所有的事件处理器注册完毕后,启动初始分发器的事件循环,这时初始分发器会将每个事件处理器的Handle合并起来,并使用同步事件分离器等待这些事件的发生

当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初始分发器

初始分发器会将Ready状态的Handle作为key,来寻找其对应的事件处理器

初始分发器会调用其对应事件处理器中对应的回调方法来响应该事件

二、epoll ET服务器(Reactor模式)

2.1 设计思路

epoll ET服务器


读事件:若是监听套接字读事件就绪则调用accept函数获取底层连接,若是其他套接字读事件就绪则调用recv函数读取客户端发来的数据

写事件:写事件就绪则将待发送的数据写入到发送缓冲区中

异常事件:当某个套接字的异常事件就绪时不做过多处理,直接关闭该套接字

当epoll ET服务器监测到某一事件就绪后,就会将该事件交给对应的服务处理程序进行处理


Reactor模式的五个角色


在这个epoll ET服务器中,Reactor模式中的五个角色对应如下:


句柄:文件描述符

同步事件分离器:I/O多路复用epoll

事件处理器:包括读回调、写回调和异常回调

具体事件处理器:读回调、写回调和异常回调的具体实现

初始分发器:TcpServer中的Dispatcher函数

Dispatcher函数的工作即为:调用epoll_wait函数等待事件发生,有事件发生后将就绪的事件派发给对应的服务处理程序即可


Connection类


在Reactor的工作流程中说到,在注册事件处理器时需要将其与Handle关联,本质上就是需要将读回调、写回调和异常回调与某个文件描述符关联起来。这样做的目的就是为了当某个文件描述符上的事件就绪时可以找到其对应的各种回调函数,进而执行对应的回调方法来处理该事件。可以设计一个Connection类,该类中的成员包括了一个文件描述符,以及该文件描述符对应的各种回调函数,以及其他成员


TcpServer类


在Reactor的工作流程中说到,当所有事件处理器注册完毕后,会使用同步事件分离器等待这些事件发生,当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初始分发器,然后初始分发器会将Ready状态的Handle作为key来寻找其对应的事件处理器,并调用该事件处理器中对应的回调方法来响应该事件

本质就是当事件注册完毕后,会调用epoll_wait函数来等待这些事件发生,当某个事件就绪时epoll_wait函数会告知调用方,然后调用方就根据就绪的文件描述符来找到其对应的各种回调函数,并调用对应的回调函数进行事件处理

对此可以设计一个Reactor类


该类当中有一个成员函数Dispatcher,即初始分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件

当事件就绪后需要根据就绪的文件描述符来找到其对应的各种回调函数,由于会将每个文件描述符及其对应的各种回调都封装到一个Connection结构中,所以可以根据文件描述符找到其对应的Connection结构

使用C++ STL中的unordered_map,来建立各个文件描述符与其对应的Connection结构之间的映射,这个unordered_map可以作为TcpServer类的一个成员变量,当需要找某个文件描述符的Connection结构时就可以通过该成员变量找到

TcpServer类中还需要提供成员函数AddConnection,用于向初始分发器中注册事件

epoll ET服务器的工作流程


epoll ET服务器的初始化:需进行套接字的创建、绑定、监听,创建epoll模型

为监听套接字创建对应的Connection结构,并调用TcpServer类中提供的AddConnection函数将监听套接字添加到epoll模型中,并建立监听套接字与其对应的Connection结构之间的映射关系

之后就可以不断调用TcpServer类中的Dispatcher函数进行事件派发

在事件处理过程中,会不断向Dispatcher中新增事件,每个事件就绪时都会自动调用其对应的回调函数处理,不断调用Dispatcher函数进行事件派发即可

2.2 Connection结构

Connection结构中除了包含文件描述符和其对应的读回调、写回调和异常回调外,还包含一个输入缓冲区_inBuffer、一个输出缓冲区_outBuffer以及一个回指指针_svrPtr


当某个文件描述符的读事件就绪时,调用recv函数读取客户端发来的数据,但并不能保证读到了一个完整报文,因此需要将读取到的数据暂时存放到该文件描述符对应的_inBuffer中,当_inBuffer中可以分离出一个完整的报文后再将其分离出来进行数据处理,_inBuffer本质就是用来解决粘包问题的

当处理完一个报文请求后,需将响应数据发送给客户端,但并不能保证底层TCP的发送缓冲区中有足够的空间写入,因此需将要发送的数据暂时存放到该文件描述符对应的_outBuffer中,当底层TCP的发送缓冲区中有空间,即写事件就绪时,再依次发送_outBuffer中的数据

Connection结构中设置回指指针_svrPtr,便于快速找到TcpServer对象,因为后续需要根据Connection结构找到这个TcpServer对象。如上层业务处理函数NetCal函数向_outBuffer输出缓冲区递交数据后,需通过Connection中的回指指针,"提醒"TcpServer进行处理

Connection结构中需提供一个管理回调的成员函数,便于外部对回调进行设置

class Connection
{
public:
    Connection(int sock = -1):_socketFd(sock),_svrPtr(nullptr) {}
    ~Connection() {}
public:
    void SetCallBack(func_t recvCb, func_t sendCb, func_t exceptCb) {
        _recvCb = recvCb;
        _sendCb = sendCb;
        _exceptCb = exceptCb;
    }
public:
    int _socketFd;
    func_t _recvCb;
    func_t _sendCb;
    func_t _exceptCb;
    string _inBuffer;//无法处理二进制流
    string _outBuffer;
    TcpServer* _svrPtr;
};


2.3 TcpServer类

在TcpServer类中有一个unordered_map成员,用于建立文件描述符和与其对应的Connection结构之间的映射,还有一个_epoll成员,该成员是封装的Epoll对象。在初始化TcpServer对象时就可以调用封装的EpollCreate函数创建Epoll对象,并将该epoll模型对应的文件描述符记录在该对象的成员变量_epollFd中,便于后续使用。TcpServer对象析构时,Epoll对象的析构会自动调用close函数将epoll模型关闭


封装Epoll类

#pragma once
#include <iostream>
#include <sys/epoll.h>
class Epoll
{
public:
    Epoll() {}
    ~Epoll() { if(_epollFd > 0) close(_epollFd); }
public:
    void EpollCreate() {
        _epollFd = epoll_create(128);
        if(_epollFd < 0) exit(5);
    }
    bool AddSockToEpoll(int socket, uint32_t event) 
    {
        struct epoll_event ev;
        ev.events = event;
        ev.data.fd = socket;
        int n = epoll_ctl(_epollFd, EPOLL_CTL_ADD, socket, &ev);
        return n == 0;
    }
    bool EpollCtrl(int socket, uint32_t event) 
    {
        event |= EPOLLET;
        struct epoll_event ev;
        ev.events = event;
        ev.data.fd = socket;
        int n = epoll_ctl(_epollFd, EPOLL_CTL_MOD, socket, &ev);
        return n == 0;
    }
    int EpollWait(struct epoll_event* revs, int revsNum) {
        return epoll_wait(_epollFd, revs, revsNum, 5000);
    }
    bool DelFromEpoll(int socket) 
    {
        int n = epoll_ctl(_epollFd, EPOLL_CTL_DEL, socket, 0);
        return n == 0;
    }
private:
    int _epollFd;
};

TcpServer类部分代码

class TcpServer
{
public:
    TcpServer(uint16_t port = 8080, int revsNum = 128):_port(port), _revsNum(revsNum)
    {
        //创建listenSocket
        _listenSocketFd = Socket::SocketCreate();
        Socket::Bind(_listenSocketFd, _port);
        Socket::Listen(_listenSocketFd);
        //创建多路转接对象
        _epoll.EpollCreate();
    }
    ~TcpServer() { 
        if(_listenSocketFd >= 0) close(_listenSocketFd); 
    }
private:
    int _listenSocketFd;
    uint16_t _port;
    unordered_map<int, Connection*> _connections;//管理服务器链接
    Epoll _epoll;
    struct epoll_event* _revs;//获取就绪事件的缓冲区
    int _revsNum;//缓冲区大小
    callback_t _cb;//上层业务处理
};


2.3.1 AddConnection函数

TcpServer类中的AddConnection函数用于进行事件注册


在注册事件时需要传入一个文件描述符和三个回调函数,表示当该文件描述符上的事件(默认只关心读事件)就绪后应该执行的回调方法。

在AddConnection函数内部要做的就是,设置套接字为非阻塞(ET模型要求),将套接字和回调函数等属性封装为一个Connection,在将套接字添加到epoll模型中,对象建立文件描述符和Connection的映射关系并管理

void AddConnection(int socket, func_t reavCb, func_t sendCb, func_t exceptCb) //将套接字封装为链接并添加至服务器的管理中
{
    //设置套接字为非阻塞
    Socket::SetNonBlock(socket);
    //将套接字封装为链接,设置链接的各个属性
    Connection* con = new Connection(socket);
    con->SetCallBack(reavCb, sendCb, exceptCb);//监听套接字只需读取回调函数
    con->_svrPtr = this;
    //添加套接字到epoll中
    _epoll.AddSockToEpoll(socket, EPOLLIN | EPOLLET);//一般多路转接服务器默认监视读事件,其他事件按需设置
    //对应的链接添加到映射表中管理
    _connections.insert(make_pair(socket, con));
}

2.3.2 Dispatcher函数(初始分发器)

TcpServer中的Dispatcher函数即初始分发器,其要做的就是调用epoll_wait函数等待事件发生。当某个文件描述符上的事件发生后,先通过unordered_map找到该文件描述符对应的Connection结构,然后调用Connection结构中对应的回调函数对该事件进行处理即可

class TcpServer
{
public:
    void LoopOnce() {
        int number = _epoll.EpollWait(_revs, _revsNum);
        for(int i = 0; i < number; ++i) {
            int socket = _revs[i].data.fd;
            uint32_t revent = _revs[i].events;
            //将所有异常交给read和write处理
            if(revent & EPOLLERR) revent |= (EPOLLIN | EPOLLOUT);
            if(revent & EPOLLHUP) revent |= (EPOLLIN | EPOLLOUT);
            if(revent & EPOLLIN) {
                if((_connections.find(socket) != _connections.end()) && (_connections[socket]->_recvCb != nullptr)) {//存在且回调不为空
                    _connections[socket]->_recvCb(_connections[socket]);
                }
            }
            if(revent & EPOLLOUT) {
                if((_connections.find(socket) != _connections.end()) && (_connections[socket]->_sendCb != nullptr)) {
                    _connections[socket]->_sendCb(_connections[socket]);
                }
            }
        }
    }
    void Dispatcher(callback_t cb)//根据就绪事件,进行特定事件的派发
    {
        _cb = cb;
        while(true)
        {
            LoopOnce();
        }
    }
private:
    int _listenSocketFd;
    uint16_t _port;
    unordered_map<int, Connection*> _connections;//管理服务器链接
    Epoll _epoll;
    struct epoll_event* _revs;//获取就绪事件的缓冲区
    int _revsNum;//缓冲区大小
    callback_t _cb;//上层业务处理
};


本代码没有用switch或if语句对epoll_wait函数的返回值进行判断,而是借用for循环对其返回值进行了判断

若epoll_wait的返回值为-1则说明epoll_wait函数调用失败,此时不会进入到for循环内部进行事件处理

若epoll_wait的返回值为0则说明epoll_wait函数超时返回,此时也不会进入到for循环内部进行事件处理

若epoll_wait的返回值大于0则说明epoll_wait函数调用成功,此时才会进入到for循环内部调用对应的回调函数对事件进行处理

事件处理时先对异常事件进行处理,将异常事件交给回调函数进行处理

2.3.3 EnableReadWrite函数

TcpServer类中的EnableReadWrite函数,用于使能某个文件描述符的读写事件


调用EnableReadWrite函数时需要传入一个文件描述符,表示需要设置的是哪个文件描述符对应的事件

传入两个bool值,分别表示是否关心读事件以及是否关心写事件

EnableReadWrite函数内部会调用封装EpollCtrl函数修改该文件描述符的监听事件

void EnableReadWrite(Connection* con ,bool readable, bool writable) 
{
    uint32_t event = (readable ? EPOLLIN : 0) | (writable ? EPOLLOUT : 0);
    bool ret = _epoll.EpollCtrl(con->_socketFd, event);
    assert(ret);
}
目录
相关文章
|
8月前
|
消息中间件 Kubernetes NoSQL
Reactor 和 Proactor 区别
Reactor 和 Proactor 区别
|
3月前
|
Java
Reactor模式
通过一个具体的Java代码示例展示了如何在NIO框架下实现Reactor模式,用于处理网络IO事件,包括事件的接收、分发和处理。
46 4
Reactor模式
|
3月前
|
NoSQL Java Redis
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
本文通过一个简单的单线程Reactor模式的Java代码示例,展示了如何使用NIO创建一个服务端,处理客户端的连接和数据读写,帮助理解Reactor模式的核心原理。
46 0
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
|
8月前
|
设计模式
深入浅出Reactor和Proactor模式
深入浅出Reactor和Proactor模式
|
8月前
|
监控 安全 Linux
reactor的原理与实现
前情回顾 网络IO,会涉及到两个系统对象:   一个是用户空间调用的进程或线程   一个是内核空间的内核系统 如果发生IO操作read时,会奖励两个阶段:
86 1
|
8月前
|
缓存
2.1.2事件驱动reactor的原理与实现
2.1.2事件驱动reactor的原理与实现
|
8月前
|
监控 Java 应用服务中间件
Reactor反应器模式
在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。 为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。
|
监控 NoSQL Java
Netty高性能架构之Reactor模式
在讨论Netty的架构模式之前,我们先来介绍下Reactor模式,因为Netty的架构模式是在此基础上演变而来的
Netty高性能架构之Reactor模式
|
网络协议 数据处理
Reactor模式(二)
Reactor模式
91 0
|
设计模式 存储 监控

热门文章

最新文章

下一篇
开通oss服务