简介
poller是I/O多路复用接口抽象虚基类,对I/O多路复用API的封装,muduo提供了EPollPoller和PollPoller派生类(epoll和poll),所以不支持select.
newDefaultPoller()默认选择epoll
主要接口
poll
是Poller的核心功能,使用派生类的poll或者epollwait来阻塞等待IO事件发生
通过派生类的实现来填充EventLoop的activeChannelList
static createNewPoller:
工厂函数,创建一个Poller实例
在EpollPoller中,每个实例对应一个epollfd
update
更新I/O多路复用的状态,例如epoll_ctl的ADD,MOD,DEL
主要成员
loop
控制当前Poller的EventLoop指针
其余成员由派生类实现
源码剖析
poller.h
#ifndef MUDUO_NET_POLLER_H
#define MUDUO_NET_POLLER_H
#include <map>
#include <vector>
#include "muduo/base/Timestamp.h"
#include "muduo/net/EventLoop.h"
namespace muduo
{
namespace net
{
class Channel;
///
/// Base class for IO Multiplexing
///
/// This class doesn't own the Channel objects.
class Poller : noncopyable
{
public:
typedef std::vector<Channel*> ChannelList;
Poller(EventLoop* loop);
virtual ~Poller();
/// Polls the I/O events.
/// Must be called in the loop thread.
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel* channel) = 0;
/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel* channel) = 0;
//判断是否存在
virtual bool hasChannel(Channel* channel) const;
//创建一个poller,默认是epoll
static Poller* newDefaultPoller(EventLoop* loop);
void assertInLoopThread() const
{
ownerLoop_->assertInLoopThread();
}
protected:
typedef std::map<int, Channel*> ChannelMap;
ChannelMap channels_;
private:
EventLoop* ownerLoop_;
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_POLLER_H
poller.cc
#include "muduo/net/Poller.h"
#include "muduo/net/Channel.h"
using namespace muduo;
using namespace muduo::net;
Poller::Poller(EventLoop* loop)
: ownerLoop_(loop)
{
}
Poller::~Poller() = default;
bool Poller::hasChannel(Channel* channel) const
{
assertInLoopThread();
ChannelMap::const_iterator it = channels_.find(channel->fd());
return it != channels_.end() && it->second == channel;
}
EPollPoller.h
#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H
#include "muduo/net/Poller.h"
#include <vector>
struct epoll_event;
namespace muduo
{
namespace net
{
///
/// IO Multiplexing with epoll(4).
///
class EPollPoller : public Poller
{
public:
EPollPoller(EventLoop* loop);
~EPollPoller() override;
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
void updateChannel(Channel* channel) override;
void removeChannel(Channel* channel) override;
private:
static const int kInitEventListSize = 16;
static const char* operationToString(int op);
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;
void update(int operation, Channel* channel);
typedef std::vector<struct epoll_event> EventList;
int epollfd_;
EventList events_;
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_POLLER_EPOLLPOLLER_H
EPollPoller.cc
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include "muduo/net/poller/EPollPoller.h"
#include "muduo/base/Logging.h"
#include "muduo/net/Channel.h"
#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
/*struct epoll_event
{
uint32_t events; //Epoll events
epoll_data_t data; //User data variable
} __attribute__ ((__packed__));
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;*/
// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
static_assert(EPOLLIN == POLLIN, "epoll uses same flag values as poll");
static_assert(EPOLLPRI == POLLPRI, "epoll uses same flag values as poll");
static_assert(EPOLLOUT == POLLOUT, "epoll uses same flag values as poll");
static_assert(EPOLLRDHUP == POLLRDHUP, "epoll uses same flag values as poll");
static_assert(EPOLLERR == POLLERR, "epoll uses same flag values as poll");
static_assert(EPOLLHUP == POLLHUP, "epoll uses same flag values as poll");
namespace
{
const int kNew = -1; //channel尚未添加到poller中
const int kAdded = 1; //已经添加了
const int kDeleted = 2; //之前监听过了,后来移除了监听
}
//当flag = EPOLL_CLOEXEC,创建的epfd会设置FD_CLOEXEC
//FD_CLOEXEC表示当程序执行exec函数时本fd将被系统自动关闭,表示不传递给exec创建的新进程
EPollPoller::EPollPoller(EventLoop* loop)
: Poller(loop),
//创建epollfd,使用带1的版本
//如果参数为0,则与epoll_create版本相同,设置为O_CLOEXEC,查看open函数的这个参数解释,
//子进程fork并调用exec时会关闭这个fd
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize) //vector这样用时初始化kInitEventListSize个大小空间,默认16
{
if (epollfd_ < 0) //在构造函数中判断,<0就abort()
{
LOG_SYSFATAL << "EPollPoller::EPollPoller";
}
}
EPollPoller::~EPollPoller()
{
::close(epollfd_);
}
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)//ChannelList是一个存放channel的vector
{
LOG_TRACE << "fd total count " << channels_.size();
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(), //events_已初始化,是存放epoll_event的vector
static_cast<int>(events_.size()), //监控套接字的数目
timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happened";
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size()) //如果返回的事件数目等于当前事件数组大小,就分配2倍空间
{
events_.resize(events_.size()*2);
}
}
else if (numEvents == 0)
{
LOG_TRACE << "nothing happened";
}
else
{
// error happens, log uncommon ones
if (savedErrno != EINTR)
{
errno = savedErrno;
LOG_SYSERR << "EPollPoller::poll()";
}
}
return now;
}
//把返回到的这么多个事件添加到activeChannels
void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i) //确定它的大小小于events_的大小,因为events_是预留的事件vector
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
int fd = channel->fd(); //debug时做一下检测
ChannelMap::const_iterator it = channels_.find(fd);
assert(it != channels_.end());
assert(it->second == channel);
#endif
channel->set_revents(events_[i].events); //把已发生的事件传给channel,写到通道当中
activeChannels->push_back(channel); //并且push_back进activeChannels
}
}
//这个函数被调用是因为channel->enablereading()被调用,再调用channel->update(),再event_loop->updateChannel(),再->epoll或poll的updateChannel被调用
//
void EPollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread(); //在IO线程
const int index = channel->index(); //初始状态index是-1
LOG_INFO << "fd = " << channel->fd()
<< " events = " << channel->events() << " index = " << index;
// 当是新的或是之前监听过,后来移除了监听
// 两者的区别在于,新的channel 之前没有在epoll 中保存
// 而 del 的之前在 channels_ 中保存了,但是没有被放入epoll_ctl中监听
if (index == kNew || index == kDeleted) //index是在poll中是下标,在epoll中是三种状态,上面有三个常量
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
if (index == kNew)
{
assert(channels_.find(fd) == channels_.end()); //channels_是一个Map
channels_[fd] = channel;
}
else // index == kDeleted
{
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
}
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel); //注册事件
}
else
{
// update existing one with EPOLL_CTL_MOD/DEL
int fd = channel->fd();
(void)fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(index == kAdded);
// 既然已经添加了,那么可能的修改就是修改监听的时间,或者不在监听
// 因此这里先判断是否是没有监听的事件了,如果是那么直接移除、
if (channel->isNoneEvent()) //判断无事件
{
update(EPOLL_CTL_DEL, channel); //删除事件
channel->set_index(kDeleted); //删除后被设置为kDeleted
}
else
{
update(EPOLL_CTL_MOD, channel); //修改已注册的监听事件
}
}
}
void EPollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread(); //判断是否在IO线程
int fd = channel->fd();
LOG_TRACE << "fd = " << fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(channel->isNoneEvent());
int index = channel->index();
assert(index == kAdded || index == kDeleted);
size_t n = channels_.erase(fd); //删除
(void)n;
assert(n == 1);
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
channel->set_index(kNew);
}
void EPollPoller::update(int operation, Channel* channel)
{
printf("-------%s,line.%d-------\n",__FUNCTION__,__LINE__);
struct epoll_event event; //存放数据的结构体
memZero(&event, sizeof event);
event.events = channel->events(); //注册的事件
event.data.ptr = channel;
int fd = channel->fd();
LOG_INFO << "epoll_ctl op = " << operationToString(operation)
<< " fd = " << fd << " event = { " << channel->eventsToString() << " }";
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)//epoll_ctl失败返回-1
{
if (operation == EPOLL_CTL_DEL)
{
LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
}
else
{
LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
}
}
}
const char* EPollPoller::operationToString(int op)
{
switch (op)
{
case EPOLL_CTL_ADD:
return "ADD";
case EPOLL_CTL_DEL:
return "DEL";
case EPOLL_CTL_MOD:
return "MOD";
default:
assert(false && "ERROR op");
return "Unknown Operation";
}
}