56 # 实现 pipe 方法进行拷贝

简介: 56 # 实现 pipe 方法进行拷贝

pipe 是异步的,可以实现读一点写一点,管道的优势:不会淹没可用内存,但是在导入的过程中无法获取到内容

const fs = require("fs");
const path = require("path");
fs.createReadStream(path.resolve(__dirname, "./56/name.txt")).pipe(fs.createWriteStream(path.resolve(__dirname, "./56/copy_name.txt")));

我们新建文件 name.txt 里面添加文本

kaimo123456789

下面在我们的可读流里添加 pipe 方法:上一节实现的可写流直接拿过来用即可

const EventEmitter = require("events");
const fs = require("fs");
class KaimoReadStream extends EventEmitter {
    constructor(path, opts = {}) {
        super();
        this.path = path;
        this.flags = opts.flags || "r";
        this.mode = opts.mode || 0o666;
        this.autoClose = opts.autoClose || true;
        this.start = opts.start || 0;
        this.end = opts.end;
        // 读取的数量默认是 64k 如果文件大于 64k 就可以采用流的方式
        this.highWaterMark = opts.highWaterMark || 64 * 1024;
        // 记录读取的偏移量
        this.pos = this.start;
        // 默认创建一个可读流,是非流动模式,不会触发 data 事件,如果用户监听了 data 事件后,需要变为流动模式
        // 是否是流动模式
        this.flowing = false;
        this.on("newListener", (type) => {
            // 如果用户监听了 data
            if (type === "data") {
                this.flowing = true;
                this.read();
            }
        });
        // 打开文件
        this.open();
    }
    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                return this.emit("error", err);
            }
            // 将 fd 保存到实例上,用于稍后的读取操作
            this.fd = fd;
            this.emit("open", fd);
        });
    }
    // 利用发布订阅来实现延迟执行
    read() {
        // 读取必须要等待文件打开完毕,如果打开了会触发 open 事件
        if (typeof this.fd !== "number") {
            // 如果没有 fd 就返回一个 open 的一次性事件,再去回调 read 方法
            return this.once("open", () => this.read());
        }
        console.log("KaimoReadStream---->", this.fd);
        // 真正开始读取
        const buffer = Buffer.alloc(this.highWaterMark);
        // 每次理论上应该读取 highWaterMark 个,但是用户能指定读取的位置
        // 应该读几个(不要读超了)
        let howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
        fs.read(this.fd, buffer, 0, howMuchToRead, this.pos, (err, bytesRead) => {
            if (bytesRead) {
                this.pos += bytesRead;
                this.emit("data", buffer.slice(0, bytesRead));
                if (this.flowing) {
                    this.read();
                }
            } else {
                this.emit("end");
                if (this.autoClose) {
                    fs.close(this.fd, () => {
                        this.emit("close");
                    });
                }
            }
        });
    }
    pause() {
        this.flowing = false;
    }
    resume() {
        this.flowing = true;
        this.read();
    }
    // 管道
    pipe(dest) {
        this.on("data", (data) => {
            let flag = dest.write(data);
            console.log("pipe--flag-->", flag, data.toString());
            if (!flag) {
                this.pause();
            }
        });
        dest.on("drain", () => {
            this.resume();
        });
    }
}
module.exports = KaimoReadStream;

测试一下

const path = require("path");
// 默认读 4 写 1
const KaimoReadStream = require("./56/KaimoReadStream"); // 64k
const KaimoWriteStream = require("./56/KaimoWriteStream"); // 16k
let rs = new KaimoReadStream(path.resolve(__dirname, "./56/name.txt"), {
    highWaterMark: 4
});
// 先写1个,3个放缓存
let ws = new KaimoWriteStream(path.resolve(__dirname, "./56/copy_name2.txt"), {
    highWaterMark: 1
});
rs.pipe(ws);

目录
相关文章
|
2月前
|
算法 Unix Linux
select函数中的文件描述符(File Descriptor)范围
select函数中的文件描述符(File Descriptor)范围
17 0
select函数中的文件描述符(File Descriptor)范围
|
9月前
|
算法
File对象和相关方法02
File对象和相关方法02
44 0
|
9月前
|
Java
File对象和相关方法01
File对象和相关方法01
31 0
|
Unix Linux
C 程序来演示 fork() 和 pipe()
fork() 用于创建子进程。此子进程是原始(父)进程的副本。它是在类Unix操作系统上创建进程的主要方法。
60 0
|
物联网 Linux 开发者
Dup 文件描述符复制|学习笔记
快速学习 Dup 文件描述符复制
83 0
Dup 文件描述符复制|学习笔记
|
物联网 Linux 开发者
Dup2 文件描述符复制|学习笔记
快速学习 Dup2 文件描述符复制
46 0
读取文件内容: 1.通过open函数获取文件对象 2.执行read函数 需要指定读取的字符
读取文件内容: 1.通过open函数获取文件对象 2.执行read函数 需要指定读取的字符