p-queue 源码解读

简介: 前言在钉钉表格的开放场景中,需要对用户在沙箱中执行的操作进行合理的调度,我们选择了 p-queue 这个简洁但功能强大的工具库。最近需要对不同类型的任务进行更复杂的并发调度,希望通过学习源码的方式,掌握其中的核心逻辑,确保业务功能的顺利实现。什么是 p-queuep-queue 是一个异步队列管理库。对于需要控制执行速度或数量的异步操作很有用,比如:与 REST API 交互或执行 CPU / 内

前言

在钉钉表格的开放场景中,需要对用户在沙箱中执行的操作进行合理的调度,我们选择了 p-queue 这个简洁但功能强大的工具库。

最近需要对不同类型的任务进行更复杂的并发调度,希望通过学习源码的方式,掌握其中的核心逻辑,确保业务功能的顺利实现。

什么是 p-queue

p-queue 是一个异步队列管理库。

对于需要控制执行速度或数量的异步操作很有用,比如:与 REST API 交互或执行 CPU / 内存密集型任务。

核心功能

add 方法

add 方法是 PQueue 中最核心的方法之一,其主要作用是向队列中添加任务并执行。

下面是 add 方法的具体实现:

async add<TaskResultType>(function_: Task<TaskResultType>, options: Partial<EnqueueOptionsType> = {}): Promise<TaskResultType | void> {
  // 设置默认的选项
  options = {
    timeout: this.timeout,
    throwOnTimeout: this.#throwOnTimeout,
    ...options,
  };

return new Promise((resolve, reject) => {
  // 将任务加入队列
  this.#queue.enqueue(async () => {
    this.#pending++;
    this.#intervalCount++;

    try {
      // 执行任务
      let operation = function_({signal: options.signal});

      if (options.timeout) {
        operation = pTimeout(Promise.resolve(operation), options.timeout);
      }

      if (options.signal) {
        operation = Promise.race([operation, this.#throwOnAbort(options.signal)]);
      }

      const result = await operation;
      resolve(result);
      this.emit('completed', result);
    } catch (error: unknown) {
      if (error instanceof TimeoutError && !options.throwOnTimeout) {
        resolve();
        return;
      }

      reject(error);
      this.emit('error', error);
    } finally {
      this.#next();
    }
  }, options);

  // 发送任务添加事件
  this.emit('add');

  // 尝试启动另一个任务
  this.#tryToStartAnother();
});
}

从上面的代码可以看出,add 方法的主要逻辑如下:

  1. 接收用户提供的任务和选项参数。 
  2. 将任务加入队列,并尝试启动另一个任务。 
  3. 递增队列状态计数器。 
  4. 执行任务,并尝试获取配置中的超时和信号配置,更新操作处理。 
  5. 等待操作完成后,触发 completed 事件。 
  6. 任务执行完毕后,尝试启动队列中的下一个任务。 

调用 add 方法时,参数中的任务会被加入到队列中,并等待被执行。当队列为空或有空闲时,队列中的任务会被取出并执行。

通过这种方式,PQueue 可以在限制的并发数的条件下,按照 FIFO 的顺序执行任务。

tryToStartAnother 方法

#tryToStartAnother(): boolean {
  if (this.#queue.size === 0) {
  if (this.#intervalId) {
    clearInterval(this.#intervalId);
  }

  this.#intervalId = undefined;

  this.emit('empty');

  if (this.#pending === 0) {
    this.emit('idle');
  }

  return false;
}

if (!this.#isPaused) {
  const canInitializeInterval = !this.#isIntervalPaused;
  if (this.#doesIntervalAllowAnother && this.#doesConcurrentAllowAnother) {
    const job = this.#queue.dequeue();
    if (!job) {
      return false;
    }

    this.emit('active');
    job();

    if (canInitializeInterval) {
      this.#initializeIntervalIfNeeded();
    }

    return true;
  }
}

return false;
}

tryToStartAnother 是另一个一个核心方法。

  1. 先判断任务队列是否已经没有可执行的任务,若是,则进行状态的清理并触发相应事件通知并返回 
  2. 再判断队列是否在暂停状态,若是,直接返回 
  3. 再根据 interval 和 internalCap 判断是否允许在当前间隔内执行新的任务(doesIntervalAllowAnother),若同时满足 concurrency 配置项的限制(doesConcurrentAllowAnother),就从任务队列中取出一个执行 

tryToStartAnother 方法的作用是在任务队列空闲时尽可能启动一个新的任务,并确保能够满足并发数和任务执行间隔的限制。

任务队列的优先级控制

enqueue(run: RunFunction, options?: Partial<PriorityQueueOptions>): void {
  options = {
    priority: 0,
    ...options,
  };

const element = {
  priority: options.priority,
  run,
};

if (this.size && this.#queue[this.size - 1]!.priority! >= options.priority!) {
  this.#queue.push(element);
  return;
}

const index = lowerBound(
  this.#queue, element,
  (a: Readonly<PriorityQueueOptions>, b: Readonly<PriorityQueueOptions>) => b.priority! - a.priority!,
);
this.#queue.splice(index, 0, element);
}

这是 PriorityQueue 中的 enqueue 方法,用于向队列中添加任务。options 对象包含一个可选的 priority 属性用于指定任务的优先级。

  1. 队列中已有任务,且新增的任务优先级比当前队列的最后一个任务低,则直接将新任务放在末尾 
  2. 如果新任务优先级比最后任务高,则通过 lowerBound 函数找到正确位置并插入队列
// Port of lower_bound from https://en.cppreference.com/w/cpp/algorithm/lower_bound
// Used to compute insertion index to keep queue sorted after insertion
export default function lowerBound(array: readonly T[], value: T, comparator: (a: T, b: T) => number): number {
  let first = 0;
  let count = array.length; 
  while (count > 0) {
    const step = Math.trunc(count / 2);
    let it = first + step;

    if (comparator(array[it]!, value) <= 0) {
      first = ++it;
      count -= step + 1;
    } else {
      count = step;
    }
  }

  return first;
} 

lowerBound 是一个二分查找算法的实现,该算法参考了 C++ 中的 lower_bound 算法。

算法维护两个指针 first 和 count,用于表示当前查找范围的左右端点。每次遍历时,计算中间位置 it,并将其与目标值进行比较。

如果目标值比中间元素大,则将 first 更新为 it + 1,并将 count 减去 step + 1;否则将 count 更新为 step,并不需要更新 first。

然后根据比较结果更新 first 和 count,以缩小查找范围。最终,当 count 为 0 时,first 就是插入位置的下标。

通过这种方式,保证高优先的任务都能够在执行队列的最前面。

小结

p-queue 的优点有:

  1. 控制并发:可以非常方便地控制任务并发量,避免过度消耗资源和导致程序崩溃的问题。 
  2. 支持优先级:可以为每个任务设置优先级,优先级越高的任务将会在队列中先被执行,可以优先处理重要任务。 
  3. 支持超时:支持为每个任务设置超时时间,当任务执行时间超过设定的超时时间时,p-queue 会自动结束该任务并抛出 TimeoutError 异常。 
  4. 可扩展性:API 简单易用,可以方便地扩展自己的定制功能,比如自定义任务执行函数、实现自定义的 queue class 等。 

不过对于任务数量非常多且有很多不同优先级的极端场景来说,对 priority 的排序可能会成为一个性能瓶颈。

但这种场景目前可以忽略不计,p-queue 是一个能够满足当前需求的异步队列管理库。

目录
相关文章
每日一道面试题之在 Queue 中 poll()和 remove()有什么区别?
每日一道面试题之在 Queue 中 poll()和 remove()有什么区别?
|
5月前
|
移动开发
poll(2) 源码分析
poll(2) 源码分析
|
6月前
|
算法 编译器 C++
priority_queue简单实现(优先级队列)(c++)
priority_queue介绍 pri_que是一个容器适配器,它的底层是其他容器,并由这些容器再封装而来。类似于heap的结构。默认为大堆。
60 0
|
6月前
|
存储 C++ 容器
【STL】priority_queue的底层原理及其实现
【STL】priority_queue的底层原理及其实现
|
6月前
|
编译器 C++ 容器
【STL】stack与queue的底层原理及其实现
【STL】stack与queue的底层原理及其实现
|
存储 安全 Java
【面试题精讲】Queue 与 Deque 的区别
【面试题精讲】Queue 与 Deque 的区别
|
6月前
|
存储 算法 C++
C++:stack、queue、priority_queue增删查改模拟实现、deque底层原理
C++:stack、queue、priority_queue增删查改模拟实现、deque底层原理
61 0
|
11月前
|
算法 索引 Python
数据结构与算法-(8)---队列(Queue)
数据结构与算法-(8)---队列(Queue)
61 1
|
存储 设计模式 C++
【C++杂货铺】探索stack和queue的底层实现
【C++杂货铺】探索stack和queue的底层实现
108 0
|
存储 算法 程序员
stack、queue、priority_queue的使用和简单实现【STL】
stack、queue、priority_queue的使用和简单实现【STL】
61 0