worker_threads 多线程

简介: worker_threads 多线程

Node.js 中的 worker_threads 模块
worker_threads 模块是 Node.js 中用于创建多线程处理的工具。

尽管 JavaScript 是单线程的,但有时候在处理计算密集型任务或长时间运行的操作时,单线程的运行会导致主线程被阻塞,影响服务器性能。

为了解决这种问题,worker_threads 模块允许我们在同一个进程内创建并运行多个线程,每个线程有自己的事件循环,但共享进程的内存空间。

基本概念
主线程:主线程是 Node.js 程序默认执行代码的地方,通常是单线程运行,执行同步和异步的事件循环。
Worker(工作线程):工作线程是与主线程平行执行的额外线程,用于处理复杂、长时间运行的任务,不会阻塞主线程的执行。
何时使用 worker_threads?
当需要处理 CPU 密集型 任务(如大型计算、图像处理、数据加密等)时。
当需要保持 异步 I/O 操作的同时,不阻塞主线程时。
基本使用方法

  1. 创建一个简单的 Worker
    我们可以通过 Worker 类创建工作线程。每个工作线程运行一个独立的 JavaScript 文件。

// main.js
const { Worker } = require('worker_threads');

// 创建一个新的 Worker,并指定 worker 执行的脚本文件
const worker = new Worker('./worker.js');

// 监听 worker 发回的消息
worker.on('message', (message) => {
console.log(Received from worker: ${message});
});

// 向 worker 发送消息
worker.postMessage('Start task');

// worker.js
const { parentPort } = require('worker_threads');

// 监听来自主线程的消息
parentPort.on('message', (message) => {
console.log(Worker received: ${message});

// 进行一些耗时操作
let result = 0;
for (let i = 0; i < 1e9; i++) {
result += i;
}

// 将结果发回主线程
parentPort.postMessage(result);
});

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
在这个例子中,主线程(main.js)创建了一个 Worker 线程(worker.js),并通过 parentPort 与其通信。主线程可以向 Worker 发送任务,Worker 在处理完后将结果返回给主线程。

  1. 数据通信
    主线程和 Worker 通过 postMessage() 和 message 事件来传递数据。可以发送任意可以序列化的 JavaScript 数据类型,如字符串、对象、数组等。

主线程向 Worker 发送消息:
worker.postMessage('Some data');
1
Worker 向主线程发送消息:
parentPort.postMessage('Some result');
1

  1. 共享内存(SharedArrayBuffer)
    worker_threads 支持通过 SharedArrayBuffer 来在多个线程之间共享内存。这种机制可以避免频繁的消息传递开销,提高性能。

// main.js
const { Worker } = require('worker_threads');

const sharedBuffer = new SharedArrayBuffer(4); // 分配 4 字节的共享内存
const sharedArray = new Int32Array(sharedBuffer);

const worker = new Worker('./worker.js', { workerData: sharedBuffer });

worker.on('message', () => {
console.log('Modified shared array:', sharedArray);
});

// worker.js
const { parentPort, workerData } = require('worker_threads');

const sharedArray = new Int32Array(workerData);

// 修改共享数组
sharedArray[0] = 42;

parentPort.postMessage('Shared data modified');

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
这里,SharedArrayBuffer 是共享内存的核心,它允许主线程和 Worker 线程访问相同的内存空间。我们用 Int32Array 对内存进行操作,修改数据后,主线程可以立即读取结果,无需通过消息传递。

  1. 工作线程与主线程的生命周期
    启动和终止:

当创建一个 Worker 实例时,线程会自动启动。
当 Worker 执行完所有任务或调用 worker.terminate() 时,线程会退出。
自动终止:
如果工作线程的事件循环为空(没有待处理的事件),Worker 会自动退出。

worker.terminate().then(() => {
console.log('Worker terminated');
});
1
2
3

  1. 错误处理
    在多线程环境下,处理错误尤为重要。我们可以使用 error 事件来捕获线程中的错误。

worker.on('error', (err) => {
console.error('Worker error:', err);
});
1
2
3
如果 Worker 出现错误,会触发 error 事件,主线程可以处理这个错误。

Worker 线程池
虽然 worker_threads 允许我们创建多个 Worker,但直接为每个任务创建一个新的 Worker 可能效率较低。为此,我们可以创建一个 线程池,通过复用 Worker 来处理多个任务。

线程池实现(简单示例):
const { Worker } = require('worker_threads');

class ThreadPool {
constructor(size) {
this.size = size;
this.workers = [];
this.tasks = [];

// 初始化线程池
for (let i = 0; i < size; i++) {
  this.workers.push(this.createWorker());
}

}

createWorker() {
const worker = new Worker('./worker.js');
worker.on('message', () => {
this.executeNextTask(worker);
});
return worker;
}

executeNextTask(worker) {
if (this.tasks.length === 0) {
return;
}
const task = this.tasks.shift();
worker.postMessage(task);
}

runTask(task) {
const availableWorker = this.workers.find(w => w.isIdle);

if (availableWorker) {
  availableWorker.isIdle = false;
  availableWorker.postMessage(task);
} else {
  this.tasks.push(task);
}

}
}

const pool = new ThreadPool(4);

pool.runTask('Task 1');
pool.runTask('Task 2');

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
在这个简单的示例中,我们创建了一个大小为 4 的线程池,任务可以通过 runTask 方法提交到线程池中。线程池会依次执行任务,并复用空闲的线程。

与其他多线程解决方案的比较
child_process 模块:允许在 Node.js 中创建独立的进程,进程间通过消息传递进行通信,但资源隔离更强,消耗较大。相比之下,worker_threads 在线程间共享内存,创建成本和通信成本较低。
异步操作:虽然 Node.js 的异步 I/O 可以通过事件驱动模型来处理大量任务,但对于 CPU 密集型任务,异步操作并不适合,此时可以使用 worker_threads 来实现并行计算。
总结
worker_threads 是 Node.js 中用于多线程处理的核心工具。
它允许在单个进程内创建多个线程,线程间可以通过消息传递和共享内存进行通信。
非常适合用于处理计算密集型任务,避免主线程的阻塞。
虽然 worker_threads 增强了并行计算的能力,但需要合理管理线程的创建和销毁,避免线程资源的浪费。

相关文章
|
23天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
15天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
20天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2572 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
18天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
3天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
2天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
159 2
|
19天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1570 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
21天前
|
编解码 JSON 自然语言处理
通义千问重磅开源Qwen2.5,性能超越Llama
击败Meta,阿里Qwen2.5再登全球开源大模型王座
942 14
|
3天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
185 2
|
16天前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
711 12