前言
以前的工作中并没有涉及到这块,群里然叔老师出了一个如何实现一个并发函数的打卡题,以前想过前端并发有点懵,后来仔细想想其实浏览器的并发请求控制就已经实现了这个功能,我可以抽象分析一下,然后以模拟的方式去实现它。
谷歌浏览器的并发数是6个,有六个请求正在处理,那么其它的任务就会排队,等六个请求任务中的某一个完成之后,就会立马插入进去执行。这样一来,就清楚了。
设计
设计理念:控制六个任务(线程),结构化任务模型,递归的去执行任务,可以在执行完毕后直接打印结果,也可以在所有任务执行完毕,再打印结果。
参数配置:至少两个,比如任务列表、并发数。
初始化任务:需要将普通任务处理成指定规范格式的任务,不规范就不好控制了噢。
任务状态:等待await、运行中run、执行成功 success、执行失败bad。
代码我已经实现了噢,两种方式,借用Promise、Promise.all、Promise.race 实际上都可以。
实现
// 随机的任务
const paramTasks = Array(~~(Math.random() * 100)).fill(1).map((item, i) => 'task-' + i);
// 并发数
let paramToncurrentNum = 6
// 开始执行任务
function startTasks(tasks, concurrentNum = 6, successCallback) {
// 状态枚举
const TaskState = {
await: 'await',
run: 'run',
success: 'success',
bad: 'bad'
}
// 初始化状态任务
const initStateTask = (task) => {
return {
task: typeof task === 'function' ? task: () => task,
state: TaskState.await, // await, success, bad
result: null,
threadName: null,
}
}
// 初始化为带状态和结果的任务
const stateTasks = [...tasks].map((item) => initStateTask(item))
// 开始执行任务
const startTask = (taskList, threadName) => {
const task = taskList.find((item) => item.state === TaskState.await)
if (task) {
task.state = TaskState.run
return new Promise((resolve, reject) => {
try {
setTimeout(() => {
task.state = TaskState.success
task.result = task.task()
task.threadName = threadName
resolve(task)
console.log(`任务:${task.result} 执行完毕!执行线程:${task.threadName}!`)
})
} catch (e) {
task.state = TaskState.bad
task.result = e
reject(task)
console.log(`任务:${task.result} 执行失败!执行线程:${task.threadName}!`)
}
}).finally(() => {
const task = taskList.find((item) => item.state === TaskState.await)
if (task) {
return startTask(taskList, threadName)
}
})
}
}
// 根据并发数,推进任务
let list = []
concurrentNum = concurrentNum > stateTasks.length ? stateTasks.length : concurrentNum
while(concurrentNum --) {
const threadName = `线程ID:${concurrentNum + 1}`
list.push(startTask(stateTasks, threadName))
}
// 指定并发数的任务执行完毕后,就调用回调函数,把结果返回
// return Promise.all(list).then(() => stateTasks).then(successCallback).catch(() => successCallback(stateTasks))
return Promise.race(list).then(() => stateTasks).then(successCallback).catch(() => successCallback(stateTasks))
}
startTasks(paramTasks,paramToncurrentNum, function (tasks) {
console.log('tasks', tasks)
})
总结
使用Promise.race,会造成所有任务未结束,就直接拿到最终反馈的任务列表对象,由于对象是引用类型,所以等等也能看到所有结果。
使用Promise.all,可以等到所有任务结束之后,再拿到最终反馈的任务列表对象。
通过结果化任务模型,可以让你最终得到的反馈列表非常的清晰。
拓展一下
如果换成fetch请求api接口,可以改成这种,这样可以保证所有请求发送完毕后,拿到所有结果,无论是正常还是异常的都能看到。
function sendRequest(urls: string[], max: number, callback: () => void) {
// 状态枚举
const TaskState = {
await: 'await',
run: 'run',
success: 'success',
bad: 'bad'
}
// 初始化状态任务
const initStateTask = (url) => {
return {
url: url,
state: TaskState.await, // await, run,success, bad
result: null,
threadName: null,
}
}
// 初始化为带状态和结果的任务
const stateTasks = [...urls].map((item) => initStateTask(item))
// 开始执行任务
const startTask = (taskList, threadName) => {
const task = taskList.find((item) => item.state === TaskState.await)
if (task) {
task.state = TaskState.run
return new Promise((resolve, reject) => {
fetch(task.url).then(r => r.json()).then(r => {
task.state = TaskState.success
task.result = r
task.threadName = threadName
resolve(task)
console.log(`任务:${task.result} 执行完毕!执行线程为:${task.threadName}!`)
}).catch(e => {
task.state = TaskState.bad
task.result = e
reject(task)
console.log(`任务:${task.result} 执行失败!执行线程为:${task.threadName}!`)
})
}).finally(() => {
const task = taskList.find((item) => item.state === TaskState.await)
if (task) {
return startTask(taskList, threadName)
}
})
}
}
// 根据并发数,推进任务
let list = []
let concurrentNum = max
while(concurrentNum --) {
const threadName = `线程ID:${concurrentNum + 1}`
list.push(startTask(stateTasks, threadName))
}
return Promise.all(list).then(() => stateTasks).then(callback).catch(() => callback(stateTasks))
}
sendRequest(Array(~~(Math.random() * 100)).fill(1).map((item, i) => '/api/url-' + i),6, function (tasks) {
console.log('tasks', tasks)
})