上一节讲了 fs.createReadStream
创建一个可读流,那么怎么查看它的源码是怎么实现的?
我们可以采用打断点的方式:我们可以看到先执行了 lazyLoadStreams
如果没有 ReadStream
就会 require
内部的 internal/fs/streams
模块
通过 internal/fs/streams.js
我们就能大概的知道 ReadStream
的大致实现
另外我们还可以直接下载 node 的源代码去看 ReadStream
的实现过程
打开官网:https://nodejs.org/zh-cn/download,或者通过 https://nodejs.org/dist/ 找自己想要看的版本
点击下载源代码,然后解压用 vscode 打开,找到 node-v18.16.1\lib\internal\fs\streams.js
即可
下面自己来实现可读流:
const EventEmitter = require("events"); const fs = require("fs"); class KaimoReadStream extends EventEmitter { constructor(path, opts = {}) { super(); this.path = path; this.flags = opts.flags || "r"; this.mode = opts.mode || 0o666; this.autoClose = opts.autoClose || true; this.start = opts.start || 0; this.end = opts.end; // 读取的数量默认是 64k 如果文件大于 64k 就可以采用流的方式 this.highWaterMark = opts.highWaterMark || 64 * 1024; // 记录读取的偏移量 this.pos = this.start; // 默认创建一个可读流,是非流动模式,不会触发 data 事件,如果用户监听了 data 事件后,需要变为流动模式 // 是否是流动模式 this.flowing = false; this.on("newListener", (type) => { // 如果用户监听了 data if (type === "data") { this.flowing = true; this.read(); } }); // 打开文件 this.open(); } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { return this.emit("error", err); } // 将 fd 保存到实例上,用于稍后的读取操作 this.fd = fd; this.emit("open", fd); }); } // 利用发布订阅来实现延迟执行 read() { // 读取必须要等待文件打开完毕,如果打开了会触发 open 事件 if (typeof this.fd !== "number") { // 如果没有 fd 就返回一个 open 的一次性事件,再去回调 read 方法 return this.once("open", () => this.read()); } console.log("KaimoReadStream---->", this.fd); // 真正开始读取 const buffer = Buffer.alloc(this.highWaterMark); // 每次理论上应该读取 highWaterMark 个,但是用户能指定读取的位置 // 应该读几个(不要读超了) let howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark; fs.read(this.fd, buffer, 0, howMuchToRead, this.pos, (err, bytesRead) => { if (bytesRead) { this.pos += bytesRead; this.emit("data", buffer.slice(0, bytesRead)); if (this.flowing) { this.read(); } } else { this.emit("end"); if (this.autoClose) { fs.close(this.fd, () => { this.emit("close"); }); } } }); } pause() { this.flowing = false; } resume() { this.flowing = true; this.read(); } } module.exports = KaimoReadStream;
测试:
const KaimoReadStream = require("./47/KaimoReadStream"); const path = require("path"); let rs = new KaimoReadStream(path.resolve(__dirname, "./47/name.txt"), { flags: "r", // r 代表读取 encoding: null, // 默认 null,就是 buffer 类型的 mode: 0o666, // 模式:可读可写 autoClose: true, // fs.close start: 0, // 0 - 8 包前又包后 end: 8, highWaterMark: 3 // 每次读取的个数 }); // console.log(rs); let bufferArr = []; // 监听 open(文件流特殊的事件,普通流没有) rs.on("open", (fd) => { console.log("open---->", fd); }); // 监听 data rs.on("data", (data) => { console.log("data---->", data, data.toString()); bufferArr.push(data); // rs.pause 可以让可读流暂停触发 data 事件 rs.pause(); console.log("pause---->暂停"); // 再次触发 data 事件,可以使用 rs.resume setTimeout(() => { console.log("过 2s 再次触发 data 事件"); rs.resume(); }, 2000); }); // 监听 end rs.on("end", () => { console.log("end---->", Buffer.concat(bufferArr).toString()); }); // 监听 error rs.on("error", (err) => { console.log("err---->", err); }); // 监听 close (文件流特殊的事件,普通流没有) rs.on("close", () => { console.log("close---->"); });