前言:
针对一些大型的秒杀活动,抢票业务,高并发是一个经常遇到的问题,后端人员时常会接触到消息队列这个中间件。对于前端人员而言,使用node开发业务,或者使用浏览器单线程异步渲染时也会遇到堵塞,页面卡死的现象,如何处理大量的数据同时加载或者数据同时请求便成为了老生常谈的话题。
此时一个异步的任务队列或许可以帮助我们缓解这些问题。
任务队列的特点:异步,解耦,削峰
异步是多个任务并发进行,互不依赖;解耦是将业务隔离开,保证任务的运行结果不会影响到其他任务,从而产生堵塞;削峰是指在系统请求量或者负载达到一定峰值时使用缓存延缓或者排队的手段,保证服务或者任务稳定进行
功能设计:
为了达到通过异步执行队列的目的,采用Promise.all()的方式执行队列的defer延时函数,代码中的解耦可以使用messageCenter进行发布订阅,队列的削峰可以使用状态和最大运行峰值来控制函数执行
流程设计:
接口设计:
/** * 单条队列 * defer: 待运行的异步函数 * params?: defer的参数,也可以用bind直接传递 * */ export interface IQueue { defer: Function name?: string } /** * 队列参数 * children: 队列列表 * name: 队列唯一标识 * result: 运行完成后的结果 * */ export interface IQueues { children: Array<Function> name: string result?: any[] } /** * 队列缓存 */ export type IQueueTemp = { [key: string]: IQueues } /** * 系统队列 */ export type IQueueList = Array<IQueue> /** * 队列状态 idle:空闲 pending:等待 fulfilled:完成 rejected:失败 */ export type IState = "idle" | "pending" | "fulfilled" | "rejected" /** * 任务队列参数 */ export type ITaskQueueProps = { maxLen: number } /** * 任务队列 */ export type ITaskQueue = { readonly fix: string props: ITaskQueueProps queueTemp: IQueueTemp queues: IQueueList state: IState push: (queue: IQueues) => Promise<void> unshift: (length: number) => IQueueList run: (reject: any) => unknown clear: () => void }
功能实现:
工具函数
MessageCenter
defer:扁平化Promise
/** * 优化Promise,避免Promise嵌套 * @returns {Promise,resolve,reject} */ private defer = () => { let resolve, reject return { promise: new Promise<void>((_resolve, _reject) => { resolve = _resolve reject = _reject }), resolve, reject } }
fixStr:根据队列名称存入缓存,添加消息订阅
/** * 混淆字符串 * @param str 需要混淆的字符 * @returns 混淆产物 */ private fixStr(str) { return `${this.fix}${str}` }
checkHandler:检测队列格式是否符合要求
/** * 检查参数是否符合标准 * @param queue 队列或队列集合 */ private checkHandler(queue: IQueues) { if (!queue) { throw new ReferenceError('queue is not defined') } if (!(queue.children instanceof Array) || typeof queue !== "object") { throw new TypeError(`queue should be an object and queue.children should be an array`); } const noFn = i => !i || typeof i !== "function" if (queue.children?.length === 0) throw new Error('queue.children.length can not be 0') if (queue.children?.find((i) => noFn(i))) throw new Error('queueList should have defer') }
stateProxy:设置与获取队列状态
/** * 设置、获取当前队列的状态 * @param state 队列状态,有值就修改当前状态,无值就获取当前状态 * @returns srate 队列的状态 */ private stateProxy = (state?: IState) => { state && (this.state = state) return this.state }
handlerSuccess:异步执行队列成功后回调
/** * 单次队列执行成功 * @param data { res, queues } * @returns */ private handlerSuccess = (data) => { this.stateProxy("fulfilled") return this.messageCenter.emit("run:success:handler", data) }
handlerError:异步执行队列抛错后回调
/** * 单次队列执行失败 * @param data { res, queues, error } * @returns */ private handlerError = (data) => { const { reject, error } = data this.stateProxy("rejected") reject && typeof reject === "function" && reject(error) return this.messageCenter.emit("run:error:handler", data) }
decoratorTaskQueue:使用装饰器调用任务队列
/** * 装饰器用法 * @param opts 同TaskQueue中constructor * @returns 混入类原型中 */ export const decoratorTaskQueue = (opts: ITaskQueueProps): ClassDecorator => { return <TFunction extends Function>(target: TFunction) => { if (!target.prototype.taskQueue) { target.prototype.taskQueue = new TaskQueue(opts) } } }
任务队列实现
其中init函数中注册了一些hooks,当执行push:handler时触发run函数执行队列中的异步函数,另外run函数与run:success:handler以及run:error:handler结合形成递归函数。finish函数是用来判断某一个队列是否全部执行完成
import { decoratorMessageCenter, MessageCenter } from "event-message-center" import { ITaskQueue, IQueueList, IQueues, IState, IQueueTemp, ITaskQueueProps } from "./type" @decoratorMessageCenter export class TaskQueue implements ITaskQueue { readonly fix: string = `@~&$` readonly messageCenter: MessageCenter props: ITaskQueueProps queues: IQueueList queueTemp: IQueueTemp state: IState /** * @param props: {maxLen:并发峰值} 削峰 */ constructor(props: ITaskQueueProps) { this.clear() props && this.defineProps(props, "props") this.init() } /** * 初始化 */ private init = () => { this.messageCenter.on("push:handler", this.run) this.messageCenter.on("run:success:handler", this.run) this.messageCenter.on("run:success:handler", this.finish) this.messageCenter.on("run:error:handler", this.run) this.messageCenter.on("run:error:handler", this.finish) } private defineProps = (props, key) => { Object.defineProperty(this, key, { value: props }) } /** * 进入队列 * @param queue: IQueues 单个队列 * @returns promise: Promise<void> 当前队列执行结束的异步操作 */ push = (queue: IQueues) => { this.checkHandler(queue) const { resolve, reject, promise } = this.defer() const queueName = this.fixStr(queue.name) this.queues = this.queues.concat(queue.children.map(defer => ({ defer, name: queueName }))) this.queueTemp[queueName] = { ...queue, result: [] } this.messageCenter.emit("push:handler", reject) this.messageCenter.on(queueName, resolve) return promise } /** * 移出队列 * @param length 移出数量 * @returns queues 移出的队列 */ unshift = (length) => { return this.queues.splice(0, length) } /** * 异步执行队列:函数的思路是无限递归,通过当前队列状态和数量判断是否执行 * @param reject 异常函数 * @returns void 0 */ run = async ({ reject }) => { if (this.stateProxy() === 'pending') return void 0 if (this.queues.length === 0) return this.stateProxy("idle") this.stateProxy("pending") const queues = this.unshift(this.props?.maxLen ?? 10) try { const res = await Promise.all(queues.map((item, i) => item.defer().catch(error => error))) return this.handlerSuccess({ res, queues }) } catch (error) { return this.handlerError({ reject, error, queues }) } } /** * 初始化整个队列,清除所有数据 */ clear = () => { this.queues = [] this.queueTemp = {} this.props = null this.stateProxy("idle") this.messageCenter.clear() } /** * 处理每次队列执行完成后的数据 * @param data { res, queues, error } 运行结束后的返回值及削峰后的初始队列,一一对应 */ private finish = ({ res = [], queues, error = 'err' }) => { const { queueTemp } = this queues.forEach((it, i) => { const item = queueTemp[it.name] item?.result.push(res[i] ?? error) if (item?.result?.length === item?.children?.length) { this.messageCenter.emit(it.name, item?.result) queueTemp[it.name] = null } }); } }
功能验证
创建异步函数:
根据传入的length生成异步函数,其中有部分函数会走Promise的reject抛错
const syncFn = (args) => { return new Promise((resolve, reject) => { if (args.includes('error')) { return setTimeout(reject.bind(null, args), 1000); } return setTimeout(resolve.bind(null, args), 500); }); }; const createFnList = (length, name) => { const task = { name, children: [], }; while (length--) { task.children.push(syncFn.bind(null, length % 3 === 1 ? 'error' + length : 'params')) } return task; };
创建四个任务:
const task = createFnList(10, "task1"); const task2 = createFnList(20, "task2"); const task3 = createFnList(10, "task3"); const task4 = createFnList(40, "task4");
试用任务队列:
import { TaskQueue, decoratorTaskQueue } from "task-queue-lib" @decoratorTaskQueue({ maxLen: 10 }) export class QueueExample { taskQueue: TaskQueue constructor() { this.taskQueue.push(task).then((res) => { console.log(res); // [ // 'params', 'params', // 'error7', 'params', // 'params', 'error4', // 'params', 'params', // 'error1', 'params' // ] }) this.taskQueue.push(task2).then((res) => { console.log(res); // [ // 'error19', 'params', 'params', // 'error16', 'params', 'params', // 'error13', 'params', 'params', // 'error10', 'params', 'params', // 'error7', 'params', 'params', // 'error4', 'params', 'params', // 'error1', 'params' // ] }) this.taskQueue.push(task3).then((res) => { console.log(res); // [ // 'params', 'params', // 'error7', 'params', // 'params', 'error4', // 'params', 'params', // 'error1', 'params' // ] }) this.taskQueue.push(task4).then((res) => { console.log(res); // [ // 'params', 'params', 'error37', 'params', // 'params', 'error34', 'params', 'params', // 'error31', 'params', 'params', 'error28', // 'params', 'params', 'error25', 'params', // 'params', 'error22', 'params', 'params', // 'error19', 'params', 'params', 'error16', // 'params', 'params', 'error13', 'params', // 'params', 'error10', 'params', 'params', // 'error7', 'params', 'params', 'error4', // 'params', 'params', 'error1', 'params' // ] }) } }
写在最后
以上就是文章的全部内容了,有兴趣的同学可以下载源码或者使用npm下载使用
仓库地址:TaskQueue: nodejs消息队列
npm:task-queue-lib - npm