标题:模拟微信第一篇,nodejs搭建一套高性能分布式的在线文件服务。
引言:
1、前言
对象入手了前端开发,为了让她对程序员工作有更深刻理解,准备展示一套前后端兼具的模拟微信开发。微信开发少不了在线文件服务(图片,头像,音乐,视频……),本想省事儿,直接采用FastDFS,或者是腾讯云的对象存储服务。奈何要考虑到从零开始,一步一个脚印的渲染,遂自己动手干吧。
其实用Java,或者golang来实现的话,操作起来更容易。但是呢,还是担心她可能看不懂原生后端语言,毕竟前端开发能看懂的语言就只有JavaScript了,故采用了Nodejs。为了操作的高效性与直观性,不会引入任何web依赖框架(例如:express,egg……),利用NodeJs自带的http模块纯手动实现。
埋个伏笔,建议点个关注,后面还有模拟微信通讯开发的奉献,包括服务端的数据接口,全栈javaScript来实现
2、高性能
成熟的在线文件服务,肯定需要高性能支撑的,每个客户端每时每刻都有文件资源交互的场景。文件传输的数据包要远大于API数据,所以对服务的性能要求较高,不能出现因为下载大文件而导致后续请求阻塞卡顿的场景。
2.1、Nodejs主线程
Node的创始人认为,Web服务器的高并发性能关键点在于事件驱动和异步I/O,并不是线程数的多少。Google发明的Chrome浏览器,使用V8引擎来解析javaScript程序,恰好满足了他的这两点需求,于是他把V8引擎移植到了服务器端,构建了基于服务端运行的JavaScript环境,成就了Nodejs,目的是花最小的硬件成本,追求更高的并发性能。
在大多数web服务模型中,使用的都是多线程来解决并发的问题。像用得最为广泛的Java SpringMVC,每个http请求对应着单独的线程。而每一个客户端连接创建的一个线程,需要耗费1M的内存(jdk默认值),也就是说,理论分析一个8G的服务器可以同时支持连接用户数小于8000,(除了线程创建,还有线程内其他资源开销)
但是Nodejs使用一个主线程处理所有请求,利用异步I/O(也称非阻塞I/O)和事件驱动。省去了创建线程的资源开销,理论分析一个8G的服务器可以同时支持连接用户数为3万~4万。
2.1.1、多线程同步I/O
并发请求多线程的特点:
虽然充分利用着硬件多核CPU的优势,但是对线程生命周期的管理和上下文的切换,从微观角度看,也是耗费效率的一环节。
2.1.2、单线程异步I/O
在传统的线程处理机制中,因为同步串行的原因,I/O会阻塞代码的执行;
Nodejs采用的非阻塞I/O,把异步操作丢给了底层的I/O线程池,使得主线程永远在执行计算操作,CPU的核心利用率永远是满状态,主线程通过事件轮询机制与IO线程池交互得到异步数据,当某个I/O执行完成后,会以事件通知的形式执行回调函数
故此,多线程同步I/O的web模型,并不一定比单线程异步I/O的web模型性能好,而且前者对于资源环境的要求很高。
但是Node只是解决了I/O的交互瓶颈,并没有提高I/O速度,只是资源调度的效率提升,要想解决I/O速度,还是应该升级服务器硬件环境,例如把硬盘换成SSD。
2.1.3、局限性
随着硬件资源的更新发展,全面升级为多核CPU,Node单线程模式下的上限就是单个CPU被打满了,无法享受到多核CPU下并发的处理,Nodejs通过提供cluster
、child_process
API 以牺牲内存,来创建子进程的方式来赋予Node应用程序进一步提升高并发的能力。
2.2、Node Cluster
为了充分发挥多核CPU的优势,Nodejs提供了cluster
模块。允许设立一个master进程,和若干个worker进程,由master进程负责监控和协调worker进程的相关状态。各个进程之间,变量和资源引用相对独立。
理想状态下:master数(1) + worker数 = cpu数。充分利用每一核
const cluster = require('cluster');
const cpus = require('os').cpus().length;
if (cluster.isMaster) {
for (let i = 1; i < cpus; i++) {
cluster.fork();
}
cluster.on('online', function (newWorker) {
console.log('new worker online: ' + newWorker.id)
});
} else {
console.log('worker内部,执行逻辑')
}
如上所示:获取cpu的核数,然后master循环依次fork出worker进程,fork出来的工作进程继续运行当前代码,但是cluster.isMaster
为false
,所以会执行到else
里面。
因为进程之间的资源不共享,也可以根据业务逻辑,再fork工作进程的时候,为其设置环境变量。
2.2.1、进程通信
worker之间,采用进程间通信交换信息。通信方式有点特殊:
- worker进程调用
send
函数,主动发送数据包; - master进程会接收到该数据包,然后遍历当前worker数量,广播调用
send
函数转发原数据包; - 每个worker都会接收到该数据包,然后执行相关逻辑;
- 也可以,在master中,指定具体的worker来发送,由被指定的worker自己接收(当然,这种场景少,基本都是广播);
- 相当于master是一个调度的中转站,worker之间无法直接通信;
如下所示:
- 1号worker主动发送了一条
refresh
的数据包; - master接收到该数据包后,遍历当前的worker集合,选出3号worker,原样转发给它;
- 3号worker接收到数据包后,解析,并且执行对应逻辑;
- 相当于3号worker是自己发给自己的,但必须经过master调度;
const cluster = require('cluster');
const cpus = require('os').cpus().length;
if (cluster.isMaster) {
for (let i = 1; i < cpus; i++) {
let env = Object.assign({}, process.env, {workerId: i})
let worker = cluster.fork(env);
worker.w_id = i
}
/**
* 2、master负责接收通信消息,
*/
cluster.on('message', (workers, msg) => {
/**
* 3、将原消息定向推给3号worker
*/
for (const id in cluster.workers) {
if (cluster.workers[id].w_id == 3) {
cluster.workers[id].send(msg, err => {
if (err) {
console.log('master send message error')
}
})
}
}
})
}
/**
* 1、由1号worker进程主动通信
*/
if (process.env.workerId == 1) {
cluster.worker.send({
cmd: 'refresh',
data: {host: '127.0.0.1'}
}, err => {
if (err) {
console.log(`send refresh error, workerId: ${process.env.workerId}`)
}
})
}
/**
* 4、3号worker接收通信消息
*/
if (process.env.workerId == 3) {
cluster.worker.on('message', msg => {
if (msg.cmd == 'refresh') {
console.log(`receive refresh:${JSON.stringify(msg.data)} `)
}
})
}
进程之间通信的方式,可以有效解决单节点模式下资源不共享的问题,例如:1、单机使用内存的限流,需要共享进程之间的请求计数;
2、单机设置IP黑名单的添加与移除,也要实时同步……
2.2.2、负载分发
cluster模块内置一个负载均衡器,默认采用Round-robin算法,由master负责分发,协调各个worker之间的负载。http服务运行时,master负责监听IP/端口,然后将接收到的请求根据负载算法分配给对应worker进程处理,这段话在官方文档写的很清楚了:
如下所示:
- 根据当前机器的CPU核数,fork出CPU核数 - 1的worker进程;
- worker进程负责处理http请求;
- 这里写法虽然是在worker中,开启了http服务,监听端口,但其实底层实现只有master负责监听IP/端口,然后请求交给worker处理;
- 不然的话,这么多进程监听同一个端口,肯定会报错端口已被占用的;
const cluster = require('cluster');
const http = require('http');
const cpus = require('os').cpus().length;
if (cluster.isMaster) {
for (let i = 1; i < cpus; i++) {
cluster.fork()
}
} else {
http.createServer().on('request', (request, response) => {
console.log("接收请求");
console.log(JSON.stringify(request.headers));
let body = [];
request.on('data', chunk => {
body.push(chunk);
});
request.on('end', () => {
console.dir(body.toString());
});
response.writeHead(200);
response.end('OK\n');
}).listen(8000);
}
3、在线文件服务
一款成熟的在线文件服务,需要提供文件上传,预览,下载的三大功能,从http协议的层面来分析,不管是哪一项功能,都是文件数据报文的传输,没有什么特殊的操作,区别就在于这三者的请求&响应,头信息各不一致。
3.3、文件上传
从本质上来分析,文件上传的主体功能是,客户端获取到文件后,将文件内容以数据包的形式发送给了后端。后端解析数据包,通过I/O流的方式,将文件以原样的名称和格式写入服务器本地硬盘。如何从数据包中提取文件名和文件格式,才是核心所在。
3.3.1、报文抓包
- 搭建一个简单的http服务,无需提供文件上传的处理功能,只要保证端口能通;
- postman发起post请求,body类型选择file,造两个file,一个上传简单的文本文件(
checkTls.sh
),另一个为空; - wireshark抓包如下:
POST /upload HTTP/1.1
User-Agent: PostmanRuntime/7.28.4
Accept: */*
Postman-Token: a786fccc-5d6e-4042-9d58-82c41dbbf510
Host: 127.0.0.1:80
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
Content-Type: multipart/form-data; boundary=--------------------------242891718799640360995416
Content-Length: 1174
----------------------------242891718799640360995416
Content-Disposition: form-data; name=""; filename="checkTls.sh"
Content-Type: application/x-sh
#!/bin/bash
# ......https...............
if [ $# -ne 1 ]; then
echo "................................. ...... /check_ssl.sh www.baidu.com"
else
#...............host
host=$1
#..................
#end_date=`echo |openssl s_client -servername $host -connet $host:443 2nssl x509 -noout -dates|awk -F '=' '/notAfter/{print $2}'`
#....................................
end_data=`date +%s -d "$(echo |openssl s_client -servername $host -connect $host:443 2>/dev/null | openssl x509 -noout -dates|awk -F '=' '/notAfter/{print $2}')"`
#...............
new_date=$(date +%s)
#......SSL....................................
#......SSL..........................................
days=$(expr $(expr $end_data - $new_date) / 86400)
echo -e "\033[31m ......$host ...... ......${days}......... ............... \033[0m"
fi
----------------------------242891718799640360995416
Content-Disposition: form-data; name=""; filename=""
----------------------------242891718799640360995416--
HTTP/1.1 200 OK
Date: Thu, 20 Oct 2022 15:19:11 GMT
Connection: keep-alive
Keep-Alive: timeout=5
Content-Length: 20
{"name":"ikejcwang"}
3.3.2、请求头的设定
Content-Type: multipart/form-data; boundary=--------------------------242891718799640360995416
1、抓包可以看到,即使postman中没有设置Content-Type
,文件上传的request.headers['Content-Type']
被设置成了multipart/form-data
,这一点可以粗暴的记忆,文件上传操作的这个头信息不会变;
2、boundary代表分隔符的内容,设计理念是,post请求的报文体应该是一个完整的form表单,而不会单单只是一个文件,所以为了方便的从请求数据包中区分处理不同的表单项,特地出现这一分割线,本质就是格式化解析报文,重新构造表单内容。
3.3.3、请求体的设定
----------------------------242891718799640360995416
Content-Disposition: form-data; name=""; filename="checkTls.sh"
Content-Type: application/x-sh
#!/bin/bash
# ......https...............
if [ $# -ne 1 ]; then
echo "................................. ...... /check_ssl.sh www.baidu.com"
else
#...............host
host=$1
#..................
#end_date=`echo |openssl s_client -servername $host -connet $host:443 2nssl x509 -noout -dates|awk -F '=' '/notAfter/{print $2}'`
#....................................
end_data=`date +%s -d "$(echo |openssl s_client -servername $host -connect $host:443 2>/dev/null | openssl x509 -noout -dates|awk -F '=' '/notAfter/{print $2}')"`
#...............
new_date=$(date +%s)
#......SSL....................................
#......SSL..........................................
days=$(expr $(expr $end_data - $new_date) / 86400)
echo -e "\033[31m ......$host ...... ......${days}......... ............... \033[0m"
fi
这一段,是分割符分割出来完整的第一个文件内容,包含内容描述,文件名,内容类型,内容详情,所以需要格式化处理它~
1、Content-Disposition
为内容描述字段,name
为请求提交数据表单项的字段名,postman这里没填,所以为空,filename
为上传文件的文件名;
2、Content-Type
含义等同于请求头中的 Content-Type
,但是它会具体化文件的格式类型;
3、下面的一个空格之后,才是上传文件的内容详情(可以拿它来做操作),这里我上传的是文本,所以能够被解析出来,要是图片或者其他类型文件,解析的内容就无法辨识了;
4、第二个分割符表示,postman那里创建了两个字段,只不过有一个为空,没有填值,这里也给标记出来了。
3.4、文件预览
3.4.1、响应头的设定
1、判断响应头Content-Type
中,设定的类型,浏览器是否可以解析预览,如果可以的话,图片,媒体,文本,PDF……等可以直接在浏览器预览,无需先下载然后打开
以下是预览一个JS文件,设定的Content-Type
为application/javascript
,抓包响应数据如下所示:
HTTP/1.1 200 OK
Content-type: application/javascript;charset=utf-8
Date: Sat, 22 Oct 2022 06:48:29 GMT
Connection: keep-alive
Keep-Alive: timeout=5
Content-Length: 570
/**
* ............
*/
'use strict';
const os = require('os');
const fs = require('fs');
const settingFile = './etc/setting.json';
run();
function run() {
process.env.settingsFile = settingFile;
process.env.settings = fs.readFileSync(settingFile).toString();
process.workerCount = JSON.parse(process.env.settings).workerCount || Math.max(Math.min(process.env.CPUS_LIMIT || 16, os.cpus().length) - 1, 1); // .....................
process.ikeVersion = JSON.parse(process.env.settings).version || new Date().toISOString();
require('./lib/enging');
}
3.5、文件下载
3.5.1、响应头的设定
1、同样也是判断响应头Content-Type
是否是直接下载的类型:application/force-download
,是的话,直接将指定链接的文件下载到本地;
2、下载文件的名称,如果响应头有设置Content-Disposition
的话,则以其中的filename
为主,如果没有该头信息的话,浏览器会根据请求path来最终确定需要指定的文件名称;
以下是下载一个JS文件,设定的Content-Type
为application/force-download
,设定的Content-Disposition
为attachment; filename=start.js
,抓包响应数据如下所示:
HTTP/1.1 200 OK
Content-Type: application/force-download
Content-Disposition: attachment; filename=start.js
Date: Sat, 22 Oct 2022 06:53:36 GMT
Connection: keep-alive
Keep-Alive: timeout=5
Content-Length: 570
/**
* ............
*/
'use strict';
const os = require('os');
const fs = require('fs');
const settingFile = './etc/setting.json';
run();
function run() {
process.env.settingsFile = settingFile;
process.env.settings = fs.readFileSync(settingFile).toString();
process.workerCount = JSON.parse(process.env.settings).workerCount || Math.max(Math.min(process.env.CPUS_LIMIT || 16, os.cpus().length) - 1, 1); // .....................
process.ikeVersion = JSON.parse(process.env.settings).version || new Date().toISOString();
require('./lib/enging');
}
4、实现方案
4.1、业务未动,日志先行
虽然是简单的模拟,但也需要参照一款线上服务组件来开发,不管是针对后面的排错,拓展,开源……都是打下了标准的基础,这一点,在任何场景下都屡试不爽。
为了同样兼容Node
,cluster
多进程高性能的模式,采集日志也是如此,worker进程负责采集日志到本地缓冲区,然后以进程通讯的方式,全部传递给master落盘存储;
大致逻辑如下:
对外提供
WriteLog
函数,收集日志的类型,日志字段信息,由各个进程调用,采集到的日志信息暂时推送到当前进程的日志队列中:1、定时轮询这个队列,清空它,通过进程通信的方式将其发送给master处理;
2、为了避免高并发下,日志采集汹涌,通信发送的队列太长,每次往队列推送结束后,判断当前队列的长度是否大于配置项
日志队列最大长度
,如果大于的话,清空它,通过进程通信的方式将其发送给master处理;master进程接收到worker进程发送的日志数据包后落盘处理操作:
1、将日志解析,先推送到日志缓冲区,并没有进行实时落盘,减少I/O密集操作;
2、定时轮询缓冲区,15秒一次,清空它,执行落盘计划;
3、为了避免高并发下日志采集汹涌,缓冲区太大,I/O压力上升;每次推送到日志缓冲区后,判断当前的缓冲区长度是否大于配置项
日志最大缓冲长度
,如果大于的话,清空它,并且执行落盘操作;4、落盘操作,是否小时分割,是否需要创建新文件,内容追加……
通过对外函数,提供主动配置参数(同时设置默认配置):
日志最大缓冲长度;(日志落盘存储不是实时,为了避开I/O太密集的操作,通过一个缓冲区来接收);
日志队列最大长度(worker采集的日志并不是实时传递给master,而是有一个队列来存储,定时传递);
日志存储路径;
是否启用小时分割;
日志保存天数(定时清除历史日志);
禁止日志采集的类型(数组);
- 定时任务清除历史日志,防止磁盘打满,1小时轮询一次,默认保存时间30天。
这段代码,可以独立于项目存在,可以在任意场景都直接引入它;
/**
* 日志采集
*/
'use strict'
const fs = require('fs');
const cluster = require('cluster');
const path = require('path');
const logCache = {}; // 异步写入的日志缓存,主线程控制
const lineCaches = {}; // 线程之间的日志缓存,会put到logCache
const logPrefixFormat = 'yyyyMMdd';
const cleanTime = 3600000; // 清理日志的周期,毫秒粒度,1小时
const flushTime = 15000; // 刷新日志的周期,毫秒粒度,15秒
let maxLogBufferSize = 8 * 1024 * 1024; // 日志最大缓冲长度
let maxLineCacheSize = 16; // 行缓存的最大长度
let useHourSuffix = false; // 是否启用小时做后缀分割
let keepDays = 30; // 日志保存的天数
let logPath = path.resolve(__dirname); // 日志路径
let disableCategory = {}; // 禁止采集的日志类型
let maxColumnSize = 4000; // 行的最大长度
let META_CHARS = {
'\b': '\\b',
'\n': '\\n',
'\f': '\\f',
'\r': '\\r',
'"': '""'
};
let escapable = /[\b\n\f\r"]/g;
function escapeChar(char) {
return META_CHARS[char];
}
/**
* 添加日志到缓存中
* @param category
* @param line
*/
function putLine(category, line) {
let cat = logCache[category];
if (!cat) {
cat = logCache[category] = [];
cat.charCount = 0;
}
cat.push(line);
cat.charCount += line.length;
if (cat.charCount >= maxLogBufferSize) {
console.log('putLine exceed max log buffer size', maxLogBufferSize, 'flushing....');
exports.FlushLog(category);
}
}
/**
* 发送日志到缓存中,主线程直接put到缓存中,worker线程需要通过通信put
* @param category
* @param line
*/
function sendLine(category, line) {
if (cluster.isMaster) {
putLine(category, line);
} else {
cluster.worker.send({
cmd: 'WriteLog',
category: category,
line: line
}, err => {
if (err) {
console.error('Write Log Error: ' + err.toString())
}
})
}
}
/**
* 递归创建目录
* @param dir
*/
function mkdirp(dir) {
let dirNames = dir.split(path.sep);
let base = dirNames[0];
if (dir.indexOf('/') == 0) {
base = base + '/';
}
for (let i = 1; i < dirNames.length; i++) {
base = path.join(base, dirNames[i]);
!fs.existsSync(base) && fs.mkdirSync(base);
}
}
function flushLog(targetCategory) {
if (!cluster.isMaster) {
return;
}
if (!logPath) {
logPath = path.resolve('log');
}
mkdirp(logPath);
/**
* 刷新缓存,写日志的步骤
* @param category
*/
let checkCategory = category => {
let toWrite = logCache[category];
if (toWrite) {
delete logCache[category];
let now = new Date();
let file = logPath + '/' + now.Format(logPrefixFormat) + '.' + category + (useHourSuffix ? '_' + now.getHours() : '') + '.csv';
let s = fs.createWriteStream(file, {flags: 'a'});
s.on('error', err => {
console.error('Flush Log Error: ' + err.message);
});
toWrite.forEach(block => {
s.write(block);
s.write('\n');
});
toWrite.length = 0;
s.end();
}
}
if (targetCategory) {
checkCategory(targetCategory);
} else {
for (let category in logCache) {
checkCategory(category);
}
}
}
/**
* 周期执行函数,发送日志
*/
setInterval(async () => {
let needDelete = [];
let now = Date.now();
for (let category in lineCaches) {
let cat = lineCaches[category];
if (cat.firstLineTime < (now - 1000)) {
sendLine(category, cat.join('\n'));
needDelete.push(category);
}
}
for (let category of needDelete) {
delete lineCaches[category];
}
}, 1000);
/**
* 主线程的工作:
* 1、定期删除日志文件;
* 2、定时刷新日志到文件;
* 3、接收工作线程发来的日志消息并put到缓存
* 4、程序退出时,刷新日志到文件
*/
if (cluster.isMaster) {
setInterval(() => {
let oldEst = new Date(Date.now() - (keepDays * 1000 * 86400)).Format(logPrefixFormat);
if (logPath && fs.existsSync(logPath)) {
fs.readdir(logPath, (err, files) => {
if (!err && files) {
files.forEach(f => {
let m = f.match(/^([0-9]{8})/);
if (m && m[1] <= oldEst) {
fs.unlink(path.join(logPath, f), () => {
});
}
});
}
});
}
}, cleanTime);
setInterval(flushLog, flushTime);
cluster.on('online', worker => {
worker.on('message', msg => {
if (msg.cmd === 'WriteLog') {
putLine(msg.category, msg.line);
}
});
});
process.on('beforeExit', () => {
exports.FlushLog();
})
}
/**
* 为日期对象添加格式化函数
* @param fmt
* @constructor
*/
Date.prototype.Format = function (fmt) {
let options = {
"y+": this.getFullYear(), //年份
"M+": this.getMonth() + 1, //月份
"d+": this.getDate(), //日
"h+": this.getHours(), //小时
"m+": this.getMinutes(), //分
"s+": this.getSeconds(), //秒
"S+": this.getMilliseconds() //毫秒
}
let regexMap = {
"y+": /(y+)/,
"M+": /(M+)/,
"d+": /(d+)/,
"h+": /(h+)/,
"m+": /(m+)/,
"s+": /(s+)/,
"S+": /(S+)/
}
for (let k in options) {
if (regexMap[k].test(fmt)) {
fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (options[k]) : (('0000' + options[k]).substr(-RegExp.$1.length)));
}
}
return fmt;
}
/**
* 设置日志最大缓冲长度
* @param size
* @constructor
*/
exports.SetMaxLogBufferSize = function (size) {
maxLogBufferSize = size;
}
/**
* 设置最大行缓存长度
* @param size
* @constructor
*/
exports.SetMaxLineCacheSize = function (size) {
maxLineCacheSize = size;
}
/**
* 设置日志路径
* @param _logPath
* @constructor
*/
exports.SetLogPath = function (_logPath) {
logPath = path.resolve(_logPath);
}
/**
* 设置是否启用小时分割
* @param _useHourSuffix
* @constructor
*/
exports.SetHourSuffix = function (_useHourSuffix) {
useHourSuffix = _useHourSuffix;
}
/**
* 设置日志保存的天数
* @param days
* @constructor
*/
exports.SetKeepDays = function (days) {
keepDays = days;
}
/**
* 获取日志存储路径
* @returns {*}
* @constructor
*/
exports.GetLogPath = function () {
return logPath;
}
/**
* 获取当前日期:yyyyMMdd
* @returns {*}
* @constructor
*/
exports.GetDayYMD = function () {
return new Date().Format(logPrefixFormat)
}
/**
* 设置禁止采集的日志类型
* @param categories
* @constructor
*/
exports.SetDisableCategory = function (categories = ['ALL']) {
disableCategory = {};
categories.forEach(c => {
if (c) {
disableCategory[c] = 1;
}
});
}
/**
* 设置行的最大长度
* @param maxSize
* @constructor
*/
exports.SetMaxColumnSize = function (maxSize) {
if (maxSize > 0) {
maxColumnSize = maxSize;
} else {
maxColumnSize = 4000;
}
}
/**
* 写日志
* @param category
* @param msg
* @constructor
*/
exports.WriteLog = function (category, msgList) {
if (!msgList || !category || msgList.length <= 0) {
return;
}
if (disableCategory['ALL'] || disableCategory[category]) {
return;
}
let outMsg = [];
outMsg.push('"');
outMsg.push(new Date().Format('yyyy-MM-dd hh:mm:ss'));
outMsg.push('",');
msgList.forEach(msg => {
outMsg.push('"');
if (msg) {
msg = String(msg);
if (msg.length > maxColumnSize) {
msg = msg.slice(0, maxColumnSize);
}
outMsg.push(msg.replace(escapable, escapeChar));
}
outMsg.push('"');
outMsg.push(',');
});
outMsg.length--; // 移除最后一个逗号
let line = outMsg.join('');
let cat = lineCaches[category];
if (!cat) {
cat = lineCaches[category] = [];
cat.firstLineTime = Date.now();
}
cat.push(line);
if (cat.length > maxLineCacheSize) {
sendLine(category, cat.join('\n'));
delete lineCaches[category];
}
}
/**
* 刷新日志缓存
* @param targetCategory
* @constructor
*/
exports.FlushLog = flushLog
4.2、配置项的设定
通过配置文件,可以决定部署架构方式,监听的IP/Port,数据存储,日志落盘……相关路径的设置,并且允许默认值填充;
nativeIp
作为分布式部署访问的前提,为当前机器对外提供访问的IP,会以哈希值填充到在线文件的路径中;- 允许对外提供当前启动目录下的
etc/settings.json
,通过它来引入配置项; - 配置允许授权文件上传客户端的AppId,appToken,可以配置多个key-value(防止恶意上传打满磁盘);
- 配置文件数据的存储位置;
- 配置文件下载,预览的前缀,集群模式下为
协议://域名
,作为上传成功的响应; - 配置日志加载项,对应上述日志代码;
'use strict'
const fs = require('fs');
const path = require('path');
const settingFile = './etc/settings.json';
process.env.settingsFile = settingFile;
process.env.settings = fs.existsSync(settingFile) ? fs.readFileSync(settingFile).toString() : '{}';
let default_settings = {
appName: 'ike_httpFileServer',
bindIP: '0.0.0.0',
bindPort: 8088,
nativeIp: '2.5.8.88',
secrets: {
ike: '4a5826a55872410499c27aeb860ac195', // 授权允许的app,key-value分别对呀appId与appToken
},
dataDir: path.resolve(__dirname, 'data'), // 数据文件的存放位置
dataUrlPrefix: 'http://9.135.218.88:8088', // 文件下载的前缀,协议+域名(ip)
log: {
maxLogBufferSize: 8 * 1024 * 1024, // 默认日志最大缓冲大小L:8M
maxLineCacheSize: 16, // 默认行缓存队列的最大长度:16行
maxColumnSize: 4000, // 默认一行日志的最大长度:4000
path: path.resolve(__dirname, 'log'), // 默认日志的保存路径,当前
useHourSuffix: false, // 默认不启用小时分割
keepDays: 30, // 默认日志的保存天数
}
}
let settings = Object.assign({}, default_settings, JSON.parse(process.env.settings));
exports.settings = settings;
4.3、核心代码
这里才是关键点,引入了上文的log.js
和settings.js
。
前期准备工作:
引入
log.js
,并依次初始化日志的配置项;根据配置文件的
DataDir
,LogPath
,依次初始化数据存储目录,日志落盘目录;根据当前运行机器的CPU核数,fork出CPU核数 - 1 的worker进程数,充分利用机器资源,打满并发;
master监控worker的健康状态,若有意外死掉的进程,重新fork起来,时刻保证CPU被用满;
worker在当前的
DataDir
中,又新建一层文件夹,以预设的当前进程的环境变量workerProcessId
命名;- 启动http服务,配置日志采集项;
监听http请求->文件上传:
文件上传的硬性要求,必须是post,path必须是
/upload
;文件上传请求的headers必须携带签名项:appid,apptoken,signature(用作授权的客户端校验);
文件上传报文的处理,按照上文抓包的分析,格式化处理,支持多文件上传;
文件落盘存储,新名称采用随机值存储(防止通过在线链接判断其目的);
文件落盘存储,路径为
DataDir/workerProcessId/DAY/
,指定存储目录/进程号/日期,方便查看上传成功后,响应值为key-value,key为上传的文件原始名称,value为在线链接的完整地址,在线链接的完整地址前面添加当前机器
nativeIp
的哈希值,作为分布式部署访问的条件;监听http请求->文件下载:
文件下载由于是读操作,所以此处鉴权,要求必须是get;
从请求的path中分离出文件路径,然后前面加上
DataDir
,判断文件是否存在;是否需要响应404;读取文件,将内容Buffer写入响应体中,设置响应头为文件类型,表示预览,浏览器不支持解析的类型直接为下载;
预留了直接下载的响应头注释,放开那几行代码即可。
- 根据请求的完成
finish
,error
,abort
的状态,来采集不同的日志信息;
'use strict'
const http = require('http')
const fs = require('fs');
const os = require('os');
const cluster = require('cluster');
const nodeUtil = require('util');
const mime = require('mime')
const URL = require('url');
const crypto = require('crypto')
const path = require('path');
const log = require('./log');
const settings = require('./settings').settings;
/**
* 随机数组,随机位数
*/
const randomArr = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'];
const randomRange = 16;
/**
* 子进程数:当前机器cpu核数 - master进程数(1),默认cpu核数最大限制为16
*/
const workCount = Math.min(16, os.cpus().length) - 1;
/**
* 初始化日志的配置
*/
log.SetMaxColumnSize(settings.log.maxColumnSize);
log.SetMaxLogBufferSize(settings.log.maxLogBufferSize);
log.SetMaxLineCacheSize(settings.log.maxLineCacheSize);
log.SetHourSuffix(settings.log.useHourSuffix);
log.SetLogPath(settings.log.path);
log.SetKeepDays(settings.log.keepDays);
/**
* 初始化目录
*/
if (!fs.existsSync(settings.dataDir)) {
fs.mkdirSync(settings.dataDir)
}
if (!fs.existsSync(settings.log.path)) {
fs.mkdirSync(settings.log.path)
}
/**
* master进程负责fork子进程
*/
if (cluster.isMaster) {
let workProcessMap = {}; // 子进程的信息map
for (let i = 1; i < workCount; i++) {
console.log('for 循环:' + i, workCount)
let env = Object.assign({}, process.env, {
'workerProcessId': i
});
let worker = cluster.fork(env);
workProcessMap[worker.id] = env;
}
cluster.on('online', function (newWorker) {
console.log('new worker online: ' + newWorker.id)
});
/**
* 子进程断开
*/
cluster.on('disconnect', function (oldWorker) {
console.log('The worker #' + oldWorker.id + ' has disconnected');
});
/**
* 子进程下线,重新fork
*/
cluster.on('exit', function (oldWorker, code, signal) {
console.log(`worker ${oldWorker.id} died (${signal || code}). restarting...`);
let cp = cluster.fork(workProcessMap[oldWorker.id]);
console.log('worker restart created, ', cp.id);
workProcessMap[cp.id] = workProcessMap[oldWorker.id];
delete workProcessMap[oldWorker.id];
cp.workerProcessId = workProcessMap[cp.id].workerProcessId;
console.log('worker log id is', cp.workerProcessId);
cp.isMasterWorker = (cp.workerProcessId == '-1');
cp.isRealWorker = (cp.workerProcessId >= 0);
});
} else if (cluster.isWorker) {
if (process.env.workerProcessId) {
let dataDir = path.resolve(settings.dataDir, process.env.workerProcessId.toString());
if (!fs.existsSync(dataDir)) {
fs.mkdirSync(dataDir)
}
startServer()
}
}
function startServer() {
let server = http.createServer();
server.on('request', listenRequestEvent);
server.on('close', () => {
console.log('http Server has Stopped At:' + settings['bindPort'])
});
server.on('error', err => {
console.log('http Server error:' + err.toString());
setTimeout(() => {
process.exit(1);
}, 3000);
});
server.listen(settings['bindPort'], settings['bindIP'], settings['backlog'] || 8191, () => {
console.log('Started Http Server At: ' + settings['bindIP'] + ':' + settings['bindPort'])
})
}
/**
* 请求日志:
* 类型(aborted,finished,error),
* 结果(fail,success),
* 动作(上传还是下载),
* 请求头host,请求路径,客户端IP,耗时,异常信息
* @param fileLog
* @param type
*/
function requestLog(fileLog, type) {
fileLog.duration = Date.now() - fileLog.startTime
log.WriteLog('request', [
type, defaultStr(fileLog.result), defaultStr(fileLog.action), defaultStr(fileLog.host), defaultStr(fileLog.path),
defaultStr(fileLog.remoteAddress), defaultStr(fileLog.duration), defaultStr(fileLog.error)]);
}
/**
* 异常日志
* @param args
*/
function exceptionLog(...args) {
log.WriteLog('exception', [...args])
}
function defaultStr(v) {
return v ? v : ''
}
/**
* 监听请求事件(强制要求上传一定是post,path = /upload)
* @param request
* @param response
* @returns {Promise<void>}
*/
async function listenRequestEvent(request, response) {
request.fileLog = {
startTime: Date.now(),
host: request.headers['host'],
remoteAddress: request.remoteAddress,
};
request.on('aborted', () => {
request.fileLog.result = 'fail'
requestLog(request.fileLog, 'aborted');
});
request.on('finish', () => {
request.fileLog.result = 'success'
requestLog(request.fileLog, 'finish');
})
request.on('error', (err) => {
request.fileLog.result = 'fail'
request.fileLog.error = nodeUtil.inspect(err)
requestLog(request.fileLog, 'err');
})
try {
let sourceUrl = URL.parse(request.url, true);
if (request.method.toLowerCase() == 'post') {
request.fileLog.action = 'upload'
if (sourceUrl.pathname != '/upload') {
request.abort()
return
}
if (!checkHeaders(request.headers)) {
request.fileLog.result = 'fail';
request.fileLog.error = 'request headers checkout fail';
response.statusCode = 400;
response.end('request headers checkout fail, please retry')
return
}
upload(request).then(res => {
response.statusCode = 200;
response.setHeader('Content-Type', 'application/json')
response.end(JSON.stringify(res))
})
} else if (request.method.toLowerCase() == 'get') {
request.fileLog.action = 'download';
download(request, sourceUrl).then(res => {
// let temp = sourceUrl.pathname.split('/');
response.statusCode = 200;
response.setHeader('Content-type', mime.getType(res.filePath) + ';charset=utf-8')
// response.setHeader('Content-Type', 'application/force-download');
// response.setHeader('Content-Disposition', 'attachment; filename=haha' + temp[temp.length - 1]);
response.end(res.data)
}).catch(err => {
console.dir(err)
if (err.code) {
response.statusCode = err.code
} else {
response.statusCode = 500;
}
if (err.body) {
response.end(err.body.toString)
} else {
response.end(err.toString)
}
});
} else {
request.fileLog.action = 'unknown'
request.abort()
return
}
} catch (e) {
request.fileLog.error = nodeUtil.inspect(e)
request.abort()
return
}
}
/**
* 验证请求头的签名信息
* @param headers
* @returns {boolean}
*/
function checkHeaders(headers) {
if (!headers['appid'] || !headers['apptoken'] || !headers['signature']) {
return false;
}
let appId = headers['appid'];
if (!settings.secrets[appId]) {
return false
}
let appToken = headers['apptoken'];
let signature = hashStr(appId + appToken);
return headers['signature'] === signature
}
/**
* 哈希值计算
* @param str
* @returns {string}
*/
function hashStr(str) {
return crypto.createHash('sha256').update(str).digest('hex').toUpperCase()
}
/**
* 上传,兼容单文件和多文件的上传
* @param request
* @returns {Promise<void>}
*/
function upload(request) {
return new Promise((resolve, _) => {
let data = Buffer.alloc(0)
let separator = `--${request.headers['content-type'].split('boundary=')[1]}`
request.on('data', chunk => {
data = Buffer.concat([data, chunk])
})
request.on('end', () => {
let bufArr = uploadBufferSplit(data, separator).slice(1, -1)
let result = [];
let error = [];
bufArr.forEach(item => {
let [head, body] = uploadBufferSplit(item, '\r\n\r\n')
let headArr = uploadBufferSplit(head, '\r\n').slice(1) //可能存在两行head,用换行符'\r\n'分割一下,第一个元素是截取后剩下空buffer,剔除
let headerVal = parseUploadHeader(headArr[0].toString()) // header的第一行为Content-Disposition,通过它能拿到文件名
if (headerVal.filename) {
let viewPathName = `${process.env.workerProcessId}/${log.GetDayYMD()}` // 对外展示的路径
let dirPathName = path.resolve(settings.dataDir, viewPathName) // 真正存储在磁盘上的路径
try {
if (!fs.existsSync(dirPathName)) {
fs.mkdirSync(dirPathName)
}
let temp = headerVal.filename.split('.')
let createFileName = `${randomStr()}.${temp[temp.length - 1]}` // 随机文件名
let filePath = `${dirPathName}/${createFileName}`
let resultPath = `${viewPathName}/${createFileName}`
fs.writeFileSync(filePath, body.slice(0, -2))
result.push({
[headerVal.filename]: `${settings.dataUrlPrefix}/${hashStr(settings.nativeIp)}/${resultPath}`
})
} catch (e) {
error.push(`${headerVal.filename}: ${e}`)
}
} else {
error.push(`unable to parse filename: ${headerVal}`)
exceptionLog('upload', headerVal, 'unable to parse filename');
}
})
resolve({result, error})
})
})
}
/**
* 多文件上传的缓存数组分割
* @param buffer
* @param separator
* @returns {[]}
*/
function uploadBufferSplit(buffer, separator) {
const res = []
let offset = 0;
let index = buffer.indexOf(separator, 0)
while (index != -1) {
res.push(buffer.slice(offset, index))
offset = index + separator.length
index = buffer.indexOf(separator, index + separator.length)
}
res.push(buffer.slice(offset))
return res
}
/**
* 文件上传的头信息格式化,
* @param header
* @returns {{}}
*/
function parseUploadHeader(header) {
const [_, value] = header.split(': ')
const valueObj = {}
try {
value.split('; ').forEach(item => {
const [key, val = ''] = item.split('=')
valueObj[key] = val && JSON.parse(val)
})
} catch (e) {
exceptionLog('parseUploadHeader', value, e);
}
return valueObj
}
/**
* 下载
* @param request
* @param sourceUrl
* @returns {Promise<unknown>}
*/
async function download(request, sourceUrl) {
return new Promise((resolve, reject) => {
let tempArr = sourceUrl.pathname.split('/');
tempArr.splice(0, 2); // 删除第一个空格,和第二个nativeIp的哈希值
let filePath = `${settings.dataDir}/${tempArr.join('/')}`
if (!fs.existsSync(filePath)) {
reject({
code: 404,
body: 'file not found'
})
} else {
fs.readFile(filePath, (err, data) => {
if (err) {
reject({
code: 500,
body: nodeUtil.inspect(err)
})
} else {
resolve({filePath, data})
}
})
}
})
}
/**
* 随机字符串
* @returns {string}
*/
function randomStr() {
let result = ''
for (let i = 0; i < randomRange; i++) {
result += randomArr[Math.round(Math.random() * (randomArr.length - 1))];
}
return result;
}
4.4、测试
1、打开postman,请求头的设定,新增三个字段appid,apptoken,signature(为appid + apptoken 的sha256值大写);
2、body选择上传的文件,可以上传多个;
3、浏览器预览:
4、数据存储结构:
├── data
│ ├── 1
│ │ └── 20221023
│ │ ├── IMoCfUdNRaUMkVVy.sh
│ │ ├── KnJSPvlejvMoSrxH.sh
│ │ ├── QwOOJoUTCYdkmZby.pdf
│ │ ├── iJlEbBwgifNFMgXH.jpeg
│ │ ├── sEMrenkBNPThOUVy.pdf
│ │ └── tYRGuYkiwvJTGBjF.sh
│ ├── 2
│ │ ├── 20221018
│ │ │ ├── YGCYdDgPDMHKzfrg.jpeg
│ │ │ └── kgVjozYwRAYUSpqU.js
│ │ └── 20221023
│ │ ├── UdEbPfVsniDfORru.sh
│ │ └── jyfgVDrvJrIIkWpP.pdf
…………………………
4、分布式部署
分布式部署:鸡蛋不能放到一个篮子里,为了拓宽硬件机房的功能性,更多的场景不管是数据存储,还是在线服务,都是选择多节点组装成集群,采用分布式部署的架构,做到融灾与高性能的兼具性;
4.1、架构图
画了个流程图,大致如下所示:
- 部署多个文件服务节点,组成文件服务集群(即把上述编译好的js文件,放到多台机器上执行),文件服务节点之间不关联;
- 部署多个http网关节点,组成网关集群,通过网关可以设置安全组,防刷,限流,可以配置网关为文件服务集群的唯一白名单;
- 客户端的请求先到http网关,网关在分析请求后,选择转发到具体的文件服务节点;文件上传可以是随机均衡,可以是权重,也可以是会话保持,但是文件下载(预览),http网关必须根据请求path的
nativeIp
哈希值,从自身配置中找到具体的文件服务节点,然后定向转发,否则会报错404;
4.2、http网关
作为一款http网关,完全透传客户端的请求&响应报文(path,headers,body……),只不过在请求链路的中间,充当安全防控和文件预览的溯源的功能:
- 文件服务集群的
nativeIp
列表必须配置在网关集群上,这样子它才可以根据path中的哈希值做到定向转发; - 文件服务节点,文件上传响应值的链接前缀,必须配置成http网关集群的域名,这样透传给客户端的最终在线链接才是有效的;
- 其他的,跟绝大多数网关功能类似……
实现的话,逻辑比较简单,这里就不再续写代码了,本来只是通过“模拟微信开发,nodejs搭建一套高性能分布式的在线文件服务系统”作为芥子,引入分布式部署构建的架构图,而我本地自测和调试的时候,就没必要消耗额外的资源去部署啦,单节点即可。
5、写在文末
上述代码内容言简意赅,没有冗余项,也没有引入第三方框架依赖包,完全是以node原生的模式搭建,是可以直接拿出来使用的。但作者并不是专业的前端coder,本着带对象学习前端开发的方式,才开启了“模拟微信”这一征程,在征程结束之前,即对象还没有拿到一份正式的前端offer,代码是不会放到github的。
每次推送的章节,即便没有开源链接,代码也绝大不会穿插嵌套复杂,都可以直接贴出去引用~