muduo源码剖析之poller/EpollPoller多路复用类

简介: poller是I/O多路复用接口抽象虚基类,对I/O多路复用API的封装,muduo提供了EPollPoller和PollPoller派生类(epoll和poll),所以不支持select.newDefaultPoller()默认选择epoll。

简介

poller是I/O多路复用接口抽象虚基类,对I/O多路复用API的封装,muduo提供了EPollPoller和PollPoller派生类(epoll和poll),所以不支持select.

newDefaultPoller()默认选择epoll

主要接口

poll

是Poller的核心功能,使用派生类的poll或者epollwait来阻塞等待IO事件发生
通过派生类的实现来填充EventLoop的activeChannelList

static createNewPoller:

工厂函数,创建一个Poller实例
在EpollPoller中,每个实例对应一个epollfd

update

更新I/O多路复用的状态,例如epoll_ctl的ADD,MOD,DEL

主要成员

loop

控制当前Poller的EventLoop指针
其余成员由派生类实现

源码剖析

poller.h

#ifndef MUDUO_NET_POLLER_H
#define MUDUO_NET_POLLER_H

#include <map>
#include <vector>

#include "muduo/base/Timestamp.h"
#include "muduo/net/EventLoop.h"

namespace muduo
{
   
namespace net
{
   

class Channel;

///
/// Base class for IO Multiplexing
///
/// This class doesn't own the Channel objects.
class Poller : noncopyable
{
   
 public:
  typedef std::vector<Channel*> ChannelList;

  Poller(EventLoop* loop);
  virtual ~Poller();

  /// Polls the I/O events.
  /// Must be called in the loop thread.
  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;

  /// Changes the interested I/O events.
  /// Must be called in the loop thread.
  virtual void updateChannel(Channel* channel) = 0;

  /// Remove the channel, when it destructs.
  /// Must be called in the loop thread.
  virtual void removeChannel(Channel* channel) = 0;

  //判断是否存在
  virtual bool hasChannel(Channel* channel) const;

  //创建一个poller,默认是epoll
  static Poller* newDefaultPoller(EventLoop* loop);

  void assertInLoopThread() const
  {
   
    ownerLoop_->assertInLoopThread();
  }

 protected:
  typedef std::map<int, Channel*> ChannelMap;
  ChannelMap channels_;

 private:
  EventLoop* ownerLoop_;
};

}  // namespace net
}  // namespace muduo

#endif  // MUDUO_NET_POLLER_H

poller.cc

#include "muduo/net/Poller.h"
#include "muduo/net/Channel.h"

using namespace muduo;
using namespace muduo::net;

Poller::Poller(EventLoop* loop)
  : ownerLoop_(loop)
{
   
}

Poller::~Poller() = default;

bool Poller::hasChannel(Channel* channel) const
{
   
  assertInLoopThread();
  ChannelMap::const_iterator it = channels_.find(channel->fd());
  return it != channels_.end() && it->second == channel;
}

EPollPoller.h

#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H

#include "muduo/net/Poller.h"

#include <vector>

struct epoll_event;

namespace muduo
{
   
namespace net
{
   

///
/// IO Multiplexing with epoll(4).
///
class EPollPoller : public Poller
{
   
 public:
  EPollPoller(EventLoop* loop);
  ~EPollPoller() override;

  Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
  void updateChannel(Channel* channel) override;
  void removeChannel(Channel* channel) override;

 private:
  static const int kInitEventListSize = 16;

  static const char* operationToString(int op);

  void fillActiveChannels(int numEvents,
                          ChannelList* activeChannels) const;
  void update(int operation, Channel* channel);

  typedef std::vector<struct epoll_event> EventList;

  int epollfd_;
  EventList events_;
};

}  // namespace net
}  // namespace muduo
#endif  // MUDUO_NET_POLLER_EPOLLPOLLER_H

EPollPoller.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "muduo/net/poller/EPollPoller.h"

#include "muduo/base/Logging.h"
#include "muduo/net/Channel.h"

#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;


/*struct epoll_event
{
  uint32_t events;   //Epoll events
  epoll_data_t data;    //User data variable
} __attribute__ ((__packed__));

typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;*/


// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
static_assert(EPOLLIN == POLLIN,        "epoll uses same flag values as poll");
static_assert(EPOLLPRI == POLLPRI,      "epoll uses same flag values as poll");
static_assert(EPOLLOUT == POLLOUT,      "epoll uses same flag values as poll");
static_assert(EPOLLRDHUP == POLLRDHUP,  "epoll uses same flag values as poll");
static_assert(EPOLLERR == POLLERR,      "epoll uses same flag values as poll");
static_assert(EPOLLHUP == POLLHUP,      "epoll uses same flag values as poll");

namespace
{
   
const int kNew = -1; //channel尚未添加到poller中
const int kAdded = 1; //已经添加了
const int kDeleted = 2;   //之前监听过了,后来移除了监听
}

//当flag = EPOLL_CLOEXEC,创建的epfd会设置FD_CLOEXEC
//FD_CLOEXEC表示当程序执行exec函数时本fd将被系统自动关闭,表示不传递给exec创建的新进程
EPollPoller::EPollPoller(EventLoop* loop)
  : Poller(loop),
    //创建epollfd,使用带1的版本
    //如果参数为0,则与epoll_create版本相同,设置为O_CLOEXEC,查看open函数的这个参数解释,
    //子进程fork并调用exec时会关闭这个fd
    epollfd_(::epoll_create1(EPOLL_CLOEXEC)),      
    events_(kInitEventListSize)                    //vector这样用时初始化kInitEventListSize个大小空间,默认16
{
   
  if (epollfd_ < 0)                           //在构造函数中判断,<0就abort()
  {
   
    LOG_SYSFATAL << "EPollPoller::EPollPoller";
  }
}

EPollPoller::~EPollPoller()
{
   
  ::close(epollfd_);
}

Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)//ChannelList是一个存放channel的vector
{
   
  LOG_TRACE << "fd total count " << channels_.size();
  int numEvents = ::epoll_wait(epollfd_,
                               &*events_.begin(),  //events_已初始化,是存放epoll_event的vector
                               static_cast<int>(events_.size()),    //监控套接字的数目
                               timeoutMs);
  int savedErrno = errno;
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
   
    LOG_TRACE << numEvents << " events happened";
    fillActiveChannels(numEvents, activeChannels);
    if (implicit_cast<size_t>(numEvents) == events_.size()) //如果返回的事件数目等于当前事件数组大小,就分配2倍空间
    {
   
      events_.resize(events_.size()*2);
    }
  }
  else if (numEvents == 0)
  {
   
    LOG_TRACE << "nothing happened";
  }
  else
  {
   
    // error happens, log uncommon ones
    if (savedErrno != EINTR)
    {
   
      errno = savedErrno;
      LOG_SYSERR << "EPollPoller::poll()";
    }
  }
  return now;
}

//把返回到的这么多个事件添加到activeChannels
void EPollPoller::fillActiveChannels(int numEvents,
                                     ChannelList* activeChannels) const   
{
   
  assert(implicit_cast<size_t>(numEvents) <= events_.size());
  for (int i = 0; i < numEvents; ++i)                          //确定它的大小小于events_的大小,因为events_是预留的事件vector
  {
   
    Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
    int fd = channel->fd();                       //debug时做一下检测
    ChannelMap::const_iterator it = channels_.find(fd);
    assert(it != channels_.end());
    assert(it->second == channel);
#endif
    channel->set_revents(events_[i].events);        //把已发生的事件传给channel,写到通道当中
    activeChannels->push_back(channel);             //并且push_back进activeChannels
  }
}

//这个函数被调用是因为channel->enablereading()被调用,再调用channel->update(),再event_loop->updateChannel(),再->epoll或poll的updateChannel被调用
//
void EPollPoller::updateChannel(Channel* channel)
{
   
  Poller::assertInLoopThread();  //在IO线程
  const int index = channel->index();  //初始状态index是-1
  LOG_INFO << "fd = " << channel->fd()
    << " events = " << channel->events() << " index = " << index;

  // 当是新的或是之前监听过,后来移除了监听
  // 两者的区别在于,新的channel 之前没有在epoll 中保存
  // 而 del 的之前在 channels_ 中保存了,但是没有被放入epoll_ctl中监听
  if (index == kNew || index == kDeleted)  //index是在poll中是下标,在epoll中是三种状态,上面有三个常量
  {
   
    // a new one, add with EPOLL_CTL_ADD
    int fd = channel->fd();
    if (index == kNew)
    {
   
      assert(channels_.find(fd) == channels_.end());  //channels_是一个Map
      channels_[fd] = channel;
    }
    else // index == kDeleted
    {
   
      assert(channels_.find(fd) != channels_.end());
      assert(channels_[fd] == channel);
    }

    channel->set_index(kAdded);
    update(EPOLL_CTL_ADD, channel);   //注册事件
  }
  else
  {
   
    // update existing one with EPOLL_CTL_MOD/DEL
    int fd = channel->fd();
    (void)fd;
    assert(channels_.find(fd) != channels_.end());
    assert(channels_[fd] == channel);
    assert(index == kAdded);

    // 既然已经添加了,那么可能的修改就是修改监听的时间,或者不在监听
    // 因此这里先判断是否是没有监听的事件了,如果是那么直接移除、
    if (channel->isNoneEvent())   //判断无事件
    {
   
      update(EPOLL_CTL_DEL, channel);    //删除事件
      channel->set_index(kDeleted);  //删除后被设置为kDeleted
    }
    else
    {
   
      update(EPOLL_CTL_MOD, channel);   //修改已注册的监听事件
    }
  }
}

void EPollPoller::removeChannel(Channel* channel)
{
   
  Poller::assertInLoopThread();  //判断是否在IO线程
  int fd = channel->fd();
  LOG_TRACE << "fd = " << fd;
  assert(channels_.find(fd) != channels_.end());
  assert(channels_[fd] == channel);
  assert(channel->isNoneEvent());
  int index = channel->index();
  assert(index == kAdded || index == kDeleted);
  size_t n = channels_.erase(fd);   //删除
  (void)n; 
  assert(n == 1);

  if (index == kAdded)
  {
   
    update(EPOLL_CTL_DEL, channel);
  }
  channel->set_index(kNew);
}

void EPollPoller::update(int operation, Channel* channel)
{
   
  printf("-------%s,line.%d-------\n",__FUNCTION__,__LINE__);
  struct epoll_event event;    //存放数据的结构体
  memZero(&event, sizeof event);
  event.events = channel->events();  //注册的事件
  event.data.ptr = channel;
  int fd = channel->fd();
  LOG_INFO << "epoll_ctl op = " << operationToString(operation)
    << " fd = " << fd << " event = { " << channel->eventsToString() << " }";
  if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)//epoll_ctl失败返回-1
  {
   
    if (operation == EPOLL_CTL_DEL)
    {
   
      LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
    }
    else
    {
   
      LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
    }
  }
}

const char* EPollPoller::operationToString(int op)
{
   
  switch (op)
  {
   
    case EPOLL_CTL_ADD:
      return "ADD";
    case EPOLL_CTL_DEL:
      return "DEL";
    case EPOLL_CTL_MOD:
      return "MOD";
    default:
      assert(false && "ERROR op");
      return "Unknown Operation";
  }
}
目录
相关文章
|
人工智能 芯片
通义千问上新,可一键免费解析超万页文档、速读百份文档
通义千问上新,可一键免费解析超万页文档、速读百份文档
1817 0
|
Java Spring 容器
spring之HttpInvoker
  一、HttpInvoker是常用的Java同构系统之间方法调用实现方案,是众多Spring项目中的一个子项目。顾名思义,它通过HTTP通信即可实现两个Java系统之间的远程方法调用,使得系统之间的通信如同调用本地方法一般。
2705 0
|
3月前
|
人工智能 自然语言处理 JavaScript
用 LLM 辅助性能测试报告生成
性能测试报告通常包含测试概述、方案说明、结果分析、问题定位、优化建议及上线评估等内容。报告编写面临数据分析复杂、撰写耗时、经验依赖等问题。引入大型语言模型(LLM),可实现报告智能生成,提升效率与专业度。LLM具备自然语言生成、数据归纳、专家知识迁移等能力,可适配多格式、多语言输出。通过构建LLM辅助的报告生成引擎,结合Prompt设计,可高效输出结构化报告。实践表明,LLM在测试结论总结、瓶颈分析与优化建议方面表现优异,为性能测试智能化升级提供有力支撑。
307 0
|
机器学习/深度学习 存储 算法
回声状态网络(Echo State Networks,ESN)详细原理讲解及Python代码实现
本文详细介绍了回声状态网络(Echo State Networks, ESN)的基本概念、优点、缺点、储层计算范式,并提供了ESN的Python代码实现,包括不考虑和考虑超参数的两种ESN实现方式,以及使用ESN进行时间序列预测的示例。
996 4
回声状态网络(Echo State Networks,ESN)详细原理讲解及Python代码实现
|
缓存 自然语言处理 数据挖掘
一篇文章让你学会Elasticsearch中的查询
一篇文章让你学会Elasticsearch中的查询
137621 118
|
开发者 Python
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
深入解析Python `httpx`源码,探索现代HTTP客户端的秘密!
315 1
|
文字识别
印刷文字识别产品使用合集之OCR调用为什么会失败
印刷文字识别(Optical Character Recognition, OCR)技术能够将图片、扫描文档或 PDF 中的印刷文字转化为可编辑和可搜索的数据。这项技术广泛应用于多个领域,以提高工作效率、促进信息数字化。以下是一些印刷文字识别产品使用的典型场景合集。
235 1
|
机器学习/深度学习 自然语言处理 安全
LLM 系列 | 17:如何用LangChain做长文档问答?
本文作为LangChain专题的开篇,以长文档问答为例介绍如何使用LangChain。
LLM 系列 | 17:如何用LangChain做长文档问答?
|
存储 人工智能 Go
探索Gin框架:Golang使用Gin完成文件上传
探索Gin框架:Golang使用Gin完成文件上传
|
存储 小程序 JavaScript