EventLoopThreadPool 是 Reactor 模式下,实现“一个主线程 + 多个工作线程”的关键组件,用于高效管理多个 EventLoop 并在多核CPU上分担高并发IO压力。
底层线程封装Thread,Thread 类封装了线程创建、启动、join、detach、命名、tid 获取等功能,是一个更高级、更安全的线程封装类。
成员变量
bool started_;//表示线程是否已经启动(start() 之后为 true)。 bool joined_;//表示是否已经调用了 join() 等待该线程结束 std::shared_ptr<std::thread>thread_;//封装的 C++11 标准库线程对象,使用智能指针管理其生命周期 pid_t tid_;//存储线程的操作系统级线程 ID(Thread ID) ThreadFunc func_;//ThreadFunc 是一个类型别名(见下方),表示线程执行的函数 std::string name_;//当前线程的名字,比如 "IOThread"、"Worker1" 等。 static std::atomic_int numCreated_;//线程数量 using ThreadFunc=std::function<void()>;//定义了一个函数类型别名 ThreadFunc,代表接收 0 个参数、无返回值的可调用对象
成员方法
bool started()const{return started_;} pid_t tid()const{return tid_;} const std::string&name(){return name_;} static int numCreated(){return numCreated_.load();} //静态函数,返回当前创建的线程数量 std::atomic_int Thread::numCreated_{0}; //线程数量 Thread::Thread(ThreadFunc func,const std::string &name) :started_(false) ,joined_(false) ,tid_(0)//线程 ID 默认初始化为 0。 ,func_(std::move(func))//将传入的函数对象移动赋值给成员变量,避免复制开销 ,name_(name)//:将传入的线程名字拷贝给成员变量 name_。 //explicit 关键字用于 构造函数或转换函数前,其作用是:禁止编译器执行隐式类型转换,只能显式调用。 { setDefaultName();//如果 name_ 是空字符串,setDefaultName() 会自动设置默认线程名 } Thread::~Thread()//处理线程资源,防止程序崩溃或内存泄漏。 { if(started_&&!joined_)//如果线程启动了但没被 join(),程序结束时会崩溃 { thread_->detach();//thread类提供的设置分离线程的方法 //将当前线程与 std::thread 对象分离,让线程自己在后台运行完并自动释放资源。 } } void Thread::start()//启动一个新线程并获取该线程的 tid。 { started_=true;//标记当前线程已启动。 sem_t sem;//创建一个局部信号量 sem。 sem_init(&sem,false,0);//第二个参数为 false(或 0),表示它是线程间共享的信号量(不是多进程)。第三个参数 0 是初始值,也就是说一开始信号量为 0,sem_wait 会阻塞等待 //用 shared_ptr 管理 std::thread 对象的生命周期。 thread_=std::shared_ptr<std::thread>(new std::thread([&](){ tid_= CurrentThread::tid();//获取线程tid值 sem_post(&sem);//通知主线程,线程 ID 设置完毕,sem_post() 是对信号量加 1 //主线程在后面 sem_wait() 中等待这个信号。 //意思是:“我新线程 tid 设置好了,主线程你可以继续执行了”。 func_();//正式执行用户传进来的线程任务函数。 })); //这里必须等待上面新创建的线程的tid sem_wait(&sem);//主线程必须等待子线程把 tid_ 设置好,再继续执行。 } void Thread::join()//让 主线程等待子线程执行完成,会 阻塞当前线程(主线程),直到目标线程执行完。 { joined_=true; thread_->join(); } void Thread::setDefaultName()//自动为线程生成形如 "Thread1", "Thread2" 的名字 { int num=++numCreated_;//把增加后的数字赋给 num,作为线程编号 if(name_.empty())//如果当前线程对象的名字为空(说明用户没有指定名字),就进入下面的逻辑 { char buf[32]={0}; snprintf(buf,sizeof buf,"Thread%d",num); name_=buf; } }
EventLoopThread封装
成员变量
EventLoop *loop_;//指向该线程中创建的 EventLoop 对象 bool exiting_;//是否退出循环 Thread thread_;//自己封装的 Thread 类对象。 std::mutex mutex_;//用于线程同步保护 loop_ 等共享资源,避免竞态条件。 std::condition_variable cond_;//条件变量,配合 mutex_ 使用,用来等待某些事件(如 loop_ 初始化完成)再继续执行 using ThreadInitCallback=std::function<void(EventLoop*)>;//声明一个函数类型别名,用于指定线程启动时对 EventLoop 做一些自定义初始化操作。 ThreadInitCallback callback_;//保存用户传入的线程初始化回调函数
成员方法
//:一个回调函数,用于在线程中初始化 EventLoop //线程的名称(用于调试,日志等) EventLoopThread::EventLoopThread(const ThreadInitCallback &cb, const std::string&name): loop_(nullptr), //初始化成员变量 loop_ 为 nullptr exiting_(false), //标志线程是否正在退出,初始化为 false。 thread_(std::bind(&EventLoopThread::threadFunc,this),name), mutex_(), cond_(), callback_(cb)//将用户传入的 ThreadInitCallback 赋值给初始化回调callback_ { } //这里传入EventLoopThread类的threadFunc()函数作为底层新线程启动的线程函数 //优雅地关闭子线程中运行的 EventLoop,释放资源,防止程序崩溃或资源泄漏。 EventLoopThread::~EventLoopThread() { exiting_=true;//设置退出标志 if(loop_!=nullptr)//如果 loop_ 不为空,说明子线程中的 EventLoop 已经初始化并运行 { loop_->quit();//调用 EventLoop::quit() 方法,告诉事件循环线程:“你可以安全退出了”。 thread_.join();//等待子线程执行完毕并安全退出 } } //在主线程中启动一个子线程,并等待子线程中 EventLoop 创建完成后,返回其指针地址,保证线程之间的同步 EventLoop*EventLoopThread::startLoop() { thread_.start();//启动底层的新线程,执行的是EventLoopThread::threadFunc //threadFunc() 中会创建 EventLoop 并赋值给成员变量 loop_,然后通过条件变量通知主线程。 EventLoop *loop=nullptr;//本地临时变量,用于接收 loop_ 成员变量的值并最终返回 { std::unique_lock<std::mutex>lock(mutex_);//上锁,保护 loop_ 的读写(threadFunc 也会写入 loop_,存在竞争风险) while(loop_==nullptr)//子线程还未启动loop { cond_.wait(lock);// // 主线程阻塞,等待子线程设置 loop_ 并通知 } loop=loop_;// 获取创建好的 EventLoop 指针(子线程创建的) } return loop;//子线程中创建的 subloop } //这个方法是在单独得新线程里面运行的(线程函数) void EventLoopThread::threadFunc() { EventLoop loop;//在子线程栈上创建一个独立的Eventloop(subloop),和上面得线程是一一对应了,one loop per thread if(callback_)//如果创建 EventLoopThread 时用户传入了初始化回调 callback_,就调用它 { callback_(&loop); } { std::unique_lock<std::mutex>lock(mutex_); loop_=&loop;//把subloop的地址给mainloop(成员变量) cond_.notify_one();//并通过条件变量 cond_ 通知主线程:子线程的subLoop已经就绪 } loop.loop(); //EventLoop loop=>Poller->poll,subloop 的事件循环。 std::unique_lock<std::mutex>lock(mutex_);//当 loop.loop() 退出(即事件循环结束、调用了 quit())后,清空指针,表示 loop 已经销毁。 loop_=nullptr; }
startloop会发生那些事?
首先会创建一个Eventloopthread对象(主线程),来执行startloop()方法,从而去创建一个新线程,然后执行线程函数,创建一个subloop,这时就有了一个subthread+subloop,然后把subloop的地址给主线程的一个loops数组,这时subloop就开始在自己的线程里干活了(loop.loop),多个Eventloopthread对象就会形成线程池,管理多个subloop+thread
最终效果:
- 每个
EventLoopThread对象:
- 包含一个子线程
- 子线程中运行一个
EventLoop(即 subloop)
- 主线程通过
startLoop()获取每个 subloop 的指针,加入loops_数组 - 这些
subloop + thread组成了线程池,由主线程(base loop)来协调连接分发。
线程池封装
//线程池中只有 subloop(子事件循环),不包括 baseloop(主事件循环)
成员变量
using ThreadInitCallback=std::function<void(EventLoop*)>;//线程池中每个 EventLoopThread 在新线程中创建 EventLoop 时,可以调用一个初始化回调函数 EventLoop *baseLoop_; //主线程中的 EventLoop(主反应堆),也叫 baseloop std::string name_;//线程池的名字,用于标识线程池或日志记录。 bool started_;//标志位,表示线程池是否已经启动(是否已经调用 start())。 int numThreads_;//线程池中子线程(subloop)的数量。 int next_;//这是一个轮询索引变量,用于实现轮询负载均衡分发机制(Round-Robin): std::vector<std::unique_ptr<EventLoopThread>>threads_;//这是线程池中的 EventLoopThread 对象数组 std::vector<EventLoop*>loops_;//这是保存线程池中所有 subloop 的指针的数组:
成员方法
bool started()const{return started_;}//返回当前线程池是否已经启动。 const std::string name()const{return name_;}//获取线程池的名字(通常用于日志、调试)。 void setThreadNum(int numThread){numThreads_=numThread;}//设置线程池中 subLoop 的线程数量 //构造函数 EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop,const std::string &nameArg) :baseLoop_(baseLoop),//初始化主线程的 EventLoop,通常这个 loop 会监听新连接并分发给 subloop。即 EventLoopThreadPool 不负责创建 baseLoop,它是由调用者传入的 name_(nameArg),//设置线程池名称 started_(false),//标记线程池是否启动 numThreads_(0),//表示线程池中包含的线程数量(不包括 baseLoop),默认为 0。 next_(0)//这是用于 轮询分发连接的下标,当有新连接到达时,使用 getNextLoop() 方法从 loops_ 中轮转选出一个 subloop。 { } //为什么析构函数中什么也没写? //因为 EventLoopThreadPool 中的线程对象(EventLoopThread)都是通过 std::unique_ptr 管理的 //subloop 是线程局部的栈对象,线程结束自动销毁 EventLoopThreadPool::~EventLoopThreadPool() { //不需要关注 loop_是栈上 } //启动线程池中的所有 EventLoopThread对象来创建 子线程+subloop,并获取每个子线程中的 (subloop)对象 void EventLoopThreadPool::start(const ThreadInitCallback&cb) { started_=true;//标记线程池已启动,防止重复调用。 for(int i=0;i<numThreads_;++i) { char buf[name_.size()+32]; snprintf(buf,sizeof buf,"%s%d",name_.c_str(),i);//为每个线程起一个名字,例如 "IOThread0"、"IOThread1"……方便日志和调试。 EventLoopThread *t=new EventLoopThread(cb,buf);//创建一个新的 EventLoopThread 对象。每个对象会负责创建新线程+ ( subloop) threads_.push_back(std::unique_ptr<EventLoopThread>(t));//将 EventLoopThread* 包装进 unique_ptr,并加入线程池容器中管理。自动管理生命周期,不需要手动释放 loops_.push_back(t->startLoop());//保存所有创建的subloop } //整个服务端只有一个线程,运行着baseLoop ,:单线程模式 if(numThreads_==0&&cb) { cb(baseLoop_); } } //按顺序轮询返回下一个 subloop,用于负载均衡地分发新连接或任务 EventLoop*EventLoopThreadPool::getNextLoop()//这个函数用于在线程池中按顺序选择下一个 EventLoop 来处理新的任务或连接(典型用法:多线程服务器中新连接分发到不同线程)。 { EventLoop*loop=baseLoop_;//默认使用主线程中的 baseLoop if(!loops_.empty())//如果线程池中存在多个子线程(即有多个 subloop) { loop =loops_[next_];//从 loops_ 数组中取出第 next_ 个 subloop,这是这一次返回的对象。 ++next_;//自增索引,实现「轮询」。 if(next_>=loops_.size())//如果到了最后一个 loop,就从头开始循环。 { next_=0; } } return loop;//返回选择的subloop } //这个函数经常在 新连接到来时 被调用: //// 在 Acceptor 接收新连接后 EventLoop* subLoop = threadPool->getNextLoop(); subLoop->runInLoop(std::bind(...)); // 把连接交给对应 subloop 来管理 std::vector<EventLoop*>EventLoopThreadPool::getAllLoops()//是用于获取 线程池中所有的subloop { if(loops_.empty())//线程池为空 { return std::vector<EventLoop*>(1,baseLoop_); //返回一个只包含一个元素 baseLoop_ 的 vector。 } else { return loops_;//返回subloop数组 } }
程池就是先有一个主线程,运行着baseloop,然后for循环,使用智能指针创建numthread个Eventloopthread对象,每个Eventloopthread对象执行startloop()方法,从而去创建一个新线程,然后执行线程函数,创建一个subloop,这时就有了一个subthread+subloop,可以说每个Eventloopthread对象管理一个subthread+subloop,然后把所有的subloop收集起来放到loops数组里,主线程baseloop需要时,通过负载均衡算法在loops数组里选择一个subloop派发任务
如何派发?就有了subloop->runinloop()方法