基于Reactor模型的高性能网络库之核心调度器:EventLoop组件

简介: 它负责:监听事件(如 I/O 可读写、定时器)、分发事件、执行回调、管理事件源 Channel 等。

它负责:监听事件(如 I/O 可读写、定时器)、分发事件、执行回调、管理事件源 Channel 等。  

1. 负责 I/O 多路复用(epoll 等)事件的监听与分发

  • 持有一个 Poller(如 EpollPoller)对象。
  • 循环调用 poll() 等待所有注册的 Channel 的事件(如:可读、可写、关闭等)。
  • 事件触发后,调用 Channel::handleEvent(),由 Channel 触发具体的回调。

2. 管理定时器、信号、用户回调等任务的执行

  • 事件循环内除了处理 I/O,也可以执行外部线程通过 runInLoop()queueInLoop() 提交的回调。
  • 所有回调任务都在 EventLoop 所属线程中执行,避免竞态。

3. 线程内任务调度器

  • 保证所有回调函数(包括异步提交的任务)都在同一个线程中顺序执行。
  • 提供 runInLoop(cb)queueInLoop(cb),支持其他线程将任务安全地提交到该 EventLoop 所在线程。

4. 支持跨线程唤醒机制

  • 通过 eventfd + wakeup() + handleRead(),其他线程可以唤醒 poll() 阻塞的线程。
  • 实现线程间通信,典型用于:主线程唤醒 subLoop 来处理新连接等任务。

5. 保证线程安全

  • 一个 EventLoop 只能被一个线程拥有,不可跨线程调用其 loop()。
  • 提供 isInLoopThread() 判断当前线程是否为所属线程,做线程隔离。

6. 生命周期管理

  • 通过 loop() 启动事件循环。
  • quit() 安全退出事件循环(支持跨线程退出)。
  • 析构时自动清理 wakeupfdChannel,释放资源。

私有数据成员

  using  Functor=std::function<void()>;//所以 std::function<void()> 表示 可以封装任意 “无参、无返回” 的函数或可调用对象。
    using  ChannelList=std::vector<Channel*>;
    /*
    为什么用原子?
在多线程场景下,多个线程可能访问或修改同一个布尔变量,会产生“竞态条件(race condition)”。
使用 std::atomic_bool 可以确保读写操作是线程安全的。*/
    std::atomic_bool looping_;// 是否正在 loop 循环中
    std::atomic_bool quit_;//是否收到退出指令
    std::atomic_bool callingPendingFunctors_;//标识当前loop是否有需要执行的回调操作
    const   pid_t  threadID_;//记录当前loop所在线程id
    Timestamp  pollReturnTime_;//poller返回发生事件的channel得时间点
    std::unique_ptr<Poller>poller_;//evenyloop管理的poller
    int   wakeupFD_;//(eventfd系统调用)//主要作用,当mainloop获取一个新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel
    std::unique_ptr<Channel>wakeupChannel_;//封装 wakeupFD_,并注册到 Poller 上监听可读事件。
    ChannelList   activeChannels_;//存放 poll() 返回的当前发生事件的Channel列表;
    std::vector<Functor>pendingFunctors_;//存储loop需要执行的所有回调操作
    std::mutex  mutex_;//互斥锁, // 保护 pendingFunctors_ 的线程安全
};

函数实现

//防止一个线程创建多个Eventoop
//__thread  每个线程有一个独立的 t_loopInThisThread 变量副本,互不干扰。
//初始值是 nullptr,表示当前线程还没有创建 EventLoop。
__thread  EventLoop*t_loopInThisThread=nullptr;

//定义默认的poller  IO复用接口的超时时间
const   int   KPollTimeMs=10000;


//创建一个 eventfd 文件描述符,用于线程间事件通知。
EFD_NONBLOCK:设置为非阻塞。
EFD_CLOEXEC:fork 时关闭 fd。
int createEventfd()
{
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0)
    {
        LOG_FATAL("eventfd error:%d\n", errno);
    }
    return evtfd;
}

构造函数
EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    callingPendingFunctors_(false),
    threadID_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),//创建 epoll 封装器
    wakeupFD_(createEventfd()),//创建 eventfd
    wakeupChannel_(new Channel(this, wakeupFD_))//封装 wakeupFD_ 为 Channel;
{
//若当前线程已有 EventLoop,则终止程序。
//否则将 this 赋值给线程局部变量。
        LOG_DEBUG("Eventloop created %p in thread %d\n", this, threadID_);
    if (t_loopInThisThread) {
        LOG_FATAL("Another EventLoop %p exists in this thread %d\n", t_loopInThisThread, threadID_);
    } else {
        t_loopInThisThread = this;
    }
//为 wakeupChannel_ 设置一个读事件回调函数,当该 Channel 上的文件描述符
//(wakeupFd_,通常是 eventfd)变为可读时,就会自动调用 EventLoop::handleRead() 函数。
          wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead,this));  
          wakeupChannel_->enableReading();
// //每一个Eventloop都将监听wakeupchannel的读事件,添加到epoll
}



析构函数
EventLoop::~EventLoop()
    {
        wakeupChannel_->disableAll();//禁用 wakeupChannel_ 上所有的监听事件(如 EPOLLIN)
        wakeupChannel_->remove();//将 wakeupChannel_ 从 Poller 中彻底注销
        ::close(wakeupFD_);//关闭 eventfd 文件描述符,释放内核资源
        t_loopInThisThread=nullptr;//将线程局部变量恢复为空,表示当前线程中已无 EventLoop 实例,为将来再次创建留出空间。
    }


//事件循环中用于“被唤醒”后的处理函数,
//handleRead() 是绑定给 wakeupChannel_(封装了 eventfd)的可读事件回调函数
//当其他线程(或自身)调用 wakeup() 向 wakeupFD_ 写入数据时,epoll_wait 检测到可读事件,EventLoop 被唤醒后就会调用这个函数。
 void  EventLoop::handleRead()//唤醒后执行的,,将内部计数器归零,从而解除“可读”状态。
    {
        uint64_t  one =1;//8字节
        ssize_t  n=read(wakeupFD_,&one,sizeof  one);//从 wakeupFD_ 中读取 8 字节数据(uint64_t 类型),清除 eventfd 的“可读事件”状态。
        if(n!=sizeof  one)//如果不等于 8,说明读取失败或异常,打印错误日志以便排查
        {
            LOG_ERROR("EventLoop::headleRead() reads  %zd  bytes  instead  of 8",n);
        }
    }



//开启事件循环
    void   EventLoop::loop()
    {
        looping_=true;//表示当前线程已进入事件循环
        quit_=false;//不退出
        LOG_INFO("Eventloop %p start looping\n",this);
        while(!quit_)//只要 quit_ 为 false,就会一直运行
        {
            activeChannels_.clear();//清空上一次 poll() 返回的活跃通道(Channel)列表,准备本次事件处理
            pollReturnTime_=poller_->poll(KPollTimeMs,&activeChannels_);//,activeChannels_ 是一个输出参数,会收集发生事件的 Channel*
            for(Channel *channel:activeChannels_)//实际发生事件的channel
            {
                //Poller监听哪些channel发生事件了,上报给Eventloop,通知channel处理相应地事件
                channel->handleEvent(pollReturnTime_);
            }

            //在 EventLoop 所在线程中,执行其他线程提交的任务回调,确保线程安全和异步性。
            doPendingFunctors();
            
        }
        LOG_INFO("Eventloop %p stop   looping\n",this);
        looping_=false;
    }
     // doPendingFunctors();函数解释
     如果是主线程或者其他线程给当前线程提交任务,不可以直接调用当前线程的方法,会产生线程不安全的情况,这时候其他线程
     将任务封装成回调函数放到当前loop所在线程,wakeup唤醒他后,会从poll返回,因为这个函数是当前loop的成员函数,会由当前线程执行回调任务
            



        void EventLoop::quit()
    {
        quit_=true;//// 设置退出标志,通知事件循环条件满足可以退出循环了
        if(!isInLoopThread())//:比如在subloop(worker)中,调用了mainloop的quit
        {
            wakeup();
        }
    }
//为什么要判断线程?
如果调用 quit() 的线程就是 EventLoop 所在线程(isInLoopThread() 返回 true),那么事件循环马上会检测到 quit_ 并退出。
如果调用 quit() 的线程是另一个线程(比如主线程或者工作线程),此时 EventLoop 线程可能正在 poll() 阻塞等待事件,这时它感知不到 quit_ 变化。
所以要用 wakeup() 写一个事件,触发 poll() 返回,让 EventLoop 线程被唤醒,从而立刻检测到 quit_  true,然后退出循环。




 void  EventLoop::runInLoop(Functor  cb)//在当前loop中执行cb
     {
        if(isInLoopThread())//在当前的loop线程中,执行cb
        {
            cb();
        }
        else//在非当前的loop线程中,执行cb,需要唤醒loop所在线程,执行cb,“唤醒”只是打断 poll() 的阻塞,让 EventLoop 所在线程继续往下执行,最终自己执行任务
        {
                queueInLoop(cb);
        }
     }
    void  EventLoop::queueInLoop(Functor  cb)
  //pendingFunctors_ 是一个回调函数的队列,用来存放所有等待 EventLoop 线程执行的任务。
    {
        {
            std::unique_lock<std::mutex>lock(mutex_);//构造时加锁,作用域结束时自动释放
            pendingFunctors_.emplace_back(cb);//直接构造
        }
//如果调用者不是事件循环线程,就需要调用 wakeup(),唤醒事件循环线程,尽快执行新加入的回调。
//callingPendingFunctors_关键在于“唤醒”的目的不是立刻中断当前正在执行的回调,而是确保事件循环线程不会在当前批回调执行完后阻塞于 poll(),而是立刻继续执行下一批回调任务。
        if(!isInLoopThread()||callingPendingFunctors_)//callingPendingFunctors_为true表示正在执行回调,
        {
            wakeup();
        }
    }



    void  EventLoop::wakeup()//用来唤醒loop所在的线程(向wakeupfd写一个数据,
      //wakeupchannel就会发生读时间,当前loop线程就会被唤醒
    
     {
            uint64_t    one=1;
            ssize_t n=write(wakeupFD_,&one,sizeof one);
            if(n!=sizeof one)
            {
                LOG_ERROR("Evenyloop::wakeup()writes%lu  bytes   instead of 8\n",n);
            }
         } 


          //Eventloop方法-》poller方法
    void    EventLoop::updateChannel(Channel*channel)//本质是channel通过eventloop调用poller
    {
        poller_->updateChannel(channel);//将一个 Channel 的监听事件注册或更新到 Poller 中。
    }
    void    EventLoop::removeChannel(Channel*channel)
    {
        poller_->removeChannel(channel);//从 Poller 中移除某个 Channel,不再监听其事件。
    }
    bool  EventLoop::hasChannel(Channel*channel)
    {
        return  poller_->hasChannel(channel);//判断 Poller 当前是否管理着这个 Channel。
    }



     void  EventLoop::doPendingFunctors()//执行回调
    {
        std::vector<Functor>functors;//局部变量,使用一个局部变量 functors,避免直接在临界区(加锁区)内执行任务,提升性能、减少锁竞争。避免死锁
        callingPendingFunctors_=true;//这是一个共享资源,多个线程都可能调用 queueInLoop() 往这个容器里添加任务。所以对它的访问需要加锁(mutex_)保护。
        {
            std::unique_lock<std::mutex>lock(mutex_);
            functors.swap(pendingFunctors_);
        }

        for(const  Functor&functor:functors)
        {
            functor();//执行当前loop需要执行的回调函数
        }
        callingPendingFunctors_=false;
    }

wakeup作用:  1.每个loop在执行loop()事件循环时,会阻塞在poll()上,如果其他线程提交任务时,需要唤醒,从poll()返回

相关文章
|
2月前
|
网络协议 算法 Java
基于Reactor模型的高性能网络库之Tcpserver组件-上层调度器
TcpServer 是一个用于管理 TCP 连接的类,包含成员变量如事件循环(EventLoop)、连接池(ConnectionMap)和回调函数等。其主要功能包括监听新连接、设置线程池、启动服务器及处理连接事件。通过 Acceptor 接收新连接,并使用轮询算法将连接分配给子事件循环(subloop)进行读写操作。调用链从 start() 开始,经由线程池启动和 Acceptor 监听,最终由 TcpConnection 管理具体连接的事件处理。
60 2
|
2月前
基于Reactor模式的高性能网络库github地址
https://github.com/zyi30/reactor-net.git
48 0
|
9月前
|
SQL 安全 网络安全
网络安全与信息安全:知识分享####
【10月更文挑战第21天】 随着数字化时代的快速发展,网络安全和信息安全已成为个人和企业不可忽视的关键问题。本文将探讨网络安全漏洞、加密技术以及安全意识的重要性,并提供一些实用的建议,帮助读者提高自身的网络安全防护能力。 ####
219 17
|
9月前
|
SQL 安全 网络安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
随着互联网的普及,网络安全问题日益突出。本文将从网络安全漏洞、加密技术和安全意识三个方面进行探讨,旨在提高读者对网络安全的认识和防范能力。通过分析常见的网络安全漏洞,介绍加密技术的基本原理和应用,以及强调安全意识的重要性,帮助读者更好地保护自己的网络信息安全。
171 10
|
9月前
|
存储 SQL 安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
随着互联网的普及,网络安全问题日益突出。本文将介绍网络安全的重要性,分析常见的网络安全漏洞及其危害,探讨加密技术在保障网络安全中的作用,并强调提高安全意识的必要性。通过本文的学习,读者将了解网络安全的基本概念和应对策略,提升个人和组织的网络安全防护能力。
|
9月前
|
SQL 安全 网络安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
在数字化时代,网络安全和信息安全已成为我们生活中不可或缺的一部分。本文将介绍网络安全漏洞、加密技术和安全意识等方面的内容,并提供一些实用的代码示例。通过阅读本文,您将了解到如何保护自己的网络安全,以及如何提高自己的信息安全意识。
175 10
|
9月前
|
监控 安全 网络安全
网络安全与信息安全:漏洞、加密与意识的交织
在数字时代的浪潮中,网络安全与信息安全成为维护数据完整性、保密性和可用性的关键。本文深入探讨了网络安全中的漏洞概念、加密技术的应用以及提升安全意识的重要性。通过实际案例分析,揭示了网络攻击的常见模式和防御策略,强调了教育和技术并重的安全理念。旨在为读者提供一套全面的网络安全知识框架,从而在日益复杂的网络环境中保护个人和组织的资产安全。
|
9月前
|
存储 监控 安全
云计算与网络安全:云服务、网络安全、信息安全等技术领域的融合与挑战
本文将探讨云计算与网络安全之间的关系,以及它们在云服务、网络安全和信息安全等技术领域中的融合与挑战。我们将分析云计算的优势和风险,以及如何通过网络安全措施来保护数据和应用程序。我们还将讨论如何确保云服务的可用性和可靠性,以及如何处理网络攻击和数据泄露等问题。最后,我们将提供一些关于如何在云计算环境中实现网络安全的建议和最佳实践。
|
9月前
|
安全 算法 网络协议
网络安全与信息安全知识分享
本文深入探讨了网络安全漏洞、加密技术以及安全意识三个方面,旨在帮助读者更好地理解和应对网络安全威胁。通过分析常见的网络安全漏洞类型及其防范措施,详细介绍对称加密和非对称加密的原理和应用,并强调提高个人和企业安全意识的重要性,为构建更安全的网络环境提供指导。
168 2
|
9月前
|
安全 网络安全 数据安全/隐私保护
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
在数字化时代,网络安全和信息安全已成为我们日常生活中不可或缺的一部分。本文将深入探讨网络安全漏洞、加密技术和安全意识等方面的问题,并提供一些实用的建议和解决方案。我们将通过分析网络攻击的常见形式,揭示网络安全的脆弱性,并介绍如何利用加密技术来保护数据。此外,我们还将强调提高个人和企业的安全意识的重要性,以应对日益复杂的网络威胁。无论你是普通用户还是IT专业人士,这篇文章都将为你提供有价值的见解和指导。

热门文章

最新文章