问题
Node.js 由这些关键字组成: 事件驱动、非阻塞I/O、高效、轻量。
于是在我们刚接触 Node.js 时,会有所疑问:
- 为什么在浏览器中运行的 JavaScript 能与操作系统进行如此底层的交互?
- Node 真的是单线程吗?
- 如果是单线程,他是如何处理高并发请求的?
- Node 事件驱动是如何实现的?
下来我们一起来解秘这是怎么一回事!
架构一览
上面的问题,都挺底层的,所以我们从 Node.js 本身入手,先来看看 Node.js 的结构。
- Node.js 标准库,这部分是由 Javascript编写的,即我们使用过程中直接能调用的 API。在源码中的 lib 目录下可以看到。
- Node bindings,这一层是 Javascript 与底层 C/C++ 能够沟通的关键,前者通过 bindings 调用后者,相互交换数据。
- 第三层是支撑 Node.js 运行的关键,由 C/C++ 实现。
- V8:Google 推出的 Javascript VM,也是 Node.js 为什么使用的是 JavaScript 的关键,它为 JavaScript 提供了在非浏览器端运行的环境,它的高效是 Node.js 之所以高效的原因之一。
- Libuv:它为 Node.js 提供了跨平台,线程池,事件池,异步 I/O 等能力,是 Node.js 如此强大的关键。
- C-ares:提供了异步处理 DNS 相关的能力。
- http_parser、OpenSSL、zlib 等:提供包括 http 解析、SSL、数据压缩等其他的能力。
单线程、异步
- 单线程:所有任务需要排队,前一个任务结束,才会执行后一个任务。如果前一个任务耗时很长,后一个任务就不得不一直等着。Node 单线程指的是 Node 在执行程序代码时,主线程是单线程。
- 异步:主线程之外,还维护了一个"事件队列"(Event queue)。当用户的网络请求或者其它的异步操作到来时,Node 都会把它放到 Event Queue 之中,此时并不会立即执行它,代码也不会被阻塞,继续往下走,直到主线程代码执行完毕。
注:
- JavaScript 是单线程的,Node 本身其实是多线程的,只是 I/O 线程使用的 CPU 比较少;还有个重要的观点是,除了用户的代码无法并行执行外,所有的 I/O (磁盘 I/O 和网络 I/O) 则是可以并行起来的。
- libuv 线程池默认打开 4 个,最多打开 128 个 线程。
事件循环
Nodejs 所谓的单线程,只是主线程是单线程。
- 主线程运行 V8 和 JavaScript
- 多个子线程通过
事件循环
被调度
可以抽象为:主线程对应于老板,正在工作。一旦发现有任务可以分配给职员(子线程)来做,将会把任务分配给底下的职员来做。同时,老板继续做自己的工作,等到职员(子线程)把任务做完,就会通过事件把结果回调给老板。老板又不停重复处理职员(子线程)子任务的完成情况。
老板(主线程)给职员(子线程)分配任务,当职员(子线程)把任务做完之后,通过事件把结果回调给老板。老板(主线程)处理回调结果,执行相应的 JavaScript。
更具体的解释请看下图:
1、每个 Node.js 进程只有一个主线程在执行程序代码,形成一个执行栈(execution context stack)。
2、Node.js 在主线程里维护了一个"事件队列"(Event queue),当用户的网络请求或者其它的异步操作到来时,Node 都会把它放到 Event Queue之中,此时并不会立即执行它,代码也不会被阻塞,继续往下走,直到主线程代码执行完毕。
3、主线程代码执行完毕完成后,然后通过 Event Loop,也就是事件循环机制,检查队列中是否有要处理的事件,这时要分两种情况:如果是非 I/O 任务,就亲自处理,并通过回调函数返回到上层调用;如果是 I/O 任务,就从 线程池
中拿出一个线程来处理这个事件,并指定回调函数,当线程中的 I/O 任务完成以后,就执行指定的回调函数,并把这个完成的事件放到事件队列的尾部,线程归还给线程池,等待事件循环。当主线程再次循环到该事件时,就直接处理并返回给上层调用。 这个过程就叫 事件循环 (Event Loop)
。
4、期间,主线程不断的检查事件队列中是否有未执行的事件,直到事件队列中所有事件都执行完了,此后每当有新的事件加入到事件队列中,都会通知主线程按顺序取出交 Event Loop 处理。
优缺点
Nodejs 的优点:I/O 密集型处理是 Nodejs 的强项,因为 Nodejs 的 I/O 请求都是异步的(如:sql 查询请求、文件流操作操作请求、http 请求...)
Nodejs 的缺点:不擅长 cpu 密集型的操作(复杂的运算、图片的操作)
总结
1、Nodejs 与操作系统交互,我们在 JavaScript 中调用的方法,最终都会通过 process.binding 传递到 C/C++ 层面,最终由他们来执行真正的操作。Node.js 即这样与操作系统进行互动。
2、Nodejs 所谓的单线程,只是主线程是单线程,所有的网络请求或者异步任务都交给了内部的线程池去实现,本身只负责不断的往返调度,由事件循环不断驱动事件执行。
3、Nodejs 之所以单线程可以处理高并发的原因,得益于 libuv 层的事件循环机制,和底层线程池实现。
4、Event loop 就是主线程从主线程的事件队列里面不停循环的读取事件,驱动了所有的异步回调函数的执行,Event loop 总共 7 个阶段,每个阶段都有一个任务队列,当所有阶段被顺序执行一次后,event loop 完成了一个 tick。
串联同步执行并发请求
就像上面说的:Node.js 在主线程里维护了一个"事件队列"(Event queue),当用户的网络请求或者其它的异步操作到来时,Node 都会把它放到 Event Queue之中,此时并不会立即执行它,代码也不会被阻塞,继续往下走,直到主线程代码执行完毕。
所以要串联同步执行并发请求的关键在于维护一个队列,队列的特点是 先进先出
,按队列里面的顺序执行就可以达到串联同步执行并发请求的目的。
方案
- 根据每个请求的 uniqueId 变量作为唯一令牌
- 队列里面维护一个结果数组和一个执行队列,把执行队列完成的
令牌与结果
存储在结果数组里面 - 根据唯一令牌,一直去获取执行完成的结果,间隔 200 毫秒,超时等待时间为 10 分钟
- 一直等待并获取结果,等待到有结果时,才返回给请求;并根据令牌把结果数组里面相应的项删除
队列
代码:
class Recorder { private list: any[]; private queueList: any[]; private intervalTimer; constructor() { this.list = []; this.queueList = []; this.intervalTimer = null; } // 根据 id 获取任务结果 public get(id: string) { let data; console.log('this.list: ', this.list); let index; for (let i = 0; i < this.list.length; i++) { const item = this.list[i]; if (id === item.id) { data = item.data; index = i; break; } } // 删除获取到结果的项 if (index !== undefined) { this.list.splice(index, 1); } return data; } public clear() { this.list = []; this.queueList = []; } // 添加项 public async addQueue(item: any) { this.queueList.push(item); } public async runQueue() { clearInterval(this.intervalTimer); if (!this.queueList.length) { // console.log('队列执行完毕'); return; } // 取出队列里面的最后一项 const item = this.queueList.shift(); console.log('item: ', item); // 执行队列的回调 const data = await item.callback(); console.log('回调执行完成: ', data); // 把结果放进 结果数组 this.list.push({ id: item.id, data }); } public interval() { clearInterval(this.intervalTimer); this.intervalTimer = setInterval(async () => { clearInterval(this.intervalTimer); // 一直执行里面的任务 await this.runQueue(); this.interval(); }, 200); } } const recorder = new Recorder(); recorder.interval(); export default recorder;
服务
下面模拟一个请求端口的的 Node 服务。
代码:
const Koa = require('koa') const Router = require('koa-router') const cuid = require('cuid'); const bodyParser = require('koa-bodyparser') import recorder from "./libs/recorder"; const MAX_WAITING_TIME = 60 * 5; // 最大等待时长 // web服务端口 const SERVER_PORT: number = 3000; const app = new Koa(); app.use(bodyParser()); const router = new Router(); /** * 程序睡眠 * @param time 毫秒 */ const timeSleep = (time: number) => { return new Promise((resolve) => { setTimeout(() => { resolve(""); }, time); }); }; /** * 程序睡眠 * @param second 秒 */ const sleep = (second: number) => { return timeSleep(second * 1000); }; router.post("/getPort", async (ctx, next) => { const { num } = ctx.request.body; const uniqueId = cuid(); console.log('uniqueId: ', uniqueId); recorder.addQueue({ id: uniqueId, callback: getPortFun(num) }); let waitTime = 0; while (!ctx.body) { await sleep(0.2); console.log('1'); const data: any = recorder.get(uniqueId); if (data) { ctx.body = { code: 0, data: data, msg: 'success' }; } waitTime++; // 超过最大时间就返回一个结果 if (waitTime > MAX_WAITING_TIME) { ctx.body = {}; } } }); // 返回一个函数 function getPortFun(num) { return () => { return new Promise((resolve) => { // 模拟异步程序 setTimeout(() => { console.log(`num${num}: `, num); resolve(num * num); }, num * 1000); }); }; } app.use(router.routes()).use(router.allowedMethods()); app.listen(SERVER_PORT);