Node.js躬行记(23)——Worker threads

简介: Node.js躬行记(23)——Worker threads

Node.js 官方提供了 ClusterChild process 创建子进程,通过 Worker threads 模块创建子线程。但前者无法共享内存,通信必须使用 JSON 格式,有一定的局限性和性能问题。后者更轻量,并且可以共享内存,通过传输 ArrayBuffer 实例或共享 SharedArrayBuffer 实例来做到这一点,即数据格式没有太多要求。但是要注意,数据中不能包含函数。

  Worker threads 从 Node V12 开始成为正式标准,其对于执行 CPU 密集型的操作很有用,而对 I/O 密集型工作没有多大帮助。 Node.js 内置的异步 I/O 操作要比它效率更高。注意,Worker threads 是基于 Node.js 架构的多工作线程,如下图所示。在每个工作线程中,都会包含 V8 和 libuv,即都包含Event Loop。

  


一、线程池


  创建、执行、销毁一个 Worker 的开销是很大的,所以需要实现一个线程池(Worker Pool),在初始化时创建有限数量的 Worker 并加载单一的 worker.js,主线程和 Worker 可进行进程间通信,当所有任务完成后,这些 Worker 将会被统一销毁。

  在 Worker 中通过 parentPort.postMessage() 向主线程发送消息,而在主线程中可以通过 worker.on('message') 接收发送过来的消息,worker 是一个 Worker 实例,例如 new Worker(filePath)。

  下面是一个官方示例,isMainThread 可判断当前是否是主线程,workerData 是传递给 Worker 的数据。

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
  module.exports = function parseJSAsync(script) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, {
        workerData: script
      });
      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0)
          reject(new Error(`Worker stopped with exit code ${code}`));
      });
    });
  };
} else {
  const script = workerData;
  parentPort.postMessage(script);
}

  下面是一个线程池示例,参考自《worker_threads 初体验》一文,做了微调,具体在此不在赘述,可阅读原文或注释。

// 获取当前设备的 CPU 线程数目,作为 numberOfThreads 的默认值。
const { length: cpusLength } = require('os').cpus();
const { Worker } = require('worker_threads');
class WorkerPool {
  constructor(workerPath, options = {}, numberOfThreads = cpusLength) {
    if (numberOfThreads < 1) {
      throw new Error('Number of threads should be greater or equal than 1!');
    }
    this.workerPath = workerPath;
    this.numberOfThreads = numberOfThreads;
    // 任务队列
    this._queue = [];
    // Worker 索引
    this._workersById = {};
    // Worker 激活状态索引
    this._activeWorkersById = {};
    // 创建 Workers
    for (let i = 0; i < this.numberOfThreads; i++) {
      const worker = new Worker(workerPath, options);
      this._workersById[i] = worker;
      // 将这些 Worker 设置为未激活状态
      this._activeWorkersById[i] = false;
    }
  }
  /**
   * 检查空闲的 Worker
   */
  getInactiveWorkerId() {
    for (let i = 0; i < this.numberOfThreads; i++) {
      if (!this._activeWorkersById[i]) return i;
    }
    return -1;
  }
  /**
   * 调用 Worker 执行,目的是在指定的 Worker 里执行指定的任务
   */
  runWorker(workerId, taskObj) {
    const worker = this._workersById[workerId];
    // 当任务执行完毕后执行
    const doAfterTaskIsFinished = () => {
      // 去除所有的 Listener,不然一次次添加不同的 Listener 会内存溢出(OOM)
      worker.removeAllListeners('message');
      worker.removeAllListeners('error');
      // 将这个 Worker 设为未激活状态
      this._activeWorkersById[workerId] = false;
      if (this._queue.length) {
        // 任务队列非空,使用该 Worker 执行任务队列中第一个任务
        this.runWorker(workerId, this._queue.shift());
      }
    };
    // 将这个 Worker 设置为激活状态
    this._activeWorkersById[workerId] = true;
    // 设置两个回调,用于 Worker 的监听器
    const messageCallback = result => {
      taskObj.cb(null, result);
      doAfterTaskIsFinished();
    };
    const errorCallback = error => {
      taskObj.cb(error);
      doAfterTaskIsFinished();
    };
    // 为 Worker 添加 'message' 和 'error' 两个 Listener
    worker.once('message', messageCallback);
    worker.once('error', errorCallback);
    // 将数据传给 Worker 供其获取和执行
    worker.postMessage(taskObj.data);
  }
  /**
   * 运行线程
   */
  run(data) {
    // Promise 是个好东西
    return new Promise((resolve, reject) => {
      // 调用 getInactiveWorkerId() 获取一个空闲的 Worker
      const availableWorkerId = this.getInactiveWorkerId();
      const taskObj = {
        data,
        cb: (error, result) => {
          // 虽然 Workers 需要使用 Listener 和 Callback,但这不能阻止我们使用 Promise,对吧?
          // 不,你不能 util.promisify(taskObj) 。人不能,至少不应该。
          if (error) reject(error);
          return resolve(result);
        }
      };
      if (availableWorkerId === -1) {
        // 当前没有空闲的 Workers 了,把任务丢进队列里,这样一旦有 Workers 空闲时就会开始执行。
        this._queue.push(taskObj);
        return null;
      }
      // 有一个空闲的 Worker,用它执行任务
      this.runWorker(availableWorkerId, taskObj);
    })
  }
  /**
   * 销毁
   */
   destroy(force = false) {
    for (let i = 0; i < this.numberOfThreads; i++) {
      if (this._activeWorkersById[i] && !force) {
        // 通常情况下,不应该在还有 Worker 在执行的时候就销毁它,这一定是什么地方出了问题,所以还是抛个 Error 比较好
        // 不过保留一个 force 参数,总有人用得到的
        throw new Error(`The worker ${i} is still runing!`);
      }
      // 销毁这个 Worker
      this._workersById[i].terminate();
    }
  }
}
module.exports = WorkerPool;


二、实践


  之所以需要多线程,是为了解决一个优化需求。就是有一个接口,里面有很多查询数据库(MySQL和MongoDB)的操作,单条语句并不会慢,但累加后整体的响应速度就会变慢,那么就想通过多线程,同时处理一些查询语句,然后整合结果。

  先对线程池做最简单的处理,创建 worker.js,接收 userId。

const { isMainThread, parentPort } = require('worker_threads');
// 不是主线程时执行
if (!isMainThread) {
  parentPort.on('message', async ({userId }) => {
    console.log('postMessage', userId);
    parentPort.postMessage(userId);
  });
}

  然后初始化线程池,将数组中的 userId 传递给 Worker,pool.run({ userId: item })。

const WorkerPool = require('./workerPool');
const { join } = require('path');
async function workerMain(services) {
  const workerPath = join(__dirname + '/worker.js');
  // 初始化一个 Worker Pool
  const pool = new WorkerPool(workerPath);
  Promise.all([4,12,13,15].map(async item => {
    await pool.run({ userId: item });
  })).then(json => {
    // 销毁线程池
    pool.destroy();
  });
}

  输出顺序没有按照数组的顺序,并且每次的输出顺序还都是不同的,由此可知,代码是并发运行的。

postMessage 12
postMessage 4
postMessage 15
postMessage 13

  那么接下来就引入数据库查询的代码,公司项目基于 sequelize.js 封装了增删改查的逻辑,通过 services 变量可以调用相关的操作。在主线程中,计划将 services 传递到 Worker 中。

async function workerMain(services) {
  // Worker Threads 不能共享实例以及带函数的对象
  const workerPath = join(__dirname + '/worker.js', { workerData: services });
  // 初始化一个 Worker Pool
  const pool = new WorkerPool(workerPath);
  // 省略代码......
}

  然而报错了,大致是下面这个意思,无法克隆,因为对象中包含函数,就会引发错误。

node:internal/worker:349     
ReflectApply(this[kPublicPort].postMessage, this[kPublicPort], args);
could not be cloned.

  想以通信的方式实现数据库的并发查询,目前看来不能完成。

  其实可以在 worker.js 中单独引入 services, 不过由于我们在脚本文件中采用了 import 语法,因此在执行时会报错,SyntaxError: Cannot use import statement outside a module。

const { isMainThread, parentPort, workerData } = require('worker_threads');
const services = require('../services');
// 不是主线程时执行
if (!isMainThread) {
  // 省略代码......
}

  还有一种解决方案,其成本就比较高,就是单独再实现一套服务层,也就是说再封装一层符合Node.js 模块化语法的数据库操作集合。

 

相关文章
|
4月前
|
Web App开发 JavaScript 前端开发
[译] 深入理解 Node.js 中的 Worker 线程
[译] 深入理解 Node.js 中的 Worker 线程
|
5月前
|
缓存 JavaScript 前端开发
JavaScript进阶 - Web Workers与Service Worker
【7月更文挑战第4天】JavaScript的Web Workers和Service Worker增强了Web性能。Web Workers处理后台多线程,减轻主线程负担,但通信有开销,受同源策略限制。Service Worker则用于离线缓存和推送通知,需管理其生命周期、更新策略,并确保安全。两者都带来了挑战,但也极大提升了用户体验。通过理解和优化,开发者能构建更高效、安全的Web应用。
146 2
|
5月前
|
缓存 前端开发 JavaScript
JavaScript进阶 - Web Workers与Service Worker
【7月更文挑战第10天】在Web开发中,Web Workers和Service Worker提升性能。Workers运行后台任务,防止界面冻结。Web Workers处理计算密集型任务,Service Worker则缓存资源实现离线支持。常见问题包括通信故障、资源限制、注册错误及缓存更新。通过示例代码展示了两者用法,并强调生命周期管理和错误处理的重要性。善用这些技术,可构建高性能的Web应用。
118 0
|
5月前
|
缓存 JavaScript 前端开发
JavaScript进阶 - Web Workers与Service Worker
【7月更文挑战第6天】JavaScript的Web Workers和Service Worker增强了浏览器的性能处理和离线功能。Web Workers处理后台计算,减轻主线程压力,但通信有开销,受同源策略限制。Service Worker则能拦截网络请求,支持离线缓存和推送通知,但其生命周期和权限管理需谨慎处理。通过理解它们的工作原理和限制,开发者能创建更流畅、更健壮的Web应用。
121 0
|
5月前
|
JavaScript 前端开发 API
JS案例:前端Iframe及Worker通信解决思路
JS案例:前端Iframe及Worker通信解决思路
103 0
|
7月前
|
移动开发 JavaScript 前端开发
Web Worker:JavaScript的后台任务解决方案
Web Worker:JavaScript的后台任务解决方案
|
7月前
|
移动开发 JavaScript 前端开发
【JavaScript技术专栏】Web Worker在JavaScript中的应用
【4月更文挑战第30天】HTML5的Web Worker API解决了JavaScript单线程性能瓶颈问题,允许在后台线程运行JS代码。本文介绍了Web Worker的基本概念、类型、用法和应用场景,如复杂计算、图像处理和数据同步。通过实例展示了搜索建议、游戏开发和实时数据分析等应用,并提醒注意其无法直接访问DOM、需消息传递通信以及移动端资源管理。Web Worker为前端开发提供了多线程能力,提升了Web应用性能和用户体验。
80 0
|
Kubernetes Linux 网络安全
|
前端开发 JavaScript
【worker】js中的多线程
因为下个项目中要用到一些倒计时的功能,所以就提前准备了一下,省的到时候出现一下界面不友好和一些其他的事情。正好趁着这个机会也加深一下html5中的多线程worker的用法和理解。 Worker简介     JavaScript 语言采用的是单线程模型,也就是说,所有任务只能在一个线程上完成,一次只能做一件事。
1351 0
|
JavaScript 前端开发 HTML5
web worker 是运行在后台的 JavaScript,不会影响页面的性能。
http://www.w3school.com.cn/html5/html_5_webworkers.asp
1016 0