3.4 上传文件
先介绍一下md5,每个文件都有一个唯一的 MD5 值(比如2bf8170b42cc7124b04a8886c83a9c6f),就好比每个人的指纹都是唯一的一样,效验 MD5 就是用来确保文件在传输过程中未被修改过。也就是说,如果要上传文件的MD5和数据库的某个文件的MD5匹配,意味着这两个文件一样。那么就无需重复上传。
1)客户端在上传文件之前将文件的 MD5 码上传到服务器。
2)服务器端判断是否已存在此 MD5 码,如果存在,说明该文件已存在,则此文件无需再上传,在此文件的计数器加 1,说明此文件多了一个用户共用。
3)如果服务器没有此 MD5 码,说明上传的文件是新文件,则真正上传此文件。
我们先将处理上传新文件的逻辑。
1)先通过 nginx-upload-module 模块上传文件到临时目录
2)nginx-upload-module 模块上传完文件后,通知/api/upload 后端处理程序:
3)后端处理程序 ApiUpload 函数解析文件信息,然后将临时文件上传到 fastdfs
1、解析客户端的post请求
------WebKitFormBoundaryLheXCMpLubcS8BsC Content-Disposition: form-data; name="file"; filename="牛牛.png" Content-Type: image/png ------WebKitFormBoundaryLheXCMpLubcS8BsC Content-Disposition: form-data; name="user" handsome1 ------WebKitFormBoundaryLheXCMpLubcS8BsC Content-Disposition: form-data; name="md5" aa3a04152a85412779357dc008d67ae7 ------WebKitFormBoundaryLheXCMpLubcS8BsC Content-Disposition: form-data; name="size" 2292609 ------WebKitFormBoundaryLheXCMpLubcS8BsC--
post请求通过nginx-upload-module加工后到达后台server,后台server逐步从post请求中解析出相应的文件信息
//===============> 1. 解析post请求 <============ // boundary=----WebKitFormBoundaryjWE3qXXORSg2hZiB 找到起始位置 p1 = strstr(begin, "\r\n"); // 作用是返回字符串中首次出现子串的地址 if (p1 == NULL) { LogError("wrong no boundary!"); ret = -1; goto END; } //拷贝分界线 strncpy(boundary, begin, p1 - begin); // 缓存分界线, 比如:WebKitFormBoundary88asdgewtgewx boundary[p1 - begin] = '\0'; //字符串结束符 LogInfo("boundary: {}", boundary); //打印出来 // 查找文件名file_name begin = p1 + 2; // 2->\r\n p2 = strstr(begin, "name=\"file_name\""); //找到file_name字段 if (!p2) { LogError("wrong no file_name!"); ret = -1; goto END; } p2 = strstr(begin, "\r\n"); // 找到file_name下一行 p2 += 4; //下一行起始 begin = p2; // p2 = strstr(begin, "\r\n"); strncpy(file_name, begin, p2 - begin); LogInfo("file_name: {}", file_name); // 其他的类似 // 查找文件类型file_content_type // …… // 查找文件file_path // …… // 查找文件file_md5 // …… // 查找文件file_size // …… // 查找user // ……
2、根据文件后缀对临时文件做重命名
//===============> 2. 根据文件后缀对临时文件做重命名 <============ // 获取文件名后缀 GetFileSuffix(file_name, suffix); // 20230720-2.txt -> txt mp4, jpg, png strcat(new_file_path, file_path); // /root/tmp/1/0045118901 strcat(new_file_path, "."); // /root/tmp/1/0045118901. strcat(new_file_path, suffix); // /root/tmp/1/0045118901.txt // 重命名 修改文件名 ret = rename(file_path, new_file_path); /// /root/tmp/1/0045118901 -> /root/tmp/1/0045118901.txt if (ret < 0) { LogError("rename {} to {} failed", file_path, new_file_path); ret = -1; goto END; }
3、将该文件存入fastDFS中,并得到文件的file_id
//===============> 3. 将该文件存入fastDFS中,并得到文件的file_id <============ // file_id 例如 group1/M00/00/00/ctepQmIWLzWAHzHrAAAAKTIQHvk745.txt LogInfo("uploadFileToFastDfs, file_name:{}, new_file_path:{}", file_name, new_file_path); if (uploadFileToFastDfs(new_file_path, fileid) < 0) { LogError("uploadFileToFastDfs failed, unlink: {}", new_file_path); ret = unlink(new_file_path); if (ret != 0) { LogError("unlink: {} failed", new_file_path); // 删除失败则需要有个监控重新清除过期的临时文件,比如过期两天的都删除 } ret = -1; goto END; }
将这个本地文件上传到 后台分布式文件系统(fastdfs)中,具体来说通过多进程的方式,子进程通过execlp()进程替换执行fastdfs写的的客户端上传文件的程序
//fdfs_upload_file 客户端的配置文件(/etc/fdfs/client.conf) 要上传的文件 fdfs_upload_file /etc/fdfs/client.conf zxm.txt
/* -------------------------------------------*/ /** * @brief 将一个本地文件上传到 后台分布式文件系统中 * 对应 fdfs_upload_file /etc/fdfs/client.conf 完整文件路径 * * @param file_path (in) 本地文件的路径 * @param fileid (out)得到上传之后的文件ID路径 * * @returns * 0 succ, -1 fail */ /* -------------------------------------------*/ int uploadFileToFastDfs(char *file_path, char *fileid) { int ret = 0; pid_t pid; int fd[2]; //无名管道的创建 if (pipe(fd) < 0) // fd[0] → r; fd[1] → w 获取上传后返回的信息 fileid { LogError("pipe error"); ret = -1; goto END; } //创建进程 pid = fork(); // if (pid < 0) //进程创建失败 { LogError("fork error"); ret = -1; goto END; } if (pid == 0) //子进程 { //关闭读端 close(fd[0]); //将标准输出 重定向 写管道 dup2(fd[1], STDOUT_FILENO); // 往标准输出写的东西都会重定向到fd所指向的文件, // 当fileid产生时输出到管道fd[1] // fdfs_upload_file /etc/fdfs/client.conf 123.txt //通过execlp执行fdfs_upload_file //如果函数调用成功,进程自己的执行代码就会变成加载程序的代码,execlp()后边的代码也就不会执行了. execlp("fdfs_upload_file", "fdfs_upload_file", s_dfs_path_client.c_str(), file_path, NULL); // // 执行正常不会跑下面的代码 //执行失败 LogError("execlp fdfs_upload_file error"); close(fd[1]); } else //父进程 { //关闭写端 close(fd[1]); //从管道中去读数据 read(fd[0], fileid, TEMP_BUF_MAX_LEN); // 等待管道写入然后读取 LogInfo("fileid1: {}", fileid); //去掉一个字符串两边的空白字符 TrimSpace(fileid); if (strlen(fileid) == 0) { LogError("upload failed"); ret = -1; goto END; } LogInfo("fileid2: {}", fileid); wait(NULL); //等待子进程结束,回收其资源 close(fd[0]); } END: return ret; }
4、删除本地临时存放的上传文件
//================> 4. 删除本地临时存放的上传文件 <=============== LogInfo("unlink: {}", new_file_path); ret = unlink(new_file_path); if (ret != 0) { LogWarn("unlink: {} failed", new_file_path); // 删除失败则需要有个监控重新清除过期的临时文件,比如过期两天的都删除 }
5、得到文件所存放storage的host_name,拼接出完整的http地址
//================> 5. 得到文件所存放storage的host_name <================= // 拼接出完整的http地址 LogInfo("getFullurlByFileid, fileid: {}", fileid); if (getFullurlByFileid(fileid, fdfs_file_url) < 0) { LogError("getFullurlByFileid failed "); ret = -1; goto END; }
和把文件上传到fastdfs系统一样,都是多进程加管道通信
// 子进程 //将标准输出 重定向 写管道 dup2(fd[1], STDOUT_FILENO); /*读取存储文件的信息文件,利用fastdfs自带的fdfs_file_info进程*/ //使用“fdfs_file_info”可以查看到文件的详细存储信息,也是跟上客户端的配置文件以及储服务器返回给我们的文件的路径 execlp("fdfs_file_info", "fdfs_file_info", fdfs_cli_conf_path, fileid, NULL); // 父进程 //从管道中去读数据 read(fd[0], fdfs_file_stat_buf, TEMP_BUF_MAX_LEN); //拼接上传文件的完整url地址--->http://host_name/group1/M00/00/00/D12313123232312.png
6、将该文件的FastDFS相关信息存入mysql中
//===============> 将该文件的FastDFS相关信息存入mysql中 <====== LogInfo("storeFileinfo, url: {}", fdfs_file_url); // 把文件写入file_info if (storeFileinfo(db_conn, cache_conn, user, file_name, file_md5, long_file_size, fileid, fdfs_file_url) < 0) { LogError("storeFileinfo failed "); ret = -1; // 严谨而言,这里需要删除 已经上传的文件 goto END; } ret = 0; value["code"] = 0; str_json = value.toStyledString(); // json序列化, 直接用writer是紧凑方式,这里toStyledString是格式化更可读方式
3.5 上传文件之秒传
上节提到,文件上传时会先校验MD5,如果匹配,则说明服务器已经存在该文件,客户端不需要再去调用 upload 接口上传文件。达到秒传效果。本节介绍的就是秒传。
1、sql 语句,从文件信息表file_info获取此md5值文件的文件计数器 count(表示有多少个用户拥有这个MD5值的文件)
sprintf(sql_cmd, "select count from file_info where md5 = '%s'", md5);
2、若查询不到,秒传失败
3、若查询到,再查询此用户是否已经有此文件
◼ 如果存在,说明此用户已经保存此文件,不能能重复上传
◼ 如果不存在,修改file_info对应MD5文件的count字段,进行+1,表示多一个用户拥有。同时向用户文件列表user_file_list插入一条数据。
"insert into user_file_list(user, md5, create_time, file_name, " "shared_status, pv) values ('%s', '%s', '%s', '%s', %d, %d)", user, md5, time_str, filename, 0, 0);
//秒传处理 void handleDealMd5(const char *user, const char *md5, const char *filename, string &str_json) { Md5State md5_state = Md5Failed; int ret = 0; int file_ref_count = 0; char sql_cmd[SQL_MAX_LEN] = {0}; CDBManager *db_manager = CDBManager::getInstance(); CDBConn *db_conn = db_manager->GetDBConn("tuchuang_slave"); AUTO_REL_DBCONN(db_manager, db_conn); CacheManager *cache_manager = CacheManager::getInstance(); CacheConn *cache_conn = cache_manager->GetCacheConn("token"); AUTO_REL_CACHECONN(cache_manager, cache_conn); // 1、sql 语句,获取此md5值文件的文件计数器 count sprintf(sql_cmd, "select count from file_info where md5 = '%s'", md5); LogInfo("执行: {}", sql_cmd); //返回值: 0成功并保存记录集,1没有记录集,2有记录集但是没有保存,-1失败 file_ref_count = 0; ret = GetResultOneCount(db_conn, sql_cmd, file_ref_count); //执行sql语句 LogInfo("ret: {}, file_ref_count: {}", ret, file_ref_count); if (ret == 0) //2、有结果, 并且返回 file_info被引用的计数 file_ref_count { //2.1 查看此用户是否已经有此文件,如果存在说明此文件已上传,无需再上传 sprintf(sql_cmd, "select * from user_file_list where user = '%s' and md5 = '%s' " "and file_name = '%s'", user, md5, filename); LogInfo("执行: {}", sql_cmd); //返回值: 1: 表示已经存储了,有这个文件记录 ret = CheckwhetherHaveRecord(db_conn, sql_cmd); // 检测个人是否有记录 if (ret == 1) //如果有结果,说明此用户已经保存此文件 { LogWarn("user: {}-> filename: {}, md5: {}已存在", user, filename, md5); md5_state = Md5FileExit; // 此用户已经有该文件了,不能重复上传 goto END; } // 2.2 此用户没有此文件,修改file_info中的count字段,+1 (count文件引用计数),表示多了一个用户拥有该文件 sprintf(sql_cmd, "update file_info set count = %d where md5 = '%s'", file_ref_count + 1, md5); LogInfo("执行: {}", sql_cmd); if (!db_conn->ExecutePassQuery(sql_cmd)) { LogError("{} 操作失败", sql_cmd); md5_state = Md5Failed; // 更新文件引用计数失败这里也认为秒传失败,宁愿他再次上传文件 goto END; } // 2.3 同时向user_file_list用户文件列表插入一条数据 //当前时间戳 struct timeval tv; struct tm *ptm; char time_str[128]; //使用函数gettimeofday()函数来得到时间。它的精度可以达到微妙 gettimeofday(&tv, NULL); ptm = localtime(&tv.tv_sec); //把从1970-1-1零点零分到当前时间系统所偏移的秒数时间转换为本地时间 // strftime() // 函数根据区域设置格式化本地时间/日期,函数的功能将时间格式化,或者说格式化一个时间字符串 strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", ptm); // 用户列表增加一个文件记录 sprintf(sql_cmd, "insert into user_file_list(user, md5, create_time, file_name, " "shared_status, pv) values ('%s', '%s', '%s', '%s', %d, %d)", user, md5, time_str, filename, 0, 0); LogInfo("执行: {}", sql_cmd); if (!db_conn->ExecuteCreate(sql_cmd)) { LogError("{} 操作失败", sql_cmd); md5_state = Md5Failed; // 恢复引用计数 sprintf(sql_cmd, "update file_info set count = %d where md5 = '%s'", file_ref_count, md5); LogInfo("执行: {}", sql_cmd); if (!db_conn->ExecutePassQuery(sql_cmd)) { LogError("{} 操作失败", sql_cmd); } goto END; } //查询用户文件数量, 用户数量+1 if (CacheIncrCount(cache_conn, FILE_USER_COUNT + string(user)) < 0) { LogWarn("CacheIncrCount failed"); // 这个可以在login的时候从mysql加载 } md5_state = Md5Ok; } else //3、没有结果,秒传失败 { LogInfo("秒传失败"); md5_state = Md5Failed; goto END; } END: /* 秒传文件: 秒传成功: {"code": 0} 秒传失败: {"code":1} 文件已存在:{"code": 5} */ int code = (int)md5_state; encodeMd5Json(code, str_json); }
3.6 获取共享文件列表或下载榜
分 3 个接口:
◼ 获取共享文件个数 /api/sharefiles?cmd=count
◼ 获取共享文件列表 /api/sharefiles?cmd=normal
◼ 获取共享文件下载排行榜 /api/sharefiles?cmd=pvdesc
1、共享文件个数 /api/sharefiles?cmd=count
获取共享文件数量,我们是先查redis,若有直接返回即可。若没有再查MySQL,并且把数据同步到redis。
int getShareFilesCount(CDBConn *db_conn, CacheConn *cache_conn, int &count) { int ret = 0; int64_t file_count = 0; // 先查看用户是否存在 string str_sql; // 1. 先从redis里面获取 if (CacheGetCount(cache_conn, FILE_PUBLIC_COUNT, file_count) < 0) { LogWarn("CacheGetCount FILE_PUBLIC_COUNT failed"); ret = -1; } // 2. 若数量为0,从mysql查询确定是否为0 if (file_count == 0) { // 2.1 从mysql加载 if (DBGetShareFilesCount(db_conn, count) < 0) { LogError("DBGetShareFilesCount failed"); return -1; } file_count = (int64_t)count; // 2.2 同步数据到redis if (CacheSetCount(cache_conn, FILE_PUBLIC_COUNT, file_count) < 0) // 失败了那下次继续从mysql加载 { LogError("CacheSetCount FILE_PUBLIC_COUNT failed"); return -1; } ret = 0; } // 3. 若数量不为0,直接返回 count = file_count; return ret; }
2、 获取共享文件列表 /api/sharefiles?cmd=normal
核心就是执行sql语句,然后把返回的数据解析成json打包。
str_sql = FormatString( "select share_file_list.*, file_info.url, file_info.size, file_info.type from file_info, \ share_file_list where file_info.md5 = share_file_list.md5 limit %d, %d", start, count); LogInfo("执行: {}", str_sql); result_set = db_conn->ExecuteQuery(str_sql.c_str()); if (result_set) { // 遍历所有的内容 // 获取大小 file_count = 0; while (result_set->Next()) { Json::Value file; file["user"] = result_set->GetString("user"); file["md5"] = result_set->GetString("md5"); file["file_name"] = result_set->GetString("file_name"); file["share_status"] = result_set->GetInt("share_status"); file["pv"] = result_set->GetInt("pv"); file["create_time"] = result_set->GetString("create_time"); file["url"] = result_set->GetString("url"); file["size"] = result_set->GetInt("size"); file["type"] = result_set->GetString("type"); files[file_count] = file; file_count++; } if (file_count > 0) root["files"] = files; ret = 0; delete result_set; } else { ret = -1; }
3、获取共享文件下载排行榜 /api/sharefiles?cmd=pvdesc
排行榜的逻辑比较简单,就是使用 redis 的 ZSET 做排行榜。
这里涉及到 mysql 和 redis,获取返回的是文件名和下载量。这里文件名可能重名,所以这里用了文件 md5+文件名作为唯一 ID。
1)先从 ZSET 获取排行榜,此时的 member 是 md5+文件名,score 是下载量 pv
2)然后将 member 的 md5+文件名 通过 HASH 查找对应的文件名 filename
3)将文件名 filename 和下载量 pv 返回给前端展示。
4)下载文件后,需要更新排行榜。
具体步骤:
a) mysql共享文件数量和redis共享文件数量对比,判断是否相等
b) 如果不相等,清空redis数据,从mysql中导入数据到redis (mysql和redis交互)
//===3、mysql共享文件数量和redis共享文件数量对比,判断是否相等 if (redis_num != sql_num) { //===4、如果不相等,清空redis数据,重新从mysql中导入数据到redis //(mysql和redis交互) // a) 清空redis有序数据 cache_conn->Del(FILE_PUBLIC_ZSET); // 删除集合 cache_conn->Del(FILE_NAME_HASH); // 删除hash, 理解 这里hash和集合的关系 // b) 从mysql中导入数据到redis // sql语句 strcpy( sql_cmd, "select md5, file_name, pv from share_file_list order by pv desc"); LogInfo("执行: {}", sql_cmd); pCResultSet = db_conn->ExecuteQuery(sql_cmd); if (!pCResultSet) { LogError("{} 操作失败", sql_cmd); ret = -1; goto END; } // mysql_fetch_row从使用mysql_store_result得到的结果结构中提取一行,并把它放到一个行结构中。 // 当数据用完或发生错误时返回NULL. while ( pCResultSet->Next()) { char field[1024] = {0}; string md5 = pCResultSet->GetString("md5"); // 文件的MD5 string file_name = pCResultSet->GetString("file_name"); // 文件名 int pv = pCResultSet->GetInt("pv"); sprintf(field, "%s%s", md5.c_str(), file_name.c_str()); //文件标示,md5+文件名 //增加有序集合成员 cache_conn->ZsetAdd(FILE_PUBLIC_ZSET, pv, field); //增加hash记录 cache_conn->Hset(FILE_NAME_HASH, field, file_name); } }
c) 从redis读取数据,给前端反馈相应信息