目标:
我们要设计搭建主从Reactor服务器的组件,以便使用者利用这些组件更方便地搭建主从Reactor服务器
那么什么是主从Reactor服务器呢?
其实就是以主线程负责接收新连接,从属线程负责管理连接、接受数据、处理业务这种方式运行的服务器
图示:
本文讲解思路:
先列出服务器各模块的功能,再按照服务器实际运行场景,按照执行流梳理各模块发挥的作用。
注:为节省篇幅,我将源码链接放于下方,方便读者在梳理模块时对照阅读。
各模块的功能以及代码:
1.服务器相关模块:服务器模块的功能是对所有的连接以及线程进⾏管理
代码链接:主从Reactor模型/server.hpp · tt_02922/Linux_new - 码云 - 开源中国 (gitee.com)
- Buffer模块:用于接收socket缓冲区的数据,以便上层读取并处理
- Socket模块:封装了套接字编程的常用接口,方便后续复用
- Channel模块:设置事件对应的回调函数,事件就绪时调用对应的回调
- Poller模块:是对epoll系统调用接口的封装,用于对文件描述符添加/修改监控事件
- Connection模块:对Buffer模块,Socket模块,Channel模块的整体封装,实现了对⼀个通信套接字的整体的管理
- Acceptor模块:对Socket模块,Channel模块的整体封装,实现了对⼀个监听套接字的整体的管理
- TimerQueue模块:解决超时连接不释放、占用资源的问题,时间一到自动释放并执行任务
- EventLoop模块:对Poller模块,TimerQueue模块, Socket模块的⼀个整体封装,进⾏所有描述符的事件监控
- TcpServer模块:Tcp服务器模块,内部封装了Acceptor模块,EventLoopThreadPool模块
2.协议相关模块:协议模块是对当前的Reactor模型服务器提供应⽤层协议⽀持(我采用的时Http协议)
代码链接:主从Reactor模型/http.hpp · tt_02922/Linux_new - 码云 - 开源中国 (gitee.com)
- Util模块:协议模块所⽤到的⼀些⼯具函数
- HttpRequest模块:⽤于保存请求数据被解析后的各项请求元素信息
- HttpResponse模块:业务处理后设置并保存响应数据的的各项元素信息以待发送给 客户端
- HttpContext模块:用于保证请求接受的数据的完整性
- HttpServer模块:⽤于以简单的接⼝实现Http协议服务器的搭建
运行流程梳理:
实际的运行场景,当一个新连接到达服务器的时候,服务器双方要建立通信,随后服务器要对发来的数据做对应的处理,并将处理结果返回给服务器
1.Start from Main:
int main() { HttpServer server(8085); server.SetThreadCount(3); server.SetBaseDir(WWWROOT);//设置静态资源根目录,告诉服务器有静态资源请求到来,需要到哪里去找资源文件 server.Get("/hello", Hello); server.Post("/login", Login); server.Put("/1234.txt", PutFile); server.Delete("/1234.txt", DelFile); server.Listen(); return 0; }
(1)HttpServer server(8085);
DO:启动非活跃连接释放、设置连接就绪回调、设置业务处理回调
调用其构造函数
_server的类型是TcpServer,接下来分别调用_server的成员函数
作用分别是:启动非活跃连接释放、设置连接就绪回调、设置业务处理回调
HttpServer(int port, int timeout = DEFALT_TIMEOUT) : _server(port) { _server.EnableInactiveRelease(timeout); _server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); }
函数实现
void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }
void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }
void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }
ConnectedCallBack与MessageCallBack的类型
PtrConnection的类型
(2)关于bind与function
bind:函数模板,传入函数以及对应的参数,返回已经绑定好对应参数或预留好对应参数的函数对象
eg:绑定好对应参数
#include<functional> #include<iostream> int add(int a,int b) { return a+b; } int main() { auto cb=std::bind(add,1,2);//绑定add的参数为1,2并返回函数对象 std::cout<<cb()<<std::endl; }
eg:预留好对应参数
#include<functional> #include<iostream> int add(int a,int b) { return a+b; } int main() { auto cb=std::bind(add,std::placeholders::_1,std::placeholders::_2); //为add预留了2个参数,并返回函数对象,调用函数对象时需要显示的传参 std::cout<<cb(5,6)<<std::endl; }
function:模板类,用于接收可以调用的目标,例如函数、函数对象、lambda等
eg:
#include<functional> int test(int a){} std::function<int(int)> func=test;//用于接收返回值为int类型,参数为一个int的可调用对象
other:
using test1=std::function<int(int)>;//设置类型别名,即test1等价于function<int(int)>类型
OnConnected 连接就绪时要调用的回调
// 设置上下文 void OnConnected(const PtrConnection &conn) { conn->SetContext(HttpContext()); DBG_LOG("NEW CONNECTION %p", conn.get()); }
PtrConnection即智能指针管理的Connection对象
为什么要用智能指针管理Connection对象,后面会提
void SetContext(const Any &context) { _context = context; }
OnMessage
// 缓冲区数据解析+处理 void OnMessage(const PtrConnection &conn, Buffer *buffer) { while (buffer->ReadAbleSize() > 0) { // 1. 获取上下文 HttpContext *context = conn->GetContext()->get<HttpContext>(); // 2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象 // 1. 如果缓冲区的数据解析出错,就直接回复出错响应 // 2. 如果解析正常,且请求已经获取完毕,才开始去进行处理 context->RecvHttpRequest(buffer);//处理获取到的数据 HttpRequest &req = context->Request();//获取处理完毕的请求报文 HttpResponse rsp(context->RespStatu());//设置响应报文的状态码 if (context->RespStatu() >= 400)//接收数据时发生了错误 { // 进行错误响应,关闭连接 ErrorHandler(req, &rsp); // 填充一个错误显示页面数据到rsp中 WriteReponse(conn, req, rsp); // 组织响应发送给客户端 context->ReSet(); buffer->MoveReadOffset(buffer->ReadAbleSize()); // 出错了就把缓冲区数据清空 conn->Shutdown(); // 关闭连接 return; } if (context->RecvStatu() != RECV_HTTP_OVER) { // 当前请求还没有接收完整,则退出,等新数据到来再重新继续处理 如何重新进行处理?完全重来 return; } // 3. 请求路由 + 业务处理 Route(req, &rsp); // 4. 对HttpResponse进行组织发送 WriteReponse(conn, req, rsp); // 5. 重置上下文 context->ReSet(); // 6. 根据长短连接判断是否关闭连接或者继续处理 if (rsp.Close() == true) conn->Shutdown(); // 短链接则直接关闭 } return; }
(3)server.SetThreadCount(3);
DO:设置线程数量
调用接口:
_server内部调用的是_pool的接口,_pool的类型是eventloop
最终设置线程数量的接口
class LoopThreadPool {... int _thread_count;//要创建多少个线程,即要创建多少个eventloop ... void SetThreadCount(int count) { _thread_count = count; } ... };
(4)server.SetBaseDir(WWWROOT)
调用接口:
//stat 是一个函数调用 //函数原型 //int stat(const char *pathname, struct stat *statbuf) static bool IsDirectory(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if (ret < 0) { return false; } return S_ISDIR(st.st_mode); } void SetBaseDir(const std::string &path) { assert(Util::IsDirectory(path) == true);//判断路径是否为文件夹 _basedir = path; }
(5)设置各种请求的处理方法
//设置各种请求的处理方法 server.Get("/hello", Hello); server.Post("/login", Login); server.Put("/1234.txt", PutFile); server.Delete("/1234.txt", DelFile);
(1)以GET为例
//using Handler = std::function<void(const HttpRequest &, HttpResponse *)>; //using Handlers = std::vector<std::pair<std::regex, Handler>>; //Handlers _get_route; void Get(const std::string &pattern, const Handler &handler) { _get_route.push_back(std::make_pair(std::regex(pattern), handler)); }
(2)关于正则表达式与regex
正则表达式(regular expression)描述了⼀种字符串匹配的模式(pattern),可以⽤来检查⼀个串是否 含有某种⼦串、将匹配的⼦串替换或者从某个串中取出符合某个条件的⼦串等。
(6)server.Listen();
DO:启动服务器
void Start() { _pool.Create(); _baseloop.Start(); }
(1)Create
Create()
创建对应数量的线程,并在各线程内创建EventLoop,并启动线程内的EventLoop
这保证了One Thread One Loop,即一个线程一个事件处理循环 ,EventLoop生命周期随线程
//_threads类型:vector<LoopThread*> //_loops类型:vector<EventLoop*> void Create() { if (_thread_count > 0) { _threads.resize(_thread_count); _loops.resize(_thread_count); for (int i = 0; i < _thread_count; i++) { //one thread one loop _threads[i] = new LoopThread(); _loops[i] = _threads[i]->GetLoop(); } } return ; }
LoopThread()
//EventLoop *_loop; //std::thread _thread; LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
ThreadEntry()
void ThreadEntry() { EventLoop loop; { std::unique_lock<std::mutex> lock(_mutex);//加锁 _loop = &loop; _cond.notify_all(); } loop.Start(); }
GetLoop()
EventLoop *GetLoop() { EventLoop *loop = NULL; { std::unique_lock<std::mutex> lock(_mutex);//加锁 _cond.wait(lock, [&](){ return _loop != NULL; });//loop为NULL就一直阻塞 loop = _loop; } return loop; }
图示
(2)Start
Start()
获取新连接
事件处理
执行任务
注意:_baseloop与业务loop的不同
普通的loop现在正在阻塞在epoll_wait,因为此时还没新连接分配给loop,所以应该先看_baseloop.Start();
//三步走--事件监控->就绪事件处理->执行任务 void Start() { while(1) { //1. 事件监控 std::vector<Channel *> actives; _poller.Poll(&actives);//获取活跃连接 //2. 事件处理 for (auto &channel : actives) { channel->HandleEvent(); } //3. 执行任务 RunAllTask(); } }
//int _epfd; //struct epoll_event _evs[MAX_EPOLLEVENTS]; //std::unordered_map<int, Channel *> _channels;fd:channel void Poll(std::vector<Channel*> *active) { // int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout) int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); if (nfds < 0) { if (errno == EINTR) { return ; } ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno)); abort();//退出程序 } for (int i = 0; i < nfds; i++) { auto it = _channels.find(_evs[i].data.fd); assert(it != _channels.end()); it->second->SetREvents(_evs[i].events);//设置实际就绪的事件 active->push_back(it->second); } return; }
channel->HandleEvent();
void HandleEvent() { if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { /*不管任何事件,都调用的回调函数*/ if (_read_callback) _read_callback(); } /*有可能会释放连接的操作事件,一次只处理一个*/ if (_revents & EPOLLOUT) { if (_write_callback) _write_callback(); }else if (_revents & EPOLLERR) { if (_error_callback) _error_callback();//一旦出错,就会释放连接,因此要放到前边调用任意回调 }else if (_revents & EPOLLHUP) { if (_close_callback) _close_callback(); } if (_event_callback) _event_callback(); }
(7)服务器的正式启动——_baseloop.Start();
(1)_baseloop在初始化时被做了哪些处理?
创建监听套接字
创立baseloop与监听套接字的监听关系
设置监听套接字就绪时要调用的回调
TcpServer(int port): ... _acceptor(&_baseloop, port) _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1)); _acceptor.Listen();
_acceptor(&_baseloop, port)
传给_accept_callback
_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
_acceptor.Listen();
现在我们已知的是什么?
baseloop会监控监听套接字,当新连接到来时,也就是读事件就绪了,它会调用NewConnection这一回调,为新连接创建一个Connection对象,在这其中,分配一个EventLoop,以监控该连接,为新连接设置各种回调函数,以待事件就绪时调用
//上述步骤其实发生在 HttpServer server(8085);调用了_server的构造函数
(2)_baseloop.Start();
baseeloop上的监听套接字此时正在等待读事件就绪,也就是在等待新连接的到来
以上便是服务器的初始准备工作
(3)迎来连接时服务器如何工作
场景:epoll_wait队列上监听套接字读事件就绪了
std::unordered_map<int, Channel *> _channels;
epoll_wait不再阻塞,异常则报错退出,非异常则查找监听套接字对应的channel,并设置监听套接字实际的就绪事件,再将channel返回,baseloop得到channel后,就可以去执行监听套接字对应的读事件就绪处理回调,这个回调 即NewConnection
One fd one connection one eventloop(one poller fds) one channel one httpcontext
现在,随机的一个eventloop也有了baseloop为其分配的新连接了,接下来就该讨论,
(4)eventloop上的连接就绪后干什么
连接的就绪事件调用的回调是由用户指定的,以读事件就绪为例,当eventloop上有连接就绪时,且是读事件就绪,我们调用的就是HandleRead回调函数
(8)OnMessage是如何处理数据的?
要先知悉:http格式的请求报文与响应报文是什么样子的
处理流程:
(1)请求行处理
(2)请求头部处理
(3)请求正文处理
请求报文的数据解析工作可能成功(请求报文完整发送且数据正确,解析工作正确完成),也可能不成功(请求报文未发送完整或数据有错,解析工作无法完成),即解析工作能否完成与数据完整性与正确性有关
(4)请求报文完整且正确解析
进行如下工作
何为请求路由+业务处理
即通过对请求行的资源路径进行解析,判断其为静态资源请求(访问服务器上的某种资源,例如图片、音频)还是功能性请求(函数处理),根据不同的请求组织不同的响应
静态资源请求
判断是否为静态资源静态资源即在根目录下寻找相关的文件
1. 必须设置了静态资源根目录
2. 请求方法,必须是GET / HEAD请求方法
3. 请求的资源路径必须是一个合法路径
4. 请求的资源必须存在,且是一个普通文件
5.特殊处理,资源路径以/结尾,要在结尾加index.html
将根目录下对应资源写入响应报文的正文中
功能性请求
利用key:val模型,正则表达,处理函数存储起来。查找处理函数时利用正则表达式进行查询
不同的正则表达式对应着不同的处理函数,而处理函数是由用户来设置的 ,每个请求方法都有一张路由表,上面记录着正则表达式与处理函数的映射关系。
以GET方法为例,
调用对应的处理函数后,再将函数处理结果接入响应报文的正文之中,就可以按照响应报文的格式组织发送了,发送完毕后,重置该连接的上下文,并根据其长短连接属性决定是否断开连接
注意:对于connection的所有操作,都要放到对应的eventloop里执行,以保证线程安全,后面会提为什么
(5)请求报文不完整或解析数据不正确
如果是数据不完整,那么只可能是请求行不完整、请求头部不完整、请求正文不完整,任一部分的缺少都无法进入到请求路由+业务处理阶段,因此程序只能返回,等待新数据到来,再在原有进度继续执行。
如果是数据不正确,那么就会给响应报文一个错误的状态码,并组织一个错误的页面返回
(9)断开连接
我们的服务器默认使用非活跃连接销毁的断开方式
为何非活跃连接销毁呢?如果该连接在指定时间内没有进行通信,就认为他是不活跃的,那么服务器就应该主动断开该连接,以避免资源浪费。例如,若连接A在10s内没有发送通信,那么在10s后服务器就主动断开与A的连接。反之如果他在10s内有通信,那么对该连接的断开操作就放在20s后,依次类推
(1)非活跃连接销毁的原理
时间轮+智能指针
所谓时间轮,其实就是一个二维数组,每访问一个下标元素,就执行该元素上所有任务
图示
a.如何实现,每访问一个时间轮的下标,就执行该下标上的任务呢?
利用类的析构函数,类对象销毁时会自动调用类的析构函数,所以我们只要将任务封装成类,当时间轮访问对应下标时,就销毁该下标上的所有任务对象,就可以自动执行任务了。
图示
b.那么这个时间轮要如何动起来呢?
什么叫做让时间轮动起来?就是让时间轮依次释放每一下标对应对象的操作
使用timerfd,Linux提供的定时器,其功能是在指定时间后向timerfd内写入8字节数据,以告诉我们超时了多少次。我们将timerfd投入到epoll队列中,并设置好时间就绪的回调函数,每当定时器超时(也就是事件就绪),就执行我们让时间轮动起来的回调函数
例如,我们设置了一个首次超时时间为1s,之后超时时间为1s的定时器,意思就是每一秒向timerfd内写入一个8字节数据,如果我们2s读取timerfd的数据,就会读到一个2,表示超时了2s,如果4s后读取timerfd数据,就会读取到4,表示超时了4次
图示
c.在时间轮的基础之上,再加上智能指针,就可以达到非活跃连接销毁的目的。
现在我将叙述具体情景,时间轮上存储的是智能指针,其管理是Task对象的生命周期。假设一个Task对象它要执行的任务是断开连接,即它的析构函数存储的是对连接的断开操作。此时该连接有新的通信,(通过调用回调的方式,任意事件回调)那么我们就针对该连接的智能指针再创建一个智能指针,使得智能指针的计数器+1,并将对断开连接的操作放到时间轮的新位置,这样,当 时间轮走到旧的位置时,虽然仍然会释放连接,智能指针的计数-1,但由于计数未到0,不执行类的析构函数,达到刷新任务的目的,也就是我们说的非活跃连接销毁。
图示
具体实现
注意connection的智能指针 保存管理所有连接对应的shared_ptr对象 其实也是智能指针计数器为0 调用connection的析构 真正的对connection的释放 其实是在timerwheel完成的
2.为什么要有inloop系列函数
由于连接(也就是文件描述符)在线程间是共享的,也就是说任意一个线程都可以对eventloop所持有的连接做出操作,这显然是线程不安全的,例如eventloop上某一连接正在数据通信,而外面的某个线程却要关闭连接。
如何解决上述问题?
因为一个线程对应一个eventloop 所以只要将对连接的各种操作放在 eventloop内执行,就不会有线程安全问题,因为这保证了,任意时刻内,只要一个线程拥有该连接。
如何将对连接的各种操作放在 eventloop内执行?
在eventloop内设置一个任务池,将在线程外对连接执行的操作放入任务池里。
值得注意的是:
1.如何知道它是线程外的
eventloop本身就是和线程绑定的,其内部保存了它的线程id;相同,在外面对连接执行操作的线程也有自己的线程id,只有对二者相比较,就能知道它们是否是同一线程了
2.具体如何做到的,inloop系列函数登场
对连接的所有操作,都是由inloop系列函数完成的,例如发送数据sendinloop、释放连接releaseinloop,它们都以回调函数的形式,作为参数传递给eventloop的RunInLoop函数,并做第一步的工作,线程id相同,则执行任务,线程id不同,则压入任务队列,待切换到eventloop线程时再做处理
图示,以send为例
3.如果外面设置了业务处理线程池,就意味着对任务池的业务处理是在其他线程进行的,这时就要对任务池加锁了
3.eventloop中的eventfd
一种事件通知机制
当任务队列有了任务,然而epoll_wait却还在阻塞(epoll_wait上没有描述符就绪),这会导致任务池里的任务迟迟无法得到执行
如何解决呢?
每当向任务队列里放入了新任务,就往eventfd里写数据,这时eventfd读事件就绪了,epoll_wait也就不阻塞了(返回),然后就可以执行任务池的任务了
图示