JS案例:实现一个简单的任务队列-TaskQueue

简介: JS案例:实现一个简单的任务队列-TaskQueue

前言:

针对一些大型的秒杀活动,抢票业务,高并发是一个经常遇到的问题,后端人员时常会接触到消息队列这个中间件。对于前端人员而言,使用node开发业务,或者使用浏览器单线程异步渲染时也会遇到堵塞,页面卡死的现象,如何处理大量的数据同时加载或者数据同时请求便成为了老生常谈的话题。

此时一个异步的任务队列或许可以帮助我们缓解这些问题。

任务队列的特点:异步,解耦,削峰


异步是多个任务并发进行,互不依赖;解耦是将业务隔离开,保证任务的运行结果不会影响到其他任务,从而产生堵塞;削峰是指在系统请求量或者负载达到一定峰值时使用缓存延缓或者排队的手段,保证服务或者任务稳定进行


功能设计:

为了达到通过异步执行队列的目的,采用Promise.all()的方式执行队列的defer延时函数,代码中的解耦可以使用messageCenter进行发布订阅,队列的削峰可以使用状态和最大运行峰值来控制函数执行


流程设计:

1.png


接口设计:

/**
 * 单条队列
 * defer: 待运行的异步函数
 * params?: defer的参数,也可以用bind直接传递
 * 
 */
export interface IQueue {
    defer: Function
    name?: string
}
/**
 * 队列参数
 * children: 队列列表
 * name: 队列唯一标识
 * result: 运行完成后的结果
 * 
 */
export interface IQueues {
    children: Array<Function>
    name: string
    result?: any[]
}
/**
 * 队列缓存
 */
export type IQueueTemp = {
    [key: string]: IQueues
}
/**
 * 系统队列
 */
export type IQueueList = Array<IQueue>
/**
 * 队列状态 idle:空闲 pending:等待 fulfilled:完成 rejected:失败
 */
export type IState = "idle" | "pending" | "fulfilled" | "rejected"
/**
 * 任务队列参数
 */
export type ITaskQueueProps = {
    maxLen: number
}
/**
 * 任务队列
 */
export type ITaskQueue = {
    readonly fix: string
    props: ITaskQueueProps
    queueTemp: IQueueTemp
    queues: IQueueList
    state: IState
    push: (queue: IQueues) => Promise<void>
    unshift: (length: number) => IQueueList
    run: (reject: any) => unknown
    clear: () => void
}


功能实现:

工具函数

MessageCenter


defer:扁平化Promise

   /**
     * 优化Promise,避免Promise嵌套
     * @returns {Promise,resolve,reject}
     */
    private defer = () => {
        let resolve, reject
        return {
            promise: new Promise<void>((_resolve, _reject) => {
                resolve = _resolve
                reject = _reject
            }),
            resolve, reject
        }
    }

fixStr:根据队列名称存入缓存,添加消息订阅

    /**
     * 混淆字符串
     * @param str 需要混淆的字符 
     * @returns 混淆产物
     */
    private fixStr(str) {
        return `${this.fix}${str}`
    }

checkHandler:检测队列格式是否符合要求

   /**
     * 检查参数是否符合标准
     * @param queue 队列或队列集合
     */
    private checkHandler(queue: IQueues) {
        if (!queue) {
            throw new ReferenceError('queue is not defined')
        }
        if (!(queue.children instanceof Array) || typeof queue !== "object") {
            throw new TypeError(`queue should be an object and queue.children should be an array`);
        }
        const noFn = i => !i || typeof i !== "function"
        if (queue.children?.length === 0) throw new Error('queue.children.length can not be 0')
        if (queue.children?.find((i) => noFn(i))) throw new Error('queueList should have defer')
    }


stateProxy:设置与获取队列状态

    /**
     * 设置、获取当前队列的状态
     * @param state 队列状态,有值就修改当前状态,无值就获取当前状态
     * @returns srate 队列的状态
     */
    private stateProxy = (state?: IState) => {
        state && (this.state = state)
        return this.state
    }

handlerSuccess:异步执行队列成功后回调

    /**
     * 单次队列执行成功
     * @param data { res, queues }
     * @returns 
     */
    private handlerSuccess = (data) => {
        this.stateProxy("fulfilled")
        return this.messageCenter.emit("run:success:handler", data)
    }

handlerError:异步执行队列抛错后回调

    /**
     * 单次队列执行失败
     * @param data { res, queues, error }
     * @returns 
     */
    private handlerError = (data) => {
        const { reject, error } = data
        this.stateProxy("rejected")
        reject && typeof reject === "function" && reject(error)
        return this.messageCenter.emit("run:error:handler", data)
    }

decoratorTaskQueue:使用装饰器调用任务队列

/**
 * 装饰器用法
 * @param opts  同TaskQueue中constructor
 * @returns 混入类原型中
 */
export const decoratorTaskQueue = (opts: ITaskQueueProps): ClassDecorator => {
    return <TFunction extends Function>(target: TFunction) => {
        if (!target.prototype.taskQueue) {
            target.prototype.taskQueue = new TaskQueue(opts)
        }
    }
}

任务队列实现

其中init函数中注册了一些hooks,当执行push:handler时触发run函数执行队列中的异步函数,另外run函数与run:success:handler以及run:error:handler结合形成递归函数。finish函数是用来判断某一个队列是否全部执行完成

import { decoratorMessageCenter, MessageCenter } from "event-message-center"
import {
    ITaskQueue, IQueueList, IQueues, IState, IQueueTemp, ITaskQueueProps
} from "./type"
@decoratorMessageCenter
export class TaskQueue implements ITaskQueue {
    readonly fix: string = `@~&$`
    readonly messageCenter: MessageCenter
    props: ITaskQueueProps
    queues: IQueueList
    queueTemp: IQueueTemp
    state: IState
    /**
     * @param props: {maxLen:并发峰值} 削峰
     */
    constructor(props: ITaskQueueProps) {
        this.clear()
        props && this.defineProps(props, "props")
        this.init()
    }
    /**
     * 初始化
     */
    private init = () => {
        this.messageCenter.on("push:handler", this.run)
        this.messageCenter.on("run:success:handler", this.run)
        this.messageCenter.on("run:success:handler", this.finish)
        this.messageCenter.on("run:error:handler", this.run)
        this.messageCenter.on("run:error:handler", this.finish)
    }
    private defineProps = (props, key) => {
        Object.defineProperty(this, key, { value: props })
    }
    /**
     * 进入队列
     * @param queue: IQueues 单个队列
     * @returns promise: Promise<void> 当前队列执行结束的异步操作
     */
    push = (queue: IQueues) => {
        this.checkHandler(queue)
        const { resolve, reject, promise } = this.defer()
        const queueName = this.fixStr(queue.name)
        this.queues = this.queues.concat(queue.children.map(defer => ({ defer, name: queueName })))
        this.queueTemp[queueName] = { ...queue, result: [] }
        this.messageCenter.emit("push:handler", reject)
        this.messageCenter.on(queueName, resolve)
        return promise
    }
    /**
     * 移出队列
     * @param length 移出数量
     * @returns queues 移出的队列
     */
    unshift = (length) => {
        return this.queues.splice(0, length)
    }
    /**
     * 异步执行队列:函数的思路是无限递归,通过当前队列状态和数量判断是否执行
     * @param reject 异常函数
     * @returns void 0
     */
    run = async ({ reject }) => {
        if (this.stateProxy() === 'pending') return void 0
        if (this.queues.length === 0) return this.stateProxy("idle")
        this.stateProxy("pending")
        const queues = this.unshift(this.props?.maxLen ?? 10)
        try {
            const res = await Promise.all(queues.map((item, i) => item.defer().catch(error => error)))
            return this.handlerSuccess({ res, queues })
        } catch (error) {
            return this.handlerError({ reject, error, queues })
        }
    }
    /**
     * 初始化整个队列,清除所有数据
     */
    clear = () => {
        this.queues = []
        this.queueTemp = {}
        this.props = null
        this.stateProxy("idle")
        this.messageCenter.clear()
    }
    /**
     * 处理每次队列执行完成后的数据
     * @param data { res, queues, error } 运行结束后的返回值及削峰后的初始队列,一一对应
     */
    private finish = ({ res = [], queues, error = 'err' }) => {
        const { queueTemp } = this
        queues.forEach((it, i) => {
            const item = queueTemp[it.name]
            item?.result.push(res[i] ?? error)
            if (item?.result?.length === item?.children?.length) {
                this.messageCenter.emit(it.name, item?.result)
                queueTemp[it.name] = null
            }
        });
    }
}


功能验证

创建异步函数:


根据传入的length生成异步函数,其中有部分函数会走Promise的reject抛错

const syncFn = (args) => {
    return new Promise((resolve, reject) => {
        if (args.includes('error')) {
            return setTimeout(reject.bind(null, args), 1000);
        }
        return setTimeout(resolve.bind(null, args), 500);
    });
};
const createFnList = (length, name) => {
    const task = {
        name,
        children: [],
    };
    while (length--) {
        task.children.push(syncFn.bind(null, length % 3 === 1 ? 'error' + length : 'params'))
    }
    return task;
};


创建四个任务:

const task = createFnList(10, "task1");
const task2 = createFnList(20, "task2");
const task3 = createFnList(10, "task3");
const task4 = createFnList(40, "task4");

试用任务队列:

import { TaskQueue, decoratorTaskQueue } from "task-queue-lib"
@decoratorTaskQueue({ maxLen: 10 })
export class QueueExample {
    taskQueue: TaskQueue
    constructor() {
        this.taskQueue.push(task).then((res) => {
            console.log(res);
            // [
            //     'params', 'params',
            //     'error7', 'params',
            //     'params', 'error4',
            //     'params', 'params',
            //     'error1', 'params'
            // ]
        })
        this.taskQueue.push(task2).then((res) => {
            console.log(res);
            // [
            //     'error19', 'params', 'params',
            //     'error16', 'params', 'params',
            //     'error13', 'params', 'params',
            //     'error10', 'params', 'params',
            //     'error7',  'params', 'params',
            //     'error4',  'params', 'params',
            //     'error1',  'params'
            // ]
        })
        this.taskQueue.push(task3).then((res) => {
            console.log(res);
            // [
            //     'params', 'params',
            //     'error7', 'params',
            //     'params', 'error4',
            //     'params', 'params',
            //     'error1', 'params'
            // ]
        })
        this.taskQueue.push(task4).then((res) => {
            console.log(res);
            // [
            //     'params',  'params',  'error37', 'params',
            //     'params',  'error34', 'params',  'params',
            //     'error31', 'params',  'params',  'error28',
            //     'params',  'params',  'error25', 'params',
            //     'params',  'error22', 'params',  'params',
            //     'error19', 'params',  'params',  'error16',
            //     'params',  'params',  'error13', 'params',
            //     'params',  'error10', 'params',  'params',
            //     'error7',  'params',  'params',  'error4',
            //     'params',  'params',  'error1',  'params'
            // ]
        })
    }
}


写在最后

以上就是文章的全部内容了,有兴趣的同学可以下载源码或者使用npm下载使用


仓库地址:TaskQueue: nodejs消息队列


npm:task-queue-lib - npm


相关文章
|
5月前
|
JavaScript 前端开发
js事件队列
js事件队列
146 55
|
4月前
|
JavaScript 前端开发 API
详解队列在前端的应用,深剖JS中的事件循环Eventloop,再了解微任务和宏任务
该文章详细讲解了队列数据结构在前端开发中的应用,并深入探讨了JavaScript的事件循环机制,区分了宏任务和微任务的执行顺序及其对前端性能的影响。
|
5月前
|
数据采集 Web App开发 JavaScript
利用Selenium和XPath抓取JavaScript动态加载内容的实践案例
利用Selenium和XPath抓取JavaScript动态加载内容的实践案例
|
3月前
|
存储 JavaScript 前端开发
js事件队列
【10月更文挑战第15天】
57 6
|
3月前
|
JavaScript 前端开发 调度
在JavaScript中异步任务里的微任务和宏任务的特点和生命周期
在JavaScript中异步任务里的微任务和宏任务的特点和生命周期
51 0
|
3月前
|
前端开发 JavaScript
JavaScript动态渲染页面爬取——CSS位置偏移反爬案例分析与爬取实战
JavaScript动态渲染页面爬取——CSS位置偏移反爬案例分析与爬取实战
43 0
|
4月前
|
前端开发 JavaScript API
JavaScript 的宏任务和微任务有什么区别
【9月更文挑战第6天】JavaScript 的宏任务和微任务有什么区别
103 4
|
5月前
|
JavaScript 前端开发 UED
Javaweb之javascript的小案例的详细解析
通过上述步骤,我们得到了一个动态更新的实时时钟,这个简单的JavaScript案例展示了定时器的使用方法,并讲解了如何处理日期和时间。这个案例说明了JavaScript在网页中添加动态内容与交互的能力。对于涉足JavaWeb开发的学习者来说,理解和运用这些基础知识非常重要。
46 11
|
5月前
|
存储 前端开发 JavaScript
JavaScript 并发任务控制
【8月更文挑战第31天】JavaScript 并发任务控制
56 2
|
5月前
|
运维 JavaScript 安全
自动化运维:使用Ansible简化日常任务深入理解Node.js事件循环和异步编程
【8月更文挑战第27天】在快节奏的技术环境中,自动化不再是奢侈品,而是必需品。本文将引导你通过Ansible实现自动化运维,从基础到高级应用,解锁高效管理服务器群的秘诀,让你的IT操作更加流畅和高效。