nodejs中的并发编程

简介:

nodejs中的并发编程
阅读目录

从sleep的实现说起
多线程同步
参考

从sleep的实现说起
在nodejs中,如果要实现sleep的功能主要是通过“setTimeout + promise”实现,也可以通过“循环空转”来解决。前者是利用定时器实现任务的延迟执行,并通过promise链管理任务间的时序与依赖,本质上nodejs的执行线程并没有真正的sleep,事件循环以及v8仍在运行,是仅仅表现在业务逻辑上sleep;而后者的实现则无疑实在浪费CPU性能,有点类似自旋锁,不符合大多数场景。

若要实现引擎层面(运行时)的sleep,事情在ECMAScript Latest Draft (ECMA-262)出现之后开始有了转机。ECMA262规定了 Atomics.wait,它会将调用该方法的代理(引擎)陷入等待队列并让其sleep,直到被notify或者超时。该规范在8.10.0以上版本的nodejs上被实现。

事实上,Atomics.wait 的出现主要解决浏览器或nodejs的worker之间数据同步的问题。浏览器上的web-worker、正式被nodejs@12纳入的worker-threads模块,这些都是ECMAScript多线程模型的具体实现。既然出现多线程那么线程间的同步也就不可避免的被提到,在前端以及nodejs范围内可以使用Atomics.wait和notify来解决。

说的有些跑题,回到本节,如何实现运行时的sleep呢?很简单,利用Atomics.wait的等待超时机制:

let sharedBuf = new SharedArrayBuffer(4);
let sharedArr = new Int32Array(sharedBuf);
// 睡眠n秒
let sleep = function(n){

Atomics.wait(sharedArr, 0, 0, n * 1000);

}
此处的sleep并不是异步方法,它会阻塞执行线程直到超时,因此需要根据业务场景来使用该sleep模型。
关于Atomics.wait的具体使用方法,下文会着重讲解。

多线程同步
虽然nodejs多线程使用场景不是很多,但是一旦涉及到多线程,那么线程间同步就必不可少,否则无法解决临界区的问题。不过nodejs的work_threads对线程的创建不同于c或者java,它使用libuv的API创建线程 “uv_thread_create”,但是在此之前需要初始化一些设施如MessagePort、v8实例设置等,因此创建一个thread并不是一个轻量级的操作,需要结合场景酌情创建适量的threads。

回到正题,多线程间的同步一般需要依赖锁,而锁的实现需要依赖于全局变量。在nodejs的work_threads实现中,主线程无法设置全局变量,因此可以通过Atomics实现。正如上例中所示,Atomics.wait依赖 SharedArrayBuffer,这是共享内存的ArrayBuffer,threads之间可通过它共享数据,可真正操作ArrayBuffer时并不直接使用该对象,而是TypeArray。如Atomics.wait,第一个参数必须是Int32Array对象,而该对象指向的缓冲区为SharedArrayBuffer。当线程A因为Atomics.wait而阻塞后,可通过其它线程B调用Atomics.notify进行唤醒从而让线程A的v8继续执行。

let { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
var sab = new SharedArrayBuffer(1024);
var int32 = new Int32Array(sab);
if (isMainThread) {

const  worker  =  new Worker(__filename, {
    workerData: sab
});
worker.on('message', (d) => {
    console.log('parent receive message:', d);
});
worker.on('error', (e) => {
    console.error('parent receive error', e);
});
worker.on('exit', (code) => {
    if (code !==  0)
        console.error(new  Error(`工作线程使用退出码 ${code} 停止`));
});

Atomics.wait(int32, 0, 0); // A
console.log(int32[0]); // C: 123

} else {

let buf = workerData;
let arrs = new Int32Array(buf);
Atomics.store(arrs, 0, 123); 
Atomics.notify(arrs, 0); // B

}
上例中,主线程创建thread后,在A处进行阻塞;在新线程中,通过原子操作Atomics.store修改SharedArrayBuffer的第一项为123后,于B处唤醒阻塞在SharedArrayBuffer第一项的其它线程;此时主线程被唤醒,执行console.log(int32[0]),输出被新线程修改后的SharedArrayBuffer第一项数据123。


分析一个公平、排它、不可重入锁的实现,它使用Atomics.wait/notify/compareExchange完成线程的同步。

main-thread.js

let Lock = require('./lock').Lock;
let { Worker } = require('worker_threads');
const sharedBuffer = new SharedArrayBuffer(1 * Int32Array.BYTES_PER_ELEMENT);
const sharedArray = new Int32Array(sharedBuffer);
let worker = new Worker('./worker-lock.js', {

workerData:  sharedBuffer

});
Lock.initialize(sharedArray, 0);
const lock = new Lock(sharedArray, 0);
// 获取锁
lock.lock();

// 3s后释放锁
setTimeout(() => {

lock.unlock(); // (B)

}, 3000)
worker-thread.js

let Lock = require('./lock').Lock;
let { parentPort, workerData } = require('worker_threads');
const sharedArray = new Int32Array(workerData);
const lock = new Lock(sharedArray, 0);

console.log('Waiting for lock...'); // (A)
// 获取锁
lock.lock(); // (B) blocks!
console.log('Unlocked'); // (C)
主线程初始化互斥锁,同时创建线程,主线程获取锁后三秒钟释放;
worker线程尝试获取锁,此时锁已被主线程获取,因此worker线程在此阻塞,等待3s后主线程释放锁被唤醒,继续执行输出。

lock.js

const UNLOCKED = 0;
const LOCKED_NO_WAITERS = 1;
const LOCKED_POSSIBLE_WAITERS = 2;
const NUMINTS = 1;

class Lock {

// 'iab' must be a Int32Array mapping shared memory.
// 'ibase' must be a valid index in iab, the first of NUMINTS reserved for the lock.
constructor(iab, ibase) {
    if (!(iab  instanceof  Int32Array  &&  ibase|0  ===  ibase  &&  ibase  >=  0  &&  ibase+NUMINTS  <=  iab.length)) {
        throw  new  Error(`Bad arguments to Lock constructor: ${iab}  ${ibase}`);
    }
    this.iab  =  iab;
    this.ibase  =  ibase;
}
static  initialize(iab, ibase) {
    if (!(iab  instanceof  Int32Array  &&  ibase|0  ===  ibase  &&  ibase  >=  0  &&  ibase+NUMINTS  <=  iab.length)) {
        throw  new  Error(`Bad arguments to Lock constructor: ${iab}  ${ibase}`);
    }
    Atomics.store(iab, ibase, UNLOCKED);
    return  ibase;
}
// Acquire the lock, or block until we can. Locking is not recursive:
lock() {
    const  iab  =  this.iab;
    const  stateIdx  =  this.ibase;
    var  c;
    if ((c  =  Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS)) !==  UNLOCKED) { // A
        do {
            if (c  ===  LOCKED_POSSIBLE_WAITERS
            ||  Atomics.compareExchange(iab, stateIdx, LOCKED_NO_WAITERS, LOCKED_POSSIBLE_WAITERS) !==  UNLOCKED) {
                Atomics.wait(iab, stateIdx, LOCKED_POSSIBLE_WAITERS, Number.POSITIVE_INFINITY);
            }
        } while ((c  =  Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_POSSIBLE_WAITERS)) !==  UNLOCKED); // B
    }
}
tryLock() {
    const  iab  =  this.iab;
    const  stateIdx  =  this.ibase;
    return  Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS) ===  UNLOCKED;
}
unlock() {
    const  iab  =  this.iab;
    const  stateIdx  =  this.ibase;
    var  v0  =  Atomics.sub(iab, stateIdx, 1);
    // Wake up a waiter if there are any
    if (v0  !==  LOCKED_NO_WAITERS) {
        Atomics.store(iab, stateIdx, UNLOCKED);
        Atomics.notify(iab, stateIdx, 1);
    }
}
toString() {
    return  "Lock:{ibase:"  +  this.ibase  +"}";
}

}
exports.Lock = Lock;
当进程A尝试获取锁成功时,A处判断语句为false,因此由compareExchange设置状态为LOCKED_NO_WAITERS,直接执行其后续逻辑;
若进程B此时执行lock获取锁时,A处判断为true,进入do while循环体,在wait处sleep;
进程A通过unlock释放锁,会将锁状态置为UNLOCKED,同时唤醒阻塞的进程B;
进程B执行循环判断语句B,此时为false,跳出循环执行B的逻辑。

当然,也可通过tryLock实现自旋锁或者其他逻辑实现非阻塞等待。

参考
libuv漫谈之线程
Atomics
Atomics MDN

原文地址https://www.cnblogs.com/accordion/p/12533305.html

相关文章
|
6月前
|
数据采集 消息中间件 Java
python并发编程:什么是并发编程?python对并发编程有哪些支持?
python并发编程:什么是并发编程?python对并发编程有哪些支持?
41 0
|
6月前
|
Python
Python中的并发编程与多线程
在当今高并发的网络应用环境中,如何充分利用计算资源来提高程序的执行效率是一个关键问题。本文将探讨Python中的并发编程技术,重点介绍了多线程的使用方法和注意事项,帮助读者更好地理解并发编程在Python中的应用。
|
6天前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
5月前
|
Python
Python中的并发编程(7)异步编程
Python中的并发编程(7)异步编程
|
6月前
|
安全 Go 开发者
Golang深入浅出之-Go语言并发编程面试:Goroutine简介与创建
【4月更文挑战第22天】Go语言的Goroutine是其并发模型的核心,是一种轻量级线程,能低成本创建和销毁,支持并发和并行执行。创建Goroutine使用`go`关键字,如`go sayHello(&quot;Alice&quot;)`。常见问题包括忘记使用`go`关键字、不正确处理通道同步和关闭、以及Goroutine泄漏。解决方法包括确保使用`go`启动函数、在发送完数据后关闭通道、设置Goroutine退出条件。理解并掌握这些能帮助开发者编写高效、安全的并发程序。
89 1
|
6月前
|
并行计算 Python
Python并发编程与多线程
Python编程中,多线程和并发编程是优化复杂任务执行的关键。借助标准库中的`threading`模块,可实现多线程,如示例所示,创建线程并执行函数。然而,由于全局解释器锁(GIL),多线程在CPU密集型任务中并不高效。对于I/O密集型任务,多线程仍能提高效率。为充分利用多核,可采用多进程(如`multiprocessing`模块)或异步编程。选择技术时需依据任务类型和性能需求。
|
6月前
|
API 开发者 Python
深入浅出Python协程:提升并发编程效率
在当今高速发展的互联网时代,高并发成为了软件开发中的一个重要需求。本文将引领读者深入理解Python中的协程(Coroutine)概念,探讨其在并发编程中的应用及优势。我们将从协程的基础概念出发,通过实例讲解如何使用asyncio库来编写高效的异步代码。文章旨在帮助读者掌握协程的工作原理和使用方法,从而在实际开发中能够更好地利用Python进行高效的并发编程。
|
6月前
|
安全 开发者 Python
Python中的并发编程:多线程与多进程
在Python中,实现并发操作对于提升程序性能至关重要。本文将介绍Python中的并发编程技术,重点讨论多线程和多进程的应用场景、优缺点以及最佳实践。
|
6月前
|
并行计算 Python
Python中的并发编程:多线程与多进程的比较
在Python编程中,实现并发操作是提升程序性能的重要手段之一。本文将探讨Python中的多线程与多进程两种并发编程方式的优劣及适用场景,帮助读者更好地选择合适的方法来提高程序运行效率。
|
6月前
|
开发者 Python
深入浅出Python协程:提高并发性能的利器
本文旨在深入探讨Python中的协程机制,一种轻量级的并发编程解决方案。与传统的多线程和多进程相比,协程提供了更高效的并发性能,尤其是在I/O密集型应用中。我们将从协程的基本概念入手,解析其工作原理,并通过实例讲解如何在Python中使用协程来优化程序性能。文章还将对比协程与其他并发模型的优缺点,帮助读者全面理解协程在现代软件开发中的应用价值。
54 3