基于Reactor模型的高性能网络库之核心调度器:EventLoop组件

简介: 它负责:监听事件(如 I/O 可读写、定时器)、分发事件、执行回调、管理事件源 Channel 等。

它负责:监听事件(如 I/O 可读写、定时器)、分发事件、执行回调、管理事件源 Channel 等。  

1. 负责 I/O 多路复用(epoll 等)事件的监听与分发

  • 持有一个 Poller(如 EpollPoller)对象。
  • 循环调用 poll() 等待所有注册的 Channel 的事件(如:可读、可写、关闭等)。
  • 事件触发后,调用 Channel::handleEvent(),由 Channel 触发具体的回调。

2. 管理定时器、信号、用户回调等任务的执行

  • 事件循环内除了处理 I/O,也可以执行外部线程通过 runInLoop()queueInLoop() 提交的回调。
  • 所有回调任务都在 EventLoop 所属线程中执行,避免竞态。

3. 线程内任务调度器

  • 保证所有回调函数(包括异步提交的任务)都在同一个线程中顺序执行。
  • 提供 runInLoop(cb)queueInLoop(cb),支持其他线程将任务安全地提交到该 EventLoop 所在线程。

4. 支持跨线程唤醒机制

  • 通过 eventfd + wakeup() + handleRead(),其他线程可以唤醒 poll() 阻塞的线程。
  • 实现线程间通信,典型用于:主线程唤醒 subLoop 来处理新连接等任务。

5. 保证线程安全

  • 一个 EventLoop 只能被一个线程拥有,不可跨线程调用其 loop()。
  • 提供 isInLoopThread() 判断当前线程是否为所属线程,做线程隔离。

6. 生命周期管理

  • 通过 loop() 启动事件循环。
  • quit() 安全退出事件循环(支持跨线程退出)。
  • 析构时自动清理 wakeupfdChannel,释放资源。

私有数据成员

  using  Functor=std::function<void()>;//所以 std::function<void()> 表示 可以封装任意 “无参、无返回” 的函数或可调用对象。
    using  ChannelList=std::vector<Channel*>;
    /*
    为什么用原子?
在多线程场景下,多个线程可能访问或修改同一个布尔变量,会产生“竞态条件(race condition)”。
使用 std::atomic_bool 可以确保读写操作是线程安全的。*/
    std::atomic_bool looping_;// 是否正在 loop 循环中
    std::atomic_bool quit_;//是否收到退出指令
    std::atomic_bool callingPendingFunctors_;//标识当前loop是否有需要执行的回调操作
    const   pid_t  threadID_;//记录当前loop所在线程id
    Timestamp  pollReturnTime_;//poller返回发生事件的channel得时间点
    std::unique_ptr<Poller>poller_;//evenyloop管理的poller
    int   wakeupFD_;//(eventfd系统调用)//主要作用,当mainloop获取一个新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel
    std::unique_ptr<Channel>wakeupChannel_;//封装 wakeupFD_,并注册到 Poller 上监听可读事件。
    ChannelList   activeChannels_;//存放 poll() 返回的当前发生事件的Channel列表;
    std::vector<Functor>pendingFunctors_;//存储loop需要执行的所有回调操作
    std::mutex  mutex_;//互斥锁, // 保护 pendingFunctors_ 的线程安全
};

函数实现

//防止一个线程创建多个Eventoop
//__thread  每个线程有一个独立的 t_loopInThisThread 变量副本,互不干扰。
//初始值是 nullptr,表示当前线程还没有创建 EventLoop。
__thread  EventLoop*t_loopInThisThread=nullptr;

//定义默认的poller  IO复用接口的超时时间
const   int   KPollTimeMs=10000;


//创建一个 eventfd 文件描述符,用于线程间事件通知。
EFD_NONBLOCK:设置为非阻塞。
EFD_CLOEXEC:fork 时关闭 fd。
int createEventfd()
{
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0)
    {
        LOG_FATAL("eventfd error:%d\n", errno);
    }
    return evtfd;
}

构造函数
EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    callingPendingFunctors_(false),
    threadID_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),//创建 epoll 封装器
    wakeupFD_(createEventfd()),//创建 eventfd
    wakeupChannel_(new Channel(this, wakeupFD_))//封装 wakeupFD_ 为 Channel;
{
//若当前线程已有 EventLoop,则终止程序。
//否则将 this 赋值给线程局部变量。
        LOG_DEBUG("Eventloop created %p in thread %d\n", this, threadID_);
    if (t_loopInThisThread) {
        LOG_FATAL("Another EventLoop %p exists in this thread %d\n", t_loopInThisThread, threadID_);
    } else {
        t_loopInThisThread = this;
    }
//为 wakeupChannel_ 设置一个读事件回调函数,当该 Channel 上的文件描述符
//(wakeupFd_,通常是 eventfd)变为可读时,就会自动调用 EventLoop::handleRead() 函数。
          wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead,this));  
          wakeupChannel_->enableReading();
// //每一个Eventloop都将监听wakeupchannel的读事件,添加到epoll
}



析构函数
EventLoop::~EventLoop()
    {
        wakeupChannel_->disableAll();//禁用 wakeupChannel_ 上所有的监听事件(如 EPOLLIN)
        wakeupChannel_->remove();//将 wakeupChannel_ 从 Poller 中彻底注销
        ::close(wakeupFD_);//关闭 eventfd 文件描述符,释放内核资源
        t_loopInThisThread=nullptr;//将线程局部变量恢复为空,表示当前线程中已无 EventLoop 实例,为将来再次创建留出空间。
    }


//事件循环中用于“被唤醒”后的处理函数,
//handleRead() 是绑定给 wakeupChannel_(封装了 eventfd)的可读事件回调函数
//当其他线程(或自身)调用 wakeup() 向 wakeupFD_ 写入数据时,epoll_wait 检测到可读事件,EventLoop 被唤醒后就会调用这个函数。
 void  EventLoop::handleRead()//唤醒后执行的,,将内部计数器归零,从而解除“可读”状态。
    {
        uint64_t  one =1;//8字节
        ssize_t  n=read(wakeupFD_,&one,sizeof  one);//从 wakeupFD_ 中读取 8 字节数据(uint64_t 类型),清除 eventfd 的“可读事件”状态。
        if(n!=sizeof  one)//如果不等于 8,说明读取失败或异常,打印错误日志以便排查
        {
            LOG_ERROR("EventLoop::headleRead() reads  %zd  bytes  instead  of 8",n);
        }
    }



//开启事件循环
    void   EventLoop::loop()
    {
        looping_=true;//表示当前线程已进入事件循环
        quit_=false;//不退出
        LOG_INFO("Eventloop %p start looping\n",this);
        while(!quit_)//只要 quit_ 为 false,就会一直运行
        {
            activeChannels_.clear();//清空上一次 poll() 返回的活跃通道(Channel)列表,准备本次事件处理
            pollReturnTime_=poller_->poll(KPollTimeMs,&activeChannels_);//,activeChannels_ 是一个输出参数,会收集发生事件的 Channel*
            for(Channel *channel:activeChannels_)//实际发生事件的channel
            {
                //Poller监听哪些channel发生事件了,上报给Eventloop,通知channel处理相应地事件
                channel->handleEvent(pollReturnTime_);
            }

            //在 EventLoop 所在线程中,执行其他线程提交的任务回调,确保线程安全和异步性。
            doPendingFunctors();
            
        }
        LOG_INFO("Eventloop %p stop   looping\n",this);
        looping_=false;
    }
     // doPendingFunctors();函数解释
     如果是主线程或者其他线程给当前线程提交任务,不可以直接调用当前线程的方法,会产生线程不安全的情况,这时候其他线程
     将任务封装成回调函数放到当前loop所在线程,wakeup唤醒他后,会从poll返回,因为这个函数是当前loop的成员函数,会由当前线程执行回调任务
            



        void EventLoop::quit()
    {
        quit_=true;//// 设置退出标志,通知事件循环条件满足可以退出循环了
        if(!isInLoopThread())//:比如在subloop(worker)中,调用了mainloop的quit
        {
            wakeup();
        }
    }
//为什么要判断线程?
如果调用 quit() 的线程就是 EventLoop 所在线程(isInLoopThread() 返回 true),那么事件循环马上会检测到 quit_ 并退出。
如果调用 quit() 的线程是另一个线程(比如主线程或者工作线程),此时 EventLoop 线程可能正在 poll() 阻塞等待事件,这时它感知不到 quit_ 变化。
所以要用 wakeup() 写一个事件,触发 poll() 返回,让 EventLoop 线程被唤醒,从而立刻检测到 quit_  true,然后退出循环。




 void  EventLoop::runInLoop(Functor  cb)//在当前loop中执行cb
     {
        if(isInLoopThread())//在当前的loop线程中,执行cb
        {
            cb();
        }
        else//在非当前的loop线程中,执行cb,需要唤醒loop所在线程,执行cb,“唤醒”只是打断 poll() 的阻塞,让 EventLoop 所在线程继续往下执行,最终自己执行任务
        {
                queueInLoop(cb);
        }
     }
    void  EventLoop::queueInLoop(Functor  cb)
  //pendingFunctors_ 是一个回调函数的队列,用来存放所有等待 EventLoop 线程执行的任务。
    {
        {
            std::unique_lock<std::mutex>lock(mutex_);//构造时加锁,作用域结束时自动释放
            pendingFunctors_.emplace_back(cb);//直接构造
        }
//如果调用者不是事件循环线程,就需要调用 wakeup(),唤醒事件循环线程,尽快执行新加入的回调。
//callingPendingFunctors_关键在于“唤醒”的目的不是立刻中断当前正在执行的回调,而是确保事件循环线程不会在当前批回调执行完后阻塞于 poll(),而是立刻继续执行下一批回调任务。
        if(!isInLoopThread()||callingPendingFunctors_)//callingPendingFunctors_为true表示正在执行回调,
        {
            wakeup();
        }
    }



    void  EventLoop::wakeup()//用来唤醒loop所在的线程(向wakeupfd写一个数据,
      //wakeupchannel就会发生读时间,当前loop线程就会被唤醒
    
     {
            uint64_t    one=1;
            ssize_t n=write(wakeupFD_,&one,sizeof one);
            if(n!=sizeof one)
            {
                LOG_ERROR("Evenyloop::wakeup()writes%lu  bytes   instead of 8\n",n);
            }
         } 


          //Eventloop方法-》poller方法
    void    EventLoop::updateChannel(Channel*channel)//本质是channel通过eventloop调用poller
    {
        poller_->updateChannel(channel);//将一个 Channel 的监听事件注册或更新到 Poller 中。
    }
    void    EventLoop::removeChannel(Channel*channel)
    {
        poller_->removeChannel(channel);//从 Poller 中移除某个 Channel,不再监听其事件。
    }
    bool  EventLoop::hasChannel(Channel*channel)
    {
        return  poller_->hasChannel(channel);//判断 Poller 当前是否管理着这个 Channel。
    }



     void  EventLoop::doPendingFunctors()//执行回调
    {
        std::vector<Functor>functors;//局部变量,使用一个局部变量 functors,避免直接在临界区(加锁区)内执行任务,提升性能、减少锁竞争。避免死锁
        callingPendingFunctors_=true;//这是一个共享资源,多个线程都可能调用 queueInLoop() 往这个容器里添加任务。所以对它的访问需要加锁(mutex_)保护。
        {
            std::unique_lock<std::mutex>lock(mutex_);
            functors.swap(pendingFunctors_);
        }

        for(const  Functor&functor:functors)
        {
            functor();//执行当前loop需要执行的回调函数
        }
        callingPendingFunctors_=false;
    }

wakeup作用:  1.每个loop在执行loop()事件循环时,会阻塞在poll()上,如果其他线程提交任务时,需要唤醒,从poll()返回

相关文章
|
7月前
|
JSON 监控 网络协议
干货分享“对接的 API 总是不稳定,网络分层模型” 看电商 API 故障的本质
本文从 OSI 七层网络模型出发,深入剖析电商 API 不稳定的根本原因,涵盖物理层到应用层的典型故障与解决方案,结合阿里、京东等大厂架构,详解如何构建高稳定性的电商 API 通信体系。
|
4月前
|
机器学习/深度学习 数据采集 人工智能
深度学习实战指南:从神经网络基础到模型优化的完整攻略
🌟 蒋星熠Jaxonic,AI探索者。深耕深度学习,从神经网络到Transformer,用代码践行智能革命。分享实战经验,助你构建CV、NLP模型,共赴二进制星辰大海。
|
5月前
|
机器学习/深度学习 传感器 算法
【无人车路径跟踪】基于神经网络的数据驱动迭代学习控制(ILC)算法,用于具有未知模型和重复任务的非线性单输入单输出(SISO)离散时间系统的无人车的路径跟踪(Matlab代码实现)
【无人车路径跟踪】基于神经网络的数据驱动迭代学习控制(ILC)算法,用于具有未知模型和重复任务的非线性单输入单输出(SISO)离散时间系统的无人车的路径跟踪(Matlab代码实现)
356 2
|
5月前
|
机器学习/深度学习 并行计算 算法
【CPOBP-NSWOA】基于豪冠猪优化BP神经网络模型的多目标鲸鱼寻优算法研究(Matlab代码实现)
【CPOBP-NSWOA】基于豪冠猪优化BP神经网络模型的多目标鲸鱼寻优算法研究(Matlab代码实现)
130 8
|
5月前
|
监控 前端开发 安全
Netty 高性能网络编程框架技术详解与实践指南
本文档全面介绍 Netty 高性能网络编程框架的核心概念、架构设计和实践应用。作为 Java 领域最优秀的 NIO 框架之一,Netty 提供了异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。本文将深入探讨其 Reactor 模型、ChannelPipeline、编解码器、内存管理等核心机制,帮助开发者构建高性能的网络应用系统。
353 0
|
6月前
|
算法 安全 网络安全
【多智能体系统】遭受DoS攻击的网络物理多智能体系统的弹性模型预测控制MPC研究(Simulink仿真实现)
【多智能体系统】遭受DoS攻击的网络物理多智能体系统的弹性模型预测控制MPC研究(Simulink仿真实现)
270 0
|
6月前
|
机器学习/深度学习 算法 数据库
基于GoogleNet深度学习网络和GEI步态能量提取的步态识别算法matlab仿真,数据库采用CASIA库
本项目基于GoogleNet深度学习网络与GEI步态能量图提取技术,实现高精度步态识别。采用CASI库训练模型,结合Inception模块多尺度特征提取与GEI图像能量整合,提升识别稳定性与准确率,适用于智能安防、身份验证等领域。
|
SQL 安全 网络安全
网络安全与信息安全:知识分享####
【10月更文挑战第21天】 随着数字化时代的快速发展,网络安全和信息安全已成为个人和企业不可忽视的关键问题。本文将探讨网络安全漏洞、加密技术以及安全意识的重要性,并提供一些实用的建议,帮助读者提高自身的网络安全防护能力。 ####
316 17
|
SQL 安全 网络安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
随着互联网的普及,网络安全问题日益突出。本文将从网络安全漏洞、加密技术和安全意识三个方面进行探讨,旨在提高读者对网络安全的认识和防范能力。通过分析常见的网络安全漏洞,介绍加密技术的基本原理和应用,以及强调安全意识的重要性,帮助读者更好地保护自己的网络信息安全。
254 10
|
存储 SQL 安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
随着互联网的普及,网络安全问题日益突出。本文将介绍网络安全的重要性,分析常见的网络安全漏洞及其危害,探讨加密技术在保障网络安全中的作用,并强调提高安全意识的必要性。通过本文的学习,读者将了解网络安全的基本概念和应对策略,提升个人和组织的网络安全防护能力。