muduo源码剖析之TimerQueue类

简介: 通过timerfd实现的定时器功能,为EventLoop扩展了一系列runAt,runEvery,runEvery等函数TimerQueue中通过std::set维护所有的Timer,也可以使用优先队列实现muduo的TimerQueue是基于timerfd_create实现,这样超时很容易和epoll结合起来。等待超时事件保存在set集合中,注意set集合的有序性,从小到大排列,整个对TimerQueue的处理也就是对set集合的操作。

简介

TimerQueue

​ 通过timerfd实现的定时器功能,为EventLoop扩展了一系列runAt,runEvery,runEvery等函数TimerQueue中通过std::set维护所有的Timer,也可以使用优先队列实现

muduo的TimerQueue是基于timerfd_create实现,这样超时很容易和epoll结合起来。等待超时事件保存在set集合中,注意set集合的有序性,从小到大排列,整个对TimerQueue的处理也就是对set集合的操作。实现TimerQueue用了3个set,分别是等待超时事件set,活跃事件set,被撤销定时set。主要是STL的一些操作。

主要成员及属性解析

主要接口

  • addTimer

向定时器中添加Timer
Timer是一个封装了回调函数和时间的类
通过内部实现addTimerInLoop保证线程安全

  • cancel

从定时器中移除某个Timer

  • 核心实现:getExpired

从timers_集合中移除已经到期的Timer

  • 核心实现:handleRead

向timerfdChannel注册的回调函数
在timerfd触发可读时间时,也就是定时器到期的时候执行
会调用getExpired,并依次执行返回的所有Timer中的回调

主要成员

  • timerfdChannel_

用来唤醒计时器的Channel,维护了一个timerfd,注册了TimerQueue::handleRead回调

  • std::set> timers_

使用std::set容器来取得最近将要超时的Timer,从而决定是否resetTimerfd

TimerQueue执行用户回调的时序图:

img

源码剖析

TimerQueue.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_TIMERQUEUE_H
#define MUDUO_NET_TIMERQUEUE_H

#include <set>
#include <vector>

#include "muduo/base/Mutex.h"
#include "muduo/base/Timestamp.h"
#include "muduo/net/Callbacks.h"
#include "muduo/net/Channel.h"

namespace muduo
{
   
   
namespace net
{
   
   

class EventLoop;
class Timer;
class TimerId;

///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
class TimerQueue : noncopyable
{
   
   
 public:
  explicit TimerQueue(EventLoop* loop);
  ~TimerQueue();

  ///
  /// Schedules the callback to be run at given time,
  /// repeats if @c interval > 0.0.
  ///
  /// Must be thread safe. Usually be called from other threads.
  //添加定时事件
  TimerId addTimer(TimerCallback cb,
                   Timestamp when,
                   double interval);
  //取消定时事件
  void cancel(TimerId timerId);

 private:

  // FIXME: use unique_ptr<Timer> instead of raw pointers.
  // This requires heterogeneous comparison lookup (N3465) from C++14
  // so that we can find an T* in a set<unique_ptr<T>>.
  typedef std::pair<Timestamp, Timer*> Entry;
  typedef std::set<Entry> TimerList;
  typedef std::pair<Timer*, int64_t> ActiveTimer;
  typedef std::set<ActiveTimer> ActiveTimerSet;

  //在workloop中执行实际添加的定时事件的回调函数
  void addTimerInLoop(Timer* timer);

  //在workloop中执行实际删除定时事件的回调函数
  void cancelInLoop(TimerId timerId);

  // called when timerfd alarms
  //定时器超时时候被调用处理事件的回调函数
  void handleRead();

  // move out all expired timers
  //获取所有的超时事件
  std::vector<Entry> getExpired(Timestamp now);

  //重置刷新定时器列表
  void reset(const std::vector<Entry>& expired, Timestamp now);

  //将一个定时事件加入到定时列表中
  bool insert(Timer* timer);

  //所属loop的指针
  EventLoop* loop_;

  //timerfd文件描述符
  const int timerfd_;

  //存储timerfd事件的channel
  Channel timerfdChannel_;

  // Timer list sorted by expiration
  //按过期时间排序的定时器列表
  TimerList timers_;

  // for cancel()
  //存储活跃定时事件的容器
  ActiveTimerSet activeTimers_;

  //正在处理超时事件的标志
  bool callingExpiredTimers_; /* atomic */

  //存储所有需要取消定时事件的容器
  ActiveTimerSet cancelingTimers_;
};

}  // namespace net
}  // namespace muduo
#endif  // MUDUO_NET_TIMERQUEUE_H

TimerQueue.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef __STDC_LIMIT_MACROS
#define __STDC_LIMIT_MACROS
#endif

#include "muduo/net/TimerQueue.h"

#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/Timer.h"
#include "muduo/net/TimerId.h"

#include <sys/timerfd.h>
#include <unistd.h>

namespace muduo
{
   
   
namespace net
{
   
   
namespace detail
{
   
   

//创建一个timerfd
int createTimerfd()
{
   
   
  //创建一个timerFd,设置为单调,非阻塞,fork+exec关闭fd
  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
                                 TFD_NONBLOCK | TFD_CLOEXEC);
  if (timerfd < 0)
  {
   
   
    LOG_SYSFATAL << "Failed in timerfd_create";
  }
  return timerfd;
}

//计算定时器触发还有多长
struct timespec howMuchTimeFromNow(Timestamp when)
{
   
   
  //当前时间与when的时间间隔
  int64_t microseconds = when.microSecondsSinceEpoch()
                         - Timestamp::now().microSecondsSinceEpoch();
  if (microseconds < 100)//100起步
  {
   
   
    microseconds = 100;
  }
  struct timespec ts;
  //秒
  ts.tv_sec = static_cast<time_t>(
      microseconds / Timestamp::kMicroSecondsPerSecond);
  //纳秒
  ts.tv_nsec = static_cast<long>(
      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
  return ts;
}

void readTimerfd(int timerfd, Timestamp now)
{
   
   
  //在timerfd在读取8字节
  uint64_t howmany;
  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
  if (n != sizeof howmany)
  {
   
   
    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
  }
}

//重置定时器超时时间戳
void resetTimerfd(int timerfd, Timestamp expiration)
{
   
   
  // wake up loop by timerfd_settime()
  struct itimerspec newValue;
  struct itimerspec oldValue;
  memZero(&newValue, sizeof newValue);
  memZero(&oldValue, sizeof oldValue);

  newValue.it_value = howMuchTimeFromNow(expiration);
  //设置下一次定时事件的到达时间
  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
  if (ret)
  {
   
   
    LOG_SYSERR << "timerfd_settime()";
  }
}

}  // namespace detail
}  // namespace net
}  // namespace muduo

using namespace muduo;
using namespace muduo::net;
using namespace muduo::net::detail;

TimerQueue::TimerQueue(EventLoop* loop)
  : loop_(loop),
    timerfd_(createTimerfd()),
    timerfdChannel_(loop, timerfd_),
    timers_(),
    callingExpiredTimers_(false)
{
   
   
  //设置读事件的回调函数
  timerfdChannel_.setReadCallback(
      std::bind(&TimerQueue::handleRead, this));
  // we are always reading the timerfd, we disarm it with timerfd_settime.
  //监听读事件
  timerfdChannel_.enableReading();
}

TimerQueue::~TimerQueue()
{
   
   
  //取消所有的监听事件
  timerfdChannel_.disableAll();
  //将自己从polller中移除
  timerfdChannel_.remove();
  //关闭fd
  ::close(timerfd_);
  // do not remove channel, since we're in EventLoop::dtor();
  //释放定时器列表中所有的定时器
  for (const Entry& timer : timers_)
  {
   
   
    delete timer.second;
  }
}

//添加定时事件
TimerId TimerQueue::addTimer(TimerCallback cb,//超时回调
                             Timestamp when,//时间戳
                             double interval)//时间间隔
{
   
   
  //创建一个定时器
  Timer* timer = new Timer(std::move(cb), when, interval);
  //添加定时器
  loop_->runInLoop(
      std::bind(&TimerQueue::addTimerInLoop, this, timer));

  return TimerId(timer, timer->sequence());
}

//取消一个定时事件
void TimerQueue::cancel(TimerId timerId)
{
   
   
  loop_->runInLoop(
      std::bind(&TimerQueue::cancelInLoop, this, timerId));
}

void TimerQueue::addTimerInLoop(Timer* timer)
{
   
   
  loop_->assertInLoopThread();
  //将定时事件插入定时器列表
  bool earliestChanged = insert(timer);

  //如果这个定时事件将会是最先被触发的,那么就重新设置定时器超时时间戳
  if (earliestChanged)
  {
   
   
    resetTimerfd(timerfd_, timer->expiration());
  }
}

//取消一个定时器
void TimerQueue::cancelInLoop(TimerId timerId)
{
   
   
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  //寻找这个定时器
  ActiveTimer timer(timerId.timer_, timerId.sequence_);
  ActiveTimerSet::iterator it = activeTimers_.find(timer);
  if (it != activeTimers_.end())//找到了
  {
   
   
    //删除+释放
    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
    assert(n == 1); (void)n;
    delete it->first; // FIXME: no delete please
    activeTimers_.erase(it);
  }
  //如果此时正在处理过期事件,那么就将该定时器加入到过期事件列表
  else if (callingExpiredTimers_)
  {
   
   
    cancelingTimers_.insert(timer);
  }
  assert(timers_.size() == activeTimers_.size());
}

//timer读事件的回调函数
void TimerQueue::handleRead()
{
   
   
  loop_->assertInLoopThread();
  Timestamp now(Timestamp::now());
  //read timer
  readTimerfd(timerfd_, now);

  //获取所有超时的定时事件
  std::vector<Entry> expired = getExpired(now);

  callingExpiredTimers_ = true;
  //释放所有过期事件
  cancelingTimers_.clear();
  // safe to callback outside critical section
  //执行所有超时事件的回调函数
  for (const Entry& it : expired)
  {
   
   
    it.second->run();
  }
  callingExpiredTimers_ = false;

  //重置定时器列表
  reset(expired, now);
}

//获取所有超时的定时事件
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
   
   
  assert(timers_.size() == activeTimers_.size());
  std::vector<Entry> expired;
  //获取当前时间戳
  Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
  //返回指向第一个不小于给定值的元素的迭代器
  //也就是通过二分搜索找到第一个时间戳大于当前时间点的迭代器
  TimerList::iterator end = timers_.lower_bound(sentry);
  assert(end == timers_.end() || now < end->first);
  //back_inserter(expired)将获取vector尾部可插入元素的迭代器
  //将所有超时事件都复制到vector中
  std::copy(timers_.begin(), end, back_inserter(expired));
  //删除所有超时事件
  timers_.erase(timers_.begin(), end);

  //将所有超时事件都从活跃事件列表中摘除
  for (const Entry& it : expired)
  {
   
   
    ActiveTimer timer(it.second, it.second->sequence());
    size_t n = activeTimers_.erase(timer);
    assert(n == 1); (void)n;
  }

  assert(timers_.size() == activeTimers_.size());
  return expired;
}

//重置定时器列表
//expired中存放过期的定时器事件
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
   
   
  Timestamp nextExpire;

  for (const Entry& it : expired)
  {
   
   
    ActiveTimer timer(it.second, it.second->sequence());

    if (it.second->repeat()
        && cancelingTimers_.find(timer) == cancelingTimers_.end())
    {
   
   
    //如果这个定时事件设置了重复执行,并且没有被取消,
      it.second->restart(now);//刷新时间戳
      insert(it.second);//加入定时器列表
    }
    else//否则释放掉
    {
   
   
      // FIXME move to a free list
      delete it.second; // FIXME: no delete please
    }
  }

  //如果定时器列表非空,就获取首节点的时间戳
  if (!timers_.empty())
  {
   
   
    nextExpire = timers_.begin()->second->expiration();
  }
  //刷新重置timer的下一次超时时间戳
  if (nextExpire.valid())
  {
   
   
    resetTimerfd(timerfd_, nextExpire);
  }
}

//将一个定时事件加入到定时列表中
bool TimerQueue::insert(Timer* timer)
{
   
   
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  //此变量用来标记这个待插入的定时事件是不是会最早触发的
  bool earliestChanged = false;
  //获取到该定时器的时间戳
  Timestamp when = timer->expiration();

  TimerList::iterator it = timers_.begin();
  if (it == timers_.end() || when < it->first)
  {
   
   
    earliestChanged = true;
  }
  {
   
   
    //将定时事件插入事件列表
    std::pair<TimerList::iterator, bool> result
      = timers_.insert(Entry(when, timer));
    assert(result.second); (void)result;
  }
  {
   
   
    //将定时事件插入活跃事件列表
    std::pair<ActiveTimerSet::iterator, bool> result
      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }

  assert(timers_.size() == activeTimers_.size());
  return earliestChanged;
}
目录
相关文章
|
6月前
|
API
muduo源码剖析之SocketOps类
对socket设置API的封装。比较简单,已经编写注释。
39 0
|
6月前
muduo源码剖析之Socket类
封装了一个sockfd相关的设置。比较简单,已经编写注释。
55 0
|
6月前
muduo源码剖析之Acceptor监听类
Acceptor类用于创建套接字,设置套接字选项,调用socket()->bind()->listen()->accept()函数,接受连接,然后调用TcpServer设置的connect事件的回调。listen()//在TcpServer::start中调用封装了一个listen fd相关的操作,用于mainLoop接受器封装,实质上就是对Channel的多一层封装监听连接 当新连接进入时,调用Socket::accept创建套接字,触发TcpServer的回调TcpServer通过该接口设置回调,
44 0
|
6月前
|
安全 API
muduo源码剖析之EventLoop事件循环类
EventLoop.cc就相当于一个reactor,多线程之间的函数调用(用eventfd唤醒),epoll处理,超时队列处理,对channel的处理。运行loop的进程被称为IO线程,EventLoop提供了一些API确保相应函数在IO线程中调用,确保没有用互斥量保护的变量只能在IO线程中使用,也封装了超时队列的基本操作。
78 0
|
6月前
muduo源码剖析之InetAddress
InetAddress 类在 muduo 网络库中被广泛使用,用于表示网络中的通信实体的地址信息,例如服务器地址、客户端地址等。通过 InetAddress 类,我们可以方便地操作 IP 地址和端口号,实现网络通信的功能。InetAddress 类是 muduo 网络库中的一个重要类,用于表示网络中的 IP 地址和端口号。源码比较简单,已经编写详细注释。
83 0
|
6月前
muduo源码剖析之Connector客户端连接类
Connector负责主动发起连接,不负责创建socket,只负责连接的建立,外部调用Connector::start就可以发起连接,Connector具有重连的功能和停止连接的功能,连接成功建立后返回到TcpClient。
57 0
|
6月前
muduo源码剖析之channel通道类
channel是muduo中的事件分发器,它只属于一个EventLoop,Channel类中保存着IO事件的类型以及对应的回调函数,每个channel只负责一个文件描述符,但它并不拥有这个文件描述符。channel是在epoll和TcpConnection之间起沟通作用,故也叫做通道,其它类通过调用channel的setCallbcak来和建立channel沟通关系。
104 0
|
5月前
|
存储 并行计算 算法
深入解析Java并发库(JUC)中的Phaser:原理、应用与源码分析
深入解析Java并发库(JUC)中的Phaser:原理、应用与源码分析
|
6月前
|
前端开发
muduo源码剖析之AsyncLogging异步日志类
AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer_和nextBuffer_,还有一个存放BufferPtr的vector(buffers_)。多个前端线程往currentBuffer_写数据,currentBuffer_写满了将其放入buffers_,通知后端线程读。前端线程将currentBuffer_和nextBuffer_替换继续写currentBuffer_。
75 0
|
5月前
|
API C++
c++进阶篇——初窥多线程(三)cpp中的线程类
C++11引入了`std::thread`,提供对并发编程的支持,简化多线程创建并增强可移植性。`std::thread`的构造函数包括默认构造、移动构造及模板构造(支持函数、lambda和对象)。`thread::get_id()`获取线程ID,`join()`确保线程执行完成,`detach()`使线程独立,`joinable()`检查线程状态,`operator=`仅支持移动赋值。`thread::hardware_concurrency()`返回CPU核心数,可用于高效线程分配。