002. 使用最小堆实现高性能定时器实现

简介: 002. 使用最小堆实现高性能定时器实现

一. 原理

  • 定时器原理

– 任务的容器(要求:数据结构有序或相对有序;能快速查找最近触发的定时任务) + 触发条件(可以通过带有time_out的系统函数,如epoll_wait的最后一个参数);

  • 最小堆的特点

– 是一颗完全二叉树;

– 某个节点的值总是小于等于它子节点的值(即定位到最小值的时间复杂度为O(1),这点可以作为定时器的容器);

– 堆中任意一个节点的子树都是最小堆;

  • 最小堆添加节点

– ① 向二叉树的最高层的最左侧添加一个节点;

– ② 然后考虑上升操作(即和父节点比较,如果值大于父节点,就交换二者位置);

  • 最小堆删除节点

– ① 在堆中找到要删除的节点;

– ② 将①中找到的节点和最高层的最左侧节点交换;

– ③ 先考虑交换后的节点能否执行下降操作,否则执行上升操作;

– ④ 删除最后一个节点;

二. 案例1:在linux平台下纯C代码实现定时器;

minheap.h

#ifndef MARK_MINHEAP_H
#define MARK_MINHEAP_H
#include <stdint.h>
#include <stdlib.h>
typedef struct timer_entry_s timer_entry_t;
typedef void (*timer_handler_pt)(timer_entry_t *ev);
struct timer_entry_s {
    uint32_t time;
    uint32_t min_heap_idx;
    timer_handler_pt handler;
    void *privdata;
};
typedef struct min_heap {
    timer_entry_t **p; // 二维指针,指向一维指针的地址,所以此处其实就是执行了一个以为数组
    uint32_t n, a; // n 为实际元素个数  a 为容量
} min_heap_t;
void            min_heap_ctor_(min_heap_t* s);
void            min_heap_dtor_(min_heap_t* s);
void            min_heap_elem_init_(timer_entry_t* e);
int             min_heap_elt_is_top_(const timer_entry_t *e);
int             min_heap_empty_(min_heap_t* s);
unsigned        min_heap_size_(min_heap_t* s);
timer_entry_t*  min_heap_top_(min_heap_t* s);
int             min_heap_reserve_(min_heap_t* s, unsigned n);
int             min_heap_push_(min_heap_t* s, timer_entry_t* e);
timer_entry_t*  min_heap_pop_(min_heap_t* s);
int             min_heap_adjust_(min_heap_t *s, timer_entry_t* e);
int             min_heap_erase_(min_heap_t* s, timer_entry_t* e);
void            min_heap_shift_up_(min_heap_t* s, unsigned hole_index, timer_entry_t* e);
void            min_heap_shift_up_unconditional_(min_heap_t* s, unsigned hole_index, timer_entry_t* e);
void            min_heap_shift_down_(min_heap_t* s, unsigned hole_index, timer_entry_t* e);
#endif // MARK_MINHEAP_H

minheap.c

#include "minheap.h"
#define min_heap_elem_greater(a, b) \
    ((a)->time > (b)->time)
// 创建最小堆
void min_heap_ctor_(min_heap_t* s) { s->p = 0; s->n = 0; s->a = 0; }
// 
void min_heap_dtor_(min_heap_t* s) { if (s->p) free(s->p); }
void min_heap_elem_init_(timer_entry_t* e) { e->min_heap_idx = -1; }
int min_heap_empty_(min_heap_t* s) { return 0u == s->n; }
unsigned min_heap_size_(min_heap_t* s) { return s->n; }
timer_entry_t* min_heap_top_(min_heap_t* s) { return s->n ? *s->p : 0; }
int min_heap_push_(min_heap_t* s, timer_entry_t* e)
{
    if (min_heap_reserve_(s, s->n + 1))
        return -1; // 如果空间分配失败,则返回,程序终止。
    // 插入新任务,并且上升调整。
    min_heap_shift_up_(s, s->n++, e);
    return 0;
}
timer_entry_t* min_heap_pop_(min_heap_t* s)
{
    if (s->n)
    {
        timer_entry_t* e = *s->p;
        // s->p[--s->n]是该堆的最后一个元素,相当于将最后一个元素填入到最小元素的位置,而后进行下降操作。
        min_heap_shift_down_(s, 0u, s->p[--s->n]);
        e->min_heap_idx = -1;
        return e;
    }
    return 0;
}
int min_heap_elt_is_top_(const timer_entry_t *e)
{
    return e->min_heap_idx == 0;
}
int min_heap_erase_(min_heap_t* s, timer_entry_t* e)
{
    if (-1 != e->min_heap_idx)
    {
        timer_entry_t *last = s->p[--s->n];
        unsigned parent = (e->min_heap_idx - 1) / 2;
        /* we replace e with the last element in the heap.  We might need to
           shift it upward if it is less than its parent, or downward if it is
           greater than one or both its children. Since the children are known
           to be less than the parent, it can't need to shift both up and
           down. */
        if (e->min_heap_idx > 0 && min_heap_elem_greater(s->p[parent], last))
            min_heap_shift_up_unconditional_(s, e->min_heap_idx, last);
        else
            min_heap_shift_down_(s, e->min_heap_idx, last);
        e->min_heap_idx = -1;
        return 0;
    }
    return -1;
}
int min_heap_adjust_(min_heap_t *s, timer_entry_t *e)
{
    if (-1 == e->min_heap_idx) {
        return min_heap_push_(s, e);
    } else {
        unsigned parent = (e->min_heap_idx - 1) / 2;
        /* The position of e has changed; we shift it up or down
         * as needed.  We can't need to do both. */
        if (e->min_heap_idx > 0 && min_heap_elem_greater(s->p[parent], e))
            min_heap_shift_up_unconditional_(s, e->min_heap_idx, e);
        else
            min_heap_shift_down_(s, e->min_heap_idx, e);
        return 0;
    }
}
int min_heap_reserve_(min_heap_t* s, unsigned n)
{
    // 当需要插入新的任务时,如果当前容量小于插入后的任务数量,执行如下if操作,申请内存空间。
    if (s->a < n)
    {
        // 相当于声明了一个*p[],即一个指向一个timer_entry_t数组的指针。
        timer_entry_t** p; 
        // 若容量==0,初始申请8个节点,若容量!=0,初始申请原来容量的2倍。
        unsigned a = s->a ? s->a * 2 : 8;
        // [MJ]不知道干嘛的?
        if (a < n)
            a = n;
        // 重新开辟一个容量为a的一维数组,数组中的每个元素指向一个timer_entry_s。
        if (!(p = (timer_entry_t**)realloc(s->p, a * sizeof *p)))
            return -1;
        // 将新开辟的地址赋值给min_heap_t中指向的一维数组的地址。
        s->p = p;
        // 更新容量。
        s->a = a;
    }
    return 0;
}
void min_heap_shift_up_unconditional_(min_heap_t* s, unsigned hole_index, timer_entry_t* e)
{
    unsigned parent = (hole_index - 1) / 2;
    do
    {
    (s->p[hole_index] = s->p[parent])->min_heap_idx = hole_index;
    hole_index = parent;
    parent = (hole_index - 1) / 2;
    } while (hole_index && min_heap_elem_greater(s->p[parent], e));
    (s->p[hole_index] = e)->min_heap_idx = hole_index;
}
void min_heap_shift_up_(min_heap_t* s, unsigned hole_index, timer_entry_t* e)
{
    // hole_index:新插入元素所在的位置下标
    unsigned parent = (hole_index - 1) / 2;
    while (hole_index && min_heap_elem_greater(s->p[parent], e))
    {
        // 如果父节点的值>新插入节点的值,则将父节点下沉,插入的子节点上升;
        (s->p[hole_index] = s->p[parent])->min_heap_idx = hole_index;
        hole_index = parent;
        parent = (hole_index - 1) / 2;
    }
    // 最后将新插入的节点放入到对应的位置,并且设置对应的在堆中的index
    (s->p[hole_index] = e)->min_heap_idx = hole_index;
}
void min_heap_shift_down_(min_heap_t* s, unsigned hole_index, timer_entry_t* e)
{
    // hole_index 为开始下沉的位置。e是该堆的最后一个元素
    unsigned min_child = 2 * (hole_index + 1); // 右孩子
    while (min_child <= s->n)
    {
        // 如果当前min_child是堆的最后一个值或者当前值比左子树值大,则min_child指向左子树,也就是指向小的。
        min_child -= min_child == s->n || min_heap_elem_greater(s->p[min_child], s->p[min_child - 1]);
        // 如果当前层的值都比最后一个元素大,则不需要下沉了,退出循环
        if (!(min_heap_elem_greater(e, s->p[min_child])))
            break;
        (s->p[hole_index] = s->p[min_child])->min_heap_idx = hole_index;
        hole_index = min_child;
        min_child = 2 * (hole_index + 1);
    }
    // 将最后一个元素填入到合适的位置,完成下沉。
    (s->p[hole_index] = e)->min_heap_idx = hole_index;
}

mh-timer.h

#ifndef MARK_MINHEAP_TIMER_H
#define MARK_MINHEAP_TIMER_H
#if defined(__APPLE__)
#include <AvailabilityMacros.h>
#include <sys/time.h>
#include <mach/task.h>
#include <mach/mach.h>
#else
#include <time.h>
#endif
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
#include <stdint.h>
#include "minheap.h"
static min_heap_t min_heap; // 声明最小堆
static uint32_t
current_time() {
  uint32_t t;
#if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER)
  struct timespec ti;
  clock_gettime(CLOCK_MONOTONIC, &ti);
  t = (uint32_t)ti.tv_sec * 1000;
  t += ti.tv_nsec / 1000000;
#else
  struct timeval tv;
  gettimeofday(&tv, NULL);
  t = (uint32_t)tv.tv_sec * 1000;
  t += tv.tv_usec / 1000;
#endif
  return t;
}
void init_timer() {
    min_heap_ctor_(&min_heap);
}
// 向定时器中添加任务
timer_entry_t * add_timer(uint32_t msec, timer_handler_pt callback) {
    timer_entry_t *te = (timer_entry_t *)malloc(sizeof(*te));
    if (!te) {
        return NULL;
    }
    memset(te, 0, sizeof(timer_entry_t));
    te->handler = callback;
    te->time = current_time() + msec;
    if (0 != min_heap_push_(&min_heap, te)) {
        // 如果push失败,则释放创建的最小堆,结束程序;
        free(te);
        return NULL;
    }
    printf("add timer time = %u now = %u\n", te->time, current_time());
    return te;
}
bool del_timer(timer_entry_t *e) {
    return 0 == min_heap_erase_(&min_heap, e);
}
int find_nearest_expire_timer() {
    timer_entry_t *te = min_heap_top_(&min_heap);
    if (!te) return -1;
    int diff = (int) te->time - (int)current_time();
    return diff > 0 ? diff : 0;
}
void expire_timer() {
    uint32_t cur = current_time();
    for (;;) {
        // 如果到目标时间,则会将该时间点的未执行的任务都执行了。
        timer_entry_t *te = min_heap_top_(&min_heap);
        if (!te) break;
        if (te->time > cur) break; // 还未到处理时间
        te->handler(te);
        min_heap_pop_(&min_heap);
        free(te); // 释放节点任务
    }
}
#endif // MARK_MINHEAP_TIMER_H

mh-timer.c

#include <stdio.h>
#include <sys/epoll.h>
#include "mh-timer.h"
// 任务函数
void hello_world(timer_entry_t *te) {
    printf("hello world time = %u\n", te->time);
}
int main() {
    init_timer(); // 初始化定时器。
    //参数1的单位为ms
    add_timer(3000, hello_world); // 向定时器中添加任务。
    add_timer(3000, hello_world); // 向定时器中添加任务。
    add_timer(3001, hello_world); // 向定时器中添加任务。
    add_timer(9000, hello_world); // 向定时器中添加任务。
    int epfd = epoll_create(1);
    struct epoll_event events[512];
    for (;;) {
        int nearest = find_nearest_expire_timer(); // 找到最近到期的任务。
        int n = epoll_wait(epfd, events, 512, nearest); // 成功返回需要处理的事件数目。失败返回0,表示等待超时
        for (int i=0; i < n; i++) {
            // 
        }
        expire_timer();
    }
    return 0;
}
// gcc mh-timer.c minheap.c -o mh -I./

运行命令

gcc mh-timer.c minheap.c -o mh -I./
./mh


目录
相关文章
|
1月前
|
消息中间件 NoSQL 应用服务中间件
定时器方案,红黑树,时间轮
定时器方案,红黑树,时间轮
40 0
|
2月前
|
安全 Java
掌握优先级队列:提升效率的关键技巧
掌握优先级队列:提升效率的关键技巧
|
2月前
|
算法 Java 程序员
【算法训练-队列 一】【结构特性】用两个栈实现队列
【算法训练-队列 一】【结构特性】用两个栈实现队列
23 0
|
1月前
|
消息中间件 应用服务中间件 网络安全
定时器方案:红黑树、最小堆和时间轮的原理
定时器方案:红黑树、最小堆和时间轮的原理
39 0
|
1月前
|
存储 NoSQL Linux
定时器的实现方案:红黑树和多级时间轮
定时器的实现方案:红黑树和多级时间轮
|
3月前
|
存储 算法 NoSQL
定时器方案之红黑树、最小堆、时间轮
定时器方案之红黑树、最小堆、时间轮
70 0
|
3月前
|
存储 NoSQL 应用服务中间件
分别基于红黑树、timefd、多级时间轮实现定时器-1
分别基于红黑树、timefd、多级时间轮实现定时器
26 0
|
3月前
|
存储 缓存 索引
分别基于红黑树、timefd、多级时间轮实现定时器-2
分别基于红黑树、timefd、多级时间轮实现定时器
29 0
|
4月前
|
存储 算法 安全
【算法基础】栈和队列及常见变种与使用,双栈、动态栈、栈的迭代器,双端队列、优先队列、并发队列、延迟队列的使用
【算法基础】栈和队列及常见变种与使用,双栈、动态栈、栈的迭代器,双端队列、优先队列、并发队列、延迟队列的使用
68 0
【算法基础】栈和队列及常见变种与使用,双栈、动态栈、栈的迭代器,双端队列、优先队列、并发队列、延迟队列的使用
|
9月前
|
数据库
每日一博 - 使用环形队列实现高效的延时消息
每日一博 - 使用环形队列实现高效的延时消息
49 0