主从Reactor服务器

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 主从Reactor服务器

目标:

我们要设计搭建主从Reactor服务器的组件,以便使用者利用这些组件更方便地搭建主从Reactor服务器

那么什么是主从Reactor服务器呢?

其实就是以主线程负责接收新连接,从属线程负责管理连接、接受数据、处理业务这种方式运行的服务器

图示:

本文讲解思路:

先列出服务器各模块的功能,再按照服务器实际运行场景,按照执行流梳理各模块发挥的作用。

注:为节省篇幅,我将源码链接放于下方,方便读者在梳理模块时对照阅读。

各模块的功能以及代码:

1.服务器相关模块:服务器模块的功能是对所有的连接以及线程进⾏管理

代码链接:主从Reactor模型/server.hpp · tt_02922/Linux_new - 码云 - 开源中国 (gitee.com)

  1. Buffer模块:用于接收socket缓冲区的数据,以便上层读取并处理
  2. Socket模块:封装了套接字编程的常用接口,方便后续复用
  3. Channel模块:设置事件对应的回调函数,事件就绪时调用对应的回调
  4. Poller模块:是对epoll系统调用接口的封装,用于对文件描述符添加/修改监控事件
  5. Connection模块:对Buffer模块,Socket模块,Channel模块的整体封装,实现了对⼀个通信套接字的整体的管理
  1. Acceptor模块:对Socket模块,Channel模块的整体封装,实现了对⼀个监听套接字的整体的管理
  2. TimerQueue模块:解决超时连接不释放、占用资源的问题,时间一到自动释放并执行任务
  3. EventLoop模块:对Poller模块,TimerQueue模块, Socket模块的⼀个整体封装,进⾏所有描述符的事件监控
  4. TcpServer模块:Tcp服务器模块,内部封装了Acceptor模块,EventLoopThreadPool模块

2.协议相关模块:协议模块是对当前的Reactor模型服务器提供应⽤层协议⽀持(我采用的时Http协议)

代码链接:主从Reactor模型/http.hpp · tt_02922/Linux_new - 码云 - 开源中国 (gitee.com)

  1. Util模块:协议模块所⽤到的⼀些⼯具函数
  2. HttpRequest模块:⽤于保存请求数据被解析后的各项请求元素信息
  3. HttpResponse模块:业务处理后设置并保存响应数据的的各项元素信息以待发送给 客户端
  4. HttpContext模块:用于保证请求接受的数据的完整性
  5. HttpServer模块:⽤于以简单的接⼝实现Http协议服务器的搭建

运行流程梳理:

实际的运行场景,当一个新连接到达服务器的时候,服务器双方要建立通信,随后服务器要对发来的数据做对应的处理,并将处理结果返回给服务器

1.Start from Main:

int main()
{
    HttpServer server(8085);
    server.SetThreadCount(3);
    server.SetBaseDir(WWWROOT);//设置静态资源根目录,告诉服务器有静态资源请求到来,需要到哪里去找资源文件
    server.Get("/hello", Hello);
    server.Post("/login", Login);
    server.Put("/1234.txt", PutFile);
    server.Delete("/1234.txt", DelFile);
    server.Listen();
    return 0;
}

(1)HttpServer server(8085);

DO:启动非活跃连接释放、设置连接就绪回调、设置业务处理回调

调用其构造函数

_server的类型是TcpServer,接下来分别调用_server的成员函数

作用分别是:启动非活跃连接释放、设置连接就绪回调、设置业务处理回调

    HttpServer(int port, int timeout = DEFALT_TIMEOUT) : _server(port)
    {
        _server.EnableInactiveRelease(timeout);
        _server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));
        _server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
    }

函数实现

void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }
void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }
void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }

ConnectedCallBack与MessageCallBack的类型

PtrConnection的类型

(2)关于bind与function

bind:函数模板,传入函数以及对应的参数,返回已经绑定好对应参数或预留好对应参数的函数对象

eg:绑定好对应参数

#include<functional>
#include<iostream>
int add(int a,int b)
{
    return a+b;
}
int main()
{
auto cb=std::bind(add,1,2);//绑定add的参数为1,2并返回函数对象
std::cout<<cb()<<std::endl;
}

eg:预留好对应参数

#include<functional>
#include<iostream>
int add(int a,int b)
{
    return a+b;
}
int main()
{
auto cb=std::bind(add,std::placeholders::_1,std::placeholders::_2);
//为add预留了2个参数,并返回函数对象,调用函数对象时需要显示的传参
std::cout<<cb(5,6)<<std::endl;
}

function:模板类,用于接收可以调用的目标,例如函数、函数对象、lambda等

eg:

#include<functional>
int test(int a){}
 
std::function<int(int)> func=test;//用于接收返回值为int类型,参数为一个int的可调用对象

other:

using test1=std::function<int(int)>;//设置类型别名,即test1等价于function<int(int)>类型

OnConnected 连接就绪时要调用的回调

   // 设置上下文
    void OnConnected(const PtrConnection &conn)
    {
        conn->SetContext(HttpContext());
        DBG_LOG("NEW CONNECTION %p", conn.get());
    }

PtrConnection即智能指针管理的Connection对象

为什么要用智能指针管理Connection对象,后面会提

void SetContext(const Any &context) { _context = context; }

OnMessage

 // 缓冲区数据解析+处理
    void OnMessage(const PtrConnection &conn, Buffer *buffer)
    {
        while (buffer->ReadAbleSize() > 0)
        {
            // 1. 获取上下文
            HttpContext *context = conn->GetContext()->get<HttpContext>();
            // 2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象
            //   1. 如果缓冲区的数据解析出错,就直接回复出错响应
            //   2. 如果解析正常,且请求已经获取完毕,才开始去进行处理
            context->RecvHttpRequest(buffer);//处理获取到的数据
            HttpRequest &req = context->Request();//获取处理完毕的请求报文
            HttpResponse rsp(context->RespStatu());//设置响应报文的状态码
            if (context->RespStatu() >= 400)//接收数据时发生了错误
            {
                // 进行错误响应,关闭连接
                ErrorHandler(req, &rsp);      // 填充一个错误显示页面数据到rsp中
                WriteReponse(conn, req, rsp); // 组织响应发送给客户端
                context->ReSet();
                buffer->MoveReadOffset(buffer->ReadAbleSize()); // 出错了就把缓冲区数据清空
                conn->Shutdown();                               // 关闭连接
                return;
            }
            if (context->RecvStatu() != RECV_HTTP_OVER)
            {
                // 当前请求还没有接收完整,则退出,等新数据到来再重新继续处理 如何重新进行处理?完全重来
                return;
            }
            // 3. 请求路由 + 业务处理
            Route(req, &rsp);
            // 4. 对HttpResponse进行组织发送
            WriteReponse(conn, req, rsp);
            // 5. 重置上下文
            context->ReSet();
            // 6. 根据长短连接判断是否关闭连接或者继续处理
            if (rsp.Close() == true)
                conn->Shutdown(); // 短链接则直接关闭
        }
        return;
    }

(3)server.SetThreadCount(3);

DO:设置线程数量

调用接口:

_server内部调用的是_pool的接口,_pool的类型是eventloop

最终设置线程数量的接口

class LoopThreadPool 
{...
int _thread_count;//要创建多少个线程,即要创建多少个eventloop
...
void SetThreadCount(int count) { _thread_count = count; }
...
};

(4)server.SetBaseDir(WWWROOT)

调用接口:

    //stat 是一个函数调用
    //函数原型 
    //int stat(const char *pathname, struct stat *statbuf)
    static bool IsDirectory(const std::string &filename)
    {
        struct stat st;
        int ret = stat(filename.c_str(), &st);
        if (ret < 0)
        {
            return false;
        }
        return S_ISDIR(st.st_mode);
    }
 
    void SetBaseDir(const std::string &path)
    {
        assert(Util::IsDirectory(path) == true);//判断路径是否为文件夹
        _basedir = path;
    }

(5)设置各种请求的处理方法

    //设置各种请求的处理方法
    server.Get("/hello", Hello);
    server.Post("/login", Login);
    server.Put("/1234.txt", PutFile);
    server.Delete("/1234.txt", DelFile);
(1)以GET为例
    //using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;
    //using Handlers = std::vector<std::pair<std::regex, Handler>>;
    //Handlers _get_route;   
 
    void Get(const std::string &pattern, const Handler &handler)
    {
        _get_route.push_back(std::make_pair(std::regex(pattern), handler));
    }

(2)关于正则表达式与regex

正则表达式(regular expression)描述了⼀种字符串匹配的模式(pattern),可以⽤来检查⼀个串是否 含有某种⼦串、将匹配的⼦串替换或者从某个串中取出符合某个条件的⼦串等。

(6)server.Listen();

DO:启动服务器

void Start() { _pool.Create();  _baseloop.Start(); }
(1)Create

Create()

创建对应数量的线程,并在各线程内创建EventLoop,并启动线程内的EventLoop

这保证了One Thread One Loop,即一个线程一个事件处理循环 ,EventLoop生命周期随线程

        //_threads类型:vector<LoopThread*>
        //_loops类型:vector<EventLoop*> 
        void Create() {
            if (_thread_count > 0) {
                _threads.resize(_thread_count);
                _loops.resize(_thread_count);
                for (int i = 0; i < _thread_count; i++) {
                     //one thread one loop
                    _threads[i] = new LoopThread();
                    _loops[i] = _threads[i]->GetLoop();
                }
            }
            return ;
        }

LoopThread()

//EventLoop *_loop;      
//std::thread _thread;  
LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}

ThreadEntry()

        void ThreadEntry() {
            EventLoop loop;
            {
                std::unique_lock<std::mutex> lock(_mutex);//加锁
                _loop = &loop;
                _cond.notify_all();
            }
            loop.Start();
        }

GetLoop()

        EventLoop *GetLoop() {
            EventLoop *loop = NULL;
            {
                std::unique_lock<std::mutex> lock(_mutex);//加锁 
                _cond.wait(lock, [&](){ return _loop != NULL; });//loop为NULL就一直阻塞
                loop = _loop;
            }
            return loop;
        }

图示

(2)Start

Start()

获取新连接

事件处理

执行任务

注意:_baseloop与业务loop的不同

普通的loop现在正在阻塞在epoll_wait,因为此时还没新连接分配给loop,所以应该先看_baseloop.Start();

        //三步走--事件监控->就绪事件处理->执行任务
        void Start() {
            while(1) {
                //1. 事件监控
                std::vector<Channel *> actives;
                _poller.Poll(&actives);//获取活跃连接
                //2. 事件处理
                for (auto &channel : actives) {
                    channel->HandleEvent();
                }
                //3. 执行任务
                RunAllTask();
            }
        }
//int _epfd;
//struct epoll_event _evs[MAX_EPOLLEVENTS];
//std::unordered_map<int, Channel *> _channels;fd:channel
void Poll(std::vector<Channel*> *active) {
            // int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout)
            int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
            if (nfds < 0) {
                if (errno == EINTR) {
                    return ;
                }
                ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));
                abort();//退出程序
            }
            for (int i = 0; i < nfds; i++) {
                auto it = _channels.find(_evs[i].data.fd);
                assert(it != _channels.end());
                it->second->SetREvents(_evs[i].events);//设置实际就绪的事件
                active->push_back(it->second);
            }
            return;
        }

channel->HandleEvent();

 void HandleEvent() {
            if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {
                /*不管任何事件,都调用的回调函数*/
                if (_read_callback) _read_callback();
            }
            /*有可能会释放连接的操作事件,一次只处理一个*/
            if (_revents & EPOLLOUT) {
                if (_write_callback) _write_callback();
            }else if (_revents & EPOLLERR) {
                if (_error_callback) _error_callback();//一旦出错,就会释放连接,因此要放到前边调用任意回调
            }else if (_revents & EPOLLHUP) {
                if (_close_callback) _close_callback();
            }
            if (_event_callback) _event_callback();
        }

(7)服务器的正式启动——_baseloop.Start();

(1)_baseloop在初始化时被做了哪些处理?

创建监听套接字

创立baseloop与监听套接字的监听关系

设置监听套接字就绪时要调用的回调

TcpServer(int port):
           ...
            _acceptor(&_baseloop, port)
            _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
            _acceptor.Listen();
 _acceptor(&_baseloop, port)

传给_accept_callback

_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));

       _acceptor.Listen();

现在我们已知的是什么?

baseloop会监控监听套接字,当新连接到来时,也就是读事件就绪了,它会调用NewConnection这一回调,为新连接创建一个Connection对象,在这其中,分配一个EventLoop,以监控该连接,为新连接设置各种回调函数,以待事件就绪时调用

//上述步骤其实发生在  HttpServer server(8085);调用了_server的构造函数

(2)_baseloop.Start();

baseeloop上的监听套接字此时正在等待读事件就绪,也就是在等待新连接的到来

以上便是服务器的初始准备工作

(3)迎来连接时服务器如何工作

场景:epoll_wait队列上监听套接字读事件就绪了

std::unordered_map<int, Channel *> _channels;


epoll_wait不再阻塞,异常则报错退出,非异常则查找监听套接字对应的channel,并设置监听套接字实际的就绪事件,再将channel返回,baseloop得到channel后,就可以去执行监听套接字对应的读事件就绪处理回调,这个回调 即NewConnection

One fd one connection one eventloop(one poller fds) one channel one httpcontext

现在,随机的一个eventloop也有了baseloop为其分配的新连接了,接下来就该讨论,

(4)eventloop上的连接就绪后干什么

连接的就绪事件调用的回调是由用户指定的,以读事件就绪为例,当eventloop上有连接就绪时,且是读事件就绪,我们调用的就是HandleRead回调函数

(8)OnMessage是如何处理数据的?

要先知悉:http格式的请求报文与响应报文是什么样子的

处理流程:

(1)请求行处理

(2)请求头部处理

(3)请求正文处理

请求报文的数据解析工作可能成功(请求报文完整发送且数据正确,解析工作正确完成),也可能不成功(请求报文未发送完整或数据有错,解析工作无法完成),即解析工作能否完成与数据完整性与正确性有关
(4)请求报文完整且正确解析

进行如下工作

何为请求路由+业务处理

即通过对请求行的资源路径进行解析,判断其为静态资源请求(访问服务器上的某种资源,例如图片、音频)还是功能性请求(函数处理),根据不同的请求组织不同的响应

静态资源请求

判断是否为静态资源静态资源即在根目录下寻找相关的文件

1. 必须设置了静态资源根目录

2. 请求方法,必须是GET / HEAD请求方法

3. 请求的资源路径必须是一个合法路径

4. 请求的资源必须存在,且是一个普通文件

5.特殊处理,资源路径以/结尾,要在结尾加index.html

将根目录下对应资源写入响应报文的正文中

功能性请求

利用key:val模型,正则表达,处理函数存储起来。查找处理函数时利用正则表达式进行查询

不同的正则表达式对应着不同的处理函数,而处理函数是由用户来设置的 ,每个请求方法都有一张路由表,上面记录着正则表达式与处理函数的映射关系。

以GET方法为例,

调用对应的处理函数后,再将函数处理结果接入响应报文的正文之中,就可以按照响应报文的格式组织发送了,发送完毕后,重置该连接的上下文,并根据其长短连接属性决定是否断开连接

注意:对于connection的所有操作,都要放到对应的eventloop里执行,以保证线程安全,后面会提为什么

(5)请求报文不完整或解析数据不正确

如果是数据不完整,那么只可能是请求行不完整、请求头部不完整、请求正文不完整,任一部分的缺少都无法进入到请求路由+业务处理阶段,因此程序只能返回,等待新数据到来,再在原有进度继续执行。

如果是数据不正确,那么就会给响应报文一个错误的状态码,并组织一个错误的页面返回

(9)断开连接

我们的服务器默认使用非活跃连接销毁的断开方式

为何非活跃连接销毁呢?如果该连接在指定时间内没有进行通信,就认为他是不活跃的,那么服务器就应该主动断开该连接,以避免资源浪费。例如,若连接A在10s内没有发送通信,那么在10s后服务器就主动断开与A的连接。反之如果他在10s内有通信,那么对该连接的断开操作就放在20s后,依次类推
(1)非活跃连接销毁的原理

时间轮+智能指针

所谓时间轮,其实就是一个二维数组,每访问一个下标元素,就执行该元素上所有任务

图示

a.如何实现,每访问一个时间轮的下标,就执行该下标上的任务呢?

利用类的析构函数,类对象销毁时会自动调用类的析构函数,所以我们只要将任务封装成类,当时间轮访问对应下标时,就销毁该下标上的所有任务对象,就可以自动执行任务了。

图示


b.那么这个时间轮要如何动起来呢?

什么叫做让时间轮动起来?就是让时间轮依次释放每一下标对应对象的操作

使用timerfd,Linux提供的定时器,其功能是在指定时间后向timerfd内写入8字节数据,以告诉我们超时了多少次。我们将timerfd投入到epoll队列中,并设置好时间就绪的回调函数,每当定时器超时(也就是事件就绪),就执行我们让时间轮动起来的回调函数

例如,我们设置了一个首次超时时间为1s,之后超时时间为1s的定时器,意思就是每一秒向timerfd内写入一个8字节数据,如果我们2s读取timerfd的数据,就会读到一个2,表示超时了2s,如果4s后读取timerfd数据,就会读取到4,表示超时了4次

图示

c.在时间轮的基础之上,再加上智能指针,就可以达到非活跃连接销毁的目的。

现在我将叙述具体情景,时间轮上存储的是智能指针,其管理是Task对象的生命周期。假设一个Task对象它要执行的任务是断开连接,即它的析构函数存储的是对连接的断开操作。此时该连接有新的通信,(通过调用回调的方式,任意事件回调)那么我们就针对该连接的智能指针再创建一个智能指针,使得智能指针的计数器+1,并将对断开连接的操作放到时间轮的新位置,这样,当 时间轮走到旧的位置时,虽然仍然会释放连接,智能指针的计数-1,但由于计数未到0,不执行类的析构函数,达到刷新任务的目的,也就是我们说的非活跃连接销毁。

图示

具体实现

注意connection的智能指针 保存管理所有连接对应的shared_ptr对象 其实也是智能指针计数器为0 调用connection的析构 真正的对connection的释放 其实是在timerwheel完成的

2.为什么要有inloop系列函数

由于连接(也就是文件描述符)在线程间是共享的,也就是说任意一个线程都可以对eventloop所持有的连接做出操作,这显然是线程不安全的,例如eventloop上某一连接正在数据通信,而外面的某个线程却要关闭连接。

如何解决上述问题?

因为一个线程对应一个eventloop 所以只要将对连接的各种操作放在 eventloop内执行,就不会有线程安全问题,因为这保证了,任意时刻内,只要一个线程拥有该连接。

如何将对连接的各种操作放在 eventloop内执行?

在eventloop内设置一个任务池,将在线程外对连接执行的操作放入任务池里。

值得注意的是:

1.如何知道它是线程外的

eventloop本身就是和线程绑定的,其内部保存了它的线程id;相同,在外面对连接执行操作的线程也有自己的线程id,只有对二者相比较,就能知道它们是否是同一线程了

2.具体如何做到的,inloop系列函数登场

对连接的所有操作,都是由inloop系列函数完成的,例如发送数据sendinloop、释放连接releaseinloop,它们都以回调函数的形式,作为参数传递给eventloop的RunInLoop函数,并做第一步的工作,线程id相同,则执行任务,线程id不同,则压入任务队列,待切换到eventloop线程时再做处理

图示,以send为例

3.如果外面设置了业务处理线程池,就意味着对任务池的业务处理是在其他线程进行的,这时就要对任务池加锁了

3.eventloop中的eventfd

一种事件通知机制

当任务队列有了任务,然而epoll_wait却还在阻塞(epoll_wait上没有描述符就绪),这会导致任务池里的任务迟迟无法得到执行

如何解决呢?

每当向任务队列里放入了新任务,就往eventfd里写数据,这时eventfd读事件就绪了,epoll_wait也就不阻塞了(返回),然后就可以执行任务池的任务了

图示

目录
打赏
0
0
0
0
1
分享
相关文章
websocket协议介绍与基于reactor模型的websocket服务器实现
websocket协议介绍与基于reactor模型的websocket服务器实现
140 0
Linux中搭建主从DNS服务器
搭建主从DNS架构以提升DNS服务的高可用性、负载均衡和数据冗余。主服务器配置涉及编辑`/etc/named.conf`,设置监听IP和允许查询的范围,并定义主区域及允许的数据传输。从服务器配置需指定为奴隶类型,并指明主服务器的IP。测试表明正反向查询解析均正常。注意配置文件的语法正确性和权限设置。
270 0
服务器Linux系统配置mysql数据库主从自动备份
这是一个基本的配置主从复制和设置自动备份的指南。具体的配置细节和命令可能因您的环境和需求而有所不同,因此建议在操作前详细阅读MySQL文档和相关资源,并谨慎操作以避免数据丢失或不一致。
245 3
8月前
|
基于reactor模型的http服务器
基于reactor模型的http服务器
80 0
Linux C/C++ reactor模式下实现简易的web服务器
Linux C/C++ reactor模式下实现简易的web服务器
90 0
Mysql服务器线上配置主从同步
Mysql服务器线上配置主从同步
770 0
2.3 基于reactor的HTTP服务器实现
2.3 基于reactor的HTTP服务器实现
120 0
阿里云轻量应用服务器68元与云服务器99元和199元区别及选择参考
目前阿里云有三款特惠云服务器,第一款轻量云服务器2核2G68元一年,第二款经济型云服务器2核2G3M带宽99元1年,第三款通用算力型2核4G5M带宽199元一年。有的新手用户并不是很清楚他们之间的区别,因此不知道如何选择。本文来介绍一下它们之间的区别以及选择参考。
314 87
阿里云轻量应用服务器出新品通用型实例了,全球26个地域可选
近日,阿里云再度发力,推出了首款全新升级的轻量应用服务器——通用型实例。这款服务器实例不仅标配了200Mbps峰值公网带宽,更在计算、存储、网络等基础资源上进行了全面优化,旨在为中小企业和开发者提供更加轻量、易用、普惠的云计算服务,满足其对于通用计算小算力的迫切需求。目前,这款新品已在全球26个地域正式上线,为全球用户提供了更加便捷、高效的上云选择。
108 27

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等