前言
在钉钉表格的开放场景中,需要对用户在沙箱中执行的操作进行合理的调度,我们选择了 p-queue 这个简洁但功能强大的工具库。
最近需要对不同类型的任务进行更复杂的并发调度,希望通过学习源码的方式,掌握其中的核心逻辑,确保业务功能的顺利实现。
什么是 p-queue
p-queue 是一个异步队列管理库。
对于需要控制执行速度或数量的异步操作很有用,比如:与 REST API 交互或执行 CPU / 内存密集型任务。
核心功能
add 方法
add 方法是 PQueue 中最核心的方法之一,其主要作用是向队列中添加任务并执行。
下面是 add 方法的具体实现:
async add<TaskResultType>(function_: Task<TaskResultType>, options: Partial<EnqueueOptionsType> = {}): Promise<TaskResultType | void> {
// 设置默认的选项
options = {
timeout: this.timeout,
throwOnTimeout: this.#throwOnTimeout,
...options,
};
return new Promise((resolve, reject) => {
// 将任务加入队列
this.#queue.enqueue(async () => {
this.#pending++;
this.#intervalCount++;
try {
// 执行任务
let operation = function_({signal: options.signal});
if (options.timeout) {
operation = pTimeout(Promise.resolve(operation), options.timeout);
}
if (options.signal) {
operation = Promise.race([operation, this.#throwOnAbort(options.signal)]);
}
const result = await operation;
resolve(result);
this.emit('completed', result);
} catch (error: unknown) {
if (error instanceof TimeoutError && !options.throwOnTimeout) {
resolve();
return;
}
reject(error);
this.emit('error', error);
} finally {
this.#next();
}
}, options);
// 发送任务添加事件
this.emit('add');
// 尝试启动另一个任务
this.#tryToStartAnother();
});
}
从上面的代码可以看出,add 方法的主要逻辑如下:
-
接收用户提供的任务和选项参数。
-
将任务加入队列,并尝试启动另一个任务。
-
递增队列状态计数器。
-
执行任务,并尝试获取配置中的超时和信号配置,更新操作处理。
-
等待操作完成后,触发 completed 事件。
-
任务执行完毕后,尝试启动队列中的下一个任务。
调用 add 方法时,参数中的任务会被加入到队列中,并等待被执行。当队列为空或有空闲时,队列中的任务会被取出并执行。
通过这种方式,PQueue 可以在限制的并发数的条件下,按照 FIFO 的顺序执行任务。
tryToStartAnother 方法
#tryToStartAnother(): boolean {
if (this.#queue.size === 0) {
if (this.#intervalId) {
clearInterval(this.#intervalId);
}
this.#intervalId = undefined;
this.emit('empty');
if (this.#pending === 0) {
this.emit('idle');
}
return false;
}
if (!this.#isPaused) {
const canInitializeInterval = !this.#isIntervalPaused;
if (this.#doesIntervalAllowAnother && this.#doesConcurrentAllowAnother) {
const job = this.#queue.dequeue();
if (!job) {
return false;
}
this.emit('active');
job();
if (canInitializeInterval) {
this.#initializeIntervalIfNeeded();
}
return true;
}
}
return false;
}
tryToStartAnother 是另一个一个核心方法。
-
先判断任务队列是否已经没有可执行的任务,若是,则进行状态的清理并触发相应事件通知并返回
-
再判断队列是否在暂停状态,若是,直接返回
-
再根据 interval 和 internalCap 判断是否允许在当前间隔内执行新的任务(doesIntervalAllowAnother),若同时满足 concurrency 配置项的限制(doesConcurrentAllowAnother),就从任务队列中取出一个执行
tryToStartAnother 方法的作用是在任务队列空闲时尽可能启动一个新的任务,并确保能够满足并发数和任务执行间隔的限制。
任务队列的优先级控制
enqueue(run: RunFunction, options?: Partial<PriorityQueueOptions>): void {
options = {
priority: 0,
...options,
};
const element = {
priority: options.priority,
run,
};
if (this.size && this.#queue[this.size - 1]!.priority! >= options.priority!) {
this.#queue.push(element);
return;
}
const index = lowerBound(
this.#queue, element,
(a: Readonly<PriorityQueueOptions>, b: Readonly<PriorityQueueOptions>) => b.priority! - a.priority!,
);
this.#queue.splice(index, 0, element);
}
这是 PriorityQueue 中的 enqueue 方法,用于向队列中添加任务。options 对象包含一个可选的 priority 属性用于指定任务的优先级。
-
队列中已有任务,且新增的任务优先级比当前队列的最后一个任务低,则直接将新任务放在末尾
-
如果新任务优先级比最后任务高,则通过 lowerBound 函数找到正确位置并插入队列
// Port of lower_bound from https://en.cppreference.com/w/cpp/algorithm/lower_bound
// Used to compute insertion index to keep queue sorted after insertion
export default function lowerBound(array: readonly T[], value: T, comparator: (a: T, b: T) => number): number {
let first = 0;
let count = array.length;
while (count > 0) {
const step = Math.trunc(count / 2);
let it = first + step;
if (comparator(array[it]!, value) <= 0) {
first = ++it;
count -= step + 1;
} else {
count = step;
}
}
return first;
}
lowerBound 是一个二分查找算法的实现,该算法参考了 C++ 中的 lower_bound 算法。
算法维护两个指针 first 和 count,用于表示当前查找范围的左右端点。每次遍历时,计算中间位置 it,并将其与目标值进行比较。
如果目标值比中间元素大,则将 first 更新为 it + 1,并将 count 减去 step + 1;否则将 count 更新为 step,并不需要更新 first。
然后根据比较结果更新 first 和 count,以缩小查找范围。最终,当 count 为 0 时,first 就是插入位置的下标。
通过这种方式,保证高优先的任务都能够在执行队列的最前面。
小结
p-queue 的优点有:
-
控制并发:可以非常方便地控制任务并发量,避免过度消耗资源和导致程序崩溃的问题。
-
支持优先级:可以为每个任务设置优先级,优先级越高的任务将会在队列中先被执行,可以优先处理重要任务。
-
支持超时:支持为每个任务设置超时时间,当任务执行时间超过设定的超时时间时,p-queue 会自动结束该任务并抛出 TimeoutError 异常。
-
可扩展性:API 简单易用,可以方便地扩展自己的定制功能,比如自定义任务执行函数、实现自定义的 queue class 等。
不过对于任务数量非常多且有很多不同优先级的极端场景来说,对 priority 的排序可能会成为一个性能瓶颈。
但这种场景目前可以忽略不计,p-queue 是一个能够满足当前需求的异步队列管理库。