源码地址:https://github.com/sindresorhus/p-limit
前言
并发场景在所有的开发过程中都是会遇到的。像面向对象语言 C#、Java 经常都是使用多线程的技术来解决并发的场景。但是 Node 是单线程的,对于解决并发基本是通过异步方式。还有使用 node 提供的 Cluster 模块来完成多进程的。
注意点:Node 中的异步是为了解决不堵塞主线程,而不是为了解决并发问题。
Promise.all
Promise.all 应该是很多人都会使用到的并发问题的解决,以及其他的 Promise.series 等。Promise.all 最后的结果是所有的异步执行后的结果集合。
Promise.all([...]).then(result=>{
....
})
但当并发数量超过一定的数量时可能就会导致下游的承受能力不足以支撑你想要并发的数量。就想数据库连接池,如果开启太多的话一个可能造成浪费,另一个原因可能就是数据库没法提供那么大的连接数让你使用。所以我们也要限制并发的数量。
p-limit
对于 Promise.all 如果只是少量的,完全可以支持。但是超量时该如何限制并发数,这里 P-limit 结合 Promise 就完美解决这个问题。
const fns=[....]; // 方法集合
const limit = pLimit(10)
Promise.all(fns.map(fn=> limit(async()=>{
await fn()
})))
从上面的源码可以看出其还是在使用 Promise.all 处理多个函数的执行,每个函数还是异步执行。但注意每个要执行的函数都是在 limit方法内,执行 limit(fn) 会在调用 fn后返回一个 promise 。当然在执行前会先通过 plimit 获取一个 limit方法。
再深入 plmit 的源码里,limit如何控制并发数。
首先获取一个 limit 方法时会初始化一个队列。使用 limit(fn)时由内部的 generator 函数生成的。它其实就是将函数压入队列内。可以认真观察当 activeCount<concurrency && queue.size <0 时就会从队列取出函数并执行它。
export default function pLimit(concurrency) {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
const queue = new Queue();
let activeCount = 0;
//..... 省略一部分内容
const enqueue = (fn, resolve, args) => {
queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.size,
},
clearQueue: {
value: () => {
queue.clear();
},
},
});
return generator;
}
所以整个过程其实就是在内部使用了一个队列和活跃数量 activeCount 来控制,无论多少个需要执行的并发,都会先放入队列中再判断正在执行的任务数量是否小于并发数,如果小于并发数就可以从队列中取出执行。每个函数都是异步执行,执行玩就从队列中获取下一个任务继续执行。
接下来就是操作实践。写了几个简单的方法然后执行结果,即便我方法有很多个也不受影响。