Node.js Stream流
Stream 流是什么?
流的概念
- 流是一组有序的,有起点和终点的字节数据传输手段
- 它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
- 流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP 服务器request和response对象都是流。
流(Stream)在 NodeJs 中是个十分基础的概念,很多基础模块都是基于流实现的,扮演着十分重要的角色。同时流也是是一个十分难以理解的概念,这主要是相关的文档比较缺少,对于 NodeJs 初学者来说,理解流往往需要花很多时间理解。文章中的代码地址:github.com/Maricaya/no…
图解 Stream
stream 就像是水流,但默认是没有水的。 stream.write 可以让水流中有水,也就是写入数据。
左上角是产生数据的一段,称为 source(源头)。 最下面是得到数据的一段 sink(水池)。 从上向下流动的小圆点是每次写的小数据,称为 chunk(块)。
Stream 实例
用 Stream 创建大文件
我们首先创建一个可写流,向文件里多次写入内容。 最后记得关闭流,得到一个大文件。
// 引入文件模块 const fs = require('fs'); const stream = fs.createWriteStream('./../big_file.txt'); for (let i = 0; i < 1000000; i++) { stream.write(`这是第${i}行内容\n`); } stream.end() console.log('done')
使用 readFile
我们先使用 fs.readFile 来读取文件内容,看看会发生什么。
const fs = require('fs') const http = require('http') const server = http.createServer() server.on('request', (request, response) => { fs.readFile('./../big_file.txt', (error, data) => { if (error) throw error response.end(data) console.log('done') }) }) server.listen(8889) console.log(8889)
当我们访问 http://localhost:8889
的时候,服务器会异步读取这个大文件。
看起来一切正常,没什么毛病。
但是,我们用任务管理器查看 Node.js 的内存,大概占用 130Mb。
服务器接收 1 次请求,占用 130 Mb;那如果接受 10 次请求,就是占用 1G。 对服务器的内存消耗是很大的。
怎么解决这个问题呢?使用 Stream。
使用 Stream
我们来试试用 Stream 改写上面的例子。
创建一个可读流 createReadStream
, 再把文件 stream
和 response stream
通过管道 pipe
相连。
const fs = require('fs') const http = require('http') const server = http.createServer() server.on('request', (request, response) => { const stream = fs.createReadStream('./big_file.txt') stream.pipe(response) }) server.listen(8888)
我们再次查看 node.js 内存占用,基本不会高于 30Mb。
因为每次只传递一小段数据,不会占用很大内存。
管道 pipe
两个流可以用一个管道相连,stream1 的末尾连接上 stream2 的开端。
只要 stream1 有数据,就会流到 stream2。
比如上面的代码:
const stream = fs.createReadStream('./big_file.txt') stream.pipe(response)
stream
就是一个文件流,下面的 stream 就是我们的 http 流 response。 本来这两个流是没有关系的,现在我们想把文件流的数据传递给 http 流。 很简单,用 pipe 连接就行啦。
常用代码
stream1.pipe(stream2)
- stream1 是发出数据的流,一个可读流。
- stream2 是写入数据的流,一个可写流。
链式操作
一个水流可以经过无限个管道,数据流也一样。
有两种写法:
a.pipe(b).pipe(c) // 等价于 a.pipe(b) b.pipe(c)
管道原理
管道也可以认为是两个事件的封装
- 监听 data 事件,stream1 一有数据就塞给 stream2
- 监听 end 事件,当 stream1 停了,就停掉 stream2
stream1.on('data', (chunk) => { stream2.write(chunk) }) stream1.on('end', () => { stream2.end() })
Node.js 中有四种基本的流类型
一共分为四类
名称 | 特点 |
Readable | 可读 |
Writable | 可写 |
Duplex | 可读可写(双向) |
Transform | 可读可写(变化) |
了解 Stream 的原型链,可以更容易地记忆 Stream 的 API。
- Readable - 可读的流 (例如 fs.createReadStream())。
- Writable - 可写的流 (例如 fs.createWriteStream()).
- Duplex - 可读写的流(双工流) (例如 net.Socket).
- Transform - 转换流 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate())
fs.createReadStream(path)
如果 s = fs.createReadStream(path)
,那么s
的对象层级为:
- 自身的属性,由
fs.ReadStream
构造函数构造的 - 原型:
stream.Readable.prototype
- 二级原型:
stream.Stream.prototype
- 三级原型:
events.EventEmitter.prototype
这是所有 Stream 都继承的原型 - 四级原型:
Object.prototype
也就是所有对象都继承的原型
Stream 支持的事件和方法
了解了原型链之,我们来看看 Stream 支持的事件和方法。
大概有个印象就好,用到的时候再去查。
Readable Stream | Writable Stream | |
事件 | data, end, error,close,readable | drain(这一次写完),finish(整个写完),error,close,pipe,unpipe |
方法 | pipe() unpipe() read()... | write() destroy() ... |
Stream 分类
一共分为四类
名称 | 特点 |
Readable | 可读 |
Writable | 可写 |
Duplex | 可读可写(双向) |
Transform | 可读可写(变化) |
Readable、Writable 都是单向的,其他两个是双向的。
可读、可写好理解,剩下两个有什么区别呢?
Duplex 可以读写,但是读的内容和写的内容是相互独立的,没有交叉。
而 Transform 是自己写自己读。
比如一个 babel,把es6转换为,我们在左边写入 es6,从右边读取 es5。 就像洗车一样,黑车进去,白车出来。
Readable Stream 可读流
静止态 paused 和流动态 flowing
可读流有两个状态 paused 和 flowing。
可以把可读流看成内容生产者,不发内容时就是静止态,内容恢复发送时就是流动态。
- 可读流默认处于 paused 态。
- 一旦添加 data 事件监听,它就变为 flowing 态。
- 删掉 data 事件监听,paused 态。
- pause() 可以将它变为 paused。
- resume() 可以将它变为 flowing。
const http = require('http') const fs = require('fs') const server = http.createServer() server,on('request', (request, response) => { // 默认处于 paused 态 const stream = fs.createReadStream('./big_file.txt') stream.pipe(response) stream.pause(); // 暂停 setTimeout(() => { // 恢复 stream.resume() }, 3000) }) server.listen(8888);
Writable Stream
drain 流干了事件
表示可以加点水了,也就是可以继续写入数据了。 我们调用 stream.write(chunk)
的时候,可能会得到 false。
false 的意思是你写太快了,积压数据。
这个时候我们就不能再 write 了,要监听 drain。
等 drain 事件触发了,我们才能继续 write。
光看这些有点难以理解,可以看看 官网上的例子:
const fs = require('fs'); // 将 data 写入文件 1000000 次 function writeOneMillionTimes(writer, data) { let i = 1000000; write(); function write() { let ok = true; do { i--; if (i === 0) { // 最后一次写入 writer.write(data); } else { // 在这里判断是不是可以继续写 // ok 为 false 的意思是你写太快了,数据积压 ok = writer.write(data); if (ok === false) { console.log('不能再写了') } } } while (i > 0 && ok); if (i > 0) { // 干涸了,可以继续写入 writer.once('drain', () => { console.log('干涸了') write() }); } } } const write = fs.createWriteStream('./../big_file.txt') writeOneMillionTimes(write, 'hello world')
finish 事件
在调用 stream.end()
之后,而且缓冲区数据都已经传给底层系统之后,触发 finish 事件。
我们往文件中写入数据时,不是直接存入硬盘中,而是先放入缓冲区。 当数据到达一定大小后,才会写入硬盘。
创建自己的流
下面我们来看看,如何创建自己的流。
按照流的四个类型依次讲解。
Writable
const {Writable} = require('stream') const outStream = new Writable({ // 如果别人调用,我们做什么 write(chunk, encoding, callback) { console.log(chunk.toString()) // 进入下一个流程 callback() } }) process.stdin.pipe(outStream);
保存文件为 writable.js 然后用 node 运行。 不管你输入什么,都会得到相同的结果。
Readable
一次读取所有数据
const {Readable} = require('stream') const inStream = new Readable() inStream.push('hello world') // 写入数据 inStream.push('hello node') inStream.push(null) // 没有数据了 // 将这个可读流,导入到可写流 process.stdout。 inStream.pipe(process.stdout)
先把所有数据都 push 进 inStream,再使用管道导入到可写流 process.stdout
中。
这样,当我们用 node 运行文件时,我们可以从 inStream 中读取所有数据,并且打印出来。
这种方式非常简单,但是并不高效。
更好的方式是按需 push,当用户需要的时候,我们才读取数据。
调用 read 才会给一次数据
这种写法数据是按需供给的,对方调用 read
,我们才会给一次数据。
例如下面这个例子,我们一次 push
一个字符,从字符码 65(代表 A) 开始。
当用户读取时,会持续触发 read
,我们会 push
更多地字符。
当字符全部 push
完毕,我们 push null
,停止 Stream。
const {Readable} = require('stream') const inStream = new Readable({ read(size) { const char = String.fromCharCode(this.currentCharCode++) this.push(char); console.log(`推了${char}`) // 这个时候停止 if (this.currentCharCode > 90) { // Z this.push(null) } } }) inStream.currentCharCode = 65 // A inStream.pipe(process.stdout)
Duplex Stream
看完了可读流、可写流,Duplex Stream
就简单多了。
同时实现 write
和 read
方法就好啦。
const {Duplex} = require('stream') const inoutStream = new Duplex({ write(chunk, encoding, callback) { console.log(chunk.toString()) callback() }, read(size) { this.push(String.fromCharCode(this.currentCharCode++)) if (this.currentCharCode > 90) { this.pull(null) } } }) inoutStream.currentCharCode = 65; process.stdin.pipe(inoutStream).pipe(process.stdout);
Transform Stream
对于 Transform Stream,我们实现 transform 方法就好了,它结合了可读、可写两个方法。
这里有一个简单的 transform 例子,会以大写的格式打印任何你键入的字符:
const {Transform} = require('stream') const upperCaseTr = new Transform({ transform(chunk, encoding, callback) { // 1. 读数据 chunk.toString() // 2. 写数据 this.push(xxx) this.push(chunk.toString().toUpperCase()) callback(); } }) // 监听用户输入,调用 upperCaseTr // 转化完成后,输出 process.stdin.pipe(upperCaseTr) .pipe(process.stdout)
Node.js 内置的 Transform Stream
比如面试中经常说的优化方案:gzip 压缩。
在 Node.js 中用 4 行代码就可以实现
const fs = require('fs') const zlib = require('zlib') const file = process.argv[2] fs.createReadStream(file) .pipe(zlib.createGzip()) .on('data', () => process.stdout.write(".")) // 打出进度条 .pipe(fs.createWriteStream(file + ".gz"))
Stream 在 Node.js 中随处可见
Readable Stream | Writeable Stream |
HTTP Response 客户端 | HTTP Request 客户端 |
HTTP Request 服务端 | HTTP Response 服务端 |
fs read stream | fs write stream |
zlib stream | zlib stream |
TCP sockets | TCP sockets |
child process stdout & stderr | child process stdin |
process.stdin | process.stdout,process.stderr |
... | ... |
数据积压问题
在 Stream 中,还有一个非常重要的问题:数据积压。
如果数据太多,堵住了,怎么解决。
Node.js 官网有一篇专门的文章解释如何解决,大家碰到问题的时候可以翻出来看看。
我在这里就不赘述了,地址在这里