基于Reactor模式的高性能网络库之线程池组件设计篇

简介: EventLoopThreadPool 是 Reactor 模式中实现“一个主线程 + 多个工作线程”的关键组件,用于高效管理多个 EventLoop 并在多核 CPU 上分担高并发 I/O 压力。通过封装 Thread 类和 EventLoopThread,实现线程创建、管理和事件循环的调度,形成线程池结构。每个 EventLoopThread 管理一个子线程与对应的 EventLoop(subloop),主线程(base loop)通过负载均衡算法将任务派发至各 subloop,从而提升系统性能与并发处理能力。

EventLoopThreadPool 是 Reactor 模式下,实现“一个主线程 + 多个工作线程”的关键组件,用于高效管理多个 EventLoop 并在多核CPU上分担高并发IO压力。


底层线程封装Thread,Thread 类封装了线程创建、启动、join、detach、命名、tid 获取等功能,是一个更高级、更安全的线程封装类。

成员变量


    bool  started_;//表示线程是否已经启动(start() 之后为 true)。
    bool  joined_;//表示是否已经调用了 join() 等待该线程结束
    std::shared_ptr<std::thread>thread_;//封装的 C++11 标准库线程对象,使用智能指针管理其生命周期
    pid_t  tid_;//存储线程的操作系统级线程 ID(Thread ID)
    ThreadFunc  func_;//ThreadFunc 是一个类型别名(见下方),表示线程执行的函数
    std::string  name_;//当前线程的名字,比如 "IOThread"、"Worker1" 等。
    static   std::atomic_int numCreated_;//线程数量  
    using  ThreadFunc=std::function<void()>;//定义了一个函数类型别名 ThreadFunc,代表接收 0 个参数、无返回值的可调用对象

成员方法

bool   started()const{return   started_;}

    pid_t   tid()const{return  tid_;}

    const std::string&name(){return  name_;}

    static   int  numCreated(){return  numCreated_.load();}
    //静态函数,返回当前创建的线程数量



    std::atomic_int Thread::numCreated_{0};
 //线程数量


Thread::Thread(ThreadFunc  func,const  std::string &name)
:started_(false)
,joined_(false)
,tid_(0)//线程 ID 默认初始化为 0。
,func_(std::move(func))//将传入的函数对象移动赋值给成员变量,避免复制开销
,name_(name)//:将传入的线程名字拷贝给成员变量 name_。
    //explicit 关键字用于 构造函数或转换函数前,其作用是:禁止编译器执行隐式类型转换,只能显式调用。
    {
        setDefaultName();//如果 name_ 是空字符串,setDefaultName() 会自动设置默认线程名
    }



 Thread::~Thread()//处理线程资源,防止程序崩溃或内存泄漏。


    {
        if(started_&&!joined_)//如果线程启动了但没被 join(),程序结束时会崩溃
        {
            thread_->detach();//thread类提供的设置分离线程的方法
          //将当前线程与 std::thread 对象分离,让线程自己在后台运行完并自动释放资源。
        }
    }




    void Thread::start()//启动一个新线程并获取该线程的 tid。
    {
        started_=true;//标记当前线程已启动。
        sem_t sem;//创建一个局部信号量 sem。
        sem_init(&sem,false,0);//第二个参数为 false(或 0),表示它是线程间共享的信号量(不是多进程)。第三个参数 0 是初始值,也就是说一开始信号量为 0,sem_wait 会阻塞等待
       //用 shared_ptr 管理 std::thread 对象的生命周期。
        thread_=std::shared_ptr<std::thread>(new std::thread([&](){
            tid_= CurrentThread::tid();//获取线程tid值
            sem_post(&sem);//通知主线程,线程 ID 设置完毕,sem_post() 是对信号量加 1
            //主线程在后面 sem_wait() 中等待这个信号。
            //意思是:“我新线程 tid 设置好了,主线程你可以继续执行了”。
            func_();//正式执行用户传进来的线程任务函数。
        }));
        //这里必须等待上面新创建的线程的tid
        sem_wait(&sem);//主线程必须等待子线程把 tid_ 设置好,再继续执行。
    }



      void   Thread::join()//让 主线程等待子线程执行完成,会 阻塞当前线程(主线程),直到目标线程执行完。
    {
        joined_=true;
        thread_->join();
    }



   void   Thread::setDefaultName()//自动为线程生成形如 "Thread1", "Thread2" 的名字
   {
      int num=++numCreated_;//把增加后的数字赋给 num,作为线程编号
      if(name_.empty())//如果当前线程对象的名字为空(说明用户没有指定名字),就进入下面的逻辑
      {
        char buf[32]={0};
        snprintf(buf,sizeof  buf,"Thread%d",num);
        name_=buf;
      }
   }

EventLoopThread封装

成员变量

 EventLoop  *loop_;//指向该线程中创建的 EventLoop 对象
    bool   exiting_;//是否退出循环
    Thread   thread_;//自己封装的 Thread 类对象。
    std::mutex  mutex_;//用于线程同步保护 loop_ 等共享资源,避免竞态条件。
    std::condition_variable  cond_;//条件变量,配合 mutex_ 使用,用来等待某些事件(如 loop_ 初始化完成)再继续执行
    using  ThreadInitCallback=std::function<void(EventLoop*)>;//声明一个函数类型别名,用于指定线程启动时对 EventLoop 做一些自定义初始化操作。
    ThreadInitCallback  callback_;//保存用户传入的线程初始化回调函数

成员方法

//:一个回调函数,用于在线程中初始化 EventLoop
//线程的名称(用于调试,日志等)
EventLoopThread::EventLoopThread(const ThreadInitCallback  &cb,
    const   std::string&name):
    loop_(nullptr),         //初始化成员变量 loop_ 为 nullptr
    exiting_(false),        //标志线程是否正在退出,初始化为 false。
    thread_(std::bind(&EventLoopThread::threadFunc,this),name),
    mutex_(),
    cond_(),
    callback_(cb)//将用户传入的 ThreadInitCallback 赋值给初始化回调callback_
    {

    }
//这里传入EventLoopThread类的threadFunc()函数作为底层新线程启动的线程函数



    //优雅地关闭子线程中运行的 EventLoop,释放资源,防止程序崩溃或资源泄漏。
    EventLoopThread::~EventLoopThread()
    {
            exiting_=true;//设置退出标志
            if(loop_!=nullptr)//如果 loop_ 不为空,说明子线程中的 EventLoop 已经初始化并运行
            {
                loop_->quit();//调用 EventLoop::quit() 方法,告诉事件循环线程:“你可以安全退出了”。
                thread_.join();//等待子线程执行完毕并安全退出
            } 
    }



//在主线程中启动一个子线程,并等待子线程中 EventLoop 创建完成后,返回其指针地址,保证线程之间的同步
EventLoop*EventLoopThread::startLoop()
    {
        thread_.start();//启动底层的新线程,执行的是EventLoopThread::threadFunc
//threadFunc() 中会创建 EventLoop 并赋值给成员变量 loop_,然后通过条件变量通知主线程。



        EventLoop *loop=nullptr;//本地临时变量,用于接收 loop_ 成员变量的值并最终返回
        {
            std::unique_lock<std::mutex>lock(mutex_);//上锁,保护 loop_ 的读写(threadFunc 也会写入 loop_,存在竞争风险)
            while(loop_==nullptr)//子线程还未启动loop
            {
                cond_.wait(lock);// // 主线程阻塞,等待子线程设置 loop_ 并通知
            }
            loop=loop_;// 获取创建好的 EventLoop 指针(子线程创建的)
        }
        return loop;//子线程中创建的 subloop
    }



      //这个方法是在单独得新线程里面运行的(线程函数)
    void   EventLoopThread::threadFunc()
    {
        EventLoop loop;//在子线程栈上创建一个独立的Eventloop(subloop),和上面得线程是一一对应了,one loop  per   thread
        if(callback_)//如果创建 EventLoopThread 时用户传入了初始化回调 callback_,就调用它
        {
            callback_(&loop);
        }
        {
            std::unique_lock<std::mutex>lock(mutex_);
            loop_=&loop;//把subloop的地址给mainloop(成员变量)
            cond_.notify_one();//并通过条件变量 cond_ 通知主线程:子线程的subLoop已经就绪
        }
        loop.loop();     //EventLoop  loop=>Poller->poll,subloop 的事件循环。
        std::unique_lock<std::mutex>lock(mutex_);//当 loop.loop() 退出(即事件循环结束、调用了 quit())后,清空指针,表示 loop 已经销毁。
            loop_=nullptr;

    }

startloop会发生那些事?

首先会创建一个Eventloopthread对象(主线程),来执行startloop()方法,从而去创建一个新线程,然后执行线程函数,创建一个subloop,这时就有了一个subthread+subloop,然后把subloop的地址给主线程的一个loops数组,这时subloop就开始在自己的线程里干活了(loop.loop),多个Eventloopthread对象就会形成线程池,管理多个subloop+thread

最终效果:

  • 每个 EventLoopThread 对象:
  • 包含一个子线程
  • 子线程中运行一个 EventLoop(即 subloop)
  • 主线程通过 startLoop() 获取每个 subloop 的指针,加入 loops_ 数组
  • 这些 subloop + thread 组成了线程池,由主线程(base loop)来协调连接分发。


线程池封装

//线程池中只有 subloop(子事件循环),不包括 baseloop(主事件循环)

成员变量

  using   ThreadInitCallback=std::function<void(EventLoop*)>;//线程池中每个 EventLoopThread 在新线程中创建 EventLoop 时,可以调用一个初始化回调函数
 EventLoop *baseLoop_;  //主线程中的 EventLoop(主反应堆),也叫 baseloop
    std::string name_;//线程池的名字,用于标识线程池或日志记录。
    bool  started_;//标志位,表示线程池是否已经启动(是否已经调用 start())。
    int  numThreads_;//线程池中子线程(subloop)的数量。
    int  next_;//这是一个轮询索引变量,用于实现轮询负载均衡分发机制(Round-Robin):
    std::vector<std::unique_ptr<EventLoopThread>>threads_;//这是线程池中的 EventLoopThread 对象数组
    std::vector<EventLoop*>loops_;//这是保存线程池中所有 subloop 的指针的数组:
  

成员方法


     bool started()const{return started_;}//返回当前线程池是否已经启动。

     const  std::string name()const{return  name_;}//获取线程池的名字(通常用于日志、调试)。

    
     void  setThreadNum(int numThread){numThreads_=numThread;}//设置线程池中 subLoop 的线程数量

//构造函数
     EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop,const std::string &nameArg)
    :baseLoop_(baseLoop),//初始化主线程的 EventLoop,通常这个 loop 会监听新连接并分发给 subloop。即 EventLoopThreadPool 不负责创建 baseLoop,它是由调用者传入的
    name_(nameArg),//设置线程池名称
    started_(false),//标记线程池是否启动
    numThreads_(0),//表示线程池中包含的线程数量(不包括 baseLoop),默认为 0。
    next_(0)//这是用于 轮询分发连接的下标,当有新连接到达时,使用 getNextLoop() 方法从 loops_ 中轮转选出一个 subloop。
    {
    }



//为什么析构函数中什么也没写?
//因为 EventLoopThreadPool 中的线程对象(EventLoopThread)都是通过 std::unique_ptr 管理的
//subloop 是线程局部的栈对象,线程结束自动销毁
    EventLoopThreadPool::~EventLoopThreadPool()
    {
       //不需要关注 loop_是栈上
    }


//启动线程池中的所有 EventLoopThread对象来创建 子线程+subloop,并获取每个子线程中的 (subloop)对象
     void  EventLoopThreadPool::start(const  ThreadInitCallback&cb)
     {
        started_=true;//标记线程池已启动,防止重复调用。
        for(int i=0;i<numThreads_;++i)
        {
             char buf[name_.size()+32];
        snprintf(buf,sizeof buf,"%s%d",name_.c_str(),i);//为每个线程起一个名字,例如 "IOThread0"、"IOThread1"……方便日志和调试。
        EventLoopThread *t=new EventLoopThread(cb,buf);//创建一个新的 EventLoopThread 对象。每个对象会负责创建新线程+ ( subloop)
        threads_.push_back(std::unique_ptr<EventLoopThread>(t));//将 EventLoopThread* 包装进 unique_ptr,并加入线程池容器中管理。自动管理生命周期,不需要手动释放
        loops_.push_back(t->startLoop());//保存所有创建的subloop
        }
        //整个服务端只有一个线程,运行着baseLoop ,:单线程模式

        if(numThreads_==0&&cb)
        {
            cb(baseLoop_);
        }
     }


   //按顺序轮询返回下一个 subloop,用于负载均衡地分发新连接或任务
     EventLoop*EventLoopThreadPool::getNextLoop()//这个函数用于在线程池中按顺序选择下一个 EventLoop 来处理新的任务或连接(典型用法:多线程服务器中新连接分发到不同线程)。
     {
        EventLoop*loop=baseLoop_;//默认使用主线程中的 baseLoop

        if(!loops_.empty())//如果线程池中存在多个子线程(即有多个 subloop)
        {
            loop =loops_[next_];//从 loops_ 数组中取出第 next_ 个 subloop,这是这一次返回的对象。
            ++next_;//自增索引,实现「轮询」。
            if(next_>=loops_.size())//如果到了最后一个 loop,就从头开始循环。
            {
                next_=0;
            }
        }
        return  loop;//返回选择的subloop
     }

     //这个函数经常在 新连接到来时 被调用:
    //// 在 Acceptor 接收新连接后
EventLoop* subLoop = threadPool->getNextLoop();
subLoop->runInLoop(std::bind(...));  // 把连接交给对应 subloop 来管理




     std::vector<EventLoop*>EventLoopThreadPool::getAllLoops()//是用于获取 线程池中所有的subloop
     {
        if(loops_.empty())//线程池为空
        {
            return std::vector<EventLoop*>(1,baseLoop_);    //返回一个只包含一个元素 baseLoop_ 的 vector。  
        }
        else
        {
            return  loops_;//返回subloop数组
        }
     }

     

    

程池就是先有一个主线程,运行着baseloop,然后for循环,使用智能指针创建numthread个Eventloopthread对象,每个Eventloopthread对象执行startloop()方法,从而去创建一个新线程,然后执行线程函数,创建一个subloop,这时就有了一个subthread+subloop,可以说每个Eventloopthread对象管理一个subthread+subloop,然后把所有的subloop收集起来放到loops数组里,主线程baseloop需要时,通过负载均衡算法在loops数组里选择一个subloop派发任务

如何派发?就有了subloop->runinloop()方法

相关文章
|
3月前
|
JavaScript
Vue中Axios网络请求封装-企业最常用封装模式
本教程介绍如何安装并配置 Axios 实例,包含请求与响应拦截器,实现自动携带 Token、错误提示及登录状态管理,适用于 Vue 项目。
150 1
|
5月前
|
网络协议 算法 Java
基于Reactor模型的高性能网络库之Tcpserver组件-上层调度器
TcpServer 是一个用于管理 TCP 连接的类,包含成员变量如事件循环(EventLoop)、连接池(ConnectionMap)和回调函数等。其主要功能包括监听新连接、设置线程池、启动服务器及处理连接事件。通过 Acceptor 接收新连接,并使用轮询算法将连接分配给子事件循环(subloop)进行读写操作。调用链从 start() 开始,经由线程池启动和 Acceptor 监听,最终由 TcpConnection 管理具体连接的事件处理。
231 2
|
5月前
基于Reactor模式的高性能网络库github地址
https://github.com/zyi30/reactor-net.git
152 0
|
3月前
|
安全 测试技术 虚拟化
VMware-三种网络模式原理
本文介绍了虚拟机三种常见网络模式(桥接模式、NAT模式、仅主机模式)的工作原理与适用场景。桥接模式让虚拟机如同独立设备接入局域网;NAT模式共享主机IP,适合大多数WiFi环境;仅主机模式则构建封闭的内部网络,适用于测试环境。内容简明易懂,便于理解不同模式的优缺点与应用场景。
504 0
|
4月前
|
机器学习/深度学习 算法 数据库
基于GoogleNet深度学习网络和GEI步态能量提取的步态识别算法matlab仿真,数据库采用CASIA库
本项目基于GoogleNet深度学习网络与GEI步态能量图提取技术,实现高精度步态识别。采用CASI库训练模型,结合Inception模块多尺度特征提取与GEI图像能量整合,提升识别稳定性与准确率,适用于智能安防、身份验证等领域。
|
2月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
184 6
|
5月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
320 83
|
2月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
325 0
|
3月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
291 16
|
7月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
290 0

热门文章

最新文章