以寡治众各个击破,超大文件分片上传之构建基于Vue.js3.0+Ant-desgin+Tornado6纯异步IO高效写入服务

简介: 分治算法是一种很古老但很务实的方法。本意即使将一个较大的整体打碎分成小的局部,这样每个小的局部都不足以对抗大的整体。战国时期,秦国破坏合纵的连横即是一种分而治之的手段;十九世纪,比利时殖民者占领卢旺达, 将卢旺达的种族分为胡图族与图西族,以图进行分裂控制,莫不如是。

分治算法是一种很古老但很务实的方法。本意即使将一个较大的整体打碎分成小的局部,这样每个小的局部都不足以对抗大的整体。战国时期,秦国破坏合纵的连横即是一种分而治之的手段;十九世纪,比利时殖民者占领卢旺达, 将卢旺达的种族分为胡图族与图西族,以图进行分裂控制,莫不如是。

21世纪,人们往往会在Leetcode平台上刷分治算法题,但事实上,从工业角度上来看,算法如果不和实际业务场景相结合,算法就永远是虚无缥缈的存在,它只会出现在开发者的某一次不经意的面试中,而真实的算法,并不是虚空的,它应该能帮助我们解决实际问题,是的,它应该落地成为实体。

大文件分片上传就是这样一个契合分治算法的场景,现而今,视频文件的体积越来越大,高清视频体积大概2-4g不等,但4K视频的分辨率是标准高清的四倍,需要四倍的存储空间——只需两到三分钟的未压缩4K 电影,或者电影预告片的长度,就可以达到500GB。 8K视频文件更是大得难以想象,而现在12K正在出现,如此巨大的文件,该怎样设计一套合理的数据传输方案?这里我们以前后端分离项目为例,前端使用Vue.js3.0配合ui库Ant-desgin,后端采用并发异步框架Tornado实现大文件的分片无阻塞传输与异步IO写入服务。

前端分片

首先,安装Vue3.0以上版本:

npm install -g @vue/cli

安装异步请求库axios:

npm install axios --save

随后,安装Ant-desgin:

npm i --save ant-design-vue@next -S

Ant-desgin虽然因为曾经的圣诞节“彩蛋门”事件而声名狼藉,但客观地说,它依然是业界不可多得的优秀UI框架之一。

接着在项目程序入口文件引入使用:

import { createApp } from 'vue'  
import App from './App.vue'  
import { router } from './router/index'  
  
  
import axios from 'axios'  
import qs from 'qs'  
  
import Antd from 'ant-design-vue';  
import 'ant-design-vue/dist/antd.css';  
  
  
const app = createApp(App)  
  
  
app.config.globalProperties.axios = axios;  
app.config.globalProperties.upload_dir = "https://localhost/static/";  
  
app.config.globalProperties.weburl = "http://localhost:8000";  
  
app.use(router);  
app.use(Antd);  
  
app.mount('#app')

随后,参照Ant-desgin官方文档:https://antdv.com/components/overview-cn 构建上传控件:

<a-upload  
  
    @change="fileupload"  
    :before-upload="beforeUpload"  
  >  
    <a-button>  
      <upload-outlined></upload-outlined>  
      上传文件  
    </a-button>  
  </a-upload>

注意这里需要将绑定的before-upload强制返回false,设置为手动上传:

beforeUpload:function(file){  
  
  
      return false;  
  
}

接着声明分片方法:

fileupload:function(file){  
  
  
      var size = file.file.size;//总大小  
   
  
      var shardSize = 200 * 1024; //分片大小  
   
      this.shardCount = Math.ceil(size / shardSize); //总片数  
  
      console.log(this.shardCount);  
   
   
      for (var i = 0; i < this.shardCount; ++i) {  
   
        //计算每一片的起始与结束位置  
   
        var start = i * shardSize;  
   
        var end = Math.min(size, start + shardSize);  
  
        var tinyfile = file.file.slice(start, end);  
  
        let data = new FormData();  
        data.append('file', tinyfile);  
        data.append('count',i);  
        data.append('filename',file.file.name);  
  
        const axiosInstance = this.axios.create({withCredentials: false});  
  
        axiosInstance({  
            method: 'POST',  
            url:'http://localhost:8000/upload/',  //上传地址  
            data:data  
        }).then(data =>{  
  
           this.finished += 1;  
  
           console.log(this.finished);  
  
           if(this.finished == this.shardCount){  
            this.mergeupload(file.file.name);  
           }  
  
        }).catch(function(err) {  
            //上传失败  
        });  
  
  
  
      }  
  
  
  
    }

具体分片逻辑是,大文件总体积按照单片体积的大小做除法并向上取整,获取到文件的分片个数,这里为了测试方便,将单片体积设置为200kb,可以随时做修改。

随后,分片过程中使用Math.min方法计算每一片的起始和结束位置,再通过slice方法进行切片操作,最后将分片的下标、文件名、以及分片本体异步发送到后台。

当所有的分片请求都发送完毕后,封装分片合并方法,请求后端发起合并分片操作:

mergeupload:function(filename){  
  
  
      this.myaxios(this.weburl+"/upload/","put",{"filename":filename}).then(data =>{  
  
              console.log(data);  
  
        });  
  
}

至此,前端分片逻辑就完成了。

后端异步IO写入

为了避免同步写入引起的阻塞,安装aiofiles库:

pip3 install aiofiles

aiofiles用于处理asyncio应用程序中的本地磁盘文件,配合Tornado的异步非阻塞机制,可以有效的提升文件写入效率:

import aiofiles  
  
# 分片上传  
class SliceUploadHandler(BaseHandler):  
      
    async def post(self):  
  
  
        file = self.request.files["file"][0]  
        filename = self.get_argument("filename")  
        count = self.get_argument("count")  
  
        filename = '%s_%s' % (filename,count) # 构成该分片唯一标识符  
  
        contents = file['body'] #异步读取文件  
        async with aiofiles.open('./static/uploads/%s' % filename, "wb") as f:  
            await f.write(contents)  
  
        return {"filename": file.filename,"errcode":0}

这里后端获取到分片实体、文件名、以及分片标识后,将分片文件以文件名\_分片标识的格式异步写入到系统目录中,以一张378kb大小的png图片为例,分片文件应该顺序为200kb和178kb,如图所示:

当分片文件都写入成功后,触发分片合并接口:

import aiofiles  
  
# 分片上传  
class SliceUploadHandler(BaseHandler):  
      
    async def post(self):  
  
  
        file = self.request.files["file"][0]  
        filename = self.get_argument("filename")  
        count = self.get_argument("count")  
  
        filename = '%s_%s' % (filename,count) # 构成该分片唯一标识符  
  
        contents = file['body'] #异步读取文件  
        async with aiofiles.open('./static/uploads/%s' % filename, "wb") as f:  
            await f.write(contents)  
  
        return {"filename": file.filename,"errcode":0}  
  
  
    async def put(self):  
  
        filename = self.get_argument("filename")  
        chunk = 0  
  
        async with aiofiles.open('./static/uploads/%s' % filename,'ab') as target_file:  
  
            while True:  
                try:  
                    source_file = open('./static/uploads/%s_%s' % (filename,chunk), 'rb')  
                    await target_file.write(source_file.read())  
                    source_file.close()  
                except Exception as e:  
                    print(str(e))  
                    break  
  
                chunk = chunk + 1  
        self.finish({"msg":"ok","errcode":0})

这里通过文件名进行寻址,随后遍历合并,注意句柄写入模式为增量字节码写入,否则会逐层将分片文件覆盖,同时也兼具了断点续写的功能。有些逻辑会将分片个数传入后端,让后端判断分片合并个数,其实并不需要,因为如果寻址失败,会自动抛出异常并且跳出循环,从而节约了一个参数的带宽占用。

轮询服务

在真实的超大文件传输场景中,由于网络或者其他因素,很可能导致分片任务中断,此时就需要通过降级快速响应,返回托底数据,避免用户的长时间等待,这里我们使用基于Tornado的Apscheduler库来调度分片任务:

pip install apscheduler

随后编写job.py轮询服务文件:

from datetime import datetime  
from tornado.ioloop import IOLoop, PeriodicCallback  
from tornado.web import RequestHandler, Application  
from apscheduler.schedulers.tornado import TornadoScheduler  
  
  
scheduler = None  
job_ids   = []  
  
# 初始化  
def init_scheduler():  
    global scheduler  
    scheduler = TornadoScheduler()  
    scheduler.start()  
    print('[Scheduler Init]APScheduler has been started')  
  
# 要执行的定时任务在这里  
def task1(options):  
    print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options))  
  
  
class MainHandler(RequestHandler):  
    def get(self):  
        self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')  
  
  
class SchedulerHandler(RequestHandler):  
    def get(self):  
        global job_ids  
        job_id = self.get_query_argument('job_id', None)  
        action = self.get_query_argument('action', None)  
        if job_id:  
            # add  
            if 'add' == action:  
                if job_id not in job_ids:  
                    job_ids.append(job_id)  
                    scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,))  
                    self.write('[TASK ADDED] - {}'.format(job_id))  
                else:  
                    self.write('[TASK EXISTS] - {}'.format(job_id))  
            # remove  
            elif 'remove' == action:  
                if job_id in job_ids:  
                    scheduler.remove_job(job_id)  
                    job_ids.remove(job_id)  
                    self.write('[TASK REMOVED] - {}'.format(job_id))  
                else:  
                    self.write('[TASK NOT FOUND] - {}'.format(job_id))  
        else:  
            self.write('[INVALID PARAMS] INVALID job_id or action')  
  
  
if __name__ == "__main__":  
    routes    = [  
        (r"/", MainHandler),  
        (r"/scheduler/?", SchedulerHandler),  
    ]  
    init_scheduler()  
    app       = Application(routes, debug=True)  
    app.listen(8888)  
    IOLoop.current().start()

每一次分片接口被调用后,就建立定时任务对分片文件进行监测,如果分片成功就删除分片文件,同时删除任务,否则就启用降级预案。

结语

分治法对超大文件进行分片切割,同时并发异步发送,可以提高传输效率,降低传输时间,和之前的一篇:聚是一团火散作满天星,前端Vue.js+elementUI结合后端FastAPI实现大文件分片上传,逻辑上有异曲同工之妙,但手法上却略有不同,确是颇有相互借镜之处,最后代码开源于Github:https://github.com/zcxey2911/Tornado6\_Vuejs3\_Edu,与众亲同飨。

相关文章
|
2月前
|
JavaScript API
深入探索fs.WriteStream:Node.js文件写入流的全面解析
深入探索fs.WriteStream:Node.js文件写入流的全面解析
|
8天前
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
26 4
|
11天前
|
前端开发 JavaScript API
前端JS读取文件内容并展示到页面上
前端JavaScript使用FileReader API读取文件内容,支持文本类型文件。在文件读取成功后,可以通过onload事件处理函数获取文件内容,然后展示到页面上。
15 2
前端JS读取文件内容并展示到页面上
|
8天前
|
JavaScript 前端开发 数据安全/隐私保护
混淆指定js文件
【9月更文挑战第26天】JavaScript 混淆旨在保护代码知识产权、减小文件体积和提高安全性。方法包括变量名和函数名混淆、代码压缩、控制流平坦化及字符串加密。常用工具如 UglifyJS 和 JScrambler 可实现这些功能。然而,混淆可能带来兼容性和调试困难等问题,需谨慎使用并确保法律合规。
|
15天前
|
Java 大数据 API
Java 流(Stream)、文件(File)和IO的区别
Java中的流(Stream)、文件(File)和输入/输出(I/O)是处理数据的关键概念。`File`类用于基本文件操作,如创建、删除和检查文件;流则提供了数据读写的抽象机制,适用于文件、内存和网络等多种数据源;I/O涵盖更广泛的输入输出操作,包括文件I/O、网络通信等,并支持异常处理和缓冲等功能。实际开发中,这三者常结合使用,以实现高效的数据处理。例如,`File`用于管理文件路径,`Stream`用于读写数据,I/O则处理复杂的输入输出需求。
|
9天前
|
移动开发 JavaScript 前端开发
js之操作文件| 12-5
js之操作文件| 12-5
|
23天前
|
存储 JSON JavaScript
学习node.js十三,文件的上传于下载
学习node.js十三,文件的上传于下载
|
8天前
|
JavaScript 前端开发 开发者
深入浅出 Vue.js:构建响应式前端应用
Vue.js 是一个流行的前端框架,以其简洁、高效和易学著称。它采用响应式和组件化设计,简化了交互式用户界面的构建。本文详细介绍 Vue.js 的核心概念、基本用法及如何构建响应式前端应用,包括实例、模板、响应式数据和组件等关键要素,并介绍了项目结构、Vue CLI、路由管理和状态管理等内容,帮助开发者高效地开发现代化前端应用。
|
1月前
|
Linux C语言
C语言 文件IO (系统调用)
本文介绍了Linux系统调用中的文件I/O操作,包括文件描述符、`open`、`read`、`write`、`lseek`、`close`、`dup`、`dup2`等函数,以及如何获取文件属性信息(`stat`)、用户信息(`getpwuid`)和组信息(`getgrgid`)。此外还介绍了目录操作函数如`opendir`、`readdir`、`rewinddir`和`closedir`,并提供了相关示例代码。系统调用直接与内核交互,没有缓冲机制,效率相对较低,但实时性更高。
|
10天前
|
算法 Java 程序员
解锁Python高效之道:并发与异步在IO与CPU密集型任务中的精准打击策略!
在数据驱动时代,高效处理大规模数据和高并发请求至关重要。Python凭借其优雅的语法和强大的库支持,成为开发者首选。本文将介绍Python中的并发与异步编程,涵盖并发与异步的基本概念、IO密集型任务的并发策略、CPU密集型任务的并发策略以及异步IO的应用。通过具体示例,展示如何使用`concurrent.futures`、`asyncio`和`multiprocessing`等库提升程序性能,帮助开发者构建高效、可扩展的应用程序。
26 0
下一篇
无影云桌面