【源码共读】大并发量如何控制并发数

简介: 【源码共读】大并发量如何控制并发数


在常见的网络请求的过程中,我们都会遇到并发的情况,如果一次性发起过多的请求,会导致服务器压力过大,甚至会导致服务器崩溃,所以我们需要控制并发的数量,这样才能保证服务器的正常运行。


今天带来的就是并发控制的库p-limit

使用


根据README的介绍,我们可以通过p-limit来创建一个限制并发的函数,然后通过这个函数来执行我们的异步任务。

import pLimit from 'p-limit';
const limit = pLimit(1);
const input = [
   limit(() => fetchSomething('foo')),
   limit(() => fetchSomething('bar')),
   limit(() => doSomething())
];
// Only one promise is run at once
const result = await Promise.all(input);
console.log(result);

源码分析


我们先来看一下p-limit的源码:

import Queue from 'yocto-queue';
export default function pLimit(concurrency) {
   if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
      throw new TypeError('Expected `concurrency` to be a number from 1 and up');
   }
   const queue = new Queue();
   let activeCount = 0;
   const next = () => {
      activeCount--;
      if (queue.size > 0) {
         queue.dequeue()();
      }
   };
   const run = async (fn, resolve, args) => {
      activeCount++;
      const result = (async () => fn(...args))();
      resolve(result);
      try {
         await result;
      } catch {}
      next();
   };
   const enqueue = (fn, resolve, args) => {
      queue.enqueue(run.bind(undefined, fn, resolve, args));
      (async () => {
         // This function needs to wait until the next microtask before comparing
         // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
         // when the run function is dequeued and called. The comparison in the if-statement
         // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
         await Promise.resolve();
         if (activeCount < concurrency && queue.size > 0) {
            queue.dequeue()();
         }
      })();
   };
   const generator = (fn, ...args) => new Promise(resolve => {
      enqueue(fn, resolve, args);
   });
   Object.defineProperties(generator, {
      activeCount: {
         get: () => activeCount,
      },
      pendingCount: {
         get: () => queue.size,
      },
      clearQueue: {
         value: () => {
            queue.clear();
         },
      },
   });
   return generator;
}

加上注释和换行只有68行代码,非常简单,我们来一行一行的分析:


可以看到最开始就导入了yocto-queue这个库,这个库之前有分析过:【源码共读】yocto-queue 一个微型队列数据结构


这个库就是一个队列的数据结构,不懂的可以直接将这个理解为数组就好;


跟着使用的代码来看,最开始就是通过pLimit来创建一个限制并发的函数,这个函数接收一个参数concurrency,然后返回一个函数,来看看这一步的代码:

function pLimit(concurrency) {
   if (
        !((Number.isInteger(concurrency)
        || concurrency === Number.POSITIVE_INFINITY)
        && concurrency > 0)
    ) {
      throw new TypeError('Expected `concurrency` to be a number from 1 and up');
   }
   const generator = (fn, ...args) => new Promise(resolve => {
      enqueue(fn, resolve, args);
   });
   return generator;
}

首先这个函数接收一个参数concurrency,然后判断这个参数是否是一个大于0的整数,如果不是就抛出一个错误;


返回的函数很简单,就是接收一个函数fn和参数args,然后返回一个Promise;


然后调用返回的generator函数就会执行enqueue函数,对应的代码如下:

const enqueue = (fn, resolve, args) => {
    queue.enqueue(run.bind(undefined, fn, resolve, args));
    (async () => {
        // This function needs to wait until the next microtask before comparing
        // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
        // when the run function is dequeued and called. The comparison in the if-statement
        // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
        await Promise.resolve();
        if (activeCount < concurrency && queue.size > 0) {
            queue.dequeue()();
        }
    })();
};

这个函数接收三个参数,fnresolveargs,然后将run函数放入队列中;


这里run使用bind是因为并不需要立即执行,参考:function.prototype.bind()

然后立即执行一个异步函数,这个里面首先会等待下一个微任务,注释解释了这个原因,因为activeCount是异步更新的,所以需要等待下一个微任务才能获取到最新的值;


然后判断activeCount是否小于concurrency,并且队列中有任务,如果满足条件就会将队列中的任务取出来执行,这一步就是并发的核心了;


这里的queue.dequeue()()执行的是run函数,这里容易理解错误,所以框起来。


接下来看看run函数:

const run = async (fn, resolve, args) => {
    activeCount++;
    const result = (async () => fn(...args))();
    resolve(result);
    try {
        await result;
    } catch {
    }
    next();
};

run函数就是用来执行异步并发任务的;


首先activeCount加1,表示当前并发数加1;


然后执行fn函数,这个函数就是我们传入的异步函数,然后将结果赋值给result,注意现在的result是一个处在pending状态的Promise


然后将result传入resolve函数,这个resolve函数就是enqueue函数中返回的Promiseresolve函数;


然后等待result的状态发生改变,这里使用了try...catch,因为result可能会出现异常,所以需要捕获异常;


最后执行next函数,这个函数就是用来处理并发数的,对应的代码如下:

const next = () => {
    activeCount--;
    if (queue.size > 0) {
        queue.dequeue()();
    }
};

首先activeCount减1,表示当前并发数减1;


然后判断队列中是否还有任务,如果有就取出来执行;


queue.dequeue()可以理解为[].shift(),取出队列中的第一个任务,由于确定里面是一个函数,所以直接执行就可以了;


最后面还看到了使用Object.definePropertiesgenerator函数添加了几个属性,来看看:

Object.defineProperties(generator, {
    activeCount: {
        get: () => activeCount,
    },
    pendingCount: {
        get: () => queue.size,
    },
    clearQueue: {
        value: () => {
            queue.clear();
        },
    },
});
  • activeCount:当前并发数
  • pendingCount:队列中的任务数
  • clearQueue:清空队列


这些属性都是只读的,可以让我们在外部知道当前的并发数和队列中的任务数,并且手动清空队列;


动手实现


接下来我们来动手实现一个,抛开队列直接使用数组 + class实现一个简易版:

class PLimit {
    constructor(concurrency) {
        this.concurrency = concurrency;
        this.activeCount = 0;
        this.queue = [];
        return (fn, ...args) => {
            return new Promise(resolve => {
               this.enqueue(fn, resolve, args);
            });
        }
    }
    enqueue(fn, resolve, args) {
        this.queue.push(this.run.bind(this, fn, resolve, args));
        (async () => {
            await Promise.resolve();
            if (this.activeCount < this.concurrency && this.queue.length > 0) {
                this.queue.shift()();
            }
        })();
    }
    async run(fn, resolve, args) {
        this.activeCount++;
        const result = (async () => fn(...args))();
        resolve(result);
        try {
            await result;
        } catch {
        }
        this.next();
    }
    next() {
        this.activeCount--;
        if (this.queue.length > 0) {
            this.queue.shift()();
        }
    }
}

image.png

一共十个并发的任务,每个任务花费 2秒,控制并发数为 2 时,一共花费 10秒。


总结


这篇文章主要介绍了Promise的并发控制,主要是通过队列来实现的。


并发控制的核心就是控制并发数,所以我们需要一个队列来存储任务,然后控制并发数,当并发数小于最大并发数时,就从队列中取出任务执行,这样就可以控制并发数了。


等待上一个任务的执行通过await来实现,这样就可以保证每次都只有可控的并发数在执行。


代码量并不多,但是内部的操作还有细节处理都是知识点。


目录
相关文章
|
小程序 JavaScript 关系型数据库
微信小程序远程连接阿里云服务器mysql——我与阿里云的相遇
第一次接触阿里云服务器:用小程序通过nodejs连接远程服务器
2226 0
微信小程序远程连接阿里云服务器mysql——我与阿里云的相遇
|
2月前
|
人工智能 运维 Cloud Native
阿里云Serverless计算产品入选Gartner®报告「领导者」象限!
近日,Gartner® 发布了 2025 年度全球《云原生应用平台魔力象限》报告,阿里云凭借 Serverless 应用引擎 SAE(以下简称 SAE)和函数计算 FC,成为亚太地区唯一入选「领导者象限」的科技公司。
279 16
|
算法 程序员 编译器
美丽的代码:规范go应用代码注释
【6月更文挑战第30天】本文介绍注释应与代码同步,避免误导,且关键点解释。使用LLVM构建编译器示例展示Go语言规范。注释虽有局限,但在解释复杂逻辑、业务规则时仍有其价值。程序员需平衡注释与代码的关系,创造更优的代码。
1221 0
美丽的代码:规范go应用代码注释
|
11月前
|
Linux 虚拟化 数据安全/隐私保护
AlmaLinux 9.5 正式版发布 - RHEL 二进制兼容免费发行版
AlmaLinux 9.5 正式版发布 - RHEL 二进制兼容免费发行版
355 11
AlmaLinux 9.5 正式版发布 - RHEL 二进制兼容免费发行版
|
分布式计算 资源调度 Hadoop
Hadoop 2.0 与 Hadoop 1.x 有何不同?
【8月更文挑战第12天】
300 4
|
缓存 Java 测试技术
探讨Java中遍历Map集合的最快方式
探讨Java中遍历Map集合的最快方式
310 1
|
存储 资源调度 前端开发
JavaScript 使用axios库发送 post请求给后端, 给定base64格式的字符串数据和一些其他参数, 使用表单方式提交, 并使用onUploadProgress显示进度
使用 Axios 发送包含 Base64 数据和其他参数的 POST 请求时,可以通过 `onUploadProgress` 监听上传进度。由于整个请求体被视为一个单元,所以进度可能不够精确,但可以模拟进度反馈。前端示例代码展示如何创建一个包含 Base64 图片数据和额外参数的 `FormData` 对象,并在上传时更新进度条。后端使用如 Express 和 Multer 可处理 Base64 数据。注意,实际进度可能不如文件上传精确,显示简单加载状态可能更合适。
|
JavaScript Java 测试技术
基于SpringBoot+Vue的毕业设计管理系统的详细设计和实现(源码+lw+部署文档+讲解等)
基于SpringBoot+Vue的毕业设计管理系统的详细设计和实现(源码+lw+部署文档+讲解等)
565 0
|
XML 监控 大数据
基于Guava布隆过滤器优化海量字符串去重策略
**Guava Bloom Filter实践:** 在大数据场景下,利用布隆过滤器进行高效字符串去重。Guava提供易用的BloomFilter实现,通过添加Guava依赖,设定预期元素数和误报率来创建过滤器。尽管可能产生误报,但不会漏报,常用于初期快速判断。添加元素,使用`mightContain`查询,若可能存在,再用精确数据结构确认。优化涉及选择哈希函数、调整误报率和避免重复添加。