p-queue 源码解读

简介: 前言在钉钉表格的开放场景中,需要对用户在沙箱中执行的操作进行合理的调度,我们选择了 p-queue 这个简洁但功能强大的工具库。最近需要对不同类型的任务进行更复杂的并发调度,希望通过学习源码的方式,掌握其中的核心逻辑,确保业务功能的顺利实现。什么是 p-queuep-queue 是一个异步队列管理库。对于需要控制执行速度或数量的异步操作很有用,比如:与 REST API 交互或执行 CPU / 内

前言

在钉钉表格的开放场景中,需要对用户在沙箱中执行的操作进行合理的调度,我们选择了 p-queue 这个简洁但功能强大的工具库。

最近需要对不同类型的任务进行更复杂的并发调度,希望通过学习源码的方式,掌握其中的核心逻辑,确保业务功能的顺利实现。

什么是 p-queue

p-queue 是一个异步队列管理库。

对于需要控制执行速度或数量的异步操作很有用,比如:与 REST API 交互或执行 CPU / 内存密集型任务。

核心功能

add 方法

add 方法是 PQueue 中最核心的方法之一,其主要作用是向队列中添加任务并执行。

下面是 add 方法的具体实现:

async add<TaskResultType>(function_: Task<TaskResultType>, options: Partial<EnqueueOptionsType> = {}): Promise<TaskResultType | void> {
  // 设置默认的选项
  options = {
    timeout: this.timeout,
    throwOnTimeout: this.#throwOnTimeout,
    ...options,
  };

return new Promise((resolve, reject) => {
  // 将任务加入队列
  this.#queue.enqueue(async () => {
    this.#pending++;
    this.#intervalCount++;

    try {
      // 执行任务
      let operation = function_({signal: options.signal});

      if (options.timeout) {
        operation = pTimeout(Promise.resolve(operation), options.timeout);
      }

      if (options.signal) {
        operation = Promise.race([operation, this.#throwOnAbort(options.signal)]);
      }

      const result = await operation;
      resolve(result);
      this.emit('completed', result);
    } catch (error: unknown) {
      if (error instanceof TimeoutError && !options.throwOnTimeout) {
        resolve();
        return;
      }

      reject(error);
      this.emit('error', error);
    } finally {
      this.#next();
    }
  }, options);

  // 发送任务添加事件
  this.emit('add');

  // 尝试启动另一个任务
  this.#tryToStartAnother();
});
}

从上面的代码可以看出,add 方法的主要逻辑如下:

  1. 接收用户提供的任务和选项参数。 
  2. 将任务加入队列,并尝试启动另一个任务。 
  3. 递增队列状态计数器。 
  4. 执行任务,并尝试获取配置中的超时和信号配置,更新操作处理。 
  5. 等待操作完成后,触发 completed 事件。 
  6. 任务执行完毕后,尝试启动队列中的下一个任务。 

调用 add 方法时,参数中的任务会被加入到队列中,并等待被执行。当队列为空或有空闲时,队列中的任务会被取出并执行。

通过这种方式,PQueue 可以在限制的并发数的条件下,按照 FIFO 的顺序执行任务。

tryToStartAnother 方法

#tryToStartAnother(): boolean {
  if (this.#queue.size === 0) {
  if (this.#intervalId) {
    clearInterval(this.#intervalId);
  }

  this.#intervalId = undefined;

  this.emit('empty');

  if (this.#pending === 0) {
    this.emit('idle');
  }

  return false;
}

if (!this.#isPaused) {
  const canInitializeInterval = !this.#isIntervalPaused;
  if (this.#doesIntervalAllowAnother && this.#doesConcurrentAllowAnother) {
    const job = this.#queue.dequeue();
    if (!job) {
      return false;
    }

    this.emit('active');
    job();

    if (canInitializeInterval) {
      this.#initializeIntervalIfNeeded();
    }

    return true;
  }
}

return false;
}

tryToStartAnother 是另一个一个核心方法。

  1. 先判断任务队列是否已经没有可执行的任务,若是,则进行状态的清理并触发相应事件通知并返回 
  2. 再判断队列是否在暂停状态,若是,直接返回 
  3. 再根据 interval 和 internalCap 判断是否允许在当前间隔内执行新的任务(doesIntervalAllowAnother),若同时满足 concurrency 配置项的限制(doesConcurrentAllowAnother),就从任务队列中取出一个执行 

tryToStartAnother 方法的作用是在任务队列空闲时尽可能启动一个新的任务,并确保能够满足并发数和任务执行间隔的限制。

任务队列的优先级控制

enqueue(run: RunFunction, options?: Partial<PriorityQueueOptions>): void {
  options = {
    priority: 0,
    ...options,
  };

const element = {
  priority: options.priority,
  run,
};

if (this.size && this.#queue[this.size - 1]!.priority! >= options.priority!) {
  this.#queue.push(element);
  return;
}

const index = lowerBound(
  this.#queue, element,
  (a: Readonly<PriorityQueueOptions>, b: Readonly<PriorityQueueOptions>) => b.priority! - a.priority!,
);
this.#queue.splice(index, 0, element);
}

这是 PriorityQueue 中的 enqueue 方法,用于向队列中添加任务。options 对象包含一个可选的 priority 属性用于指定任务的优先级。

  1. 队列中已有任务,且新增的任务优先级比当前队列的最后一个任务低,则直接将新任务放在末尾 
  2. 如果新任务优先级比最后任务高,则通过 lowerBound 函数找到正确位置并插入队列
// Port of lower_bound from https://en.cppreference.com/w/cpp/algorithm/lower_bound
// Used to compute insertion index to keep queue sorted after insertion
export default function lowerBound(array: readonly T[], value: T, comparator: (a: T, b: T) => number): number {
  let first = 0;
  let count = array.length; 
  while (count > 0) {
    const step = Math.trunc(count / 2);
    let it = first + step;

    if (comparator(array[it]!, value) <= 0) {
      first = ++it;
      count -= step + 1;
    } else {
      count = step;
    }
  }

  return first;
} 

lowerBound 是一个二分查找算法的实现,该算法参考了 C++ 中的 lower_bound 算法。

算法维护两个指针 first 和 count,用于表示当前查找范围的左右端点。每次遍历时,计算中间位置 it,并将其与目标值进行比较。

如果目标值比中间元素大,则将 first 更新为 it + 1,并将 count 减去 step + 1;否则将 count 更新为 step,并不需要更新 first。

然后根据比较结果更新 first 和 count,以缩小查找范围。最终,当 count 为 0 时,first 就是插入位置的下标。

通过这种方式,保证高优先的任务都能够在执行队列的最前面。

小结

p-queue 的优点有:

  1. 控制并发:可以非常方便地控制任务并发量,避免过度消耗资源和导致程序崩溃的问题。 
  2. 支持优先级:可以为每个任务设置优先级,优先级越高的任务将会在队列中先被执行,可以优先处理重要任务。 
  3. 支持超时:支持为每个任务设置超时时间,当任务执行时间超过设定的超时时间时,p-queue 会自动结束该任务并抛出 TimeoutError 异常。 
  4. 可扩展性:API 简单易用,可以方便地扩展自己的定制功能,比如自定义任务执行函数、实现自定义的 queue class 等。 

不过对于任务数量非常多且有很多不同优先级的极端场景来说,对 priority 的排序可能会成为一个性能瓶颈。

但这种场景目前可以忽略不计,p-queue 是一个能够满足当前需求的异步队列管理库。

目录
相关文章
|
安全 网络协议 算法
AH 协议详解
【2月更文挑战第25天】
|
5月前
|
API 开发者
1688买家/卖家店铺订单API接口指南
1688店铺订单API提供订单查询、详情获取、状态更新等功能,支持与ERP、CRM系统集成。可按条件筛选订单、获取商品及收货信息,同步发货与物流状态,并进行取消订单等操作。使用时需注意密钥授权、调用频率及异常处理,提升订单管理效率。
|
5月前
|
JSON 供应链 数据挖掘
1688买家/卖家店铺订单API说明
1688订单API是阿里巴巴B2B平台的核心接口,支持订单全生命周期管理。采用RESTful架构,返回JSON数据,可查询订单状态、商品及物流等50+字段,适用于电商整合与数据分析。支持分页、多条件筛选与状态更新,助力自动化运营。
|
9月前
|
存储 人工智能 自然语言处理
无影AgentBay来了!给AI智能体装上“超级大脑”
阿里云在WAIC上发布专为AI Agents打造的“超级大脑”——无影AgentBay。该云端电脑支持多系统切换,集成视觉理解、自然语言控制等多项AI能力,提供高性能算力与企业级安全保障,助力AI开发者高效构建智能应用。
720 1
无影AgentBay来了!给AI智能体装上“超级大脑”
|
机器学习/深度学习 自动驾驶 搜索推荐
今日热门论文推荐:多模态CoT综述、BlobCtrl、Being-0、DreamRenderer、WideRange4D 等
这篇调查论文是首个系统回顾多模态思维链(MCoT)推理的综述。论文阐明了相关基础概念和定义,提供了全面的分类法,并从不同角度对当前方法进行了深入分析。MCoT将思维链推理的优势扩展到多模态环境中,设计了各种方法和创新推理范式来解决图像、视频、语音、音频、3D和结构化数据等不同模态的独特挑战,在机器人技术、医疗保健、自动驾驶和多模态生成等应用中取得了广泛成功。
368 1
|
Java 开发者 Spring
java springboot监听事件和处理事件
通过上述步骤,开发者可以在Spring Boot项目中轻松实现事件的发布和监听。事件机制不仅解耦了业务逻辑,还提高了系统的可维护性和扩展性。掌握这一技术,可以显著提升开发效率和代码质量。
411 33
|
运维 监控 Ubuntu
【运维】如何在Ubuntu中设置一个内存守护进程来确保内存不会溢出
通过设置内存守护进程,可以有效监控和管理系统内存使用情况,防止内存溢出带来的系统崩溃和服务中断。本文介绍了如何在Ubuntu中编写和配置内存守护脚本,并将其设置为systemd服务。通过这种方式,可以在内存使用超过设定阈值时自动采取措施,确保系统稳定运行。
602 4
|
JavaScript Java 测试技术
基于springboot+vue.js+uniapp的疫情防控自动售货机系统附带文章源码部署视频讲解等
基于springboot+vue.js+uniapp的疫情防控自动售货机系统附带文章源码部署视频讲解等
340 2
|
Windows
office2007激活方法-序列号及不用激活码的激活方法
Office 2007不用Office 2007激活码的激活方法 推荐序列号: KGFVY-7733B-8WCK9-KTG64-BC7D8 V9MTG-3GX8P-D3Y4R-68BQ8-4Q8VD G86H2-GT9T2-MQWDD-8JDVH-HB...
5120 0

热门文章

最新文章