它负责:监听事件(如 I/O 可读写、定时器)、分发事件、执行回调、管理事件源 Channel 等。
1. 负责 I/O 多路复用(epoll 等)事件的监听与分发
- 持有一个
Poller
(如EpollPoller
)对象。 - 循环调用
poll()
等待所有注册的Channel
的事件(如:可读、可写、关闭等)。 - 事件触发后,调用
Channel::handleEvent()
,由Channel
触发具体的回调。
2. 管理定时器、信号、用户回调等任务的执行
- 事件循环内除了处理 I/O,也可以执行外部线程通过
runInLoop()
或queueInLoop()
提交的回调。 - 所有回调任务都在
EventLoop
所属线程中执行,避免竞态。
3. 线程内任务调度器
- 保证所有回调函数(包括异步提交的任务)都在同一个线程中顺序执行。
- 提供
runInLoop(cb)
和queueInLoop(cb)
,支持其他线程将任务安全地提交到该EventLoop
所在线程。
4. 支持跨线程唤醒机制
- 通过
eventfd
+wakeup()
+handleRead()
,其他线程可以唤醒poll()
阻塞的线程。 - 实现线程间通信,典型用于:主线程唤醒
subLoop
来处理新连接等任务。
5. 保证线程安全
- 一个
EventLoop
只能被一个线程拥有,不可跨线程调用其 loop()。 - 提供
isInLoopThread()
判断当前线程是否为所属线程,做线程隔离。
6. 生命周期管理
- 通过
loop()
启动事件循环。 quit()
安全退出事件循环(支持跨线程退出)。- 析构时自动清理
wakeupfd
和Channel
,释放资源。
私有数据成员
using Functor=std::function<void()>;//所以 std::function<void()> 表示 可以封装任意 “无参、无返回” 的函数或可调用对象。 using ChannelList=std::vector<Channel*>; /* 为什么用原子? 在多线程场景下,多个线程可能访问或修改同一个布尔变量,会产生“竞态条件(race condition)”。 使用 std::atomic_bool 可以确保读写操作是线程安全的。*/ std::atomic_bool looping_;// 是否正在 loop 循环中 std::atomic_bool quit_;//是否收到退出指令 std::atomic_bool callingPendingFunctors_;//标识当前loop是否有需要执行的回调操作 const pid_t threadID_;//记录当前loop所在线程id Timestamp pollReturnTime_;//poller返回发生事件的channel得时间点 std::unique_ptr<Poller>poller_;//evenyloop管理的poller int wakeupFD_;//(eventfd系统调用)//主要作用,当mainloop获取一个新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel std::unique_ptr<Channel>wakeupChannel_;//封装 wakeupFD_,并注册到 Poller 上监听可读事件。 ChannelList activeChannels_;//存放 poll() 返回的当前发生事件的Channel列表; std::vector<Functor>pendingFunctors_;//存储loop需要执行的所有回调操作 std::mutex mutex_;//互斥锁, // 保护 pendingFunctors_ 的线程安全 };
函数实现
//防止一个线程创建多个Eventoop //__thread 每个线程有一个独立的 t_loopInThisThread 变量副本,互不干扰。 //初始值是 nullptr,表示当前线程还没有创建 EventLoop。 __thread EventLoop*t_loopInThisThread=nullptr; //定义默认的poller IO复用接口的超时时间 const int KPollTimeMs=10000; //创建一个 eventfd 文件描述符,用于线程间事件通知。 EFD_NONBLOCK:设置为非阻塞。 EFD_CLOEXEC:fork 时关闭 fd。 int createEventfd() { int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0) { LOG_FATAL("eventfd error:%d\n", errno); } return evtfd; } 构造函数 EventLoop::EventLoop() : looping_(false), quit_(false), callingPendingFunctors_(false), threadID_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)),//创建 epoll 封装器 wakeupFD_(createEventfd()),//创建 eventfd wakeupChannel_(new Channel(this, wakeupFD_))//封装 wakeupFD_ 为 Channel; { //若当前线程已有 EventLoop,则终止程序。 //否则将 this 赋值给线程局部变量。 LOG_DEBUG("Eventloop created %p in thread %d\n", this, threadID_); if (t_loopInThisThread) { LOG_FATAL("Another EventLoop %p exists in this thread %d\n", t_loopInThisThread, threadID_); } else { t_loopInThisThread = this; } //为 wakeupChannel_ 设置一个读事件回调函数,当该 Channel 上的文件描述符 //(wakeupFd_,通常是 eventfd)变为可读时,就会自动调用 EventLoop::handleRead() 函数。 wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead,this)); wakeupChannel_->enableReading(); // //每一个Eventloop都将监听wakeupchannel的读事件,添加到epoll } 析构函数 EventLoop::~EventLoop() { wakeupChannel_->disableAll();//禁用 wakeupChannel_ 上所有的监听事件(如 EPOLLIN) wakeupChannel_->remove();//将 wakeupChannel_ 从 Poller 中彻底注销 ::close(wakeupFD_);//关闭 eventfd 文件描述符,释放内核资源 t_loopInThisThread=nullptr;//将线程局部变量恢复为空,表示当前线程中已无 EventLoop 实例,为将来再次创建留出空间。 } //事件循环中用于“被唤醒”后的处理函数, //handleRead() 是绑定给 wakeupChannel_(封装了 eventfd)的可读事件回调函数 //当其他线程(或自身)调用 wakeup() 向 wakeupFD_ 写入数据时,epoll_wait 检测到可读事件,EventLoop 被唤醒后就会调用这个函数。 void EventLoop::handleRead()//唤醒后执行的,,将内部计数器归零,从而解除“可读”状态。 { uint64_t one =1;//8字节 ssize_t n=read(wakeupFD_,&one,sizeof one);//从 wakeupFD_ 中读取 8 字节数据(uint64_t 类型),清除 eventfd 的“可读事件”状态。 if(n!=sizeof one)//如果不等于 8,说明读取失败或异常,打印错误日志以便排查 { LOG_ERROR("EventLoop::headleRead() reads %zd bytes instead of 8",n); } } //开启事件循环 void EventLoop::loop() { looping_=true;//表示当前线程已进入事件循环 quit_=false;//不退出 LOG_INFO("Eventloop %p start looping\n",this); while(!quit_)//只要 quit_ 为 false,就会一直运行 { activeChannels_.clear();//清空上一次 poll() 返回的活跃通道(Channel)列表,准备本次事件处理 pollReturnTime_=poller_->poll(KPollTimeMs,&activeChannels_);//,activeChannels_ 是一个输出参数,会收集发生事件的 Channel* for(Channel *channel:activeChannels_)//实际发生事件的channel { //Poller监听哪些channel发生事件了,上报给Eventloop,通知channel处理相应地事件 channel->handleEvent(pollReturnTime_); } //在 EventLoop 所在线程中,执行其他线程提交的任务回调,确保线程安全和异步性。 doPendingFunctors(); } LOG_INFO("Eventloop %p stop looping\n",this); looping_=false; } // doPendingFunctors();函数解释 如果是主线程或者其他线程给当前线程提交任务,不可以直接调用当前线程的方法,会产生线程不安全的情况,这时候其他线程 将任务封装成回调函数放到当前loop所在线程,wakeup唤醒他后,会从poll返回,因为这个函数是当前loop的成员函数,会由当前线程执行回调任务 void EventLoop::quit() { quit_=true;//// 设置退出标志,通知事件循环条件满足可以退出循环了 if(!isInLoopThread())//:比如在subloop(worker)中,调用了mainloop的quit { wakeup(); } } //为什么要判断线程? 如果调用 quit() 的线程就是 EventLoop 所在线程(isInLoopThread() 返回 true),那么事件循环马上会检测到 quit_ 并退出。 如果调用 quit() 的线程是另一个线程(比如主线程或者工作线程),此时 EventLoop 线程可能正在 poll() 阻塞等待事件,这时它感知不到 quit_ 变化。 所以要用 wakeup() 写一个事件,触发 poll() 返回,让 EventLoop 线程被唤醒,从而立刻检测到 quit_ 为 true,然后退出循环。 void EventLoop::runInLoop(Functor cb)//在当前loop中执行cb { if(isInLoopThread())//在当前的loop线程中,执行cb { cb(); } else//在非当前的loop线程中,执行cb,需要唤醒loop所在线程,执行cb,“唤醒”只是打断 poll() 的阻塞,让 EventLoop 所在线程继续往下执行,最终自己执行任务 { queueInLoop(cb); } } void EventLoop::queueInLoop(Functor cb) //pendingFunctors_ 是一个回调函数的队列,用来存放所有等待 EventLoop 线程执行的任务。 { { std::unique_lock<std::mutex>lock(mutex_);//构造时加锁,作用域结束时自动释放 pendingFunctors_.emplace_back(cb);//直接构造 } //如果调用者不是事件循环线程,就需要调用 wakeup(),唤醒事件循环线程,尽快执行新加入的回调。 //callingPendingFunctors_关键在于“唤醒”的目的不是立刻中断当前正在执行的回调,而是确保事件循环线程不会在当前批回调执行完后阻塞于 poll(),而是立刻继续执行下一批回调任务。 if(!isInLoopThread()||callingPendingFunctors_)//callingPendingFunctors_为true表示正在执行回调, { wakeup(); } } void EventLoop::wakeup()//用来唤醒loop所在的线程(向wakeupfd写一个数据, //wakeupchannel就会发生读时间,当前loop线程就会被唤醒 { uint64_t one=1; ssize_t n=write(wakeupFD_,&one,sizeof one); if(n!=sizeof one) { LOG_ERROR("Evenyloop::wakeup()writes%lu bytes instead of 8\n",n); } } //Eventloop方法-》poller方法 void EventLoop::updateChannel(Channel*channel)//本质是channel通过eventloop调用poller { poller_->updateChannel(channel);//将一个 Channel 的监听事件注册或更新到 Poller 中。 } void EventLoop::removeChannel(Channel*channel) { poller_->removeChannel(channel);//从 Poller 中移除某个 Channel,不再监听其事件。 } bool EventLoop::hasChannel(Channel*channel) { return poller_->hasChannel(channel);//判断 Poller 当前是否管理着这个 Channel。 } void EventLoop::doPendingFunctors()//执行回调 { std::vector<Functor>functors;//局部变量,使用一个局部变量 functors,避免直接在临界区(加锁区)内执行任务,提升性能、减少锁竞争。避免死锁 callingPendingFunctors_=true;//这是一个共享资源,多个线程都可能调用 queueInLoop() 往这个容器里添加任务。所以对它的访问需要加锁(mutex_)保护。 { std::unique_lock<std::mutex>lock(mutex_); functors.swap(pendingFunctors_); } for(const Functor&functor:functors) { functor();//执行当前loop需要执行的回调函数 } callingPendingFunctors_=false; }
wakeup作用: 1.每个loop在执行loop()事件循环时,会阻塞在poll()上,如果其他线程提交任务时,需要唤醒,从poll()返回