文件处理一直都是前端人的心头病,如何控制好文件大小,文件太大上传不了,文件下载时间太长,tcp直接给断开了😱😱😱等
效果
为了方便大家有意义的学习,这里就先放效果图,如果不满足直接返回就行,不浪费大家的时间。
文件上传
文件上传实现,分片上传,暂停上传,恢复上传,文件合并等
文件下载
为了方便测试,我上传了1个1g的大文件拿来下载,前端用的是流的方式来保存文件的,具体的可以看这个api TransformStream
正文
本项目的地址是: https://github.com/cll123456/deal-big-file 需要的自提
上传
请带着以下问题来阅读下面的文章
1.如何计算文件的hash,怎么做计算hash是最快的
2.文件分片的方式有哪些
3.如何控制分片上传的http请求(控制并发),大文件的碎片太多,直接把网络打垮
4.如何暂停上传
5.如何恢复上传等
计算文件hash
在计算文件hash的方式,主要有以下几种: 分片全量计算hash、抽样计算hash。
在这两种方式上,分别又可以使用web-work和浏览器空闲(requestIdleCallback)来实现.
- web-work有不明白的可以看这里: https://juejin.cn/post/7091068088975622175
- requestIdleCallback 有不明白的可以看这里: https://juejin.cn/post/7069597252473815053
接下来咋们来计算文件的hash,计算文件的hash需要使用 spark-md5这个库,
全量计算文件hash
export async function calcHashSync(file: File) { // 对文件进行分片,每一块文件都是分为2MB,这里可以自己来控制 const size = 2 * 1024 * 1024; let chunks: any[] = []; let cur = 0; while (cur < file.size) { chunks.push({ file: file.slice(cur, cur + size) }); cur += size; } // 可以拿到当前计算到第几块文件的进度 let hashProgress = 0 return new Promise(resolve => { const spark = new SparkMD5.ArrayBuffer(); let count = 0; const loadNext = (index: number) => { const reader = new FileReader(); reader.readAsArrayBuffer(chunks[index].file); reader.onload = e => { // 累加器 不能依赖index, count++; // 增量计算md5 spark.append(e.target?.result as ArrayBuffer); if (count === chunks.length) { // 通知主线程,计算结束 hashProgress = 100; resolve({ hashValue: spark.end(), progress: hashProgress }); } else { // 每个区块计算结束,通知进度即可 hashProgress += 100 / chunks.length // 计算下一个 loadNext(count); } }; }; // 启动 loadNext(0); }); }
全量计算文件hash,在文件小的时候计算是很快的,但是在文件大的情况下,计算文件的hash就会非常慢,并且影响主进程哦🙄🙄🙄
抽样计算文件hash
抽样就是取文件的一部分来继续,原理如下:
/** * 抽样计算hash值 大概是1G文件花费1S的时间 * * 采用抽样hash的方式来计算hash * 我们在计算hash的时候,将超大文件以2M进行分割获得到另一个chunks数组, * 第一个元素(chunks[0])和最后一个元素(chunks[-1])我们全要了 * 其他的元素(chunks[1,2,3,4....])我们再次进行一个分割,这个时候的分割是一个超小的大小比如2kb,我们取* 每一个元素的头部,尾部,中间的2kb。 * 最终将它们组成一个新的文件,我们全量计算这个新的文件的hash值。 * @param file {File} * @returns */ export async function calcHashSample(file: File) { return new Promise(resolve => { const spark = new SparkMD5.ArrayBuffer(); const reader = new FileReader(); // 文件大小 const size = file.size; let offset = 2 * 1024 * 1024; let chunks = [file.slice(0, offset)]; // 前面2mb的数据 let cur = offset; while (cur < size) { // 最后一块全部加进来 if (cur + offset >= size) { chunks.push(file.slice(cur, cur + offset)); } else { // 中间的 前中后去两个字节 const mid = cur + offset / 2; const end = cur + offset; chunks.push(file.slice(cur, cur + 2)); chunks.push(file.slice(mid, mid + 2)); chunks.push(file.slice(end - 2, end)); } // 前取两个字节 cur += offset; } // 拼接 reader.readAsArrayBuffer(new Blob(chunks)); // 最后100K reader.onload = e => { spark.append(e.target?.result as ArrayBuffer); resolve({ hashValue: spark.end(), progress: 100 }); }; }); }
这个设计是不是发现挺灵活的,真是个人才哇
在这两个的基础上,咋们还可以分别使用web-worker和requestIdleCallback来实现,源代码在hereヾ(≧▽≦*)o
这里把我电脑配置说一下,公司给我分的电脑配置比较lower, 8g内存的老机器。计算(3.3g文件的)hash的结果如下:
结果很显然,全量无论怎么弄,都是比抽样的更慢。
文件分片的方式
这里可能大家会说,文件分片方式不就是等分吗,其实还可以根据网速上传的速度来实时调整分片的大小哦!
const handleUpload1 = async (file:File) => { if (!file) return; const fileSize = file.size let offset = 2 * 1024 * 1024 let cur = 0 let count = 0 // 每一刻的大小需要保存起来,方便后台合并 const chunksSize = [0, 2 * 1024 * 1024] const obj = await calcHashSample(file) as { hashValue: string }; fileHash.value = obj.hashValue; //todo 判断文件是否存在存在则不需要上传,也就是秒传 while (cur < fileSize) { const chunk = file.slice(cur, cur + offset) cur += offset const chunkName = fileHash.value + "-" + count; const form = new FormData(); form.append("chunk", chunk); form.append("hash", chunkName); form.append("filename", file.name); form.append("fileHash", fileHash.value); form.append("size", chunk.size.toString()); let start = new Date().getTime() // todo 上传单个碎片 const now = new Date().getTime() const time = ((now - start) / 1000).toFixed(4) let rate = Number(time) / 10 // 速率有最大和最小 可以考虑更平滑的过滤 比如1/tan if (rate < 0.5) rate = 0.5 if (rate > 2) rate = 2 offset = parseInt((offset / rate).toString()) chunksSize.push(offset) count++ } //todo 可以发送合并操作了 }
🥉🥉🥉ATTENTION!!! 如果是这样上传的文件碎片,如果中途断开是无法续传的(每一刻的网速都是不一样的),除非每一次上传都把 chunksSize(分片的数组)保存起来哦
控制http请求(控制并发)
控制http的请求咋们可以换一种想法,是不是就是控制异步任务呢?
/** * 异步控制池 - 异步控制器 * @param concurrency 最大并发次数 * @param iterable 异步控制的函数的参数 * @param iteratorFn 异步控制的函数 */ export async function* asyncPool<IN, OUT>(concurrency: number, iterable: ReadonlyArray<IN>, iteratorFn: (item: IN, iterable?: ReadonlyArray<IN>) => Promise<OUT>): AsyncIterableIterator<OUT> { // 传教set来保存promise const executing = new Set<Promise<IN>>(); // 消费函数 async function consume() { const [promise, value] = await Promise.race(executing) as unknown as [Promise<IN>, OUT]; executing.delete(promise); return value; } // 遍历参数变量 for (const item of iterable) { const promise = (async () => await iteratorFn(item, iterable))().then( value => [promise, value] ) as Promise<IN>; executing.add(promise); // 超出最大限制,需要等待 if (executing.size >= concurrency) { yield await consume(); } } // 存在的时候继续消费promise while (executing.size) { yield await consume(); } }
暂停请求
暂停请求,其实也很简单,在原生的XMLHttpRequest 里面有一个方法是 xhr?.abort(),在发送请求的同时,在发送请求的时候,咋们用一个数组给他装起来,然后就可以自己直接调用abort方法了。
在封装request的时候,咋们要求传入一个requestList就好:
export function request({ url, method = "post", data, onProgress = e => e, headers = {}, requestList }: IRequest) { return new Promise((resolve, reject) => { const xhr = new XMLHttpRequest(); xhr.upload.onprogress = onProgress // 发送请求 xhr.open(method, baseUrl + url); // 放入其他的参数 Object.keys(headers).forEach(key => xhr.setRequestHeader(key, headers[key]) ); xhr.send(data); xhr.onreadystatechange = e => { // 请求是成功的 if (xhr.readyState === 4) { if (xhr.status === 200) { if (requestList) { // 成功后删除列表 const i = requestList.findIndex(req => req === xhr) requestList.splice(i, 1) } // 获取服务响应的结构 const resp = JSON.parse(xhr.response); // 这个code是后台规定的,200是正确的响应,500是异常 if (resp.code === 200) { // 成功操作 resolve({ data: (e.target as any)?.response }); } else { reject('报错了 大哥') } } else if (xhr.status === 500) { reject('报错了 大哥') } } }; // 存入请求 requestList?.push(xhr) }); }
有了请求数组后,那么咋们想暂时直接遍历请求数组,调用 abort方法
恢复上传
恢复上传是判断有哪些碎片上已经存在的,存在的就不需要上传了,不存在的继续上传。所以咋们要一个接口,verify 传入文件的hash,文件名称,判断文件是否存在或者说是上传了多少。
/** * 验证文件是否存在 * @param req * @param res */ async handleVerify(req: http.IncomingMessage, res: http.ServerResponse) { // 解析post请求数据 const data = await resolvePost(req) as { filename: string, hash: string } const { filename, hash } = data // 获取文件后缀名称 const ext = extractExt(filename) const filePath = path.resolve(this.UPLOAD_DIR, `${hash}${ext}`) // 文件是否存在 let uploaded = false let uploadedList: string[] = [] if (fse.existsSync(filePath)) { uploaded = true } else { // 文件没有完全上传完毕,但是可能存在部分切片上传完毕了 uploadedList = await getUploadedList(path.resolve(this.UPLOAD_DIR, hash)) } res.end( JSON.stringify({ code: 200, uploaded, uploadedList // 过滤诡异的隐藏文件 }) ) }
注意,这里还需要在每一次验证的时候需要去删除片段的最后几块文件,防止最后几块文件是不完全上传的残杂.
合并文件
合并文件很好理解,就是把所有的碎片进行合并,但是有一个地方需要注意的是,咋们不能把所有的文件都读到内存中进行合并,而是使用流的方式来进行合并,边读边写入文件。写入文件的时候需要保证顺序,不然文件可能就会损坏了。
这一部分代码会比较多,感兴趣的同学可以看源码
文件下载
对于文件下载的话,后端其实很简单,就是返回一个流就行,如下:
/** * 文件下载 * @param req * @param res */ async handleDownload(req: http.IncomingMessage, res: http.ServerResponse) { // 解析get请求参数 const resp: UrlWithParsedQuery = await resolveGet(req) // 获取文件名称 const filePath = path.resolve(this.UPLOAD_DIR, resp.query.filename as string) // 判断文件是否存在 if (fse.existsSync(filePath)) { // 创建流来读取文件并下载 const stream = fse.createReadStream(filePath) // 写入文件 stream.pipe(res) } }
对于前端的话,咋们需要使用一个库,就是 streamsaver
,这个库调用了 TransformStream
api来实现浏览器中把文件用流的方式保存在本地的。有了这个后,那就非常简单的使用啦😄😄😃
const downloadFile = async () => { // StreamSaver // 下载的路径 const url = 'http://localhost:4001/download?filename=b0d9a1481fc2b815eb7dbf78f2146855.zip' // 创建一个文件写入流 const fileStream = streamSaver.createWriteStream('b0d9a1481fc2b815eb7dbf78f2146855.zip') // 发送请求下载 fetch(url).then(res => { const readableStream = res.body // more optimized if (window.WritableStream && readableStream?.pipeTo) { return readableStream.pipeTo(fileStream) .then(() => console.log('done writing')) } const writer = fileStream.getWriter() const reader = res.body?.getReader() const pump: any = () => reader?.read() .then(res => res.done ? writer.close() : writer.write(res.value).then(pump)) pump() }) }