JS案例:实现一个简单的任务队列-TaskQueue

简介: JS案例:实现一个简单的任务队列-TaskQueue

前言:

针对一些大型的秒杀活动,抢票业务,高并发是一个经常遇到的问题,后端人员时常会接触到消息队列这个中间件。对于前端人员而言,使用node开发业务,或者使用浏览器单线程异步渲染时也会遇到堵塞,页面卡死的现象,如何处理大量的数据同时加载或者数据同时请求便成为了老生常谈的话题。

此时一个异步的任务队列或许可以帮助我们缓解这些问题。

任务队列的特点:异步,解耦,削峰


异步是多个任务并发进行,互不依赖;解耦是将业务隔离开,保证任务的运行结果不会影响到其他任务,从而产生堵塞;削峰是指在系统请求量或者负载达到一定峰值时使用缓存延缓或者排队的手段,保证服务或者任务稳定进行


功能设计:

为了达到通过异步执行队列的目的,采用Promise.all()的方式执行队列的defer延时函数,代码中的解耦可以使用messageCenter进行发布订阅,队列的削峰可以使用状态和最大运行峰值来控制函数执行


流程设计:

1.png


接口设计:

/**
 * 单条队列
 * defer: 待运行的异步函数
 * params?: defer的参数,也可以用bind直接传递
 * 
 */
export interface IQueue {
    defer: Function
    name?: string
}
/**
 * 队列参数
 * children: 队列列表
 * name: 队列唯一标识
 * result: 运行完成后的结果
 * 
 */
export interface IQueues {
    children: Array<Function>
    name: string
    result?: any[]
}
/**
 * 队列缓存
 */
export type IQueueTemp = {
    [key: string]: IQueues
}
/**
 * 系统队列
 */
export type IQueueList = Array<IQueue>
/**
 * 队列状态 idle:空闲 pending:等待 fulfilled:完成 rejected:失败
 */
export type IState = "idle" | "pending" | "fulfilled" | "rejected"
/**
 * 任务队列参数
 */
export type ITaskQueueProps = {
    maxLen: number
}
/**
 * 任务队列
 */
export type ITaskQueue = {
    readonly fix: string
    props: ITaskQueueProps
    queueTemp: IQueueTemp
    queues: IQueueList
    state: IState
    push: (queue: IQueues) => Promise<void>
    unshift: (length: number) => IQueueList
    run: (reject: any) => unknown
    clear: () => void
}


功能实现:

工具函数

MessageCenter


defer:扁平化Promise

   /**
     * 优化Promise,避免Promise嵌套
     * @returns {Promise,resolve,reject}
     */
    private defer = () => {
        let resolve, reject
        return {
            promise: new Promise<void>((_resolve, _reject) => {
                resolve = _resolve
                reject = _reject
            }),
            resolve, reject
        }
    }

fixStr:根据队列名称存入缓存,添加消息订阅

    /**
     * 混淆字符串
     * @param str 需要混淆的字符 
     * @returns 混淆产物
     */
    private fixStr(str) {
        return `${this.fix}${str}`
    }

checkHandler:检测队列格式是否符合要求

   /**
     * 检查参数是否符合标准
     * @param queue 队列或队列集合
     */
    private checkHandler(queue: IQueues) {
        if (!queue) {
            throw new ReferenceError('queue is not defined')
        }
        if (!(queue.children instanceof Array) || typeof queue !== "object") {
            throw new TypeError(`queue should be an object and queue.children should be an array`);
        }
        const noFn = i => !i || typeof i !== "function"
        if (queue.children?.length === 0) throw new Error('queue.children.length can not be 0')
        if (queue.children?.find((i) => noFn(i))) throw new Error('queueList should have defer')
    }


stateProxy:设置与获取队列状态

    /**
     * 设置、获取当前队列的状态
     * @param state 队列状态,有值就修改当前状态,无值就获取当前状态
     * @returns srate 队列的状态
     */
    private stateProxy = (state?: IState) => {
        state && (this.state = state)
        return this.state
    }

handlerSuccess:异步执行队列成功后回调

    /**
     * 单次队列执行成功
     * @param data { res, queues }
     * @returns 
     */
    private handlerSuccess = (data) => {
        this.stateProxy("fulfilled")
        return this.messageCenter.emit("run:success:handler", data)
    }

handlerError:异步执行队列抛错后回调

    /**
     * 单次队列执行失败
     * @param data { res, queues, error }
     * @returns 
     */
    private handlerError = (data) => {
        const { reject, error } = data
        this.stateProxy("rejected")
        reject && typeof reject === "function" && reject(error)
        return this.messageCenter.emit("run:error:handler", data)
    }

decoratorTaskQueue:使用装饰器调用任务队列

/**
 * 装饰器用法
 * @param opts  同TaskQueue中constructor
 * @returns 混入类原型中
 */
export const decoratorTaskQueue = (opts: ITaskQueueProps): ClassDecorator => {
    return <TFunction extends Function>(target: TFunction) => {
        if (!target.prototype.taskQueue) {
            target.prototype.taskQueue = new TaskQueue(opts)
        }
    }
}

任务队列实现

其中init函数中注册了一些hooks,当执行push:handler时触发run函数执行队列中的异步函数,另外run函数与run:success:handler以及run:error:handler结合形成递归函数。finish函数是用来判断某一个队列是否全部执行完成

import { decoratorMessageCenter, MessageCenter } from "event-message-center"
import {
    ITaskQueue, IQueueList, IQueues, IState, IQueueTemp, ITaskQueueProps
} from "./type"
@decoratorMessageCenter
export class TaskQueue implements ITaskQueue {
    readonly fix: string = `@~&$`
    readonly messageCenter: MessageCenter
    props: ITaskQueueProps
    queues: IQueueList
    queueTemp: IQueueTemp
    state: IState
    /**
     * @param props: {maxLen:并发峰值} 削峰
     */
    constructor(props: ITaskQueueProps) {
        this.clear()
        props && this.defineProps(props, "props")
        this.init()
    }
    /**
     * 初始化
     */
    private init = () => {
        this.messageCenter.on("push:handler", this.run)
        this.messageCenter.on("run:success:handler", this.run)
        this.messageCenter.on("run:success:handler", this.finish)
        this.messageCenter.on("run:error:handler", this.run)
        this.messageCenter.on("run:error:handler", this.finish)
    }
    private defineProps = (props, key) => {
        Object.defineProperty(this, key, { value: props })
    }
    /**
     * 进入队列
     * @param queue: IQueues 单个队列
     * @returns promise: Promise<void> 当前队列执行结束的异步操作
     */
    push = (queue: IQueues) => {
        this.checkHandler(queue)
        const { resolve, reject, promise } = this.defer()
        const queueName = this.fixStr(queue.name)
        this.queues = this.queues.concat(queue.children.map(defer => ({ defer, name: queueName })))
        this.queueTemp[queueName] = { ...queue, result: [] }
        this.messageCenter.emit("push:handler", reject)
        this.messageCenter.on(queueName, resolve)
        return promise
    }
    /**
     * 移出队列
     * @param length 移出数量
     * @returns queues 移出的队列
     */
    unshift = (length) => {
        return this.queues.splice(0, length)
    }
    /**
     * 异步执行队列:函数的思路是无限递归,通过当前队列状态和数量判断是否执行
     * @param reject 异常函数
     * @returns void 0
     */
    run = async ({ reject }) => {
        if (this.stateProxy() === 'pending') return void 0
        if (this.queues.length === 0) return this.stateProxy("idle")
        this.stateProxy("pending")
        const queues = this.unshift(this.props?.maxLen ?? 10)
        try {
            const res = await Promise.all(queues.map((item, i) => item.defer().catch(error => error)))
            return this.handlerSuccess({ res, queues })
        } catch (error) {
            return this.handlerError({ reject, error, queues })
        }
    }
    /**
     * 初始化整个队列,清除所有数据
     */
    clear = () => {
        this.queues = []
        this.queueTemp = {}
        this.props = null
        this.stateProxy("idle")
        this.messageCenter.clear()
    }
    /**
     * 处理每次队列执行完成后的数据
     * @param data { res, queues, error } 运行结束后的返回值及削峰后的初始队列,一一对应
     */
    private finish = ({ res = [], queues, error = 'err' }) => {
        const { queueTemp } = this
        queues.forEach((it, i) => {
            const item = queueTemp[it.name]
            item?.result.push(res[i] ?? error)
            if (item?.result?.length === item?.children?.length) {
                this.messageCenter.emit(it.name, item?.result)
                queueTemp[it.name] = null
            }
        });
    }
}


功能验证

创建异步函数:


根据传入的length生成异步函数,其中有部分函数会走Promise的reject抛错

const syncFn = (args) => {
    return new Promise((resolve, reject) => {
        if (args.includes('error')) {
            return setTimeout(reject.bind(null, args), 1000);
        }
        return setTimeout(resolve.bind(null, args), 500);
    });
};
const createFnList = (length, name) => {
    const task = {
        name,
        children: [],
    };
    while (length--) {
        task.children.push(syncFn.bind(null, length % 3 === 1 ? 'error' + length : 'params'))
    }
    return task;
};


创建四个任务:

const task = createFnList(10, "task1");
const task2 = createFnList(20, "task2");
const task3 = createFnList(10, "task3");
const task4 = createFnList(40, "task4");

试用任务队列:

import { TaskQueue, decoratorTaskQueue } from "task-queue-lib"
@decoratorTaskQueue({ maxLen: 10 })
export class QueueExample {
    taskQueue: TaskQueue
    constructor() {
        this.taskQueue.push(task).then((res) => {
            console.log(res);
            // [
            //     'params', 'params',
            //     'error7', 'params',
            //     'params', 'error4',
            //     'params', 'params',
            //     'error1', 'params'
            // ]
        })
        this.taskQueue.push(task2).then((res) => {
            console.log(res);
            // [
            //     'error19', 'params', 'params',
            //     'error16', 'params', 'params',
            //     'error13', 'params', 'params',
            //     'error10', 'params', 'params',
            //     'error7',  'params', 'params',
            //     'error4',  'params', 'params',
            //     'error1',  'params'
            // ]
        })
        this.taskQueue.push(task3).then((res) => {
            console.log(res);
            // [
            //     'params', 'params',
            //     'error7', 'params',
            //     'params', 'error4',
            //     'params', 'params',
            //     'error1', 'params'
            // ]
        })
        this.taskQueue.push(task4).then((res) => {
            console.log(res);
            // [
            //     'params',  'params',  'error37', 'params',
            //     'params',  'error34', 'params',  'params',
            //     'error31', 'params',  'params',  'error28',
            //     'params',  'params',  'error25', 'params',
            //     'params',  'error22', 'params',  'params',
            //     'error19', 'params',  'params',  'error16',
            //     'params',  'params',  'error13', 'params',
            //     'params',  'error10', 'params',  'params',
            //     'error7',  'params',  'params',  'error4',
            //     'params',  'params',  'error1',  'params'
            // ]
        })
    }
}


写在最后

以上就是文章的全部内容了,有兴趣的同学可以下载源码或者使用npm下载使用


仓库地址:TaskQueue: nodejs消息队列


npm:task-queue-lib - npm


相关文章
|
JavaScript 前端开发
js事件队列
js事件队列
199 55
|
JavaScript 前端开发 API
详解队列在前端的应用,深剖JS中的事件循环Eventloop,再了解微任务和宏任务
该文章详细讲解了队列数据结构在前端开发中的应用,并深入探讨了JavaScript的事件循环机制,区分了宏任务和微任务的执行顺序及其对前端性能的影响。
|
数据采集 Web App开发 JavaScript
利用Selenium和XPath抓取JavaScript动态加载内容的实践案例
利用Selenium和XPath抓取JavaScript动态加载内容的实践案例
|
7月前
|
JavaScript 前端开发 Java
深入理解 JavaScript 中的 Array.find() 方法:原理、性能优势与实用案例详解
Array.find() 是 JavaScript 数组方法中一个非常实用和强大的工具。它不仅提供了简洁的查找操作,还具有性能上的独特优势:返回的引用能够直接影响原数组的数据内容,使得数据更新更加高效。通过各种场景的展示,我们可以看到 Array.find() 在更新、条件查找和嵌套结构查找等场景中的广泛应用。 在实际开发中,掌握 Array.find() 的特性和使用技巧,可以让代码更加简洁高效,特别是在需要直接修改原数据内容的情形。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一
|
7月前
|
监控 JavaScript 前端开发
MutationObserver详解+案例——深入理解 JavaScript 中的 MutationObserver:原理与实战案例
MutationObserver 是一个非常强大的 API,提供了一种高效、灵活的方式来监听和响应 DOM 变化。它解决了传统 DOM 事件监听器的诸多局限性,通过异步、批量的方式处理 DOM 变化,大大提高了性能和效率。在实际开发中,合理使用 MutationObserver 可以帮助我们更好地控制 DOM 操作,提高代码的健壮性和可维护性。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
MutationObserver详解+案例——深入理解 JavaScript 中的 MutationObserver:原理与实战案例
|
8月前
|
JavaScript 前端开发 索引
40个JS常用使用技巧案例
大家好,我是V哥。在日常开发中,JS是解决页面交互的利器。V哥总结了40个实用的JS小技巧,涵盖数组操作、对象处理、函数使用等,并附带案例代码和解释。从数组去重到异步函数,这些技巧能显著提升开发效率。先赞再看后评论,腰缠万贯财进门。关注威哥爱编程,全栈开发就你行!
216 16
|
8月前
|
JavaScript 前端开发
Node.js 中实现多任务下载的并发控制策略
Node.js 中实现多任务下载的并发控制策略
179 15
|
12月前
|
存储 JavaScript 前端开发
js事件队列
【10月更文挑战第15天】
163 6
|
前端开发 JavaScript API
JavaScript 的宏任务和微任务有什么区别
【9月更文挑战第6天】JavaScript 的宏任务和微任务有什么区别
227 4
|
12月前
|
JavaScript 前端开发 调度
在JavaScript中异步任务里的微任务和宏任务的特点和生命周期
在JavaScript中异步任务里的微任务和宏任务的特点和生命周期
116 0