九、原生 Node.js 流
- 9.1 总结:异步迭代和异步生成器
- 9.2 流
- 9.2.1 管道
- 9.2.2 文本编码
- 9.2.3 辅助函数:
readableToString()
- 9.2.4 一些初步说明
- 9.3 可读流
- 9.3.1 创建可读流
- 9.3.2 通过
for-await-of
从可读流中读取块 - 9.3.3 通过模块
'node:readlines'
从可读流中读取行
- 9.4 通过异步生成器转换可读流(ch_nodejs-streams.html#transforming-Readable-via-async-generator)
- 9.4.1 从异步可迭代对象中的块转换为编号行
- 9.5 可写流
- 9.5.1 为文件创建可写流
- 9.5.2 写入可写流
- 9.6 快速参考:与流相关的功能(ch_nodejs-streams.html#quick-reference-stream-related-functionality)
- 9.7 进一步阅读和本章的来源
本章是对 Node 的原生流的介绍。它们支持异步迭代,这使它们更容易使用,这也是我们在本章中主要使用的。
请注意,跨平台的web 流在§10“在 Node.js 上使用 web 流”中有所涵盖。我们在本书中主要使用这些。因此,如果您愿意,可以跳过当前章节。
9.1 总结:异步迭代和异步生成器
异步迭代是一种异步检索数据容器内容的协议(意味着当前的“任务”在检索项目之前可能会暂停)。
异步生成器有助于异步迭代。例如,这是一个异步生成器函数:
/** * @returns an asynchronous iterable */ async function* asyncGenerator(asyncIterable) { for await (const item of asyncIterable) { // input if (···) { yield '> ' + item; // output } } }
for-await-of
循环遍历输入的asyncIterable
。这个循环也适用于普通的异步函数。yield
将值提供给此生成器返回的异步可迭代对象。
在本章的其余部分,请仔细注意函数是异步函数还是异步生成器函数:
/** @returns a Promise */ async function asyncFunction() { /*···*/ } /** @returns an async iterable */ async function* asyncGeneratorFunction() { /*···*/ }
9.2 流
流是一种模式,其核心思想是“分而治之”大量数据:如果我们将其分割成较小的部分并一次处理一部分,我们就可以处理它。
Node.js 支持几种流,例如:
- 可读流是我们可以从中读取数据的流。换句话说,它们是数据的来源。一个例子是可读文件流,它允许我们读取文件的内容。
- 可写流是我们可以写入数据的流。换句话说,它们是数据的接收端。一个例子是可写文件流,它允许我们向文件写入数据。
- 转换流既可读又可写。作为可写流,它接收数据块,转换(更改或丢弃)它们,然后将它们作为可读流输出。
9.2.1 管道
为了在多个步骤中处理流数据,我们可以管道(连接)流:
- 输入通过可读流接收。
- 每个处理步骤都是通过转换流执行的。
- 对于最后的处理步骤,我们有两个选项:
- 我们可以将最近的可读流中的数据写入可写流。也就是说,可写流是我们管道的最后一个元素。
- 我们可以以其他方式处理最近的可读流中的数据。
部分(2)是可选的。
9.2.2 文本编码
创建文本流时,最好始终指定编码:
- Node.js 文档中有支持的编码及其默认拼写的列表- 例如:
- ‘utf8’
- ‘utf16le’
- ‘base64’
- 也允许一些不同的拼写。您可以使用
Buffer.isEncoding()
来检查哪些是:
> buffer.Buffer.isEncoding('utf8') true > buffer.Buffer.isEncoding('utf-8') true > buffer.Buffer.isEncoding('UTF-8') true > buffer.Buffer.isEncoding('UTF:8') false
编码的默认值是null
,等同于'utf8'
。
9.2.3 辅助函数:readableToString()
我们偶尔会使用以下辅助函数。您不需要理解它的工作原理,只需(大致)了解它的作用。
import * as stream from 'stream'; /** * Reads all the text in a readable stream and returns it as a string, * via a Promise. * @param {stream.Readable} readable */ function readableToString(readable) { return new Promise((resolve, reject) => { let data = ''; readable.on('data', function (chunk) { data += chunk; }); readable.on('end', function () { resolve(data); }); readable.on('error', function (err) { reject(err); }); }); }
此函数是通过基于事件的 API 实现的。稍后我们将看到一个更简单的方法 - 通过异步迭代。
9.2.4 一些初步说明
- 在本章中,我们只会使用文本流。
- 在示例中,我们偶尔会遇到
await
被用于顶层。在这种情况下,我们假设我们在模块内或在异步函数的主体内。 - 每当有换行符时,我们都支持:
- Unix:
'\n'
(LF) - Windows:
'\r\n'
(CR LF)当前平台的换行符可以通过模块os
中的常量EOL
访问。
9.3 可读流
9.3.1 创建可读流
9.3.1.1 从文件创建可读流
我们可以使用fs.createReadStream()
来创建可读流:
import * as fs from 'fs'; const readableStream = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'}); assert.equal( await readableToString(readableStream), 'This is a test!\n');
9.3.1.2 Readable.from()
: 从可迭代对象创建可读流
静态方法Readable.from(iterable, options?)
创建一个可读流,其中包含iterable
中包含的数据。iterable
可以是同步可迭代对象或异步可迭代对象。参数options
是可选的,可以用于指定文本编码等其他内容。
import * as stream from 'stream'; function* gen() { yield 'One line\n'; yield 'Another line\n'; } const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'}); assert.equal( await readableToString(readableStream), 'One line\nAnother line\n');
9.3.1.2.1 从字符串创建可读流
Readable.from()
接受任何可迭代对象,因此也可以用于将字符串转换为流:
import {Readable} from 'stream'; const str = 'Some text!'; const readable = Readable.from(str, {encoding: 'utf8'}); assert.equal( await readableToString(readable), 'Some text!');
目前,Readable.from()
将字符串视为任何其他可迭代对象,因此会迭代其代码点。从性能上讲,这并不理想,但对于大多数用例来说应该是可以的。我期望Readable.from()
经常与字符串一起使用,所以也许将来会有优化。
9.3.2 通过for-await-of
从可读流中读取块
每个可读流都是异步可迭代的,这意味着我们可以使用for-await-of
循环来读取其内容:
import * as fs from 'fs'; async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk); } } const readable = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'}); logChunks(readable); // Output: // 'This is a test!\n'
9.3.2.1 在字符串中收集可读流的内容
以下函数是本章开头所见函数的简化重新实现。
import {Readable} from 'stream'; async function readableToString2(readable) { let result = ''; for await (const chunk of readable) { result += chunk; } return result; } const readable = Readable.from('Good morning!', {encoding: 'utf8'}); assert.equal(await readableToString2(readable), 'Good morning!');
请注意,在这种情况下,我们必须使用异步函数,因为我们想要返回一个 Promise。
9.3.3 通过模块'node:readlines'
从可读流中读取行
内置模块'node:readline'
让我们可以从可读流中读取行:
import * as fs from 'node:fs'; import * as readline from 'node:readline/promises'; const filePath = process.argv[2]; // first command line argument const rl = readline.createInterface({ input: fs.createReadStream(filePath, {encoding: 'utf-8'}), }); for await (const line of rl) { console.log('>', line); } rl.close();
9.4 通过异步生成器转换可读流
异步迭代提供了一个优雅的替代方案,用于在多个步骤中处理流式数据的转换流:
- 输入是一个可读流。
- 第一个转换是通过一个异步生成器执行的,该生成器遍历可读流并在适当时产生。
- 可选地,我们可以通过使用更多的异步生成器来进一步转换。
- 最后,我们有几种处理最后一个生成器返回的异步可迭代对象的选项:
- 我们可以通过
Readable.from()
将其转换为可读流(稍后可以传输到可写流)。 - 我们可以使用异步函数来处理它。
- 等等。
总之,这些是这样的处理管道的组成部分:
可读的
→ 第一个异步生成器 [→ … → 最后一个异步生成器]
→ 可读或异步函数
9.4.1 从块到异步可迭代对象中的编号行
在下一个示例中,我们将看到一个刚刚解释过的处理管道的示例。
import {Readable} from 'stream'; /** * @param chunkIterable An asynchronous or synchronous iterable * over “chunks” (arbitrary strings) * @returns An asynchronous iterable over “lines” * (strings with at most one newline that always appears at the end) */ async function* chunksToLines(chunkIterable) { let previous = ''; for await (const chunk of chunkIterable) { let startSearch = previous.length; previous += chunk; while (true) { // Works for EOL === '\n' and EOL === '\r\n' const eolIndex = previous.indexOf('\n', startSearch); if (eolIndex < 0) break; // Line includes the EOL const line = previous.slice(0, eolIndex+1); yield line; previous = previous.slice(eolIndex+1); startSearch = 0; } } if (previous.length > 0) { yield previous; } } async function* numberLines(lineIterable) { let lineNumber = 1; for await (const line of lineIterable) { yield lineNumber + ' ' + line; lineNumber++; } } async function logLines(lineIterable) { for await (const line of lineIterable) { console.log(line); } } const chunks = Readable.from( 'Text with\nmultiple\nlines.\n', {encoding: 'utf8'}); await logLines(numberLines(chunksToLines(chunks))); // (A) // Output: // '1 Text with\n' // '2 multiple\n' // '3 lines.\n'
处理管道在 A 行设置。步骤是:
chunksToLines()
: 从具有块的异步可迭代对象转换为具有行的异步可迭代对象。numberLines()
: 从具有行的异步可迭代对象转换为具有编号行的异步可迭代对象。logLines()
: 记录异步可迭代对象中的项目。
观察:
chunksToLines()
和numberLines()
的输入和输出都是异步可迭代对象。这就是为什么它们是异步生成器(由async
和*
指示)。logLines()
的输入是异步可迭代对象。这就是为什么它是一个异步函数(由async
指示)。
9.5 可写流
9.5.1 创建文件的可写流
我们可以使用fs.createWriteStream()
来创建可写流:
const writableStream = fs.createWriteStream( 'tmp/log.txt', {encoding: 'utf8'});
9.5.2 向可写流写入数据
在本节中,我们将探讨向可写流写入数据的方法:
- 通过其方法
.write()
直接向可写流写入数据。 - 使用模块
stream
中的函数pipeline()
将可读流传输到可写流。
为了演示这些方法,我们使用它们来实现相同的函数writeIterableToFile()
。
可读流的.pipe()
方法也支持管道传输,但它有一个缺点,最好避免使用它。
9.5.2.1 writable.write(chunk)
在向流中写入数据时,有两种基于回调的机制可以帮助我们:
- 事件
'drain'
表示背压已经解除。 - 函数
finished()
在流:
- 不再可读或可写
- 已经遇到错误或过早关闭事件
在下一个示例中,我们将这些机制转换为 Promise,以便我们可以通过异步函数使用它们:
import * as util from 'util'; import * as stream from 'stream'; import * as fs from 'fs'; import {once} from 'events'; const finished = util.promisify(stream.finished); // (A) async function writeIterableToFile(iterable, filePath) { const writable = fs.createWriteStream(filePath, {encoding: 'utf8'}); for await (const chunk of iterable) { if (!writable.write(chunk)) { // (B) // Handle backpressure await once(writable, 'drain'); } } writable.end(); // (C) // Wait until done. Throws if there are errors. await finished(writable); } await writeIterableToFile( ['One', ' line of text.\n'], 'tmp/log.txt'); assert.equal( fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}), 'One line of text.\n');
stream.finished()
的默认版本是基于回调的,但可以通过util.promisify()
(A 行)转换为基于 Promise 的版本。
我们使用了以下两种模式:
- 在处理背压的情况下向可写流写入数据(B 行):
if (!writable.write(chunk)) { await once(writable, 'drain'); }
- 关闭可写流并等待写入完成(C 行):
writable.end(); await finished(writable);
9.5.2.2 通过stream.pipeline()
将可读流传输到可写流
在 A 行,我们使用stream.pipeline()
的 Promise 版本将可读流readable
传输到可写流writable
:
import * as stream from 'stream'; import * as fs from 'fs'; const pipeline = util.promisify(stream.pipeline); async function writeIterableToFile(iterable, filePath) { const readable = stream.Readable.from( iterable, {encoding: 'utf8'}); const writable = fs.createWriteStream(filePath); await pipeline(readable, writable); // (A) } await writeIterableToFile( ['One', ' line of text.\n'], 'tmp/log.txt'); // ···
9.5.2.3 不推荐:readable.pipe(destination)
可读的.pipe()
方法也支持管道传输,但有一个警告:如果可读流发出错误,则可写流不会自动关闭。pipeline()
没有这个警告。
9.6 快速参考:与流相关的功能
模块os
:
const EOL: string
(自 0.7.8 起)(https://nodejs.org/api/os.html#os_os_eol)
包含当前平台使用的行尾字符序列。
模块buffer
:
Buffer.isEncoding(encoding: string): boolean
(自 0.9.1 起)(https://nodejs.org/api/buffer.html#buffer_class_method_buffer_isencoding_encoding)如果encoding
正确命名了受支持的 Node.js 文本编码之一,则返回true
。支持的编码包括:
'utf8'
'utf16le'
'ascii'
'latin1
'base64'
'hex'
(每个字节表示为两个十六进制字符)
模块stream
:
Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any>
(自 10.0.0 起)(https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator)
可读流是异步可迭代的。例如,您可以在异步函数或异步生成器中使用for-await-of
循环来迭代它们。finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void>
(自 10.0.0 起)
当读取/写入完成或出现错误时,返回的 Promise 将被解决。
此 promisified 版本的创建方式如下:
const finished = util.promisify(stream.finished);
pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void>
(自 10.0.0 起)
流之间的管道。当管道完成或出现错误时,返回的 Promise 将被解决。
此 promisified 版本的创建方式如下:
const pipeline = util.promisify(stream.pipeline);
Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable
(自 12.3.0 起)
将可迭代对象转换为可读流。
interface ReadableOptions { highWaterMark?: number; encoding?: string; objectMode?: boolean; read?(this: Readable, size: number): void; destroy?(this: Readable, error: Error | null, callback: (error: Error | null) => void): void; autoDestroy?: boolean; }
- 这些选项与
Readable
构造函数的选项相同,并在此处有文档记录。
模块fs
:
createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream
(自 2.3.0 起)
创建可读流。还有更多选项可用。createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream
(自 2.3.0 起)
使用.flags
选项,您可以指定是要写入还是追加,以及文件存在或不存在时会发生什么。还有更多选项可用。
本节中的静态类型信息基于Definitely Typed。
9.7 进一步阅读和本章的来源
- Node.js 文档中的“流与异步生成器和异步迭代器兼容性”章节
- “JavaScript for impatient programmers”中的“异步函数”章节
- “JavaScript for impatient programmers”中的“异步迭代”章节
十、在 Node.js 上使用 web 流
原文:
exploringjs.com/nodejs-shell-scripting/ch_web-streams.html
译者:飞龙
- 10.1 什么是 web 流?
- 10.1.1 流的种类
- 10.1.2 管道链
- 10.1.3 背压
- 10.1.4 Node.js 中对 web 流的支持
- 10.2 从 ReadableStreams 读取
- 10.2.1 通过 Readers 消费 ReadableStreams
- 10.2.2 通过异步迭代消费 ReadableStreams
- 10.2.3 将 ReadableStreams 管道到 WritableStreams
- 10.3 通过包装将数据源转换为 ReadableStreams
- 10.3.1 实现底层源的第一个示例
- 10.3.2 使用 ReadableStream 包装推送源或拉取源
- 10.4 写入 WritableStreams
- 10.4.1 通过 Writers 写入 WritableStreams
- 10.4.2 管道到 WritableStreams
- 10.5 通过包装将数据汇转换为 WritableStreams
- 10.5.1 示例:跟踪 ReadableStream
- 10.5.2 示例:收集写入字符串的 WriteStream 块
- 10.6 使用 TransformStreams
- 10.6.1 标准 TransformStreams
- 10.7 实现自定义 TransformStreams
- 10.7.1 示例:将任意块的流转换为行流
- 10.7.2 提示:异步生成器也非常适合转换流
- 10.8 更深入地了解背压
- 10.8.1 信号背压
- 10.8.2 对背压的反应
- 10.9 字节流
- 10.9.1 可读字节流
- 10.9.2 示例:填充随机数据的无限可读字节流
- 10.9.3 示例:压缩可读字节流
- 10.9.4 示例:通过
fetch()
读取网页
- 10.10 Node.js 特定的辅助函数
- 10.11 进一步阅读
Web 流 是一种标准的 流,现在在所有主要的 web 平台上都得到支持:web 浏览器、Node.js 和 Deno。(流是一种从各种来源顺序读取和写入数据的抽象,例如文件、托管在服务器上的数据等。)
例如,全局函数 fetch()
(用于下载在线资源)异步返回一个具有 web 流属性 .body
的 Response。
本章涵盖了 Node.js 上的 web 流,但我们所学的大部分内容都适用于支持它们的所有 web 平台。
10.1 什么是网络流?
让我们首先概述一下网络流的一些基本知识。之后,我们将快速转移到示例。
流是一种用于访问数据的数据结构,例如:
- 文件
- 托管在 Web 服务器上的数据
- 等等。
它们的两个好处是:
- 我们可以处理大量数据,因为流允许我们将它们分割成较小的片段(所谓的chunks),我们可以一次处理一个。
- 我们可以在处理不同数据时使用相同的数据结构,流。这样可以更容易地重用代码。
Web streams(“web”通常被省略)是一个相对较新的标准,起源于 Web 浏览器,但现在也受到 Node.js 和 Deno 的支持(如此MDN 兼容性表所示)。
在网络流中,chunks 通常是:
- 文本流:字符串
- 二进制流:Uint8Arrays(一种 TypedArray)
10.1.1 流的种类
有三种主要类型的网络流:
- 一个 ReadableStream 用于从source读取数据。执行此操作的代码称为consumer。
- 一个 WritableStream 用于向sink写入数据。执行此操作的代码称为producer。
- TransformStream 由两个流组成:
- 它从其writable side接收输入,即 WritableStream。
- 它将输出发送到其readable side,即 ReadableStream。
- 这个想法是通过“管道传输”TransformStream 来转换数据。也就是说,我们将数据写入可写端,并从可读端读取转换后的数据。以下 TransformStreams 内置在大多数 JavaScript 平台中(稍后会详细介绍):
- 因为 JavaScript 字符串是 UTF-16 编码的,所以在 JavaScript 中,UTF-8 编码的数据被视为二进制数据。
TextDecoderStream
将这样的数据转换为字符串。 TextEncoderStream
将 JavaScript 字符串转换为 UTF-8 数据。CompressionStream
将二进制数据压缩为 GZIP 和其他压缩格式。DecompressionStream
从 GZIP 和其他压缩格式中解压缩二进制数据。
ReadableStreams,WritableStreams 和 TransformStreams 可用于传输文本或二进制数据。在本章中,我们将主要进行前者。 字节流用于二进制数据,在最后简要提到。
10.1.2 管道链路
Piping是一种操作,它让我们将一个 ReadableStream 连接到一个 WritableStream:只要 ReadableStream 产生数据,此操作就会读取该数据并将其写入 WritableStream。如果我们连接了两个流,我们就可以方便地将数据从一个位置传输到另一个位置(例如复制文件)。但是,我们也可以连接多于两个流,并获得可以以各种方式处理数据的管道链路。这是一个管道链路的例子:
- 它以一个 ReadableStream 开始。
- 接下来是一个或多个 TransformStreams。
- 链路以 WritableStream 结束。
通过将前者连接到后者的可写端,将一个 ReadableStream 连接到 TransformStream。类似地,通过将前者的可读端连接到后者的可写端,将一个 TransformStream 连接到另一个 TransformStream。并且通过将前者的可读端连接到后者的可写端,将一个 TransformStream 连接到一个 WritableStream。
10.1.3 背压
管道链路中的一个问题是,成员可能会收到比它目前能处理的更多数据。 背压是解决这个问题的一种技术:它使数据的接收者能够告诉发送者应该暂时停止发送数据,以便接收者不会被压倒。
另一种看待背压的方式是作为一个信号,通过管道链路向后传播,从被压倒的成员到链路的开始。例如,考虑以下管道链路:
ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream
这是背压通过这个链路传播的方式:
- 最初,WriteableStream 发出信号,表明它暂时无法处理更多数据。
- 管道停止从 TransformStream 中读取。
- 输入在 TransformStream 中积累(被缓冲)。
- TransformStream 发出满的信号。
- 管道停止从 ReadableStream 中读取。
我们已经到达管道链的开头。因此,在 ReadableStream 中没有数据积累(也被缓冲),WritableStream 有时间恢复。一旦它恢复,它会发出信号表明它已准备好再次接收数据。该信号也会通过链返回,直到它到达 ReadableStream,数据处理恢复。
在这第一次对背压的探讨中,为了让事情更容易理解,省略了一些细节。这些将在以后进行讨论。
10.1.4 Node.js 中对 web 流的支持
在 Node.js 中,Web 流可以从两个来源获得:
- 来自模块
'node:stream/web'
- 通过全局变量(就像在 Web 浏览器中)
目前,只有一个 API 在 Node.js 中直接支持 web 流 – Fetch API:
const response = await fetch('https://example.com'); const readableStream = response.body;
对于其他事情,我们需要使用模块'node:stream'
中以下静态方法之一,将 Node.js 流转换为 Web 流,反之亦然:
- Node.js 的 Readable 可以转换为 WritableStreams,反之亦然:
Readable.toWeb(nodeReadable)
Readable.fromWeb(webReadableStream, options?)
- Node.js 的 Writable 可以转换为 ReadableStreams,反之亦然:
Writable.toWeb(nodeWritable)
Writable.fromWeb(webWritableStream, options?)
- Node.js 的 Duplex 可以转换为 TransformStreams,反之亦然:
Duplex.toWeb(nodeDuplex)
Duplex.fromWeb(webTransformStream, options?)
还有一个 API 部分支持 web 流:FileHandles 有方法.readableWebStream()
。
10.2 从 ReadableStreams 中读取
ReadableStreams 让我们从各种来源读取数据块。它们具有以下类型(随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,它们将再次被解释):
interface ReadableStream<TChunk> { getReader(): ReadableStreamDefaultReader<TChunk>; readonly locked: boolean; [Symbol.asyncIterator](): AsyncIterator<TChunk>; cancel(reason?: any): Promise<void>; pipeTo( destination: WritableStream<TChunk>, options?: StreamPipeOptions ): Promise<void>; pipeThrough<TChunk2>( transform: ReadableWritablePair<TChunk2, TChunk>, options?: StreamPipeOptions ): ReadableStream<TChunk2>; // Not used in this chapter: tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>]; } interface StreamPipeOptions { signal?: AbortSignal; preventClose?: boolean; preventAbort?: boolean; preventCancel?: boolean; }
这些属性的解释:
.getReader()
返回一个 Reader – 通过它我们可以从 ReadableStream 中读取。ReadableStreams 返回 Readers 类似于可迭代对象返回迭代器。.locked
: 一次只能有一个活动的 Reader 读取 ReadableStream。当一个 Reader 正在使用时,ReadableStream 被锁定,无法调用.getReader()
。[Symbol.asyncIterator](https://exploringjs.com/impatient-js/ch_async-iteration.html)
: 这个方法使得 ReadableStreams 可以异步迭代。目前只在一些平台上实现。.cancel(reason)
取消流,因为消费者对它不再感兴趣。reason
被传递给 ReadableStream 的底层源的.cancel()
方法(稍后会详细介绍)。返回的 Promise 在此操作完成时实现。.pipeTo()
将其 ReadableStream 的内容传送到 WritableStream。返回的 Promise 在此操作完成时实现。.pipeTo()
确保背压、关闭、错误等都正确地通过管道链传播。我们可以通过它的第二个参数指定选项:
.signal
让我们向这个方法传递一个 AbortSignal,这使我们能够通过 AbortController 中止管道传输。.preventClose
: 如果为true
,它会阻止在 ReadableStream 关闭时关闭 WritableStream。当我们想要将多个 ReadableStream 管道到同一个 WritableStream 时,这是有用的。- 其余选项超出了本章的范围。它们在web 流规范中有文档记录。
.pipeThrough()
将其 ReadableStream 连接到一个 ReadableWritablePair(大致是一个 TransformStream,稍后会详细介绍)。它返回生成的 ReadableStream(即 ReadableWritablePair 的可读端)。
以下小节涵盖了三种消费 ReadableStreams 的方式:
- 通过 Readers 进行读取
- 通过异步迭代进行读取
- 将 ReadableStreams 连接到 WritableStreams
10.2.1 通过 Reader 消费 ReadableStreams
我们可以使用Readers从 ReadableStreams 中读取数据。它们具有以下类型(随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,它们将再次被解释):
interface ReadableStreamGenericReader { readonly closed: Promise<undefined>; cancel(reason?: any): Promise<void>; } interface ReadableStreamDefaultReader<TChunk> extends ReadableStreamGenericReader { releaseLock(): void; read(): Promise<ReadableStreamReadResult<TChunk>>; } interface ReadableStreamReadResult<TChunk> { done: boolean; value: TChunk | undefined; }
这些属性的解释:
.closed
:此 Promise 在流关闭后被满足。如果流出现错误或者在流关闭之前 Reader 的锁被释放,它将被拒绝。.cancel()
:在活动的 Reader 中,此方法取消关联的 ReadableStream。.releaseLock()
停用 Reader 并解锁其流。.read()
返回一个 Promise,用于 ReadableStreamReadResult(一个包装的块),它有两个属性:
.done
是一个布尔值,只要可以读取块,就为false
,在最后一个块之后为true
。.value
是块(或在最后一个块之后是undefined
)。
如果您了解迭代的工作原理,ReadableStreamReadResult 可能会很熟悉:ReadableStreams 类似于可迭代对象,Readers 类似于迭代器,而 ReadableStreamReadResults 类似于迭代器方法.next()
返回的对象。
以下代码演示了使用 Readers 的协议:
const reader = readableStream.getReader(); // (A) assert.equal(readableStream.locked, true); // (B) try { while (true) { const {done, value: chunk} = await reader.read(); // (C) if (done) break; // Use `chunk` } } finally { reader.releaseLock(); // (D) }
**获取 Reader。**我们不能直接从readableStream
中读取,我们首先需要获取一个Reader(行 A)。每个 ReadableStream 最多可以有一个 Reader。获取 Reader 后,readableStream
被锁定(行 B)。在我们可以再次调用.getReader()
之前,我们必须调用.releaseLock()
(行 D)。
读取块。.read()
返回一个带有属性.done
和.value
的对象的 Promise(行 C)。在读取最后一个块之后,.done
为true
。这种方法类似于 JavaScript 中异步迭代的工作方式。
10.2.1.1 示例:通过 ReadableStream 读取文件
在下面的示例中,我们从文本文件data.txt
中读取块(字符串):
import * as fs from 'node:fs'; import {Readable} from 'node:stream'; const nodeReadable = fs.createReadStream( 'data.txt', {encoding: 'utf-8'}); const webReadableStream = Readable.toWeb(nodeReadable); // (A) const reader = webReadableStream.getReader(); try { while (true) { const {done, value} = await reader.read(); if (done) break; console.log(value); } } finally { reader.releaseLock(); } // Output: // 'Content of text file\n'
我们将 Node.js Readable 转换为 web ReadableStream(行 A)。然后我们使用先前解释的协议来读取块。
10.2.1.2 示例:使用 ReadableStream 内容组装字符串
在下一个示例中,我们将所有 ReadableStream 的块连接成一个字符串并返回它:
/** * Returns a string with the contents of `readableStream`. */ async function readableStreamToString(readableStream) { const reader = readableStream.getReader(); try { let result = ''; while (true) { const {done, value} = await reader.read(); if (done) { return result; // (A) } result += value; } } finally { reader.releaseLock(); // (B) } }
方便的是,finally
子句总是被执行 - 无论我们如何离开try
子句。也就是说,如果我们返回一个结果(行 A),锁将被正确释放(行 B)。
10.2.2 通过异步迭代消费 ReadableStreams
ReadableStreams 也可以通过异步迭代进行消费:
const iterator = readableStream[Symbol.asyncIterator](); let exhaustive = false; try { while (true) { let chunk; ({done: exhaustive, value: chunk} = await iterator.next()); if (exhaustive) break; console.log(chunk); } } finally { // If the loop was terminated before we could iterate exhaustively // (via an exception or `return`), we must call `iterator.return()`. // Check if that was the case. if (!exhaustive) { iterator.return(); } }
值得庆幸的是,for-await-of
循环为我们处理了异步迭代的所有细节:
for await (const chunk of readableStream) { console.log(chunk); }
10.2.2.1 示例:使用异步迭代读取流
让我们重新尝试从文件中读取文本。这次,我们使用异步迭代而不是 Reader:
import * as fs from 'node:fs'; import {Readable} from 'node:stream'; const nodeReadable = fs.createReadStream( 'text-file.txt', {encoding: 'utf-8'}); const webReadableStream = Readable.toWeb(nodeReadable); for await (const chunk of webReadableStream) { console.log(chunk); } // Output: // 'Content of text file'
10.2.2.2 示例:使用 ReadableStream 内容组装字符串
我们以前使用 Reader 来组装一个包含 ReadableStream 内容的字符串。有了异步迭代,代码变得更简单了:
/** * Returns a string with the contents of `readableStream`. */ async function readableStreamToString2(readableStream) { let result = ''; for await (const chunk of readableStream) { result += chunk; } return result; }
10.2.2.3 注意事项:浏览器不支持对 ReadableStreams 进行异步迭代
目前,Node.js 和 Deno 支持对 ReadableStreams 进行异步迭代,但 Web 浏览器不支持:有一个 GitHub 问题链接到错误报告。
鉴于尚不完全清楚浏览器将如何支持异步迭代,包装比填充更安全。以下代码基于Chromium bug 报告中的建议:
async function* getAsyncIterableFor(readableStream) { const reader = readableStream.getReader(); try { while (true) { const {done, value} = await reader.read(); if (done) return; yield value; } } finally { reader.releaseLock(); } }
10.2.3 将可读流导入可写流
可读流有两种管道方法:
readableStream.pipeTo(writeableStream)
同步返回一个 Promisep
。它异步读取readableStream
的所有块,并将它们写入writableStream
。完成后,它会实现p
。
当我们探索可写流时,我们将看到.pipeTo()
的示例,因为它提供了一种方便的方式将数据传输到其中。readableStream.pipeThrough(transformStream)
将readableStream
导入transformStream.writable
并返回transformStream.readable
(每个 TransformStream 都有这些属性,它们指向其可写侧和可读侧)。另一种看待这个操作的方式是,我们通过连接transformStream
到readableStream
创建一个新的可读流。
当我们探索 TransformStreams 时,我们将看到.pipeThrough()
的示例,因为这是它们主要使用的方法。
10.3 将数据源通过包装转换为可读流
如果我们想通过一个可读流读取外部源,我们可以将其包装在一个适配器对象中,并将该对象传递给ReadableStream
构造函数。适配器对象被称为可读流的底层源(当我们更仔细地看 backpressure 时,将解释排队策略):
new ReadableStream(underlyingSource?, queuingStrategy?)
这是底层源的类型(随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,它们将再次解释):
interface UnderlyingSource<TChunk> { start?( controller: ReadableStreamController<TChunk> ): void | Promise<void>; pull?( controller: ReadableStreamController<TChunk> ): void | Promise<void>; cancel?(reason?: any): void | Promise<void>; // Only used in byte streams and ignored in this section: type: 'bytes' | undefined; autoAllocateChunkSize: bigint; }
这是当可读流调用这些方法时:
- 在调用
ReadableStream
的构造函数后立即调用.start(controller)
。 - 每当可读流的内部队列中有空间时,都会调用
.pull(controller)
。直到队列再次满了为止,它会被重复调用。此方法只会在.start()
完成后调用。如果.pull()
没有入队任何内容,它将不会再次被调用。 - 如果可读流的消费者通过
readableStream.cancel()
或reader.cancel()
取消它,将调用.cancel(reason)
。reason
是传递给这些方法的值。
这些方法中的每一个都可以返回一个 Promise,并且在 Promise 解决之前不会采取进一步的步骤。如果我们想要做一些异步操作,这是有用的。
.start()
和.pull()
的参数controller
让它们访问流。它具有以下类型:
type ReadableStreamController<TChunk> = | ReadableStreamDefaultController<TChunk> | ReadableByteStreamController<TChunk> // ignored here ; interface ReadableStreamDefaultController<TChunk> { enqueue(chunk?: TChunk): void; readonly desiredSize: number | null; close(): void; error(err?: any): void; }
现在,块是字符串。我们稍后将介绍字节流,其中 Uint8Arrays 很常见。这些方法的作用是:
.enqueue(chunk)
将chunk
添加到可读流的内部队列。.desiredSize
指示.enqueue()
写入的队列中有多少空间。如果队列已满,则为零,如果超过了最大大小,则为负。因此,如果期望大小为零或负,则我们必须停止入队。
- 如果流关闭,其期望大小为零。
- 如果流处于错误模式,其期望大小为
null
。
.close()
关闭可读流。消费者仍然可以清空队列,但之后,流将结束。底层源调用此方法很重要-否则,读取其流将永远不会结束。.error(err)
将流置于错误模式:以后与它的所有交互都将以错误值err
失败。
10.3.1 实现底层源的第一个示例
在我们实现底层源的第一个示例中,我们只提供了.start()
方法。我们将在下一小节中看到.pull()
的用例。
const readableStream = new ReadableStream({ start(controller) { controller.enqueue('First line\n'); // (A) controller.enqueue('Second line\n'); // (B) controller.close(); // (C) }, }); for await (const chunk of readableStream) { console.log(chunk); } // Output: // 'First line\n' // 'Second line\n'
我们使用控制器创建一个具有两个块(行 A 和行 B)的流。关闭流很重要(行 C)。否则,for-await-of
循环永远不会结束!
请注意,这种入队的方式并不完全安全:存在超出内部队列容量的风险。我们很快将看到如何避免这种风险。
10.3.2 使用 ReadableStream 包装推送源或拉取源
一个常见的场景是将推送源或拉取源转换为 ReadableStream。源是推送还是拉取决定了我们将如何与 UnderlyingSource 连接到 ReadableStream:
- 推送源:这样的源在有新数据时通知我们。我们使用
.start()
来设置监听器和支持数据结构。如果我们收到太多数据,期望的大小不再是正数,我们必须告诉我们的源暂停。如果以后调用了.pull()
,我们可以取消暂停。对外部源在期望的大小变为非正数时暂停的反应称为应用背压。 - 拉取源:我们向这样的源请求新数据-通常是异步的。因此,我们通常在
.start()
中不做太多事情,并在调用.pull()
时检索数据。
接下来我们将看到两种来源的例子。
10.3.2.1 示例:从具有背压支持的推送源创建一个 ReadableStream
在下面的示例中,我们将一个 ReadableStream 包装在一个套接字周围-它向我们推送数据(它调用我们)。这个例子来自 web 流规范:
function makeReadableBackpressureSocketStream(host, port) { const socket = createBackpressureSocket(host, port); return new ReadableStream({ start(controller) { socket.ondata = event => { controller.enqueue(event.data); if (controller.desiredSize <= 0) { // The internal queue is full, so propagate // the backpressure signal to the underlying source. socket.readStop(); } }; socket.onend = () => controller.close(); socket.onerror = () => controller.error( new Error('The socket errored!')); }, pull() { // This is called if the internal queue has been emptied, but the // stream’s consumer still wants more data. In that case, restart // the flow of data if we have previously paused it. socket.readStart(); }, cancel() { socket.close(); }, }); }
10.3.2.2 示例:从拉取源创建一个 ReadableStream
工具函数iterableToReadableStream()
接受一个块的可迭代对象,并将其转换为一个 ReadableStream:
/** * @param iterable an iterable (asynchronous or synchronous) */ function iterableToReadableStream(iterable) { return new ReadableStream({ start() { if (typeof iterable[Symbol.asyncIterator] === 'function') { this.iterator = iterable[Symbol.asyncIterator](); } else if (typeof iterable[Symbol.iterator] === 'function') { this.iterator = iterable[Symbol.iterator](); } else { throw new Error('Not an iterable: ' + iterable); } }, async pull(controller) { if (this.iterator === null) return; // Sync iterators return non-Promise values, // but `await` doesn’t mind and simply passes them on const {value, done} = await this.iterator.next(); if (done) { this.iterator = null; controller.close(); return; } controller.enqueue(value); }, cancel() { this.iterator = null; controller.close(); }, }); }
让我们使用一个异步生成器函数来创建一个异步可迭代对象,并将该可迭代对象转换为一个 ReadableStream:
async function* genAsyncIterable() { yield 'how'; yield 'are'; yield 'you'; } const readableStream = iterableToReadableStream(genAsyncIterable()); for await (const chunk of readableStream) { console.log(chunk); } // Output: // 'how' // 'are' // 'you'
iterableToReadableStream()
也适用于同步可迭代对象:
const syncIterable = ['hello', 'everyone']; const readableStream = iterableToReadableStream(syncIterable); for await (const chunk of readableStream) { console.log(chunk); } // Output: // 'hello' // 'everyone'
可能会有一个静态的辅助方法ReadableStream.from()
,提供这个功能(请参阅其拉取请求以获取更多信息)。
10.4 向 WritableStreams 写入
WritableStreams 让我们向各种接收器写入数据块。它们具有以下类型(随意浏览此类型和其属性的解释;当我们在示例中遇到它们时,它们将再次解释):
interface WritableStream<TChunk> { getWriter(): WritableStreamDefaultWriter<TChunk>; readonly locked: boolean; close(): Promise<void>; abort(reason?: any): Promise<void>; }
这些属性的解释:
.getWriter()
返回一个 Writer-通过它我们可以向 WritableStream 写入数据的对象。.locked
:WritableStream 一次只能有一个活动的 Writer。当一个 Writer 正在使用时,WritableStream 被锁定,无法调用.getWriter()
。.close()
关闭流:
- 底层接收器(稍后会详细介绍)在关闭之前仍将接收所有排队的块。
- 从现在开始,所有的写入尝试都将无声地失败(没有错误)。
- 该方法返回一个 Promise,如果接收器成功写入所有排队的块并关闭,将实现该 Promise。如果在这些步骤中发生任何错误,它将被拒绝。
.abort()
中止流:
- 它将流置于错误模式。
- 返回的 Promise 在接收器成功关闭时实现,如果发生错误则拒绝。
以下小节涵盖了向 WritableStreams 发送数据的两种方法:
- 通过 Writers 向 WritableStreams 写入
- 将数据传输到 WritableStreams
10.4.1 通过 Writers 向 WritableStreams 写入
我们可以使用Writers向 WritableStreams 写入。它们具有以下类型(随意浏览此类型和其属性的解释;当我们在示例中遇到它们时,它们将再次解释):
interface WritableStreamDefaultWriter<TChunk> { readonly desiredSize: number | null; readonly ready: Promise<undefined>; write(chunk?: TChunk): Promise<void>; releaseLock(): void; close(): Promise<void>; readonly closed: Promise<undefined>; abort(reason?: any): Promise<void>; }
这些属性的解释:
.desiredSize
指示 WriteStream 队列中有多少空间。如果队列已满,则为零,如果超过最大大小,则为负数。因此,如果期望的大小为零或负数,我们必须停止写入。
- 如果流关闭,它的期望大小为零。
- 如果流处于错误模式,它的期望大小为
null
。
.ready
返回一个 Promise,在期望的大小从非正数变为正数时实现。这意味着没有背压活动,可以写入数据。如果期望的大小后来再次变为非正数,则会创建并返回一个新的待处理 Promise。.write()
将一个块写入流。它返回一个 Promise,在写入成功后实现,如果有错误则拒绝。.releaseLock()
释放 Writer 对其流的锁定。.close()
具有与关闭 Writer 流相同的效果。.closed
返回一个 Promise,在流关闭时被实现。.abort()
具有与中止 Writer 流相同的效果。
以下代码显示了使用 Writers 的协议:
const writer = writableStream.getWriter(); // (A) assert.equal(writableStream.locked, true); // (B) try { // Writing the chunks (explained later) } finally { writer.releaseLock(); // (C) }
我们不能直接向writableStream
写入,我们首先需要获取一个Writer(A 行)。每个 WritableStream 最多只能有一个 Writer。在获取了 Writer 之后,writableStream
被锁定(B 行)。在我们可以再次调用.getWriter()
之前,我们必须调用.releaseLock()
(C 行)。
有三种写入块的方法。
10.4.1.1 写入方法 1:等待.write()
(处理背压效率低下)
第一种写入方法是等待每个.write()
的结果:
await writer.write('Chunk 1'); await writer.write('Chunk 2'); await writer.close();
由.write()
返回的 Promise 在我们传递给它的块成功写入时实现。“成功写入”具体意味着什么取决于 WritableStream 的实现方式 - 例如,对于文件流,该块可能已发送到操作系统,但仍然驻留在缓存中,因此实际上尚未写入磁盘。
由.close()
返回的 Promise 在流关闭时实现。
这种写入方法的一个缺点是等待写入成功意味着队列没有被使用。因此,数据吞吐量可能会较低。
10.4.1.2 写入方法 2:忽略.write()
拒绝(忽略背压)
在第二种写入方法中,我们忽略了.write()
返回的 Promise,只等待.close()
返回的 Promise:
writer.write('Chunk 1').catch(() => {}); // (A) writer.write('Chunk 2').catch(() => {}); // (B) await writer.close(); // reports errors
.write()
的同步调用将块添加到 WritableStream 的内部队列中。通过不等待返回的 Promises,我们不必等待每个块被写入。但是,等待.close()
确保队列为空,并且所有写入都成功后我们才继续。
在 A 行和 B 行调用.catch()
是必要的,以避免在写入过程中出现问题时出现有关未处理的 Promise 拒绝的警告。这样的警告通常会记录在控制台上。我们可以忽略.write()
报告的错误,因为.close()
也会向我们报告这些错误。
通过使用一个忽略 Promise 拒绝的辅助函数,可以改进先前的代码:
ignoreRejections( writer.write('Chunk 1'), writer.write('Chunk 2'), ); await writer.close(); // reports errors function ignoreRejections(...promises) { for (const promise of promises) { promise.catch(() => {}); } }
这种方法的一个缺点是忽略了背压:我们只是假设队列足够大,可以容纳我们写入的所有内容。
10.4.1.3 写入方法 3:等待.ready
(高效处理背压)
在这种写入方法中,我们通过等待 Writer getter.ready
来有效地处理背压:
await writer.ready; // reports errors // How much room do we have? console.log(writer.desiredSize); writer.write('Chunk 1').catch(() => {}); await writer.ready; // reports errors // How much room do we have? console.log(writer.desiredSize); writer.write('Chunk 2').catch(() => {}); await writer.close(); // reports errors
.ready
中的 Promise 在流从有背压到无背压的转换时实现。
10.4.1.4 示例:通过 Writer 写入文件
在这个例子中,我们通过 WritableStream 创建一个文本文件data.txt
:
import * as fs from 'node:fs'; import {Writable} from 'node:stream'; const nodeWritable = fs.createWriteStream( 'new-file.txt', {encoding: 'utf-8'}); // (A) const webWritableStream = Writable.toWeb(nodeWritable); // (B) const writer = webWritableStream.getWriter(); try { await writer.write('First line\n'); await writer.write('Second line\n'); await writer.close(); } finally { writer.releaseLock() }
在 A 行,我们为文件data.txt
创建了一个 Node.js 流。在 B 行,我们将这个流转换为 web 流。然后我们使用 Writer 将字符串写入其中。
10.4.2 向 WritableStreams 进行管道传输
除了使用 Writers,我们还可以通过将 ReadableStreams 传输到 WritableStreams 来向 WritableStreams 写入:
await readableStream.pipeTo(writableStream);
由.pipeTo()
返回的 Promise 在传输成功完成时实现。
10.4.2.1 管道传输是异步进行的
管道传输是在当前任务完成或暂停后执行的。以下代码演示了这一点:
const readableStream = new ReadableStream({ // (A) start(controller) { controller.enqueue('First line\n'); controller.enqueue('Second line\n'); controller.close(); }, }); const writableStream = new WritableStream({ // (B) write(chunk) { console.log('WRITE: ' + JSON.stringify(chunk)); }, close() { console.log('CLOSE WritableStream'); }, }); console.log('Before .pipeTo()'); const promise = readableStream.pipeTo(writableStream); // (C) promise.then(() => console.log('Promise fulfilled')); console.log('After .pipeTo()'); // Output: // 'Before .pipeTo()' // 'After .pipeTo()' // 'WRITE: "First line\n"' // 'WRITE: "Second line\n"' // 'CLOSE WritableStream' // 'Promise fulfilled'
在 A 行我们创建一个 ReadableStream。在 B 行我们创建一个 WritableStream。
我们可以看到.pipeTo()
(行 C)立即返回。在一个新的任务中,块被读取和写入。然后writableStream
被关闭,最后,promise
被实现。
10.4.2.2 示例:将数据管道到文件的可写流
在下面的示例中,我们为一个文件创建一个 WritableStream,并将一个 ReadableStream 管道传递给它:
const webReadableStream = new ReadableStream({ // (A) async start(controller) { controller.enqueue('First line\n'); controller.enqueue('Second line\n'); controller.close(); }, }); const nodeWritable = fs.createWriteStream( // (B) 'data.txt', {encoding: 'utf-8'}); const webWritableStream = Writable.toWeb(nodeWritable); // (C) await webReadableStream.pipeTo(webWritableStream); // (D)
在 A 行,我们创建了一个 ReadableStream。在 B 行,我们为文件data.txt
创建了一个 Node.js 流。在 C 行,我们将这个流转换为 web 流。在 D 行,我们将我们的webReadableStream
管道传递给文件的 WritableStream。
10.4.2.3 示例:将两个 ReadableStreams 写入到一个 WritableStream
在下面的示例中,我们将两个 ReadableStreams 写入单个 WritableStream。
function createReadableStream(prefix) { return new ReadableStream({ async start(controller) { controller.enqueue(prefix + 'chunk 1'); controller.enqueue(prefix + 'chunk 2'); controller.close(); }, }); } const writableStream = new WritableStream({ write(chunk) { console.log('WRITE ' + JSON.stringify(chunk)); }, close() { console.log('CLOSE'); }, abort(err) { console.log('ABORT ' + err); }, }); await createReadableStream('Stream 1: ') .pipeTo(writableStream, {preventClose: true}); // (A) await createReadableStream('Stream 2: ') .pipeTo(writableStream, {preventClose: true}); // (B) await writableStream.close(); // Output // 'WRITE "Stream 1: chunk 1"' // 'WRITE "Stream 1: chunk 2"' // 'WRITE "Stream 2: chunk 1"' // 'WRITE "Stream 2: chunk 2"' // 'CLOSE'
我们告诉.pipeTo()
在 ReadableStream 关闭后不关闭 WritableStream(行 A 和行 B)。因此,在行 A 之后,WritableStream 保持打开状态,我们可以将另一个 ReadableStream 管道传递给它。
10.5 将数据接收端通过包装转换为可写流
如果我们想通过 WritableStream 写入到外部接收端,我们可以将其包装在一个适配器对象中,并将该对象传递给WritableStream
的构造函数。适配器对象被称为 WritableStream 的底层接收端(当我们更仔细地看反压时,排队策略将在稍后解释):
new WritableStream(underlyingSink?, queuingStrategy?)
这是底层接收端的类型(随意浏览此类型和其属性的解释;当我们在示例中遇到它们时,它们将再次解释):
interface UnderlyingSink<TChunk> { start?( controller: WritableStreamDefaultController ): void | Promise<void>; write?( chunk: TChunk, controller: WritableStreamDefaultController ): void | Promise<void>; close?(): void | Promise<void>;; abort?(reason?: any): void | Promise<void>; }
这些属性的解释:
.start(controller)
在我们调用WritableStream
的构造函数后立即调用。如果我们做一些异步操作,我们可以返回一个 Promise。在这个方法中,我们可以准备写入。.write(chunk, controller)
当一个新的块准备写入外部接收端时调用。我们可以通过返回一个 Promise 来施加反压,一旦反压消失就会实现。.close()
在调用writer.close()
后调用,并且所有排队的写入都成功。在这个方法中,我们可以在写入后进行清理。- 如果调用了
writeStream.abort()
或writer.abort()
,则会调用.abort(reason)
。reason
是传递给这些方法的值。
.start()
和.write()
的参数controller
让它们错误 WritableStream。它具有以下类型:
interface WritableStreamDefaultController { readonly signal: AbortSignal; error(err?: any): void; }
.signal
是一个 AbortSignal,如果我们想在流被中止时中止写入或关闭操作,我们可以监听它。.error(err)
错误 WritableStream:它被关闭,并且以后所有与它的交互都会失败,错误值为err
。
10.5.1 示例:跟踪一个可读流
在下一个示例中,我们将一个 ReadableStream 管道到一个 WritableStream,以便检查 ReadableStream 如何生成块:
const readableStream = new ReadableStream({ start(controller) { controller.enqueue('First chunk'); controller.enqueue('Second chunk'); controller.close(); }, }); await readableStream.pipeTo( new WritableStream({ write(chunk) { console.log('WRITE ' + JSON.stringify(chunk)); }, close() { console.log('CLOSE'); }, abort(err) { console.log('ABORT ' + err); }, }) ); // Output: // 'WRITE "First chunk"' // 'WRITE "Second chunk"' // 'CLOSE'
10.5.2 示例:收集写入到 WriteStream 的块到一个字符串中
在下一个示例中,我们创建了WriteStream
的一个子类,它将所有写入的块收集到一个字符串中。我们可以通过.getString()
方法访问该字符串:
class StringWritableStream extends WritableStream { #string = ''; constructor() { super({ // We need to access the `this` of `StringWritableStream`. // Hence the arrow function (and not a method). write: (chunk) => { this.#string += chunk; }, }); } getString() { return this.#string; } } const stringStream = new StringWritableStream(); const writer = stringStream.getWriter(); try { await writer.write('How are'); await writer.write(' you?'); await writer.close(); } finally { writer.releaseLock() } assert.equal( stringStream.getString(), 'How are you?' );
这种方法的一个缺点是我们混合了两个 API:WritableStream
的 API 和我们新的字符串流 API。另一种选择是委托给 WritableStream 而不是扩展它:
function StringcreateWritableStream() { let string = ''; return { stream: new WritableStream({ write(chunk) { string += chunk; }, }), getString() { return string; }, }; } const stringStream = StringcreateWritableStream(); const writer = stringStream.stream.getWriter(); try { await writer.write('How are'); await writer.write(' you?'); await writer.close(); } finally { writer.releaseLock() } assert.equal( stringStream.getString(), 'How are you?' );
这个功能也可以通过类来实现(而不是作为对象的工厂函数)。
10.6 使用 TransformStreams
一个 TransformStream:
- 通过其writable side接收输入,即 WritableStream。
- 然后可能会或可能不会转换这个输入。
- 结果可以通过一个 ReadableStream 来读取,它的可读端。
使用 TransformStreams 最常见的方式是“管道传递”它们:
const transformedStream = readableStream.pipeThrough(transformStream);
.pipeThrough()
将readableStream
管道到transformStream
的可写端,并返回其可读端。换句话说:我们已经创建了一个新的ReadableStream
,它是readableStream
的转换版本。
.pipeThrough()
不仅接受 TransformStreams,还接受任何具有以下形式的对象:
interface ReadableWritablePair<RChunk, WChunk> { readable: ReadableStream<RChunk>; writable: WritableStream<WChunk>; }
10.6.1 标准 TransformStreams
Node.js 支持以下标准 TransformStreams:
- 编码(WHATWG 标准)–
TextEncoderStream
和TextDecoderStream
:
- 这些流支持 UTF-8,但也支持许多“旧编码”。
- 一个 Unicode 代码点被编码为多达四个 UTF-8 代码单元(字节)。在字节流中,编码的代码点可能会跨越块。
TextDecoderStream
可以正确处理这些情况。 - 大多数 JavaScript 平台都可以使用(
TextEncoderStream
,TextDecoderStream
)。
- 当前支持的压缩格式:
deflate
(ZLIB 压缩数据格式),deflate-raw
(DEFLATE 算法),gzip
(GZIP 文件格式)。 - 在许多 JavaScript 平台上都可以使用(
CompressionStream
,DecompressionStream
)。
10.6.1.1 示例:解码一系列 UTF-8 编码的字节流
在下面的示例中,我们解码了一系列 UTF-8 编码的字节流:
const response = await fetch('https://example.com'); const readableByteStream = response.body; const readableStream = readableByteStream .pipeThrough(new TextDecoderStream('utf-8')); for await (const stringChunk of readableStream) { console.log(stringChunk); }
response.body
是一个 ReadableByteStream,其块是Uint8Array
的实例(TypedArrays)。我们通过TextDecoderStream
将该流传输,以获得具有字符串块的流。
请注意,单独翻译每个字节块(例如通过TextDecoder
)是行不通的,因为一个 Unicode 代码点在 UTF-8 中被编码为多达四个字节,而这些字节可能不都在同一个块中。
10.6.1.2 示例:创建一个用于标准输入的可读文本流
以下 Node.js 模块记录通过标准输入发送给它的所有内容:
// echo-stdin.mjs import {Readable} from 'node:stream'; const webStream = Readable.toWeb(process.stdin) .pipeThrough(new TextDecoderStream('utf-8')); for await (const chunk of webStream) { console.log('>>>', chunk); }
我们可以通过存储在process.stdin
中的流访问标准输入(process
是一个全局 Node.js 变量)。如果我们不为此流设置编码并通过Readable.toWeb()
进行转换,我们将获得一个字节流。我们通过 TextDecoderStream 将其传输,以获得一个文本流。
请注意,我们逐步处理标准输入:一旦另一个块可用,我们就会记录它。换句话说,我们不会等到标准输入完成。当数据要么很大要么只是间歇性发送时,这是很有用的。
10.7 实现自定义 TransformStreams
我们可以通过将 Transformer 对象传递给TransformStream
的构造函数来实现自定义 TransformStream。这样的对象具有以下类型(随意略过此类型和其属性的解释;当我们在示例中遇到它们时,它们将再次解释):
interface Transformer<TInChunk, TOutChunk> { start?( controller: TransformStreamDefaultController<TOutChunk> ): void | Promise<void>; transform?( chunk: TInChunk, controller: TransformStreamDefaultController<TOutChunk> ): void | Promise<void>; flush?( controller: TransformStreamDefaultController<TOutChunk> ): void | Promise<void>; }
这些属性的解释:
.start(controller)
在我们调用TransformStream
的构造函数之后立即调用。在这里,我们可以在转换开始之前准备好一些东西。.transform(chunk, controller)
执行实际的转换。它接收一个输入块,并可以使用其参数controller
来排队一个或多个转换后的输出块。它也可以选择不排队任何内容。.flush(controller)
在所有输入块成功转换后调用。在这里,我们可以在转换完成后执行清理工作。
这些方法中的每一个都可以返回一个 Promise,并且在 Promise 解决之前不会采取进一步的步骤。如果我们想要执行一些异步操作,这是很有用的。
参数controller
具有以下类型:
interface TransformStreamDefaultController<TOutChunk> { enqueue(chunk?: TOutChunk): void; readonly desiredSize: number | null; terminate(): void; error(err?: any): void; }
.enqueue(chunk)
将chunk
添加到 TransformStream 的可读端(输出)。.desiredSize
返回可读端(输出)的 TransformStream 内部队列的期望大小。.terminate()
关闭可读端(输出)并错误可写端(输入)的 TransformStream。如果转换器对可写端(输入)的剩余块不感兴趣并希望跳过它们,则可以使用它。.error(err)
错误 TransformStream:以后所有与它的交互都将以错误值err
失败。
TransformStream 中的背压如何?该类将背压从其可读端(输出)传播到其可写端(输入)。假设转换不会改变数据量太多。因此,Transform 可以忽略背压。但是,可以通过transformStreamDefaultController.desiredSize
检测到它,并通过从transformer.transform()
返回一个 Promise 来传播它。
10.7.1 示例:将任意块的流转换为行流
TransformStream
的以下子类将流转换为每个块都包含一行文本的流。也就是说,除了最后一个块可能以行尾(EOL)字符串结束之外,每个块都以行尾(EOL)字符串结束:Unix(包括 macOS)上为'\n'
,Windows 上为'\r\n'
。
class ChunksToLinesTransformer { #previous = ''; transform(chunk, controller) { let startSearch = this.#previous.length; this.#previous += chunk; while (true) { // Works for EOL === '\n' and EOL === '\r\n' const eolIndex = this.#previous.indexOf('\n', startSearch); if (eolIndex < 0) break; // Line includes the EOL const line = this.#previous.slice(0, eolIndex+1); controller.enqueue(line); this.#previous = this.#previous.slice(eolIndex+1); startSearch = 0; } } flush(controller) { // Clean up and enqueue any text we’re still holding on to if (this.#previous.length > 0) { controller.enqueue(this.#previous); } } } class ChunksToLinesStream extends TransformStream { constructor() { super(new ChunksToLinesTransformer()); } } const stream = new ReadableStream({ async start(controller) { controller.enqueue('multiple\nlines of\ntext'); controller.close(); }, }); const transformStream = new ChunksToLinesStream(); const transformed = stream.pipeThrough(transformStream); for await (const line of transformed) { console.log('>>>', JSON.stringify(line)); } // Output: // '>>> "multiple\n"' // '>>> "lines of\n"' // '>>> "text"'
请注意,Deno 的内置TextLineStream
提供类似的功能。
提示:我们也可以通过异步生成器进行这种转换。它将异步迭代 ReadableStream 并返回一个包含行的异步可迭代对象。其实现在§9.4“通过异步生成器转换可读流”中显示。
10.7.2 提示:异步生成器也非常适合转换流
由于 ReadableStreams 是异步可迭代的,我们可以使用异步生成器来转换它们。这导致非常优雅的代码:
const stream = new ReadableStream({ async start(controller) { controller.enqueue('one'); controller.enqueue('two'); controller.enqueue('three'); controller.close(); }, }); async function* prefixChunks(prefix, asyncIterable) { for await (const chunk of asyncIterable) { yield '> ' + chunk; } } const transformedAsyncIterable = prefixChunks('> ', stream); for await (const transformedChunk of transformedAsyncIterable) { console.log(transformedChunk); } // Output: // '> one' // '> two' // '> three'
10.8 仔细观察背压
让我们仔细观察背压。考虑以下管道链:
rs.pipeThrough(ts).pipeTo(ws);
rs
是一个 ReadableStream,ts
是一个 TransformStream,ws
是一个 WritableStream。这些是由前一个表达式创建的连接(.pipeThrough
使用.pipeTo
将rs
连接到ts
的可写端):
rs -pipeTo-> ts{writable,readable} -pipeTo-> ws
观察:
rs
的基础源可以被视为在rs
之前的管道链成员。ws
的基础接收器可以被视为在ws
之后的管道链成员。- 每个流都有一个内部缓冲区:ReadableStreams 在其基础源之后进行缓冲。WritableStreams 在其基础接收器之前进行缓冲。
假设ws
的基础接收器速度慢,ws
的缓冲区最终满了。然后发生以下步骤:
ws
发出满的信号。pipeTo
停止从ts.readable
读取。ts.readable
发出满的信号。ts
停止从ts.writable
移动块到ts.readable
。ts.writable
发出满的信号。pipeTo
停止从rs
读取。rs
向其基础源发出满的信号。- 基础源暂停。
这个例子说明我们需要两种功能:
- 接收数据的实体需要能够发出背压信号。
- 发送数据的实体需要对信号做出反应,施加背压。
让我们探索这些功能在 web 流 API 中是如何实现的。
10.8.1 发出背压
背压由接收数据的实体发出信号。Web 流有两个这样的实体:
- WritableStream 通过 Writer 方法
.write()
接收数据。 - 当其基础源调用 ReadableStreamDefaultController 方法
.enqueue()
时,ReadableStream 接收数据。
在这两种情况下,输入都通过队列进行缓冲。施加背压的信号是队列已满。让我们看看如何检测到这一点。
这些是队列的位置:
- 一个 WritableStream 的队列在 WritableStreamDefaultController 中内部存储(参见 web 流标准)。
- 一个 ReadableStream 的队列在 ReadableStreamDefaultController 中内部存储(参见 web 流标准)。
队列的期望大小是一个数字,表示队列中还有多少空间:
- 如果队列中仍有空间,则为正。
- 如果队列已达到其最大大小,则为零。
- 如果队列已超过其最大大小,则为负。
因此,如果期望的大小为零或更少,我们必须施加背压。它可以通过包含队列的对象的 getter.desiredSize
获得。
期望的大小是如何计算的?通过指定所谓的排队策略的对象。ReadableStream
和WritableStream
具有默认的排队策略,可以通过它们的构造函数的可选参数进行覆盖。接口QueuingStrategy
有两个属性:
- 方法
.size(chunk)
返回chunk
的大小。
- 队列的当前大小是它包含的块的大小之和。
- 属性
.highWaterMark
指定队列的最大大小。
队列的期望大小是高水位标记减去队列的当前大小。
10.8.2 对背压的反应
发送数据的实体需要对信号背压做出反应,通过施加背压。
10.8.2.1 通过 Writer 写入 WritableStream 的代码
- 我们可以在
writer.ready
中等待 Promise。在等待期间,我们被阻塞,期望的背压得到了实现。一旦队列中有空间,Promise 就会被实现。当writer.desiredSize
的值大于零时,实现会被触发。 - 或者,我们可以等待
writer.write()
返回的 Promise。如果我们这样做,队列甚至不会被填满。
如果我们愿意,我们还可以根据writer.desiredSize
来确定我们的块的大小。
10.8.2.2 ReadableStream 的底层源
可以传递给 ReadableStream 的底层源对象包装了外部源。在某种程度上,它也是管道链的成员;在其 ReadableStream 之前的成员。
- 只有在队列中有空间时,才会要求底层拉取源提供新数据。在没有空间时,会自动施加背压,因为没有数据被拉取。
- 在入队后,底层推送源应检查
controller.desiredSize
:如果为零或更少,则应通过暂停其外部源来施加背压。
10.8.2.3 WritableStream 的底层接收端
可以传递给 WritableStream 的底层接收端对象包装了外部接收端。在某种程度上,它也是管道链的成员;在其 WritableStream 之后的成员。
每个外部接收端以不同的方式(在某些情况下根本不)信号背压。底层接收端可以通过从方法.write()
返回一个被实现的 Promise 来施加背压,一旦写入完成。在web 流标准中有一个例子,演示了这是如何工作的。
10.8.2.4 一个 transformStream(.writable
→
.readable
)
TransformStream 通过为前者实现底层接收端和为后者实现底层源,将其可写端连接到其可读端。它具有一个内部插槽.[[backpressure]]
,指示内部背压当前是否处于活动状态。
- 可写端的底层接收器的
.write()
方法会异步等待,直到没有内部背压,然后将另一个块提供给 TransformStream 的转换器(web streams 标准:TransformStreamDefaultSinkWriteAlgorithm
)。然后转换器可以通过其 TransformStreamDefaultController 加入一些内容。请注意,.write()
返回一个 Promise,在方法完成时会被满足。在此之前,WriteStream 通过其队列缓冲传入的写请求。因此,可写端的背压通过该队列及其期望的大小来表示。 - 如果通过 TransformStreamDefaultController 将一个块加入队列,并且可读端的队列变满了,TransformStream 的背压就会被激活(web streams 标准:
TransformStreamDefaultControllerEnqueue
)。 - 如果从读取器中读取了一些内容,
ReadableStream
的背压可能会被取消(web streams 标准:ReadableStreamDefaultReaderRead
):
- 如果队列中现在有空间,可能是时候调用底层源的
.pull()
了(web streams 标准:.[[PullSteps]]
)。 - 可读端的底层源的
.pull()
会取消背压(web streams 标准:TransformStreamDefaultSourcePullAlgorithm
)。
10.8.2.5 .pipeTo()
(ReadableStream →
WritableStream)
.pipeTo()
通过读取器从 ReadableStream 读取块,并通过写入器将它们写入 WritableStream。当writer.desiredSize
为零或更小时,它会暂停(web streams 标准:ReadableStreamPipeTo
的第 15 步)。
10.9 字节流
到目前为止,我们只使用过文本流,流的块是字符串。但是 web streams API 也支持字节流,用于二进制数据,其中块是 Uint8Arrays(TypedArrays):
ReadableStream
有一个特殊的'bytes'
模式。WritableStream
本身不关心块是字符串还是 Uint8Arrays。因此,实例是文本流还是字节流取决于底层接收器可以处理什么类型的块。TransformStream
可以处理什么类型的块也取决于其 Transformer。
接下来,我们将学习如何创建可读的字节流。
10.9.1 可读的字节流
ReadableStream
构造函数创建的流的类型取决于可选的属性.type
和可选的第一个参数underlyingSource
:
- 如果
.type
被省略或没有提供底层源,则新实例是一个文本流。 - 如果
.type
是字符串'bytes'
,则新实例是一个字节流:
const readableByteStream = new ReadableStream({ type: 'bytes', async start() { /*...*/ } // ... });
如果一个 ReadableStream 处于'bytes'
模式,会发生什么变化?
在默认模式下,底层源可以返回任何类型的块。在字节模式下,块必须是 ArrayBufferViews,即 TypedArrays(例如 Uint8Arrays)或 DataViews。
此外,可读的字节流可以创建两种读取器:
.getReader()
返回一个ReadableStreamDefaultReader
的实例。.getReader({mode: 'byob'})
返回一个ReadableStreamBYOBReader
的实例。
“BYOB” 代表 “Bring Your Own Buffer”,意味着我们可以传递一个缓冲区(ArrayBufferView)给 reader.read()
。之后,该 ArrayBufferView 将被分离并且不再可用。但是.read()
返回其数据在一个新的 ArrayBufferView 中,该 ArrayBufferView 具有相同的类型并访问相同的 ArrayBuffer 的相同区域。
此外,可读的字节流具有不同的控制器:它们是ReadableByteStreamController
的实例(而不是ReadableStreamDefaultController
)。除了强制底层源将 ArrayBufferViews(TypedArrays 或 DataViews)入队之外,它还通过其属性.byobRequest
支持 ReadableStreamBYOBReaders。底层源将其数据写入存储在此属性中的 BYOBRequest。Web 流标准在其“创建流的示例”部分中有两个使用.byobRequest
的示例。
10.9.2 示例:填充随机数据的无限可读的字节流
在下一个示例中,创建一个无限可读的字节流,用随机数据填充其块(灵感来自:example4.mjs
in “在 Node.js 中实现 Web 流 API”)。
import {promisify} from 'node:util'; import {randomFill} from 'node:crypto'; const asyncRandomFill = promisify(randomFill); const readableByteStream = new ReadableStream({ type: 'bytes', async pull(controller) { const byobRequest = controller.byobRequest; await asyncRandomFill(byobRequest.view); byobRequest.respond(byobRequest.view.byteLength); }, }); const reader = readableByteStream.getReader({mode: 'byob'}); const buffer = new Uint8Array(10); // (A) const firstChunk = await reader.read(buffer); // (B) console.log(firstChunk);
由于readableByteStream
是无限的,我们无法循环读取它。这就是为什么我们只读取它的第一个块(B 行)。
我们在 A 行创建的缓冲区在 B 行之后被传输,因此无法读取。
10.9.3 示例:压缩可读的字节流
在下面的示例中,我们创建一个可读的字节流,并将其通过一个将其压缩为 GZIP 格式的流:
const readableByteStream = new ReadableStream({ type: 'bytes', start(controller) { // 256 zeros controller.enqueue(new Uint8Array(256)); controller.close(); }, }); const transformedStream = readableByteStream.pipeThrough( new CompressionStream('gzip')); await logChunks(transformedStream); async function logChunks(readableByteStream) { const reader = readableByteStream.getReader(); try { while (true) { const {done, value} = await reader.read(); if (done) break; console.log(value); } } finally { reader.releaseLock(); } }
10.9.4 示例:通过fetch()
读取网页
fetch()
的结果解析为一个响应对象,其属性.body
是一个可读的字节流。我们通过TextDecoderStream
将该字节流转换为文本流:
const response = await fetch('https://example.com'); const readableByteStream = response.body; const readableStream = readableByteStream.pipeThrough( new TextDecoderStream('utf-8')); for await (const stringChunk of readableStream) { console.log(stringChunk); }
10.10 Node.js 特定的辅助函数
Node.js 是唯一支持以下辅助函数的 Web 平台,它称之为实用消费者:
import { arrayBuffer, blob, buffer, json, text, } from 'node:stream/consumers';
这些函数将 Web ReadableStreams、Node.js Readables 和 AsyncIterators 转换为被满足的 Promise:
- ArrayBuffers(
arrayBuffer()
) - Blobs(
blob()
) - Node.js 缓冲区(
buffer()
) - JSON 对象(
json()
) - 字符串(
text()
)
假定二进制数据为 UTF-8 编码:
import * as streamConsumers from 'node:stream/consumers'; const readableByteStream = new ReadableStream({ type: 'bytes', start(controller) { // TextEncoder converts strings to UTF-8 encoded Uint8Arrays const encoder = new TextEncoder(); const view = encoder.encode('"😀"'); assert.deepEqual( view, Uint8Array.of(34, 240, 159, 152, 128, 34) ); controller.enqueue(view); controller.close(); }, }); const jsonData = await streamConsumers.json(readableByteStream); assert.equal(jsonData, '😀');
字符串流按预期工作:
import * as streamConsumers from 'node:stream/consumers'; const readableByteStream = new ReadableStream({ start(controller) { controller.enqueue('"😀"'); controller.close(); }, }); const jsonData = await streamConsumers.json(readableByteStream); assert.equal(jsonData, '😀');
10.11 进一步阅读
本节提到的所有材料都是本章的来源。
本章不涵盖 Web 流 API 的每个方面。您可以在此处找到更多信息:
- “WHATWG 流标准” by Adam Rice, Domenic Denicola, Mattias Buelens, and 吉野剛史 (Takeshi Yoshino)
- “Web Streams API” in Node.js 文档
更多材料:
- Web 流 API:
- “在 Node.js 中实现 Web 流 API” by James M. Snell
- “流 API” 在 MDN 上
- “流-权威指南” by Thomas Steiner
- 背压:
- “Node.js 流中的背压” by Vladimir Topolev
- “流中的背压” in Node.js 文档
- Unicode(代码点,UTF-8,UTF-16 等):“Unicode 简介”章节 in “JavaScript for impatient programmers”
- “异步迭代”章节 in “JavaScript for impatient programmers”
- “Typed Arrays:处理二进制数据”章节 in “JavaScript for impatient programmers”
十一、流配方
exploringjs.com/nodejs-shell-scripting/ch_stream-recipes.html
- 11.1 写入标准输出(stdout)
- 11.1.1 通过
console.log()
写入 stdout - 11.1.2 通过 Node.js 流写入 stdout
- 11.1.3 通过 Web 流写入 stdout
- 11.2 写入标准错误(stderr)
- 11.3 从标准输入(stdin)读取
- 11.3.1 通过 Node.js 流从 stdin 读取
- 11.3.2 通过 Web 流从 stdin 读取
- 11.3.3 通过模块
'node:readline'
从 stdin 读取
- 11.4 Node.js 流配方
- 11.5 Web 流配方
11.1 写入标准输出(stdout)
这是写入 stdout 的三个选项:
- 我们可以通过
console.log()
写入它。 - 我们可以通过 Node.js 流写入它。
- 我们可以通过 Web 流写入它。
11.1.1 通过console.log()
写入 stdout
console.log(format, ...args)
写入 stdout 并始终附加换行符'\n'
(即使在 Windows 上也是如此)。第一个参数可以包含占位符,这些占位符的解释方式与util.format()
相同:
console.log('String: %s Number: %d Percent: %%', 'abc', 123); const obj = {one: 1, two: 2}; console.log('JSON: %j Object: %o', obj, obj); // Output: // 'String: abc Number: 123 Percent: %' // 'JSON: {"one":1,"two":2} Object: { one: 1, two: 2 }'
第一个参数之后的所有参数始终显示在输出中,即使没有足够的占位符。
11.1.2 通过 Node.js 流写入 stdout
process.stdout
是stream.Readable
的一个实例。这意味着我们可以像使用其他 Node.js 流一样使用它-例如:
process.stdout.write('two'); process.stdout.write(' words'); process.stdout.write('\n');
前面的代码等同于:
console.log('two words');
请注意,这种情况下末尾没有换行符,因为console.log()
总是会添加一个。
如果我们使用.write()
来处理大量数据,我们应该考虑回压,如§9.5.2.1“writable.write(chunk)
”中所解释的那样。
以下配方适用于process.stdout
:§11.4“Node.js 流配方”。
11.1.3 通过 Web 流写入 stdout
我们可以将process.stdout
转换为 Web 流并写入其中:
import {Writable} from 'node:stream'; const webOut = Writable.toWeb(process.stdout); const writer = webOut.getWriter(); try { await writer.write('First line\n'); await writer.write('Second line\n'); await writer.close(); } finally { writer.releaseLock() }
以下配方适用于webOut
:§11.5“Web 流配方”。
11.2 写入标准错误(stderr)
写入 stderr 的工作方式与写入 stdout 类似:
- 我们可以通过
console.error()
写入它。 - 我们可以通过 Node.js 流写入它。
- 我们可以通过 Web 流写入它。
有关更多信息,请参阅前一节。
11.3 从标准输入(stdin)读取
这些是从 stdin 读取的选项:
- 我们可以通过 Node.js 流从中读取。
- 我们可以通过 Web 流从中读取。
- 我们可以使用模块
'node:readline'
。
11.3.1 通过 Node.js 流从 stdin 读取
process.stdin
是stream.Writable
的一个实例。这意味着我们可以像使用其他 Node.js 流一样使用它:
// Switch to text mode (otherwise we get chunks of binary data) process.stdin.setEncoding('utf-8'); for await (const chunk of process.stdin) { console.log('>', chunk); }
以下配方适用于webIn
:§11.4“Node.js 流配方”。
11.3.2 通过 Web 流从 stdin 读取
我们首先必须将process.stdin
转换为 Web 流:
import {Readable} from 'node:stream'; // Switch to text mode (otherwise we get chunks of binary data) process.stdin.setEncoding('utf-8'); const webIn = Readable.toWeb(process.stdin); for await (const chunk of webIn) { console.log('>', chunk); }
以下配方适用于webIn
:§11.5“Web 流配方”。
11.3.3 通过模块'node:readline'
从 stdin 读取
内置模块'node:readline'
允许我们提示用户以交互方式输入信息-例如:
import * as fs from 'node:fs'; import * as readline from 'node:readline/promises'; const rl = readline.createInterface({ input: process.stdin, output: process.stdout, }); const filePath = await rl.question('Please enter a file path: '); fs.writeFileSync(filePath, 'Hi!', {encoding: 'utf-8'}) rl.close();
有关模块'node:readline'
的更多信息,请参见:
- §9.3.3“通过模块’node:readlines’从可读流中读取行”
- 官方文档。
11.4 Node.js 流配方
可读流:
- §9.3.1.2“
Readable.from()
: 从可迭代对象创建可读流” - §9.3.2“通过
for-await-of
从可读流中读取块”
- §9.3.2.1“在字符串中收集可读流的内容”
- §9.3.3“通过模块’node:readlines’从可读流中读取行”
- §9.4“通过异步生成器转换可读流”
- §9.4.1“在异步可迭代对象中从块转换为编号行”
可写流:
- §9.5.2“写入可写流”
- §9.5.2.2“通过
stream.pipeline()
将可读流传输到可写流”
11.5 网络流配方
从中创建一个 ReadableStream:
- 字符串:§10.3.1“实现基础源的第一个示例”
- 可迭代对象:§10.3.2.2“示例:从拉取源创建一个 ReadableStream”
从 ReadableStream 中读取:
- §10.2.1“通过读取器消耗 ReadableStreams”
- §10.2.2“通过异步迭代消耗 ReadableStreams”
- §10.2.2.2“示例:组装包含 ReadableStream 内容的字符串”
- §10.2.3“将 ReadableStreams 传输到 WritableStreams”
转换 ReadableStreams:
- §10.6“使用 TransformStreams”
- §10.7.2“提示:异步生成器也非常适合转换流”
- §10.7.1“示例:将任意块的流转换为行流”
使用 WritableStreams:
- §10.4“写入可写流”
- §10.5.2“示例:在字符串中收集写入到 WriteStream 的块”
十二、在子进程中运行 shell 命令
原文:
exploringjs.com/nodejs-shell-scripting/ch_nodejs-child-process.html
译者:飞龙
- 12.1 本章概述](ch_nodejs-child-process.html#overview-of-this-chapter)
- 12.1.1 Windows vs. Unix
- 12.1.2 我们在示例中经常使用的功能
- 12.2 异步生成进程:
spawn()
- 12.2.1
spawn()
的工作原理 - 12.2.2 何时执行 shell 命令?](ch_nodejs-child-process.html#when-is-the-shell-command-executed)
- 12.2.3 仅命令模式 vs. 参数模式](ch_nodejs-child-process.html#spawn-argument-modes)
- 12.2.4 向子进程的 stdin 发送数据
- 12.2.5 手动进行管道传输](ch_nodejs-child-process.html#piping-manually)
- 12.2.6 处理不成功的退出(包括错误)
- 12.2.7 等待子进程退出](ch_nodejs-child-process.html#waiting-for-the-exit-of-a-child-process)
- 12.2.8 终止子进程](ch_nodejs-child-process.html#terminating-child-processes)
- 12.3 同步生成进程:
spawnSync()
- 12.3.1 何时执行 shell 命令?](ch_nodejs-child-process.html#when-is-the-shell-command-executed-1)
- 12.3.2 从 stdout 读取](ch_nodejs-child-process.html#reading-from-stdout)
- 12.3.3 向子进程的 stdin 发送数据
- 12.3.4 处理不成功的退出(包括错误)
- 12.4 基于
spawn()
的异步辅助函数
- 12.4.1
exec()
- 12.4.2
execFile()
- 12.5 基于
spawnAsync()
的同步辅助函数
- 12.5.1
execSync()
- 12.5.2
execFileSync()
- 12.6 有用的库](ch_nodejs-child-process.html#useful-libraries)
- 12.6.1 tinysh:用于生成 shell 命令的辅助程序
- 12.6.2 node-powershell:通过 Node.js 执行 Windows PowerShell 命令
- 12.7 在模块
'node:child_process'
的功能之间进行选择
在本章中,我们将探讨如何通过模块'node:child_process'
从 Node.js 执行 shell 命令。
12.1 本章概述
模块'node:child_process'
有一个用于执行 shell 命令(在生成的子进程中)的函数,有两个版本:
- 一个异步版本的
spawn()
。 - 一个同步版本的
spawnSync()
。
我们将首先探讨spawn()
,然后是spawnSync()
。最后,我们将看一下基于它们并且相对类似的以下函数:
- 基于
spawn()
:
exec()
execFile()
- 基于
spawnSync()
:
execSync()
execFileSync()
12.1.1 Windows vs. Unix
本章中显示的代码在 Unix 上运行,但我也在 Windows 上进行了测试-其中大部分代码需要进行轻微更改(例如以'\r\n'
而不是'\n'
结尾)。
12.1.2 我们在示例中经常使用的功能
以下功能在示例中经常出现。这就是为什么在这里解释一次:
- 断言:对于原始值使用
assert.equal()
,对于对象使用assert.deepEqual()
。示例中从未显示必要的导入:
import * as assert from 'node:assert/strict';
- 函数
Readable.toWeb()
将 Node 的原生stream.Readable
转换为 web 流(ReadableStream
的实例)。这在§10“在 Node.js 上使用 web 流”中有解释。示例中始终导入Readable
。 - 异步函数
readableStreamToString()
会消耗可读的 web 流并返回一个字符串(包装在 Promise 中)。这在 web 流章节中有解释。假定这个函数在示例中是可用的。
12.2 异步生成进程:spawn()
12.2.1 spawn()
的工作原理
spawn( command: string, args?: Array<string>, options?: Object ): ChildProcess
spawn()
异步地在新进程中执行命令:该进程与 Node 的主 JavaScript 进程并行运行,我们可以通过各种方式与其通信(通常通过流)。
接下来,有关spawn()
的参数和结果的文档。如果您喜欢通过示例学习,可以跳过该内容,继续阅读后面的小节。
12.2.1.1 参数:command
command
是一个包含 shell 命令的字符串。有两种使用该参数的模式:
- 仅命令模式:省略
args
,command
包含整个 shell 命令。我们甚至可以使用 shell 功能,如在多个可执行文件之间进行管道传输,将 I/O 重定向到文件,变量和通配符。
options.shell
必须为true
,因为我们需要一个 shell 来处理 shell 功能。
- 参数模式:
command
仅包含命令的名称,args
包含其参数。
- 如果
options.shell
为true
,则参数中的许多元字符会被解释,并且通配符和变量名称等功能会起作用。 - 如果
options.shell
为false
,则字符串会直接使用,我们不必转义元字符。
这两种模式在本章后面进行了演示。
12.2.1.2 参数:options
以下options
最有趣:
.shell: boolean|string
(默认值:false
)是否应使用 shell 来执行命令?
- 在 Windows 上,此选项几乎总是应为
true
。例如,否则无法执行.bat
和.cmd
文件。 - 在 Unix 上,只有核心 shell 功能(例如管道,I/O 重定向,文件名通配符和变量)在
.shell
为false
时不可用。 - 如果
.shell
为true
,我们必须小心处理用户输入并对其进行清理,因为很容易执行任意代码。如果我们想将其用作非元字符,则还必须转义元字符。 - 我们还可以将
.shell
设置为 shell 可执行文件的路径。然后 Node.js 将使用该可执行文件来执行命令。如果我们将.shell
设置为true
,Node.js 将使用:
- Unix:
'/bin/sh'
- Windows:
process.env.ComSpec
.cwd: string | URL
指定在执行命令时要使用的当前工作目录(CWD)。.stdio: Array<string|Stream>|string
配置标准 I/O 的设置方式。下面会有解释。.env: Object
(默认值:process.env
)让我们为子进程指定 shell 变量。提示:
- 查看
process.env
(例如在 Node.js REPL 中)以查看存在哪些变量。 - 我们可以使用扩展运算符来非破坏性地覆盖现有变量 - 或者如果尚不存在,则创建它:
{env: {...process.env, MY_VAR: 'Hi!'}}
.signal: AbortSignal
如果我们创建了一个 AbortControllerac
,我们可以将ac.signal
传递给spawn()
,并通过ac.abort()
中止子进程。这在本章后面有演示。.timeout: number
如果子进程的执行时间超过.timeout
毫秒,则会被终止。
12.2.1.3 options.stdio
子进程的每个标准 I/O 流都有一个数字 ID,称为文件描述符:
- 标准输入(stdin)的文件描述符为 0。
- 标准输出(stdout)的文件描述符为 1。
- 标准错误(stderr)的文件描述符为 2。
可能会有更多的文件描述符,但这很少见。
options.stdio
配置子进程的流是否以及如何被管道连接到父进程的流。它可以是一个数组,其中每个元素配置等于其索引的文件描述符。可以使用以下值作为数组元素:
'pipe'
:
- 索引 0:将
childProcess.stdin
管道连接到子进程的 stdin。请注意,尽管其名称如此,但前者是属于父进程的流。 - 索引 1:将子进程的 stdout 管道连接到
childProcess.stdout
。 - 索引 2:将子进程的 stderr 管道连接到
childProcess.stderr
。
'ignore'
:忽略子进程的流。'inherit'
:将子进程的流管道连接到父进程的相应流。
- 例如,如果我们希望子进程的 stderr 被记录到控制台,我们可以在索引 2 处使用
'inherit'
。
- 原生 Node.js 流:管道到该流或从该流。
- 还支持其他值,但这超出了本章的范围。
除了通过数组指定options.stdio
之外,我们还可以缩写:
'pipe'
等同于['pipe', 'pipe', 'pipe']
(options.stdio
的默认值)。'ignore'
等同于['ignore', 'ignore', 'ignore']
。'inherit'
等同于['inherit', 'inherit', 'inherit']
。
12.2.1.4 结果:ChildProcess
的实例
spawn()
返回ChildProcess
的实例。
有趣的数据属性:
.exitCode: number | null
包含子进程退出时的代码:
- 0(零)表示正常退出。
- 大于零的数字表示发生了错误。
null
表示进程尚未退出。
.signalCode: string | null
子进程被杀死的 POSIX 信号,或者如果没有被杀死则为null
。有关更多信息,请参阅下面的.kill()
方法的描述。- 流:根据标准 I/O 的配置方式(请参阅前面的小节),以下流变得可用:
.stdin
.stdout
.stderr
.pid: number | undefined
子进程的进程标识符(PID)。如果生成失败,.pid
为undefined
。在调用spawn()
后立即可用此值。
有趣的方法:
.kill(signalCode?: number | string = 'SIGTERM'): boolean
向子进程发送 POSIX 信号(通常导致进程终止):
signal
的 man 页面包含值的列表。- Windows 不支持信号,但 Node.js 模拟了其中一些 - 例如:
SIGINT
,SIGTERM
和SIGKILL
。有关更多信息,请参阅Node.js 文档。
- 此方法在本章后面进行了演示。
有趣的事件:
.on('exit', (exitCode: number|null, signalCode: string|null) => {})
此事件在子进程结束后发出:
- 回调参数为我们提供了退出代码或信号代码:其中一个始终为非空。
- 由于多个进程可能共享相同的流,因此其标准 I/O 流可能仍然打开。事件
'close'
在子进程退出后通知我们所有 stdio 流都已关闭。
.on('error', (err: Error) => {})
如果进程无法被生成(请参阅示例后面)或子进程无法被杀死,则最常见地发出此事件。在此事件之后可能会或可能不会发出'exit'
事件。
我们稍后将看到如何将事件转换为可以等待的 Promise。
12.2.2 shell 命令何时执行?
在使用异步spawn()
时,命令的子进程是异步启动的。以下代码演示了这一点:
import {spawn} from 'node:child_process'; spawn( 'echo', ['Command starts'], { stdio: 'inherit', shell: true, } ); console.log('After spawn()');
这是输出:
After spawn() Command starts
12.2.3 仅命令模式 vs. 参数模式
在本节中,我们以两种方式指定相同的命令调用:
- 仅命令模式:我们通过第一个参数
command
提供整个调用。 - 参数模式:我们通过第一个参数
command
提供命令,通过第二个参数args
提供参数。
12.2.3.1 仅命令模式
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; const childProcess = spawn( 'echo "Hello, how are you?"', { shell: true, // (A) stdio: ['ignore', 'pipe', 'inherit'], // (B) } ); const stdout = Readable.toWeb( childProcess.stdout.setEncoding('utf-8')); // Result on Unix assert.equal( await readableStreamToString(stdout), 'Hello, how are you?\n' // (C) ); // Result on Windows: '"Hello, how are you?"\r\n'
每个带参数的仅命令生成都需要.shell
为true
(A 行)-即使它像这个这么简单。
在 B 行,我们告诉spawn()
如何处理标准 I/O:
- 忽略标准输入。
- 将子进程的标准输出管道到
childProcess.stdout
(属于父进程的流)。 - 将子进程的标准错误输出管道到父进程的标准错误输出。
在这种情况下,我们只对子进程的输出感兴趣。因此,一旦我们处理了输出,我们就完成了。在其他情况下,我们可能需要等到子进程退出。如何做到这一点,稍后会有演示。
在仅命令模式下,我们看到 shell 的更多特殊之处 - 例如,Windows 命令 shell 输出包括双引号(最后一行)。
12.2.3.2 参数模式
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; const childProcess = spawn( 'echo', ['Hello, how are you?'], { shell: true, stdio: ['ignore', 'pipe', 'inherit'], } ); const stdout = Readable.toWeb( childProcess.stdout.setEncoding('utf-8')); // Result on Unix assert.equal( await readableStreamToString(stdout), 'Hello, how are you?\n' ); // Result on Windows: 'Hello, how are you?\r\n'
12.2.3.3 args
中的元字符
让我们探讨一下如果args
中有元字符会发生什么:
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; async function echoUser({shell, args}) { const childProcess = spawn( `echo`, args, { stdio: ['ignore', 'pipe', 'inherit'], shell, } ); const stdout = Readable.toWeb( childProcess.stdout.setEncoding('utf-8')); return readableStreamToString(stdout); } // Results on Unix assert.equal( await echoUser({shell: false, args: ['$USER']}), // (A) '$USER\n' ); assert.equal( await echoUser({shell: true, args: ['$USER']}), // (B) 'rauschma\n' ); assert.equal( await echoUser({shell: true, args: [String.raw`\$USER`]}), // (C) '$USER\n' );
- 如果我们不使用 shell,例如美元符号(
$
)等元字符没有效果(A 行)。 - 在 shell 中,
$USER
被解释为一个变量(B 行)。 - 如果我们不想要这个,我们必须通过反斜杠转义美元符号(C 行)。
其他元字符(如星号(*
))也会产生类似的效果。
这是 Unix shell 元字符的两个例子。Windows shell 有它们自己的元字符和它们自己的转义方式。
12.2.3.4 一个更复杂的 shell 命令
让我们使用更多的 shell 特性(这需要仅命令模式):
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; import {EOL} from 'node:os'; const childProcess = spawn( `(echo cherry && echo apple && echo banana) | sort`, { stdio: ['ignore', 'pipe', 'inherit'], shell: true, } ); const stdout = Readable.toWeb( childProcess.stdout.setEncoding('utf-8')); assert.equal( await readableStreamToString(stdout), 'apple\nbanana\ncherry\n' );
12.2.4 将数据发送到子进程的标准输入
到目前为止,我们只读取了子进程的标准输出。但是我们也可以将数据发送到标准输入:
import {Readable, Writable} from 'node:stream'; import {spawn} from 'node:child_process'; const childProcess = spawn( `sort`, // (A) { stdio: ['pipe', 'pipe', 'inherit'], } ); const stdin = Writable.toWeb(childProcess.stdin); // (B) const writer = stdin.getWriter(); // (C) try { await writer.write('Cherry\n'); await writer.write('Apple\n'); await writer.write('Banana\n'); } finally { writer.close(); } const stdout = Readable.toWeb( childProcess.stdout.setEncoding('utf-8')); assert.equal( await readableStreamToString(stdout), 'Apple\nBanana\nCherry\n' );
我们使用 shell 命令sort
(A 行)来为我们对文本行进行排序。
在 B 行,我们使用Writable.toWeb()
将本机 Node.js 流转换为网络流(更多信息,请参见§10“在 Node.js 上使用网络流”)。
如何通过写入器(C 行)向 WritableStream 写入也在网络流章节中有解释。
12.2.5 手动进行管道传输
我们之前让 shell 执行以下命令:
(echo cherry && echo apple && echo banana) | sort
在下面的例子中,我们手动进行管道传输,从 echo(A 行)到 sorting(B 行):
import {Readable, Writable} from 'node:stream'; import {spawn} from 'node:child_process'; const echo = spawn( // (A) `echo cherry && echo apple && echo banana`, { stdio: ['ignore', 'pipe', 'inherit'], shell: true, } ); const sort = spawn( // (B) `sort`, { stdio: ['pipe', 'pipe', 'inherit'], shell: true, } ); //==== Transferring chunks from echo.stdout to sort.stdin ==== const echoOut = Readable.toWeb( echo.stdout.setEncoding('utf-8')); const sortIn = Writable.toWeb(sort.stdin); const sortInWriter = sortIn.getWriter(); try { for await (const chunk of echoOut) { // (C) await sortInWriter.write(chunk); } } finally { sortInWriter.close(); } //==== Reading sort.stdout ==== const sortOut = Readable.toWeb( sort.stdout.setEncoding('utf-8')); assert.equal( await readableStreamToString(sortOut), 'apple\nbanana\ncherry\n' );
例如echoOut
这样的 ReadableStreams 是异步可迭代的。这就是为什么我们可以使用for-await-of
循环来读取它们的chunks(流数据的片段)。更多信息,请参见§10“在 Node.js 上使用网络流”。
12.2.6 处理不成功的退出(包括错误)
有三种主要的不成功的退出方式:
- 子进程无法生成。
- Shell 中发生了错误。
- 一个进程被终止。
12.2.6.1 子进程无法生成
以下代码演示了如果子进程无法生成会发生什么。在这种情况下,原因是 shell 的路径没有指向可执行文件(A 行)。
import {spawn} from 'node:child_process'; const childProcess = spawn( 'echo hello', { stdio: ['inherit', 'inherit', 'pipe'], shell: '/bin/does-not-exist', // (A) } ); childProcess.on('error', (err) => { // (B) assert.equal( err.toString(), 'Error: spawn /bin/does-not-exist ENOENT' ); });
这是我们第一次使用事件来处理子进程。在 B 行,我们为'error'
事件注册了一个事件监听器。当前代码片段完成后,子进程开始。这有助于防止竞争条件:当我们开始监听时,我们可以确保事件尚未被触发。
12.2.6.2 Shell 中发生了错误
如果 shell 代码包含错误,我们不会收到'error'
事件(B 行),而是会收到一个带有非零退出代码的'exit'
事件(A 行):
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; const childProcess = spawn( 'does-not-exist', { stdio: ['inherit', 'inherit', 'pipe'], shell: true, } ); childProcess.on('exit', async (exitCode, signalCode) => { // (A) assert.equal(exitCode, 127); assert.equal(signalCode, null); const stderr = Readable.toWeb( childProcess.stderr.setEncoding('utf-8')); assert.equal( await readableStreamToString(stderr), '/bin/sh: does-not-exist: command not found\n' ); } ); childProcess.on('error', (err) => { // (B) console.error('We never get here!'); });
12.2.6.3 进程被终止
如果在 Unix 上终止进程,退出代码是null
(C 行),信号代码是一个字符串(D 行):
import {Readable} from 'node:stream'; import {spawn} from 'node:child_process'; const childProcess = spawn( 'kill $$', // (A) { stdio: ['inherit', 'inherit', 'pipe'], shell: true, } ); console.log(childProcess.pid); // (B) childProcess.on('exit', async (exitCode, signalCode) => { assert.equal(exitCode, null); // (C) assert.equal(signalCode, 'SIGTERM'); // (D) const stderr = Readable.toWeb( childProcess.stderr.setEncoding('utf-8')); assert.equal( await readableStreamToString(stderr), '' // (E) ); });
请注意,没有错误输出(E 行)。
子进程不是自己终止(A 行),我们也可以暂停它更长时间,然后通过我们在 B 行记录的进程 ID 手动终止它。
如果我们在 Windows 上杀死一个子进程会发生什么?
exitCode
是1
。signalCode
是null
。
12.2.7 等待子进程退出
有时我们只想等到命令执行完毕。这可以通过事件和 Promise 来实现。
12.2.7.1 通过事件等待
import * as fs from 'node:fs'; import {spawn} from 'node:child_process'; const childProcess = spawn( `(echo first && echo second) > tmp-file.txt`, { shell: true, stdio: 'inherit', } ); childProcess.on('exit', (exitCode, signalCode) => { // (A) assert.equal(exitCode, 0); assert.equal(signalCode, null); assert.equal( fs.readFileSync('tmp-file.txt', {encoding: 'utf-8'}), 'first\nsecond\n' ); });
我们使用标准的 Node.js 事件模式,并为 'exit'
事件注册了一个监听器(A 行)。
12.2.7.2 通过 Promises 等待
import * as fs from 'node:fs'; import {spawn} from 'node:child_process'; const childProcess = spawn( `(echo first && echo second) > tmp-file.txt`, { shell: true, stdio: 'inherit', } ); const {exitCode, signalCode} = await onExit(childProcess); // (A) assert.equal(exitCode, 0); assert.equal(signalCode, null); assert.equal( fs.readFileSync('tmp-file.txt', {encoding: 'utf-8'}), 'first\nsecond\n' );
我们在 A 行使用的辅助函数 onExit()
返回一个 Promise,如果触发了 'exit'
事件,它就会被满足:
export function onExit(eventEmitter) { return new Promise((resolve, reject) => { eventEmitter.once('exit', (exitCode, signalCode) => { if (exitCode === 0) { // (B) resolve({exitCode, signalCode}); } else { reject(new Error( `Non-zero exit: code ${exitCode}, signal ${signalCode}`)); } }); eventEmitter.once('error', (err) => { // (C) reject(err); }); }); }
如果 eventEmitter
失败,返回的 Promise 被拒绝,await
在 A 行抛出异常。onExit()
处理两种失败情况:
exitCode
不是零(B 行)。发生了这种情况:
- 如果有 shell 错误。那么
exitCode
大于零。 - 如果在 Unix 上杀死子进程。那么
exitCode
是null
,signalCode
是非空的。
- 在 Windows 上杀死子进程会产生一个 shell 错误。
- 一个
'error'
事件被触发(C 行)。如果孩子进程无法被生成,就会发生这种情况。
12.2.8 终止子进程
12.2.8.1 通过 AbortController 终止子进程
在这个例子中,我们使用 AbortController 来终止一个 shell 命令:
import {spawn} from 'node:child_process'; const abortController = new AbortController(); // (A) const childProcess = spawn( `echo Hello`, { stdio: 'inherit', shell: true, signal: abortController.signal, // (B) } ); childProcess.on('error', (err) => { assert.equal( err.toString(), 'AbortError: The operation was aborted' ); }); abortController.abort(); // (C)
我们创建一个 AbortController(A 行),将其信号传递给 spawn()
(B 行),并通过 AbortController 终止 shell 命令(C 行)。
子进程是异步启动的(在当前代码片段执行后)。这就是为什么我们可以在进程甚至开始之前中止,以及为什么在这种情况下我们看不到任何输出。
12.2.8.2 通过 .kill()
终止子进程
在下一个例子中,我们通过方法 .kill()
终止一个子进程(最后一行):
import {spawn} from 'node:child_process'; const childProcess = spawn( `echo Hello`, { stdio: 'inherit', shell: true, } ); childProcess.on('exit', (exitCode, signalCode) => { assert.equal(exitCode, null); assert.equal(signalCode, 'SIGTERM'); }); childProcess.kill(); // default argument value: 'SIGTERM'
再次,在孩子进程开始之前我们就杀死了它(异步!),并且没有输出。
12.3 同步生成进程:spawnSync()
spawnSync( command: string, args?: Array<string>, options?: Object ): Object
spawnSync()
是 spawn()
的同步版本 - 它会等待子进程退出,然后同步返回一个对象。
参数大多与spawn()
相同。options
有一些额外的属性 - 例如:
.input: string | TypedArray | DataView
如果这个属性存在,它的值将被发送到子进程的标准输入。.encoding: string
(默认:'buffer'
)
指定用于所有标准 I/O 流的编码。
该函数返回一个对象。它最有趣的属性是:
.stdout: Buffer | string
包含写入子进程标准输出流的内容。.stderr: Buffer | string
包含写入子进程标准错误流的内容。.status: number | null
包含子进程的退出代码或null
。退出代码或信号代码中的一个是非空的。.signal: string | null
包含孩子进程的信号代码或null
。退出代码或信号代码中的一个是非空的。.error?: Error
只有在生成失败时才会创建这个属性,然后包含一个错误对象。
使用异步的 spawn()
时,子进程并行运行,我们可以通过流读取标准 I/O。相反,同步的 spawnSync()
收集流的内容并将其同步返回给我们(见下一小节)。
12.3.1 shell 命令何时执行?
使用同步的 spawnSync()
时,命令的子进程是同步启动的。以下代码演示了这一点:
import {spawnSync} from 'node:child_process'; spawnSync( 'echo', ['Command starts'], { stdio: 'inherit', shell: true, } ); console.log('After spawnSync()');
这是输出:
Command starts After spawnSync()
12.3.2 从标准输出读取
以下代码演示了如何读取标准输出:
import {spawnSync} from 'node:child_process'; const result = spawnSync( `echo rock && echo paper && echo scissors`, { stdio: ['ignore', 'pipe', 'inherit'], // (A) encoding: 'utf-8', // (B) shell: true, } ); console.log(result); assert.equal( result.stdout, // (C) 'rock\npaper\nscissors\n' ); assert.equal(result.stderr, null); // (D)
在 A 行,我们使用 options.stdio
告诉 spawnSync()
我们只对标准输出感兴趣。我们忽略标准输入,并将标准错误传输到父进程。
因此,我们只能得到标准输出的结果属性(C 行),标准错误的属性是 null
(D 行)。
由于我们无法访问spawnSync()
内部使用的流来处理子进程的标准 I/O,我们通过options.encoding
(B 行)告诉它使用哪种编码。
12.3.3 向子进程的 stdin 发送数据
我们可以通过选项属性.input
(A 行)向子进程的标准输入流发送数据:
import {spawnSync} from 'node:child_process'; const result = spawnSync( `sort`, { stdio: ['pipe', 'pipe', 'inherit'], encoding: 'utf-8', input: 'Cherry\nApple\nBanana\n', // (A) } ); assert.equal( result.stdout, 'Apple\nBanana\nCherry\n' );
12.3.4 处理不成功的退出(包括错误)
有三种主要的不成功的退出情况(当退出代码不为零时):
- 子进程无法被生成。
- shell 中发生错误。
- 进程被终止。
12.3.4.1 子进程无法生成
如果生成失败,spawn()
会发出一个'error'
事件。相比之下,spawnSync()
将result.error
设置为一个错误对象:
import {spawnSync} from 'node:child_process'; const result = spawnSync( 'echo hello', { stdio: ['ignore', 'inherit', 'pipe'], encoding: 'utf-8', shell: '/bin/does-not-exist', } ); assert.equal( result.error.toString(), 'Error: spawnSync /bin/does-not-exist ENOENT' );
12.3.4.2 shell 中发生错误
如果在 shell 中发生错误,退出代码result.status
大于零,result.signal
为null
:
import {spawnSync} from 'node:child_process'; const result = spawnSync( 'does-not-exist', { stdio: ['ignore', 'inherit', 'pipe'], encoding: 'utf-8', shell: true, } ); assert.equal(result.status, 127); assert.equal(result.signal, null); assert.equal( result.stderr, '/bin/sh: does-not-exist: command not found\n' );
12.3.4.3 进程被终止
如果在 Unix 上终止子进程,result.signal
包含信号的名称,result.status
为null
:
import {spawnSync} from 'node:child_process'; const result = spawnSync( 'kill $$', { stdio: ['ignore', 'inherit', 'pipe'], encoding: 'utf-8', shell: true, } ); assert.equal(result.status, null); assert.equal(result.signal, 'SIGTERM'); assert.equal(result.stderr, ''); // (A)
请注意,没有输出发送到标准错误流(A 行)。
如果我们在 Windows 上终止一个子进程:
result.status
为 1result.signal
为null
result.stderr
为''
12.4 基于spawn()
的异步辅助函数
在本节中,我们将看到基于spawn()
的两个异步函数:
exec()
execFile()
在本章中,我们忽略了fork()
。引用Node.js 文档:
fork()
生成一个新的 Node.js 进程,并调用一个指定的模块,建立了一个 IPC 通信通道,允许在父进程和子进程之间发送消息。
12.4.1 exec()
exec( command: string, options?: Object, callback?: (error, stdout, stderr) => void ): ChildProcess
exec()
在新生成的 shell 中运行一个命令。与spawn()
的主要区别在于:
- 除了返回一个 ChildProcess,
exec()
还通过回调函数传递结果:错误对象或 stdout 和 stderr 的内容。 - 错误原因:子进程无法生成,shell 错误,子进程被终止。
- 相比之下,
spawn()
只在子进程无法被生成时发出'error'
事件。另外两种失败是通过退出代码和(在 Unix 上)信号代码来处理的。
- 没有参数
args
。 options.shell
的默认值为true
。
import {exec} from 'node:child_process'; const childProcess = exec( 'echo Hello', (error, stdout, stderr) => { if (error) { console.error('error: ' + error.toString()); return; } console.log('stdout: ' + stdout); // 'stdout: Hello\n' console.error('stderr: ' + stderr); // 'stderr: ' } );
exec()
可以通过util.promisify()
转换为基于 Promise 的函数:
- ChildProcess 成为返回的 Promise 的属性。
- Promise 的解决方式如下:
- 完成值:
{stdout, stderr}
- 拒绝值:与回调函数的参数
error
相同,但有两个额外的属性:.stdout
和.stderr
。
import * as util from 'node:util'; import * as child_process from 'node:child_process'; const execAsync = util.promisify(child_process.exec); try { const resultPromise = execAsync('echo Hello'); const {childProcess} = resultPromise; const obj = await resultPromise; console.log(obj); // { stdout: 'Hello\n', stderr: '' } } catch (err) { console.error(err); }
12.4.2 execFile()
execFile(file, args?, options?, callback?): ChildProcess
与exec()
类似,具有以下区别:
- 支持参数
args
。 options.shell
的默认值为false
。
与exec()
类似,execFile()
可以通过util.promisify()
转换为基于 Promise 的函数。
12.5 基于spawnAsync()
的同步辅助函数
12.5.1 execSync()
execSync( command: string, options?: Object ): Buffer | string
execSync()
在一个新的子进程中运行一个命令,并同步等待该进程退出。与spawnSync()
的主要区别在于:
- 只返回 stdout 的内容。
- 三种失败通过异常报告:子进程无法生成,shell 错误,子进程被终止。
- 相比之下,
spawnSync()
的结果只有一个.error
属性,如果子进程无法被生成。另外两种失败是通过退出代码和(在 Unix 上)信号代码来处理的。
- 没有参数
args
。 options.shell
的默认值为true
。
import {execSync} from 'node:child_process'; try { const stdout = execSync('echo Hello'); console.log('stdout: ' + stdout); // 'stdout: Hello\n' } catch (err) { console.error('Error: ' + err.toString()); }
12.5.2 execFileSync()
execFileSync(file, args?, options?): Buffer | string
与execSync()
类似,但有以下区别:
- 支持参数
args
。 options.shell
的默认值是false
。
12.6 有用的库
12.6.1 tinysh:生成 shell 命令的辅助程序
tinysh由 Anton Medvedev 是一个帮助生成 shell 命令的小型库-例如:
import sh from 'tinysh'; console.log(sh.ls('-l')); console.log(sh.cat('README.md'));
我们可以通过使用.call()
将对象作为this
传递来覆盖默认选项:
sh.tee.call({input: 'Hello, world!'}, 'file.txt');
我们可以使用任何属性名称,tinysh 会使用该名称执行 shell 命令。它通过代理实现了这一壮举。这是实际库的略微修改版本:
import {execFileSync} from 'node:child_process'; const sh = new Proxy({}, { get: (_, bin) => function (...args) { // (A) return execFileSync(bin, args, { encoding: 'utf-8', shell: true, ...this // (B) } ); }, });
在 A 行中,我们可以看到如果从sh
获取名为bin
的属性,则返回一个调用execFileSync()
并使用bin
作为第一个参数的函数。
在 B 行中传播this
使我们能够通过.call()
指定选项。默认值首先出现,以便可以通过this
进行覆盖。
12.6.2 node-powershell:通过 Node.js 执行 Windows PowerShell 命令
在 Windows 上使用node-powershell 库的示例如下:
import { PowerShell } from 'node-powershell'; PowerShell.$`echo "hello from PowerShell"`;
12.7 在模块'node:child_process'
的函数之间进行选择
一般约束:
- 在执行命令时,其他异步任务是否应该运行?
- 使用任何异步函数。
- 您是否只执行一个命令(没有后台异步任务)?
- 使用任何同步函数。
- 您想通过流访问子进程的 stdin 或 stdout 吗?
- 只有异步函数才能让您访问流:在这种情况下,
spawn()
更简单,因为它没有提供传递错误和标准 I/O 内容的回调。
- 您想在字符串中捕获 stdout 或 stderr 吗?
- 异步选项:
exec()
和execFile()
- 同步选项:
spawnSync()
,execSync()
,execFileSync()
异步函数-在spawn()
和exec()
或execFile()
之间进行选择:
exec()
和execFile()
有两个好处:
- 由于它们都通过第一个回调参数报告,因此更容易处理失败。
- 获取 stdout 和 stderr 作为字符串更容易-由于回调。
- 如果这些好处对您不重要,您可以选择
spawn()
。它的签名更简单,没有(可选的)回调。
同步函数-在spawnSync()
和execSync()
或execFileSync()
之间进行选择:
execSync()
和execFileSync()
有两个特点:
- 它们返回一个包含 stdout 内容的字符串。
- 由于它们都通过异常报告,因此更容易处理失败。
- 如果您需要比
execSync()
和execFileSync()
通过它们的返回值和异常提供的更多信息,则选择spawnSync()
。
在exec()
和execFile()
之间进行选择(选择execSync()
和execFileSync()
时适用相同的参数):
options.shell
在exec()
中的默认值为true
,但在execFile()
中为false
。execFile()
支持args
,exec()
不支持。
第四部分:处理包
原文:
exploringjs.com/nodejs-shell-scripting/pt_packages.html
译者:飞龙
接下来:13 安装 npm 包并运行 bin 脚本
十三、安装 npm 包并运行 bin 脚本
原文:
exploringjs.com/nodejs-shell-scripting/ch_installing-packages.html
译者:飞龙
- 13.1 全局安装 npm 注册表包
- 13.1.1 哪些包是全局安装的?
npm ls -g
(ch_installing-packages.html#which-packages-are-installed-globally-npm-ls–g) - 13.1.2 全局安装的包在哪里?
npm root -g
(ch_installing-packages.html#where-are-packages-installed-globally-npm-root–g) - 13.1.3 全局安装的 shell 脚本在哪里?
npm bin -g
(ch_installing-packages.html#where-are-shell-scripts-installed-globally-npm-bin–g) - 13.1.4 全局安装的包在哪里?npm 安装前缀
- 13.1.5 更改全局安装包的位置
- 13.2 在本地安装 npm 注册表包
- 13.2.1 在本地安装 bin 脚本
- 13.3 安装未发布的包
- 13.3.1
npm link
: 全局安装未发布的包 - 13.3.2
npm link
: 在本地安装全局链接的包 - 13.3.3
npm link
: 撤消链接 - 13.3.4 通过本地路径安装未发布的包(ch_installing-packages.html#installing-unpublished-packages-via-local-paths)
- 13.3.5 安装未发布包的其他方法
- 13.4
npx
: 在不安装的情况下运行 npm 包中的 bin 脚本
- 13.4.1 npx 缓存
package.json
属性 "bin"
允许 npm 包指定它提供的 shell 脚本(有关更多信息,请参见§14“创建跨平台 shell 脚本”)。如果我们安装了这样的包,Node.js 会确保我们可以从命令行访问这些 shell 脚本(称为bin 脚本)。在本章中,我们探讨了两种安装带有 bin 脚本的包的方法:
- 在本地安装带有 bin 脚本的包意味着将其安装为包内的依赖项。这些脚本只能在该包内访问。
- 全局安装带有 bin 脚本的包意味着将其安装在“全局位置”,以便脚本可以在任何地方访问-无论是当前用户还是系统的所有用户(取决于 npm 的设置方式)。
我们探讨了所有这些的含义以及我们如何在安装后运行 bin 脚本。
13.1 全局安装 npm 注册表包
包cowsay
具有以下 package.json
属性:
"bin": { "cowsay": "./cli.js", "cowthink": "./cli.js" },
要全局安装此包,我们使用 npm install -g
:
npm install -g cowsay
注意:在 Unix 上,我们可能需要使用 sudo
(我们很快将学会如何避免这样做):
sudo npm install -g cowsay
之后,我们可以在命令行中使用 cowsay
和 cowthink
命令。
请注意,只有 bin 脚本在全局可用。当 Node.js 在node_modules
目录中查找裸模块规范时,包会被忽略。
13.1.1 哪些包是全局安装的? npm ls -g
我们可以检查全局安装的包以及它们的位置:
% npm ls -g /usr/local/lib ├── corepack@0.12.1 ├── cowsay@1.5.0 └── npm@8.15.0
在 Windows 上,安装路径是 %AppData%\npm
,例如:
>echo %AppData%\npm C:\Users\jane\AppData\Roaming\npm
13.1.2 全局安装的包在哪里? npm root -g
macOS 上的结果:
% npm root -g /usr/local/lib/node_modules
Windows 上的结果:
>npm root -g C:\Users\jane\AppData\Roaming\npm\node_modules
13.1.3 全局安装的 shell 脚本在哪里? npm bin -g
npm bin -g
告诉我们 npm 全局安装 shell 脚本的位置。它还确保该目录在 shell PATH 中可用。
macOS 上的结果:
% npm bin -g /usr/local/bin % which cowsay /usr/local/bin/cowsay
在 Windows 命令 shell 上的结果:
>npm bin -g C:\Users\jane\AppData\Roaming\npm >where cowsay C:\Users\jane\AppData\Roaming\npm\cowsay C:\Users\jane\AppData\Roaming\npm\cowsay.cmd
没有文件名扩展名的可执行文件cowsay
是针对基于 Unix 的 Windows 环境(如 Cygwin、MinGW 和 MSYS)的。
Windows PowerShell 返回gcm cowsay
的路径:
C:\Users\jane\AppData\Roaming\npm\cowsay.ps1
13.1.4 全局安装的包在哪里?npm 安装前缀
npm 的安装前缀决定了全局安装包和 bin 脚本的安装位置。
这是 macOS 上的安装前缀:
% npm config get prefix /usr/local
因此:
- 包安装在
/usr/local/lib/node_modules
中 - Bin 脚本安装在
/usr/local/bin
中
这是 Windows 上的安装前缀:
>npm config get prefix C:\Users\jane\AppData\Roaming\npm
因此:
- 包安装在
C:\Users\jane\AppData\Roaming\npm\node_modules
中 - Bin 脚本安装在
C:\Users\jane\AppData\Roaming\npm
中
13.1.5 改变全局安装包位置
在这一部分,我们将研究两种改变全局安装包位置的方法:
- 更改 npm 安装前缀
- 使用 Node.js 版本管理器
13.1.5.1 改变 npm 安装前缀
改变全局安装包位置的一种方法是改变 npm 的安装前缀。
Unix:
mkdir ~/npm-global npm config set prefix '~/npm-global'
Windows 命令 shell:
mkdir "%UserProfile%\npm-global" npm config set prefix "%UserProfile%\npm-global"
Windows PowerShell:
mkdir "$env:UserProfile\npm-global" npm config set prefix "$env:UserProfile\npm-global"
配置数据保存在主目录中的.npmrc
文件中。
从现在开始,全局安装将被添加到我们刚刚指定的目录中。
之后,我们仍然需要将npm bin -g
目录添加到我们的 shell PATH 中,以便我们的 shell 可以找到我们全局安装的 bin 脚本。
**更改 npm 前缀的一个缺点:**如果我们告诉 npm 升级自己,它现在也会安装到新位置。
13.1.5.2 使用 Node.js 版本管理器
Node.js 版本管理器可以让我们同时安装多个 Node.js 版本并在它们之间切换。流行的版本管理器包括:
13.2 安装 npm 注册包到本地
要本地安装 npm 注册包(如cowsay
),我们需要执行以下操作:
cd my-package/ npm install cowsay
这将向package.json
添加以下数据:
"dependencies": { "cowsay": "¹.5.0", ··· }
此外,该包被下载到以下目录:
my-package/node_modules/cowsay/
在 Unix 上,npm 为 bin 脚本添加了这些符号链接:
my-package/node_modules/.bin/cowsay -> ../cowsay/cli.js my-package/node_modules/.bin/cowthink -> ../cowsay/cli.js
在 Windows 上,npm 将这些文件添加到my-package\node_modules\.bin\
中:
cowsay cowsay.cmd cowsay.ps1 cowthink cowthink.cmd cowthink.ps1
没有扩展名的文件是针对基于 Unix 的 Windows 环境(如 Cygwin、MinGW 和 MSYS)的脚本。
npm bin
告诉我们本地安装的 bin 脚本的位置 - 例如:
% npm bin /Users/john/my-package/node_modules/.bin
注意:本地安装的包始终安装在package.json
文件旁边的node_modules
目录中。如果当前目录中不存在package.json
,npm 会在祖先目录中搜索并在那里安装包。要检查 npm 在本地安装包的位置,我们可以使用npm root
命令 - 例如(Unix):
% cd $HOME % npm root /Users/john/node_modules
John 的主目录中没有package.json
,但 npm 无法在祖先目录中安装任何内容,这就是为什么npm root
显示这个目录。在当前位置本地安装包将导致创建package.json
并像往常一样进行安装。
13.2.1 运行本地安装的 bin 脚本
(本小节中的所有命令都在my-package
目录中执行。)
13.2.1.1 直接运行 bin 脚本
我们可以从 shell 中如下运行cowsay
:
./node_modules/.bin/cowsay Hello
在 Unix 上,我们可以设置一个辅助程序:
alias npm-exec='PATH=$(npm bin):$PATH'
然后以下命令有效:
npm-exec cowsay Hello
13.2.1.2 通过包脚本运行 bin 脚本
我们还可以在package.json
中添加一个包脚本:
{ ··· "scripts": { "cowsay": "cowsay" }, ··· }
现在我们可以在 shell 中执行这个命令:
npm run cowsay Hello
这是因为 npm 在 Unix 上临时将以下条目添加到$PATH
中:
/Users/john/my-package/node_modules/.bin /Users/john/node_modules/.bin /Users/node_modules/.bin /node_modules/.bin
在 Windows 上,类似的条目被添加到%Path%
或$env:Path
中:
C:\Users\jane\my-package\node_modules\.bin C:\Users\jane\node_modules\.bin C:\Users\node_modules\.bin C:\node_modules\.bin
以下命令列出了包脚本运行时存在的环境变量及其值:
npm run env
13.2.1.3 通过 npx 运行 bin 脚本
在一个包内,可以使用 npx 来访问 bin 脚本:
npx cowsay Hello npx cowthink Hello
稍后再详细介绍 npx。
13.3 安装未发布的包
有时,我们有一个包,要么我们还没有发布,要么永远不会发布,并且想要安装它。
13.3.1 npm link
:全局安装未发布的包
假设我们有一个未发布的包,其名称是 @my-scope/unpublished-package
,存储在目录 /tmp/unpublished-package/
中。我们可以按如下方式全局提供它:
cd /tmp/unpublished-package/ npm link
如果我们这样做:
- npm 将一个符号链接添加到全局的
node_modules
(由npm root -g
返回)- 例如:
/usr/local/lib/node_modules/@my-scope/unpublished-package -> ../../../../../tmp/unpublished-package
- 在 Unix 上,npm 还会从全局 bin 目录(由
npm bin -g
返回)到每个 bin 脚本添加一个符号链接。该链接不是直接的,而是通过全局node_modules
目录:
/usr/local/bin/my-command -> ../lib/node_modules/@my-scope/unpublished-package/src/my-command.js
- 在 Windows 上,它添加了通常的 3 个脚本(通过相对路径引用全局
node_modules
中的链接包):
C:\Users\jane\AppData\Roaming\npm\my-command C:\Users\jane\AppData\Roaming\npm\my-command.cmd C:\Users\jane\AppData\Roaming\npm\my-command.ps1
由于链接包的引用方式,其中的任何更改都会立即生效。当它发生变化时,无需重新链接它。
要检查全局安装是否成功,我们可以使用 npm ls -g
列出所有全局安装的包。
13.3.2 npm link
:在本地安装全局链接的包
在我们全局安装了未发布的包之后(参见前一小节),我们可以选择在我们的一个包中(可以是已发布的或未发布的)中将其安装为本地包:
cd /tmp/other-package/ npm link @my-scope/unpublished-package
这创建了以下链接:
/tmp/other-package/node_modules/@my-scope/unpublished-package -> ../../../unpublished-package
默认情况下,未发布的包不会被添加为 package.json
的依赖项。其背后的原因是 npm link
经常用于临时使用注册表包的未发布版本- 这些不应该出现在依赖项中。
13.3.3 npm link
:取消链接
取消本地链接:
cd /tmp/other-package/ npm uninstall @my-scope/unpublished-package
取消全局链接:
cd /tmp/unpublished-package/ npm uninstall -g
13.3.4 通过本地路径安装未发布的包
另一种在本地安装未发布的包的方法是使用 npm install
并通过本地路径引用它(而不是通过包名):
cd /tmp/other-package/ npm install ../unpublished-package
这有两个效果。
首先,创建以下符号链接:
/tmp/other-package/node_modules/@my-scope/unpublished-package -> ../../../unpublished-package
其次,将依赖项添加到 package.json
中:
"dependencies": { "@my-scope/unpublished-package": "file:../unpublished-package", ··· }
这种安装未发布的包的方法也适用于全局:
cd /tmp/unpublished-package/ npm install -g .
13.3.5 安装未发布的包的其他方法
- Yalc 让我们将包发布到本地的“Yalc 仓库”(类似本地注册表)。从该仓库中,我们可以将包安装为依赖项,例如,一个名为
my-package/
的包。它们被复制到目录my-package/.yalc
中,并且file:
或link:
依赖项被添加到package.json
中。 relative-deps
支持package.json
中的"relativeDependencies"
,如果存在的话,会覆盖正常的依赖关系。与npm link
和本地路径安装相比:
- 正常的依赖关系不需要更改。
- 相对依赖项被安装为来自 npm 注册表的依赖项(而不是通过符号链接)。
relative-deps
还有助于保持本地安装的相对依赖项及其原始依赖项同步。npx link
是npm link
的一个更安全的版本,它不需要全局安装,还有其他好处。
13.4 npx
:在不安装它们的情况下运行 npm 包中的 bin 脚本
npx 是一个与 npm 捆绑在一起的用于运行 bin 脚本的 shell 命令。
它最常见的用法是:
npx <package-name> arg1 arg2 ...
这个命令将名称为 package-name
的包安装到 npx 缓存中,并运行与包同名的 bin 脚本- 例如:
npx cowsay Hello
这意味着我们可以在不先安装它们的情况下运行 bin 脚本。npx 最适用于一次性调用 bin 脚本- 例如,许多框架提供用于设置新项目的 bin 脚本,这些通常通过 npx 运行。
npx 第一次使用包后,它将在其缓存中可用,并且后续调用速度更快。但是,我们无法确定包在缓存中停留的时间有多长。因此,npx 不能替代全局或本地安装 bin 脚本。
如果一个包带有与其包名称不同的 bin 脚本,我们可以像这样访问它们:
npx --package=<package-name> <bin-script> arg1 arg2 ...
例如:
npx --package=cowsay cowthink Hello
13.4.1 npx 缓存
npx 的缓存位于哪里?
在 Unix 上,我们可以通过以下命令找到:
npx --package=cowsay node -p \ "process.env.PATH.split(':').find(p => p.includes('_npx'))"
返回类似于这样的路径:
/Users/john/.npm/_npx/8f497369b2d6166e/node_modules/.bin
在 Windows 上,我们可以使用(一行分成两行):
npx --package=cowsay node -p "process.env.Path.split(';').find(p => p.includes('_npx'))"
返回类似于这样的路径(单个路径分成两行):
C:\Users\jane\AppData\Local\npm-cache\_npx\ 8f497369b2d6166e\node_modules\.bin
请注意,npx 的缓存与 npm 用于安装模块的缓存不同:
- Unix:
- npm 缓存:
$HOME/.npm/_cacache/
- npx 缓存:
$HOME/.npm/_npx/
- Windows(PowerShell):
- npm 缓存:
$env:UserProfile\AppData\Local\npm-cache\_npx\
- npx 缓存:
$env:UserProfile\AppData\Local\npm-cache\_cacache\
两个缓存的父目录可以通过以下方式确定:
npm config get cache
有关 npm 缓存的更多信息,请参阅npm 文档。
与 npx 缓存相比,npm 缓存中的数据永远不会被删除,只会被添加。我们可以在 Unix 上通过以下方式检查其大小:
du -sh $(npm config get cache)/_cacache/
在 Windows PowerShell 上:
DiskUsage /d:0 "$(npm config get cache)\_cacache"