muduo源码剖析之TimerQueue类

简介: 通过timerfd实现的定时器功能,为EventLoop扩展了一系列runAt,runEvery,runEvery等函数TimerQueue中通过std::set维护所有的Timer,也可以使用优先队列实现muduo的TimerQueue是基于timerfd_create实现,这样超时很容易和epoll结合起来。等待超时事件保存在set集合中,注意set集合的有序性,从小到大排列,整个对TimerQueue的处理也就是对set集合的操作。

简介

TimerQueue

​ 通过timerfd实现的定时器功能,为EventLoop扩展了一系列runAt,runEvery,runEvery等函数TimerQueue中通过std::set维护所有的Timer,也可以使用优先队列实现

muduo的TimerQueue是基于timerfd_create实现,这样超时很容易和epoll结合起来。等待超时事件保存在set集合中,注意set集合的有序性,从小到大排列,整个对TimerQueue的处理也就是对set集合的操作。实现TimerQueue用了3个set,分别是等待超时事件set,活跃事件set,被撤销定时set。主要是STL的一些操作。

主要成员及属性解析

主要接口

  • addTimer

向定时器中添加Timer
Timer是一个封装了回调函数和时间的类
通过内部实现addTimerInLoop保证线程安全

  • cancel

从定时器中移除某个Timer

  • 核心实现:getExpired

从timers_集合中移除已经到期的Timer

  • 核心实现:handleRead

向timerfdChannel注册的回调函数
在timerfd触发可读时间时,也就是定时器到期的时候执行
会调用getExpired,并依次执行返回的所有Timer中的回调

主要成员

  • timerfdChannel_

用来唤醒计时器的Channel,维护了一个timerfd,注册了TimerQueue::handleRead回调

  • std::set> timers_

使用std::set容器来取得最近将要超时的Timer,从而决定是否resetTimerfd

TimerQueue执行用户回调的时序图:

img

源码剖析

TimerQueue.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is an internal header file, you should not include this.

#ifndef MUDUO_NET_TIMERQUEUE_H
#define MUDUO_NET_TIMERQUEUE_H

#include <set>
#include <vector>

#include "muduo/base/Mutex.h"
#include "muduo/base/Timestamp.h"
#include "muduo/net/Callbacks.h"
#include "muduo/net/Channel.h"

namespace muduo
{
   
   
namespace net
{
   
   

class EventLoop;
class Timer;
class TimerId;

///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
class TimerQueue : noncopyable
{
   
   
 public:
  explicit TimerQueue(EventLoop* loop);
  ~TimerQueue();

  ///
  /// Schedules the callback to be run at given time,
  /// repeats if @c interval > 0.0.
  ///
  /// Must be thread safe. Usually be called from other threads.
  //添加定时事件
  TimerId addTimer(TimerCallback cb,
                   Timestamp when,
                   double interval);
  //取消定时事件
  void cancel(TimerId timerId);

 private:

  // FIXME: use unique_ptr<Timer> instead of raw pointers.
  // This requires heterogeneous comparison lookup (N3465) from C++14
  // so that we can find an T* in a set<unique_ptr<T>>.
  typedef std::pair<Timestamp, Timer*> Entry;
  typedef std::set<Entry> TimerList;
  typedef std::pair<Timer*, int64_t> ActiveTimer;
  typedef std::set<ActiveTimer> ActiveTimerSet;

  //在workloop中执行实际添加的定时事件的回调函数
  void addTimerInLoop(Timer* timer);

  //在workloop中执行实际删除定时事件的回调函数
  void cancelInLoop(TimerId timerId);

  // called when timerfd alarms
  //定时器超时时候被调用处理事件的回调函数
  void handleRead();

  // move out all expired timers
  //获取所有的超时事件
  std::vector<Entry> getExpired(Timestamp now);

  //重置刷新定时器列表
  void reset(const std::vector<Entry>& expired, Timestamp now);

  //将一个定时事件加入到定时列表中
  bool insert(Timer* timer);

  //所属loop的指针
  EventLoop* loop_;

  //timerfd文件描述符
  const int timerfd_;

  //存储timerfd事件的channel
  Channel timerfdChannel_;

  // Timer list sorted by expiration
  //按过期时间排序的定时器列表
  TimerList timers_;

  // for cancel()
  //存储活跃定时事件的容器
  ActiveTimerSet activeTimers_;

  //正在处理超时事件的标志
  bool callingExpiredTimers_; /* atomic */

  //存储所有需要取消定时事件的容器
  ActiveTimerSet cancelingTimers_;
};

}  // namespace net
}  // namespace muduo
#endif  // MUDUO_NET_TIMERQUEUE_H

TimerQueue.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef __STDC_LIMIT_MACROS
#define __STDC_LIMIT_MACROS
#endif

#include "muduo/net/TimerQueue.h"

#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/Timer.h"
#include "muduo/net/TimerId.h"

#include <sys/timerfd.h>
#include <unistd.h>

namespace muduo
{
   
   
namespace net
{
   
   
namespace detail
{
   
   

//创建一个timerfd
int createTimerfd()
{
   
   
  //创建一个timerFd,设置为单调,非阻塞,fork+exec关闭fd
  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
                                 TFD_NONBLOCK | TFD_CLOEXEC);
  if (timerfd < 0)
  {
   
   
    LOG_SYSFATAL << "Failed in timerfd_create";
  }
  return timerfd;
}

//计算定时器触发还有多长
struct timespec howMuchTimeFromNow(Timestamp when)
{
   
   
  //当前时间与when的时间间隔
  int64_t microseconds = when.microSecondsSinceEpoch()
                         - Timestamp::now().microSecondsSinceEpoch();
  if (microseconds < 100)//100起步
  {
   
   
    microseconds = 100;
  }
  struct timespec ts;
  //秒
  ts.tv_sec = static_cast<time_t>(
      microseconds / Timestamp::kMicroSecondsPerSecond);
  //纳秒
  ts.tv_nsec = static_cast<long>(
      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
  return ts;
}

void readTimerfd(int timerfd, Timestamp now)
{
   
   
  //在timerfd在读取8字节
  uint64_t howmany;
  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
  if (n != sizeof howmany)
  {
   
   
    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
  }
}

//重置定时器超时时间戳
void resetTimerfd(int timerfd, Timestamp expiration)
{
   
   
  // wake up loop by timerfd_settime()
  struct itimerspec newValue;
  struct itimerspec oldValue;
  memZero(&newValue, sizeof newValue);
  memZero(&oldValue, sizeof oldValue);

  newValue.it_value = howMuchTimeFromNow(expiration);
  //设置下一次定时事件的到达时间
  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
  if (ret)
  {
   
   
    LOG_SYSERR << "timerfd_settime()";
  }
}

}  // namespace detail
}  // namespace net
}  // namespace muduo

using namespace muduo;
using namespace muduo::net;
using namespace muduo::net::detail;

TimerQueue::TimerQueue(EventLoop* loop)
  : loop_(loop),
    timerfd_(createTimerfd()),
    timerfdChannel_(loop, timerfd_),
    timers_(),
    callingExpiredTimers_(false)
{
   
   
  //设置读事件的回调函数
  timerfdChannel_.setReadCallback(
      std::bind(&TimerQueue::handleRead, this));
  // we are always reading the timerfd, we disarm it with timerfd_settime.
  //监听读事件
  timerfdChannel_.enableReading();
}

TimerQueue::~TimerQueue()
{
   
   
  //取消所有的监听事件
  timerfdChannel_.disableAll();
  //将自己从polller中移除
  timerfdChannel_.remove();
  //关闭fd
  ::close(timerfd_);
  // do not remove channel, since we're in EventLoop::dtor();
  //释放定时器列表中所有的定时器
  for (const Entry& timer : timers_)
  {
   
   
    delete timer.second;
  }
}

//添加定时事件
TimerId TimerQueue::addTimer(TimerCallback cb,//超时回调
                             Timestamp when,//时间戳
                             double interval)//时间间隔
{
   
   
  //创建一个定时器
  Timer* timer = new Timer(std::move(cb), when, interval);
  //添加定时器
  loop_->runInLoop(
      std::bind(&TimerQueue::addTimerInLoop, this, timer));

  return TimerId(timer, timer->sequence());
}

//取消一个定时事件
void TimerQueue::cancel(TimerId timerId)
{
   
   
  loop_->runInLoop(
      std::bind(&TimerQueue::cancelInLoop, this, timerId));
}

void TimerQueue::addTimerInLoop(Timer* timer)
{
   
   
  loop_->assertInLoopThread();
  //将定时事件插入定时器列表
  bool earliestChanged = insert(timer);

  //如果这个定时事件将会是最先被触发的,那么就重新设置定时器超时时间戳
  if (earliestChanged)
  {
   
   
    resetTimerfd(timerfd_, timer->expiration());
  }
}

//取消一个定时器
void TimerQueue::cancelInLoop(TimerId timerId)
{
   
   
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  //寻找这个定时器
  ActiveTimer timer(timerId.timer_, timerId.sequence_);
  ActiveTimerSet::iterator it = activeTimers_.find(timer);
  if (it != activeTimers_.end())//找到了
  {
   
   
    //删除+释放
    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
    assert(n == 1); (void)n;
    delete it->first; // FIXME: no delete please
    activeTimers_.erase(it);
  }
  //如果此时正在处理过期事件,那么就将该定时器加入到过期事件列表
  else if (callingExpiredTimers_)
  {
   
   
    cancelingTimers_.insert(timer);
  }
  assert(timers_.size() == activeTimers_.size());
}

//timer读事件的回调函数
void TimerQueue::handleRead()
{
   
   
  loop_->assertInLoopThread();
  Timestamp now(Timestamp::now());
  //read timer
  readTimerfd(timerfd_, now);

  //获取所有超时的定时事件
  std::vector<Entry> expired = getExpired(now);

  callingExpiredTimers_ = true;
  //释放所有过期事件
  cancelingTimers_.clear();
  // safe to callback outside critical section
  //执行所有超时事件的回调函数
  for (const Entry& it : expired)
  {
   
   
    it.second->run();
  }
  callingExpiredTimers_ = false;

  //重置定时器列表
  reset(expired, now);
}

//获取所有超时的定时事件
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
   
   
  assert(timers_.size() == activeTimers_.size());
  std::vector<Entry> expired;
  //获取当前时间戳
  Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
  //返回指向第一个不小于给定值的元素的迭代器
  //也就是通过二分搜索找到第一个时间戳大于当前时间点的迭代器
  TimerList::iterator end = timers_.lower_bound(sentry);
  assert(end == timers_.end() || now < end->first);
  //back_inserter(expired)将获取vector尾部可插入元素的迭代器
  //将所有超时事件都复制到vector中
  std::copy(timers_.begin(), end, back_inserter(expired));
  //删除所有超时事件
  timers_.erase(timers_.begin(), end);

  //将所有超时事件都从活跃事件列表中摘除
  for (const Entry& it : expired)
  {
   
   
    ActiveTimer timer(it.second, it.second->sequence());
    size_t n = activeTimers_.erase(timer);
    assert(n == 1); (void)n;
  }

  assert(timers_.size() == activeTimers_.size());
  return expired;
}

//重置定时器列表
//expired中存放过期的定时器事件
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
   
   
  Timestamp nextExpire;

  for (const Entry& it : expired)
  {
   
   
    ActiveTimer timer(it.second, it.second->sequence());

    if (it.second->repeat()
        && cancelingTimers_.find(timer) == cancelingTimers_.end())
    {
   
   
    //如果这个定时事件设置了重复执行,并且没有被取消,
      it.second->restart(now);//刷新时间戳
      insert(it.second);//加入定时器列表
    }
    else//否则释放掉
    {
   
   
      // FIXME move to a free list
      delete it.second; // FIXME: no delete please
    }
  }

  //如果定时器列表非空,就获取首节点的时间戳
  if (!timers_.empty())
  {
   
   
    nextExpire = timers_.begin()->second->expiration();
  }
  //刷新重置timer的下一次超时时间戳
  if (nextExpire.valid())
  {
   
   
    resetTimerfd(timerfd_, nextExpire);
  }
}

//将一个定时事件加入到定时列表中
bool TimerQueue::insert(Timer* timer)
{
   
   
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  //此变量用来标记这个待插入的定时事件是不是会最早触发的
  bool earliestChanged = false;
  //获取到该定时器的时间戳
  Timestamp when = timer->expiration();

  TimerList::iterator it = timers_.begin();
  if (it == timers_.end() || when < it->first)
  {
   
   
    earliestChanged = true;
  }
  {
   
   
    //将定时事件插入事件列表
    std::pair<TimerList::iterator, bool> result
      = timers_.insert(Entry(when, timer));
    assert(result.second); (void)result;
  }
  {
   
   
    //将定时事件插入活跃事件列表
    std::pair<ActiveTimerSet::iterator, bool> result
      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }

  assert(timers_.size() == activeTimers_.size());
  return earliestChanged;
}
目录
相关文章
|
6月前
|
API
muduo源码剖析之SocketOps类
对socket设置API的封装。比较简单,已经编写注释。
38 0
|
6月前
muduo源码剖析之Socket类
封装了一个sockfd相关的设置。比较简单,已经编写注释。
54 0
|
6月前
muduo源码剖析之Acceptor监听类
Acceptor类用于创建套接字,设置套接字选项,调用socket()->bind()->listen()->accept()函数,接受连接,然后调用TcpServer设置的connect事件的回调。listen()//在TcpServer::start中调用封装了一个listen fd相关的操作,用于mainLoop接受器封装,实质上就是对Channel的多一层封装监听连接 当新连接进入时,调用Socket::accept创建套接字,触发TcpServer的回调TcpServer通过该接口设置回调,
44 0
|
6月前
|
安全 API
muduo源码剖析之EventLoop事件循环类
EventLoop.cc就相当于一个reactor,多线程之间的函数调用(用eventfd唤醒),epoll处理,超时队列处理,对channel的处理。运行loop的进程被称为IO线程,EventLoop提供了一些API确保相应函数在IO线程中调用,确保没有用互斥量保护的变量只能在IO线程中使用,也封装了超时队列的基本操作。
76 0
|
6月前
muduo源码剖析之Connector客户端连接类
Connector负责主动发起连接,不负责创建socket,只负责连接的建立,外部调用Connector::start就可以发起连接,Connector具有重连的功能和停止连接的功能,连接成功建立后返回到TcpClient。
54 0
|
6月前
muduo源码剖析之InetAddress
InetAddress 类在 muduo 网络库中被广泛使用,用于表示网络中的通信实体的地址信息,例如服务器地址、客户端地址等。通过 InetAddress 类,我们可以方便地操作 IP 地址和端口号,实现网络通信的功能。InetAddress 类是 muduo 网络库中的一个重要类,用于表示网络中的 IP 地址和端口号。源码比较简单,已经编写详细注释。
82 0
|
5月前
|
存储 并行计算 算法
深入解析Java并发库(JUC)中的Phaser:原理、应用与源码分析
深入解析Java并发库(JUC)中的Phaser:原理、应用与源码分析
|
6月前
|
前端开发
muduo源码剖析之AsyncLogging异步日志类
AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer_和nextBuffer_,还有一个存放BufferPtr的vector(buffers_)。多个前端线程往currentBuffer_写数据,currentBuffer_写满了将其放入buffers_,通知后端线程读。前端线程将currentBuffer_和nextBuffer_替换继续写currentBuffer_。
74 0
|
5月前
|
API C++
c++进阶篇——初窥多线程(三)cpp中的线程类
C++11引入了`std::thread`,提供对并发编程的支持,简化多线程创建并增强可移植性。`std::thread`的构造函数包括默认构造、移动构造及模板构造(支持函数、lambda和对象)。`thread::get_id()`获取线程ID,`join()`确保线程执行完成,`detach()`使线程独立,`joinable()`检查线程状态,`operator=`仅支持移动赋值。`thread::hardware_concurrency()`返回CPU核心数,可用于高效线程分配。
|
5月前
|
存储 设计模式 缓存
面试官:说说Netty对象池的实现原理?
Netty 作为一个高性能的网络通讯框架,它内置了很多恰夺天工的设计,目的都是为了将网络通讯的性能做到极致,其中「对象池技术」也是实现这一目标的重要技术。 ## 1.什么是对象池技术? 对象池技术是一种重用对象以减少对象创建和销毁带来的开销的方法。在对象池中,只有第一次访问时会创建对象,并将其维护在内存中,当再次需要使用对象时,会直接从对象池中获取对象,并在使用完毕后归还给对象池,而不是频繁地创建和销毁对象。 使用对象池技术的优点有以下几个: 1. **提高性能**:复用对象可以减少对象的创建和销毁次数,降低系统开销,提高系统性能和吞吐量。 2. **减少内存碎片**:对象池可以避免
39 0