一、fastdfs上传
对于fdfs中的不同协议,它们的协议header都是一样的,只是不同功能它们之间协议body格式不同
1、请求和回应的协议格式
协议header部分:
- pkg_len:body的长度,不包含header长度
- cmd:命令(比如指明是上传还是下载…)
- status:表示状态(比如是请求状态还是响应状态)
请求的协议body部分:
- storage_index:表示 storage的索引号(上传的时候要传到哪个storage)
- file_size:文件大小
- file_ext_name:文件后缀名
- file_content:文件内容
回应的协议body部分:
- group_name:组名(下图中的group1)
- remote_filename:文件名称(表示下图中M00开始的一段)
2、源码结构图
fdfs中上传部分的 代码结构
3、源码阅读
1)fastdfs/client/fdfs_upload_file.c
也就是上传命令fdfs_upload_file调用的地方
这部分内容主要有几个部分
1.解析参数
2.如果命令参数设置了storage、指定上传的盘符,就进行设置。如果没有设置,就通过连接tracker,进行自动分配storage和存储位置
3.上传文件(在storage_do_upload_file中实现)
static void usage(char *argv[]) { printf("Usage: %s <config_file> <local_filename> " \ "[storage_ip:port] [store_path_index]\n", argv[0]);//输出upload_file的用法, 带中括号的是可选的命令参数(可以不指定,系统会默认指定一个storage的ip和端口,如果指定,可以强制选一个storage),store_path_index是上传的盘符(可以指定或者默认) } //上传文件命令的用法 : fdfs_upload_file /etc/fdfs/client.conf ./filename.txt 比如会返回:group1/M00/00/00/xxxxxx.txt int main(int argc, char *argv[]) { char *conf_filename; char *local_filename; char group_name[FDFS_GROUP_NAME_MAX_LEN + 1]; ConnectionInfo *pTrackerServer; int result; int store_path_index; ConnectionInfo storageServer; char file_id[128]; if (argc < 3) { usage(argv); return 1; } log_init(); g_log_context.log_level = LOG_ERR; ignore_signal_pipe(); conf_filename = argv[1]; if ((result=fdfs_client_init(conf_filename)) != 0) { return result; } pTrackerServer = tracker_get_connection();//连接traceker if (pTrackerServer == NULL) { fdfs_client_destroy(); return errno != 0 ? errno : ECONNREFUSED; } local_filename = argv[2]; *group_name = '\0'; if (argc >= 4) //参数大于等于4,说明指定了storage { const char *pPort; const char *pIpAndPort; pIpAndPort = argv[3]; pPort = strchr(pIpAndPort, ':'); if (pPort == NULL) { fdfs_client_destroy(); fprintf(stderr, "invalid storage ip address and " \ "port: %s\n", pIpAndPort); usage(argv); return 1; } storageServer.sock = -1; snprintf(storageServer.ip_addr, sizeof(storageServer.ip_addr), \ "%.*s", (int)(pPort - pIpAndPort), pIpAndPort); storageServer.port = atoi(pPort + 1); if (argc >= 5) { store_path_index = atoi(argv[4]); } else { store_path_index = -1; } } else if ((result=tracker_query_storage_store(pTrackerServer, \ &storageServer, group_name, &store_path_index)) != 0) // 请求 // tracker_query_storage_store宏定义,本质tracker_query_storage_store_without_group ,请求tracker去获取storage { fdfs_client_destroy(); fprintf(stderr, "tracker_query_storage fail, " \ "error no: %d, error info: %s\n", \ result, STRERROR(result)); return result; } // 上传文件(最终调用的是storage_do_upload_file) result = storage_upload_by_filename1(pTrackerServer, \ &storageServer, store_path_index, \ local_filename, NULL, \ NULL, 0, group_name, file_id); if (result == 0) { printf("%s\n", file_id); } else { fprintf(stderr, "upload file fail, " \ "error no: %d, error info: %s\n", \ result, STRERROR(result)); } tracker_close_connection_ex(pTrackerServer, true); fdfs_client_destroy(); return result; }
2) storage_do_upload_file
主要分为3个步骤
1.将上半部分协议的信息全部打包到buffer中去
2.先发送上传请求的协议的上半部分,将内存中的数据(buffer)进行发送调用tcpsenddata_nb
3.然后发送上传请求的协议的下半部分,通过文件读取并发送调用tcpsendfile
int storage_do_upload_file(ConnectionInfo *pTrackerServer, \ ConnectionInfo *pStorageServer, const int store_path_index, \ const char cmd, const int upload_type, const char *file_buff, \ void *arg, const int64_t file_size, const char *master_filename, \ const char *prefix_name, const char *file_ext_name, \ const FDFSMetaData *meta_list, const int meta_count, \ char *group_name, char *remote_filename) { ...//以上都是对协议进行包装,如storage_index,file_size等 long2buff(file_size, p); // 把文件长度写入到buffer中 8字节 p += FDFS_PROTO_PKG_LEN_SIZE; if (bUploadSlave) { ... } else { memset(p, 0, FDFS_FILE_EXT_NAME_MAX_LEN); // 扩展名 固定长度6字节,先置0 } if (file_ext_name != NULL) { ...//拷贝扩展名 } p += FDFS_FILE_EXT_NAME_MAX_LEN; ... long2buff((p - out_buff) + file_size - sizeof(TrackerHeader), \ pHeader->pkg_len); // body的长度 pHeader->cmd = cmd; pHeader->status = 0; // 请求上传文件(也就是fdfs上传协议的上半部分) if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff, \ p - out_buff, g_fdfs_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "send data to storage server %s:%d fail, " \ "errno: %d, error info: %s", __LINE__, \ pStorageServer->ip_addr, pStorageServer->port, \ result, STRERROR(result)); break; } //请求上传文件(fdfs上传协议的下半部分)(数据部分) if (upload_type == FDFS_UPLOAD_BY_FILE)//一般都是按文件进行上传 { if ((result=tcpsendfile(pStorageServer->sock, file_buff, \ file_size, g_fdfs_network_timeout, \ &total_send_bytes)) != 0) // 这里才是真正发送文件 (使用了sendfile进行零拷贝) { break; } } else if (upload_type == FDFS_UPLOAD_BY_BUFF) { if ((result=tcpsenddata_nb(pStorageServer->sock, \ (char *)file_buff, file_size, \ g_fdfs_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "send data to storage server %s:%d fail, " \ "errno: %d, error info: %s", __LINE__, \ pStorageServer->ip_addr, pStorageServer->port, \ result, STRERROR(result)); break; } } else //FDFS_UPLOAD_BY_CALLBACK { UploadCallback callback; callback = (UploadCallback)file_buff; if ((result=callback(arg, file_size, pStorageServer->sock))!=0) { break; } } pInBuff = in_buff; if ((result=fdfs_recv_response(pStorageServer, \ &pInBuff, sizeof(in_buff), &in_bytes)) != 0) // 获取响应 { logError("file: "__FILE__", line: %d, " "fdfs_recv_response fail, result: %d", __LINE__, result); break; } //以下是对获取响应的一些处理 ... if (new_connection) { tracker_close_connection_ex(pStorageServer, result != 0); } return result; }
从上面可以看到有三种发送的方式,但是一般都是默认选择FDFS_UPLOAD_BY_FILE,按文件上传
二、fastdfs下载
1、请求和回应的协议格式
- file_offset:下载的文件起始偏移位置(比如偏移为0,那么就是从头开始下载)
- download_bytes:要下载的字节数量(比如文件10MB,先下载2MB)
2、源码结构图
3、源码阅读
1)fastdfs/client/fdfs_download_file.c
使用命令fdfs_download_file下载文件,就会调用下面这部分内容。
这一部分主要是解析参数,比如设置下载的起始偏移位置和下载的字节数
如果没有设置,默认是从头开始下载
最后调用storage_do_download_file1_ex进行下载(发送下载请求和接受数据)
int main(int argc, char *argv[]) { char *conf_filename; char *local_filename; ConnectionInfo *pTrackerServer; int result; char file_id[128]; int64_t file_size; int64_t file_offset; int64_t download_bytes; if (argc < 3) { printf("Usage: %s <config_file> <file_id> " \ "[local_filename] [<download_offset> " \ "<download_bytes>]\n", argv[0]); return 1; } log_init(); g_log_context.log_level = LOG_ERR; ignore_signal_pipe(); conf_filename = argv[1]; if ((result=fdfs_client_init(conf_filename)) != 0) { return result; } pTrackerServer = tracker_get_connection(); if (pTrackerServer == NULL) { fdfs_client_destroy(); return errno != 0 ? errno : ECONNREFUSED; } snprintf(file_id, sizeof(file_id), "%s", argv[2]); file_offset = 0; download_bytes = 0; if (argc >= 4) { local_filename = argv[3]; if (argc >= 6) { file_offset = strtoll(argv[4], NULL, 10);//文件的字节偏移 download_bytes = strtoll(argv[5], NULL, 10);//下载字节数 } } else { local_filename = strrchr(file_id, '/'); if (local_filename != NULL) { local_filename++; //skip / } else { local_filename = file_id; } } //在storage_do_download_file_ex实现下载 result = storage_do_download_file1_ex(pTrackerServer, \ NULL, FDFS_DOWNLOAD_TO_FILE, file_id, \ file_offset, download_bytes, \ &local_filename, NULL, &file_size); if (result != 0) { printf("download file fail, " \ "error no: %d, error info: %s\n", \ result, STRERROR(result)); } tracker_close_connection_ex(pTrackerServer, true); fdfs_client_destroy(); return 0; }
2)storage_do_download_file_ex
主要有三部分
1.连接stoarge
2.打包协议信息,发送下载请求
3.下载文件(可以下载到磁盘,内存,获得通过回调函数的方式,默认是下载到磁盘FDFS_DOWNLOAD_TO_FILE)
int storage_do_download_file_ex(ConnectionInfo *pTrackerServer, \ ConnectionInfo *pStorageServer, \ const int download_type, \ const char *group_name, const char *remote_filename, \ const int64_t file_offset, const int64_t download_bytes, \ char **file_buff, void *arg, int64_t *file_size) { ... *file_size = 0; if ((result=storage_get_read_connection(pTrackerServer, \ &pStorageServer, group_name, remote_filename, \ &storageServer, &new_connection)) != 0) // 连接storage { return result; } do { ...//将一些请求协议的信息打包到buffer中 pHeader->cmd = STORAGE_PROTO_CMD_DOWNLOAD_FILE; // 请求storage下载 if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff, \ out_bytes, g_fdfs_network_timeout)) != 0) //发送下载请求 { logError("file: "__FILE__", line: %d, " \ "send data to storage server %s:%d fail, " \ "errno: %d, error info: %s", __LINE__, \ pStorageServer->ip_addr, pStorageServer->port, \ result, STRERROR(result)); break; } if (download_type == FDFS_DOWNLOAD_TO_FILE)//storage回应数据 { // 请求接收header if ((result=fdfs_recv_header(pStorageServer, \ &in_bytes)) != 0) //in_bytes 一般就是说我们body的长度(解析获取的header,可以获得body的长度) { logError("file: "__FILE__", line: %d, " "fdfs_recv_header fail, result: %d", __LINE__, result); break; } //in_bytes 是获取到的文件的大小 if ((result=tcprecvfile(pStorageServer->sock, \ *file_buff, in_bytes, 0, \ g_fdfs_network_timeout, \ &total_recv_bytes)) != 0) // 接收文件 { break; } } else if (download_type == FDFS_DOWNLOAD_TO_BUFF)//如果要把数据读到内存里去 { *file_buff = NULL; if ((result=fdfs_recv_response(pStorageServer, \ file_buff, 0, &in_bytes)) != 0) { logError("file: "__FILE__", line: %d, " "fdfs_recv_response fail, result: %d", __LINE__, result); break; } } else//通过设置的回调函数去下载 { DownloadCallback callback; char buff[2048]; int recv_bytes; int64_t remain_bytes; if ((result=fdfs_recv_header(pStorageServer, \ &in_bytes)) != 0) { logError("file: "__FILE__", line: %d, " "fdfs_recv_header fail, result: %d", __LINE__, result); break; } callback = (DownloadCallback)*file_buff; remain_bytes = in_bytes; while (remain_bytes > 0) { if (remain_bytes > sizeof(buff)) { recv_bytes = sizeof(buff); } else { recv_bytes = remain_bytes; } if ((result=tcprecvdata_nb(pStorageServer->sock, buff, \ recv_bytes, g_fdfs_network_timeout)) != 0) { logError("file: "__FILE__", line: %d, " \ "recv data from storage server " \ "%s:%d fail, " \ "errno: %d, error info: %s", __LINE__, \ pStorageServer->ip_addr, \ pStorageServer->port, \ result, STRERROR(result)); break; } result = callback(arg, in_bytes, buff, recv_bytes); if (result != 0) { logError("file: "__FILE__", line: %d, " \ "call callback function fail, " \ "error code: %d", __LINE__, result); break; } remain_bytes -= recv_bytes; } if (remain_bytes != 0) { break; } } *file_size = in_bytes; } while (0); if (new_connection) { tracker_close_connection_ex(pStorageServer, result != 0); } return result; }
4、补充
多线程下载
由于协议中有偏移和下载字节数,因此可以自己去实现使用多线程进程下载,分成很多段,用不同的线程去下载
这样做有什么好处?
1、比如服务器对每个下载请求都限速了,那么可以用多线程来提速
2、服务器如果此时负荷比较高,多个连接,可以多占用资源进行下载
断点续传
这是fdfs内部就实现的功能,将文件追加到上一次上传的文件的末尾
echo hello > test1.txt echo world > test2.txt fdfs_upload_appender /etc/fdfs/client.conf test1.txt 得到:group1/M00/00/00/rBMYd2Id2FmEay2-AAAAADY6MCA286.txt ,在fdfs_append_file的时候需要 fdfs_append_file /etc/fdfs/client.conf group1/M00/00/00/rBMYd2Id2FmEay2-AAAAADY6MCA286.txt test2.txt 在服务器相应的⽬录下查找对应的⽂件,⽤cat读取⽂件内容。 root@iZbp1h2l856zgoegc8rvnhZ:/home/fastdfs/storage/data/00/00# cat rBMYd2Id2FmEay2-AAAAADY6MCA286.txt hello world
- fdfs_upload_appender 上传第⼀部分⽂件
- fdfs_append_file 上传其他部分的⽂件
注意:
注意断点续传的顺序性
⽀持断点续传,但fastdfs并不⽀持多线程分⽚上传同⼀个⽂件








