简析LIVE555中的延时队列

简介:

最近在看LIVE555的源码,对其中的延时队列感觉有点乱,网上查询资料,于是就总结一下。

首先描述一下LIVE555中的延时队列的设计理念。如下图,A,B,C分别为时间轴上的三个事件点,而head表示当前时间点。

要描述一个事件发生的时间,通常可以有两种方法:一种方法直接描述事件发生的绝对时间;另一种方法则是可以描述和另一事件发生的相对时间。而LIVE555中采用的就是后者。  

LIVE555中,首先将所有的事件点以发生时间的先后进行排序,然后每个事件对应的时间都是相对于前一事件发生的时间差。比如B事件中存储的时间就是A事件触发后,再去触发B事件所需要的时间。这样,我们每次去查询这个队列中是否有事件被触发的时候,就只需要查询整个队列中的第一个事件就可以了。

 然后就是LIVE555中的实现方法了。整个延时队列是用DelayQueue这个类实现的,而它的基类DelayQueueEntry就是用来描述每个事件节点的。在DelayQueueEntry中的主要成员有以下几个:fDelayTimeRemaining表示的就是与前一事件之间的时间差;fNextfPrev就是指向时间轴上的下一个事件和前一个事件的指针;ftoken表示当前节点的标识;handleTimeout就是事件超时后的处理方法。

DelayQueue类里描述的则是具体的实现方法。首先是一些对这个队列进行的基本操作:addEntry实现的是在队列中增加事件节点removeEntry实现的是在队列中删除某事件节点;updateEntry实现的则是更新某事件的触发时间;而findEntryByToken则是根据节点的标识查找相应的事件。在此类中最常用的方法应该是synchronize,它实现的就是将整个事件队列和当前系统时间进行同步,检测有无事件已经被触发,如果触发并调用handleAlarm方法对相应事件进行处理。而属性fLastSyncTime表示的就是上次同步的系统时间,其实一般情况下,方法synchronize的实现方法其实就是简单地把队列上第一个事件节点存储的时间差减去当前系统时间和上次同步时间的差。

附:相关类结构:

=================================================================

==> 相关typedef定义

typedef void TaskFunc(void* clientData);

typedef void* TaskToken;

// 下面Timeval类有涉及

#ifdef TIME_BASE

typedef TIME_BASE time_base_seconds;

#else

typedef long time_base_seconds;

#endif

==> 相关类的说明(由于有些类很大,故不会完整贴出,故用说明)

///// A "Timeval" can be either an absolute time, or a time interval /////

class Timeval {

public:

  time_base_seconds seconds() const {

    return fTv.tv_sec;

  }

  time_base_seconds seconds() {

    return fTv.tv_sec;

  }

  time_base_seconds useconds() const {

return fTv.tv_usec;

  }

int operator>=(Timeval const& arg2) const; // >=为基础,推算出其余条件判断(<=<</span>、>等)的真假

  int operator<=(Timeval const& arg2) const {

    return arg2 >= *this;

  }

  int operator<</b>(Timeval const& arg2) const {

    return !(*this >= arg2);

  } 

int operator>(Timeval const& arg2) const {

    return arg2 < *this;

  }

  int operator==(Timeval const& arg2) const {

    return *this >= arg2 && arg2 >= *this;

  }

  int operator!=(Timeval const& arg2) const {

    return !(*this == arg2);

  }

  void operator+=(class DelayInterval const& arg2);

  void operator-=(class DelayInterval const& arg2);

  // returns ZERO iff arg2 >= arg1

protected:

  Timeval_r(time_base_seconds seconds, time_base_seconds useconds) {

    fTv.tv_sec = seconds; fTv.tv_usec = useconds;

  }

private:

  time_base_seconds& secs() {

    return (time_base_seconds&)fTv.tv_sec;

  }

  time_base_seconds& usecs() {

    return (time_base_seconds&)fTv.tv_usec;

  }

  struct timeval fTv; // 看到,所有的所有,其实是在为timeval这个结构体封装了一系列操作函数

};

++++++++++++++++++++++++++++++++++++++++++

// 下面这个类用以处理自197011日以来的绝对时间

class EventTime: public Timeval {

public:

  EventTime(unsigned secondsSinceEpoch = 0,

    unsigned usecondsSinceEpoch = 0)

    // We use the Unix standard epoch: January 1, 1970

    : Timeval_r(secondsSinceEpoch, usecondsSinceEpoch) {}

};

class DelayQueueEntry { // 通过它来链接所有的事件信息,组成队列(见下面DelayQueue类)

public:

  virtual ~DelayQueueEntry();

  intptr_t token() {

    return fToken;

  }

protected: // abstract base class

  DelayQueueEntry(DelayInterval delay);

  virtual void handleTimeout();

private:

  friend class DelayQueue;

  DelayQueueEntry* fNext;

  DelayQueueEntry* fPrev;

  DelayInterval fDeltaTimeRemaining;

  intptr_t fToken;

  static intptr_t tokenCounter;

};

class DelayQueue: public DelayQueueEntry {

public:

  DelayQueue();

  virtual ~DelayQueue();

  void addEntry(DelayQueueEntry* newEntry); // returns a token for the entry

  void updateEntry(DelayQueueEntry* entry, DelayInterval newDelay);

  void updateEntry(intptr_t tokenToFind, DelayInterval newDelay);

  void removeEntry(DelayQueueEntry* entry); // but doesn't delete it

  DelayQueueEntry* removeEntry(intptr_t tokenToFind); // but doesn't delete it

  DelayInterval const& timeToNextAlarm();

  void handleAlarm();

private:

  DelayQueueEntry* head() { return fNext; } // 返回DelayQueueEntry类中的fNext队头成员

  DelayQueueEntry* findEntryByToken(intptr_t token);

  void synchronize(); // bring the 'time remaining' fields up-to-date

  EventTime fLastSyncTime;

};

////////// A subclass of DelayQueueEntry,

//////////     used to implement BasicTaskScheduler0::scheduleDelayedTask()

class AlarmHandler: public DelayQueueEntry {

public:

  AlarmHandler(TaskFunc* proc, void* clientData, DelayInterval timeToDelay)

    : DelayQueueEntry(timeToDelay), fProc(proc), fClientData(clientData) {

  }

private: // redefined virtual functions

  virtual void handleTimeout() {

    (*fProc)(fClientData);

    DelayQueueEntry::handleTimeout();

  }

private:

  TaskFunc* fProc;

  void* fClientData;

};

目录
相关文章
|
8月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
1850 15
|
8月前
|
消息中间件 数据库
七、延时队列
七、延时队列
82 0
|
消息中间件 Java 数据库
RibbitMQ学习笔记延迟队列(一)
RibbitMQ学习笔记延迟队列
77 0
|
消息中间件 存储 NoSQL
RibbitMQ学习笔记延迟队列(二)
RibbitMQ学习笔记延迟队列
95 0
|
消息中间件 Java Kafka
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
252 0
15、RabbitMQ没有延时队列?学会这一招玩转延时队列
|
前端开发
【异步FIFO的一些小事·2】异步FIFO中异步走线延时约束的一些思考
【异步FIFO的一些小事·2】异步FIFO中异步走线延时约束的一些思考
229 0
【异步FIFO的一些小事·2】异步FIFO中异步走线延时约束的一些思考
【异步FIFO的一些小事·3】异步FIFO中指针走线延时的一些思考
【异步FIFO的一些小事·3】异步FIFO中指针走线延时的一些思考
153 0
【异步FIFO的一些小事·3】异步FIFO中指针走线延时的一些思考
|
消息中间件 Java Kafka
RabbitMQ没有延时队列?我就教你一招,玩转延时队列
延时队列:顾名思义,是一个用于做消息延时消费的队列。但是它也是一个普通队列,所以它具备普通队列的特性,相比之下,延时的特性就是它最大的特点。所谓的延时就是将我们需要的消息,延迟多久之后被消费。普通队列是即时消费的,延时队列是根据延时时间,多久之后才能消费的。
|
消息中间件
延时队列优化 (2)
在这里新增了一个队列QC,绑定关系如下,该队列不设置TTL时间
延时队列优化 (2)
|
消息中间件 JSON 数据库
rabbitMQ延时队列与TTL和DLX、延迟队列的相关介绍
TTL是Time To Live的缩写, 也就是生存时间。 RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。 如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。 默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。