原文标题:Understanding Streams in Node.js
原文链接:https://nodesource.com/blog/understanding-streams-in-nodejs
什么是流
流是Node.js中的一个基本概念。流是一种数据处理方法,用于按顺序将输入读写到输出中。
流是一种高效处理读/写文件、网络通信或任何端到端信息交换的方法。
流的特别之处在于,它不是向传统方式那样把文件一次性读取到内存中,而是逐段读取数据块,在不将其全部保存在内存中的情况下,处理文件的内容。
这使得流在处理大量数据时非常强大,例如,文件大小可能大于可用内存空间,因此无法将整个文件读入内存以进行处理。这个时候,流就派上用场了。
使用流处理较小的数据块,可以读取较大的文件。
让我们以一个“流”服务为例,比如YouTube或Netflix:这些服务不会让你一次性下载视频和音频。你的浏览器会以流的方式连续不断的接受视频、音频数据,让你机会可以立即收看视频,收听音频。
然而,流不仅仅可以用于处理媒体或大数据。它们还赋予我们代码中“可组合性”的能力。考虑到可组合性的设计意味着可以以某种方式组合多个组件以产生相同类型的结果。在Node.js中,可以通过使用流在其他较小的代码片段之间传送数据来合成功能强大的代码片段。
流的优势
和其他数据处理方式相比,流有两个主要的优势:
- 内存效率:在处理数据之前,不需要在内存中加载大量数据
- 时间效率:一有数据,就可以开始处理数据了,而不用等到整个数据传输完成后才开始处理。
Node.js中有四种流
- Writable: 可以写入数据的流。例如,
fs.createWriteStream()
允许我们使用流将数据写入文件。 - Readable: 可以从中读取数据的流。例如:
fs.createReadStream()
允许我们读取文件的内容。 - Duplex: 可读写的流。例如,
net.Socket
。 - Transform: 可以在数据写入和读取时修改或转换数据的流。例如,在文件压缩的实例中,可以在文件中写入压缩数据和读取解压缩数据。
如果你在工作中已经用到了Node.js,那么你一定使用过流。例如,在基于Node.js的HTTP服务器中,request
是可读流,response
是可写流。你可能已经使用了fs模块,它允许你处理可读和可写的文件流。当你使用Express
的时候,你都在使用流与客户端进行交互。另外,由于TCP套接字、TLS堆栈和其他连接都基于Node.js流的,每个数据库连接驱动程序中都在使用流。
实例
如何创建可读流
首先引入可读流,然后初始化。
const Stream = require('stream')const readableStream = new Stream.Readable()
流已经初始化了,我们往流中写入数据
readableStream.push('ping!')readableStream.push('pong!')
async iterator
强烈建议在操作流的时候使用async iterator. 根据Axel Rauschmayer的说法,async iterator是一种异步检索数据容器内容的协议(这意味着当前的“任务”可能在检索项目之前暂停)。另外,必须提到流异步迭代器实现在内部使用的是“可读”事件。
从可读流中读取数据时,你可以使用async iterator:
import * as fs from 'fs'; async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk); }} const readable = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'});logChunks(readable); // Output:// 'This is a test!\n'
也可以在字符串中收集可读流的内容:
import {Readable} from 'stream'; async function readableToString2(readable) { let result = ''; for await (const chunk of readable) { result += chunk; } return result;} const readable = Readable.from('Good morning!', {encoding: 'utf8'});assert.equal(await readableToString2(readable), 'Good morning!');
记住不要将异步函数与EventEmitter混合使用,因为目前无法在事件处理函数中捕获到rejection,从而导致难以跟踪的错误和内存泄漏。当前的最佳实践是始终将异步函数的内容包装在try/catch块中并处理错误,但这很容易出错。这个问题的解决方案,可以看下这个pull request:https://github.com/nodejs/node/pull/27867
了解更多关于Node.js stream 和 async iteration,可以参考这篇文章:
https://2ality.com/2019/11/nodejs-streams-async-iteration.html
Readable.from(): 通过迭代器创建可读流
Readable.from()用于从迭代器中创建可独流。迭代器可以是异步的或者同步的。参数选项是可选的,可以用于指定文本编码。
const { Readable } = require('stream'); async function * generate() { yield 'hello'; yield 'streams';} const readable = Readable.from(generate()); readable.on('data', (chunk) => { console.log(chunk);});
两种读取模式
根据Streams API,可读流可以在两种模式中有效地运行:流动和暂停。可读流可以处于对象模式,也可以不处于流动模式或暂停模式。
- 在流动模式下,数据从底层系统自动读取,并通过EventEmitter接口使用事件提供给应用程序。
- 在暂停模式下,必须显式调用stream.read()方法才能从流中读取数据块。
在流动模式下,要从流中读取数据,可以监听数据事件并添加一个回调函数。当数据块可用时,可读流发出一个data事件并执行回调。请看下面的代码片段:
var fs = require("fs");var data = ''; var readerStream = fs.createReadStream('file.txt'); //Create a readable stream readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. // Handle stream events --> data, end, and errorreaderStream.on('data', function(chunk) { data += chunk;}); readerStream.on('end',function() { console.log(data);}); readerStream.on('error', function(err) { console.log(err.stack);}); console.log("Program Ended");
函数调用fs.createReadStream()提供了一个可读的流。最初,流处于静态。只要监听data事件并附加回调函数,它就会开始流动。之后,数据块被读取并传递给回调函数。流实现者决定data事件的发出频率。例如,每读取几KBs的数据,HTTP请求就可能发出一个data事件。从文件中读取数据时,可以决定在读取一行后发出data事件。
当没有要读取的数据时(到达end),流会发出end事件。在上面的代码片段中,我们侦听此事件以在到达结尾时获得通知。
另外,如果有错误发生,流会触发一个error事件。
在暂停模式下,只需重复调用流实例上的read(),直到读取了每个数据块,如下例所示:
var fs = require('fs');var readableStream = fs.createReadStream('file.txt');var data = '';var chunk; readableStream.on('readable', function() { while ((chunk=readableStream.read()) != null) { data += chunk; }}); readableStream.on('end', function() { console.log(data)});
read函数的作用是:从内部缓冲区中读取一些数据并返回。当没有要读取的内容时,返回空值。所以,在while循环中,我们检查null并终止循环。注意,当可以从流中读取数据块时,会发出readable事件。
所有可读流都以暂停模式开始,但可以通过以下方式之一切换到流动模式:
- 添加data事件的回调函数
- 调用
stream.resume()
方法。 - 调用
stream.pipe()
方法将数据发送到可写文件。
可读流能够通过以下方式,重新切换到暂停模式:
- 如果没有管道目标,调用
stream.pause()
方法。 - 如果有管道目的地,则删除所有管道目的地。可以通过调用
stream.unpipe()
方法删除多个管道目标。
要记住的重要概念是,在提供使用或忽略该数据的机制之前,可读流不会生成数据。如果消费机制被禁用或取消,可读流将尝试停止生成数据。监听readable事件会自动使流停止流动,并通过readable.read()消耗数据。如果删除了readable事件回调函数,并且如果存在data事件回调函数,则流将重新开始流动。
如何创建可写流
要将数据写入可写流,需要对流实例调用write()。如下例所示:
var fs = require('fs');var readableStream = fs.createReadStream('file1.txt');var writableStream = fs.createWriteStream('file2.txt'); readableStream.setEncoding('utf8'); readableStream.on('data', function(chunk) { writableStream.write(chunk);});
上面的代码很简单。它只是从输入流读取数据块,然后使用write()写入目标。此函数返回一个布尔值,表示操作是否成功。如果为true,则写入成功,您可以继续写入更多数据。如果返回false,则表示发生了错误,你目前无法写入任何内容。
调用writable.end()方法表示不再向可写文件写入数据。该方法有一个可选参数,可以传入finish
事件的回调函数。
// Write 'hello, ' and then end with 'world!'.const fs = require('fs');const file = fs.createWriteStream('example.txt');file.write('hello, ');file.end('world!');// Writing more now is not allowed!
使用可写流可以从可读流中读取数据:
const Stream = require('stream') const readableStream = new Stream.Readable()const writableStream = new Stream.Writable() writableStream._write = (chunk, encoding, next) => { console.log(chunk.toString()) next()} readableStream.pipe(writableStream) readableStream.push('ping!')readableStream.push('pong!') writableStream.end()
你还可以使用async iterators写入可写流。
import * as util from 'util';import * as stream from 'stream';import * as fs from 'fs';import {once} from 'events'; const finished = util.promisify(stream.finished); // (A) async function writeIterableToFile(iterable, filePath) { const writable = fs.createWriteStream(filePath, {encoding: 'utf8'}); for await (const chunk of iterable) { if (!writable.write(chunk)) { // (B) // Handle backpressure await once(writable, 'drain'); } } writable.end(); // (C) // Wait until done. Throws if there are errors. await finished(writable);} await writeIterableToFile( ['One', ' line of text.\n'], 'tmp/log.txt');assert.equal( fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}), 'One line of text.\n');
stream.finished()
默认是回调函数形式的,可以通过util.promisify变成基于Promise的。(第A行)
在这个例子中,通过以下两种模式使用可写流:
在处理Backpressure写入可写流(第B行):
译者注:在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现
if (!writable.write(chunk)) { await once(writable, 'drain');}
关闭可写流,等待写入完成(第C行)。
writable.end();await finished(writable);
pipeline()
管道是一种机制,我们将一个流的输出作为另一个流的输入。它通常用于从一个流获取数据并将该流的输出传递给另一个流。管道作业没有限制。换言之,管道用于分多个步骤处理流式数据。
Node.js 10.x中引入了stream.pipeline()。这是一个模块方法,用于在流之间进行管道传输,并在管道完成时正确清理和提供回调。
下面是使用pipeline的一个例子:
const { pipeline } = require('stream');const fs = require('fs');const zlib = require('zlib'); // Use the pipeline API to easily pipe a series of streams// together and get notified when the pipeline is fully done.// A pipeline to gzip a potentially huge video file efficiently: pipeline( fs.createReadStream('The.Matrix.1080p.mkv'), zlib.createGzip(), fs.createWriteStream('The.Matrix.1080p.mkv.gz'), (err) => { if (err) { console.error('Pipeline failed', err); } else { console.log('Pipeline succeeded'); } });
应该使用pipeline
而不是pipe
,因为pipe
不安全。
stream模块
Node.js的stream模块为所有流API构建提供了基础。
Stream模块是Node.js中默认提供的内置模块。流是EventEmitter类的一个实例,该类在节点中异步处理事件。因此,流本质上是基于事件的。
stream模块对于创建新类型的流实例非常有用。通常不需要使用流模块来使用流。
stream驱动的Node.js Api
由于流的优势,许多Node.js核心模块都提供了本地流处理功能,最显著的是:
net.Socket
是stream所基于的主要node api,它是以下大多数api的基础process.stdin
返回连接到stdin的流。process.stdout
返回连接到stdout的流。process.stderr
返回连接到stderr的流。fs.createReadStream()
创建文件的可读流。fs.createWriteStream()
创建文件的可写流。net.connect()
启动一个基于流的连接。http.request()
返回一个http.ClientRequest类的一个实例,该类是一个可写流。zlib.createGzip()
使用gzip将数据压缩到流中。zlib.createGunzip()
解压gzip流。zlib.createDeflate()
使用deflate压缩算法将数据压缩到流中。zlib.createInflate()
解压deflate流。
结束