fastdfs源码阅读:上传和下载(文件客户端逻辑)

简介: fastdfs源码阅读:上传和下载(文件客户端逻辑)

一、fastdfs上传

对于fdfs中的不同协议,它们的协议header都是一样的,只是不同功能它们之间协议body格式不同

1、请求和回应的协议格式

协议header部分:

  • pkg_len:body的长度,不包含header长度
  • cmd:命令(比如指明是上传还是下载…)
  • status:表示状态(比如是请求状态还是响应状态)

请求的协议body部分:

  • storage_index:表示 storage的索引号(上传的时候要传到哪个storage)
  • file_size:文件大小
  • file_ext_name:文件后缀名
  • file_content:文件内容

回应的协议body部分:

  • group_name:组名(下图中的group1)
  • remote_filename:文件名称(表示下图中M00开始的一段)

2、源码结构图

fdfs中上传部分的 代码结构

3、源码阅读

1)fastdfs/client/fdfs_upload_file.c

也就是上传命令fdfs_upload_file调用的地方

这部分内容主要有几个部分

1.解析参数

2.如果命令参数设置了storage、指定上传的盘符,就进行设置。如果没有设置,就通过连接tracker,进行自动分配storage和存储位置

3.上传文件(在storage_do_upload_file中实现)

static void usage(char *argv[])
{
  printf("Usage: %s <config_file> <local_filename> " \   
    "[storage_ip:port] [store_path_index]\n", argv[0]);//输出upload_file的用法, 带中括号的是可选的命令参数(可以不指定,系统会默认指定一个storage的ip和端口,如果指定,可以强制选一个storage),store_path_index是上传的盘符(可以指定或者默认)
}
//上传文件命令的用法 : fdfs_upload_file /etc/fdfs/client.conf ./filename.txt        比如会返回:group1/M00/00/00/xxxxxx.txt
int main(int argc, char *argv[])
{
  char *conf_filename;
  char *local_filename;
  char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
  ConnectionInfo *pTrackerServer;
  int result;
  int store_path_index;
  ConnectionInfo storageServer;
  char file_id[128];
  if (argc < 3)
  {
    usage(argv);
    return 1;
  }
  log_init();
  g_log_context.log_level = LOG_ERR;
  ignore_signal_pipe();
  conf_filename = argv[1];
  if ((result=fdfs_client_init(conf_filename)) != 0)
  {
    return result;
  }
  pTrackerServer = tracker_get_connection();//连接traceker
  if (pTrackerServer == NULL)
  {
    fdfs_client_destroy();
    return errno != 0 ? errno : ECONNREFUSED;
  }
  local_filename = argv[2];
  *group_name = '\0';
  if (argc >= 4) //参数大于等于4,说明指定了storage
  {
    const char *pPort;
    const char *pIpAndPort;
    pIpAndPort = argv[3];
    pPort = strchr(pIpAndPort, ':');
    if (pPort == NULL)
    {
      fdfs_client_destroy();
      fprintf(stderr, "invalid storage ip address and " \
        "port: %s\n", pIpAndPort);
      usage(argv);
      return 1;
    }
    storageServer.sock = -1;
    snprintf(storageServer.ip_addr, sizeof(storageServer.ip_addr), \
       "%.*s", (int)(pPort - pIpAndPort), pIpAndPort);
    storageServer.port = atoi(pPort + 1);
    if (argc >= 5)
    {
      store_path_index = atoi(argv[4]);
    }
    else
    {
      store_path_index = -1;
    }
  }
  else if ((result=tracker_query_storage_store(pTrackerServer, \    
                  &storageServer, group_name, &store_path_index)) != 0)     // 请求 // tracker_query_storage_store宏定义,本质tracker_query_storage_store_without_group ,请求tracker去获取storage
  {
    fdfs_client_destroy();
    fprintf(stderr, "tracker_query_storage fail, " \
      "error no: %d, error info: %s\n", \
      result, STRERROR(result));
    return result;
  }
  // 上传文件(最终调用的是storage_do_upload_file)
  result = storage_upload_by_filename1(pTrackerServer, \        
      &storageServer, store_path_index, \
      local_filename, NULL, \
      NULL, 0, group_name, file_id);
  if (result == 0)
  {
    printf("%s\n", file_id);
  }
  else
  {
    fprintf(stderr, "upload file fail, " \
      "error no: %d, error info: %s\n", \
      result, STRERROR(result));
  }
  tracker_close_connection_ex(pTrackerServer, true);
  fdfs_client_destroy();
  return result;
}

2) storage_do_upload_file

主要分为3个步骤

1.将上半部分协议的信息全部打包到buffer中去

2.先发送上传请求的协议的上半部分,将内存中的数据(buffer)进行发送调用tcpsenddata_nb

3.然后发送上传请求的协议的下半部分,通过文件读取并发送调用tcpsendfile

int storage_do_upload_file(ConnectionInfo *pTrackerServer, \
  ConnectionInfo *pStorageServer, const int store_path_index, \
  const char cmd, const int upload_type, const char *file_buff, \
  void *arg, const int64_t file_size, const char *master_filename, \
  const char *prefix_name, const char *file_ext_name, \
  const FDFSMetaData *meta_list, const int meta_count, \
  char *group_name, char *remote_filename)
{
  ...//以上都是对协议进行包装,如storage_index,file_size等
  long2buff(file_size, p);      // 把文件长度写入到buffer中       8字节
  p += FDFS_PROTO_PKG_LEN_SIZE;
  if (bUploadSlave)
  {
    ...
  }
  else
  {
    memset(p, 0, FDFS_FILE_EXT_NAME_MAX_LEN); // 扩展名  固定长度6字节,先置0
  }
  if (file_ext_name != NULL)
  {
    ...//拷贝扩展名
  }
  p += FDFS_FILE_EXT_NAME_MAX_LEN;
  ...
  long2buff((p - out_buff) + file_size - sizeof(TrackerHeader), \
    pHeader->pkg_len);            // body的长度 
  pHeader->cmd = cmd;
  pHeader->status = 0;
  // 请求上传文件(也就是fdfs上传协议的上半部分)
  if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff, \
    p - out_buff, g_fdfs_network_timeout)) != 0)    
  {
    logError("file: "__FILE__", line: %d, " \
      "send data to storage server %s:%d fail, " \
      "errno: %d, error info: %s", __LINE__, \
      pStorageServer->ip_addr, pStorageServer->port, \
      result, STRERROR(result));
    break;
  }
  //请求上传文件(fdfs上传协议的下半部分)(数据部分)
  if (upload_type == FDFS_UPLOAD_BY_FILE)//一般都是按文件进行上传          
  {
    if ((result=tcpsendfile(pStorageServer->sock, file_buff, \  
      file_size, g_fdfs_network_timeout, \
      &total_send_bytes)) != 0)  // 这里才是真正发送文件 (使用了sendfile进行零拷贝)
    {
      break;
    }
  }
  else if (upload_type == FDFS_UPLOAD_BY_BUFF)   
  {
    if ((result=tcpsenddata_nb(pStorageServer->sock, \
      (char *)file_buff, file_size, \
      g_fdfs_network_timeout)) != 0)
    {
      logError("file: "__FILE__", line: %d, " \
        "send data to storage server %s:%d fail, " \
        "errno: %d, error info: %s", __LINE__, \
        pStorageServer->ip_addr, pStorageServer->port, \
        result, STRERROR(result));
      break;
    }
  }
  else //FDFS_UPLOAD_BY_CALLBACK
  {
    UploadCallback callback;
    callback = (UploadCallback)file_buff;
    if ((result=callback(arg, file_size, pStorageServer->sock))!=0)
    {
      break;
    }
  }
  pInBuff = in_buff;
  if ((result=fdfs_recv_response(pStorageServer, \      
    &pInBuff, sizeof(in_buff), &in_bytes)) != 0)  // 获取响应
  {
    logError("file: "__FILE__", line: %d, "
                "fdfs_recv_response fail, result: %d",
                __LINE__, result);
    break;
  }
  //以下是对获取响应的一些处理
  ...
  if (new_connection)
  {
    tracker_close_connection_ex(pStorageServer, result != 0);
  }
  return result;
}

从上面可以看到有三种发送的方式,但是一般都是默认选择FDFS_UPLOAD_BY_FILE,按文件上传

二、fastdfs下载

1、请求和回应的协议格式

  • file_offset:下载的文件起始偏移位置(比如偏移为0,那么就是从头开始下载)
  • download_bytes:要下载的字节数量(比如文件10MB,先下载2MB)

2、源码结构图

3、源码阅读

1)fastdfs/client/fdfs_download_file.c

使用命令fdfs_download_file下载文件,就会调用下面这部分内容。

这一部分主要是解析参数,比如设置下载的起始偏移位置和下载的字节数

如果没有设置,默认是从头开始下载

最后调用storage_do_download_file1_ex进行下载(发送下载请求和接受数据)

int main(int argc, char *argv[])
{
  char *conf_filename;
  char *local_filename;
  ConnectionInfo *pTrackerServer;
  int result;
  char file_id[128];
  int64_t file_size;
  int64_t file_offset;
  int64_t download_bytes;
  if (argc < 3)
  {
    printf("Usage: %s <config_file> <file_id> " \
      "[local_filename] [<download_offset> " \
      "<download_bytes>]\n", argv[0]);
    return 1;
  }
  log_init();
  g_log_context.log_level = LOG_ERR;
  ignore_signal_pipe();
  conf_filename = argv[1];
  if ((result=fdfs_client_init(conf_filename)) != 0)
  {
    return result;
  }
  pTrackerServer = tracker_get_connection();
  if (pTrackerServer == NULL)
  {
    fdfs_client_destroy();
    return errno != 0 ? errno : ECONNREFUSED;
  }
  snprintf(file_id, sizeof(file_id), "%s", argv[2]);
  file_offset = 0;
  download_bytes = 0;
  if (argc >= 4)
  {
    local_filename = argv[3];
    if (argc >= 6)
    {
      file_offset = strtoll(argv[4], NULL, 10);//文件的字节偏移
      download_bytes = strtoll(argv[5], NULL, 10);//下载字节数
    }
  }
  else
  {
    local_filename = strrchr(file_id, '/');
    if (local_filename != NULL)
    {
      local_filename++;  //skip /
    }
    else
    {
      local_filename = file_id;
    }
  }
  //在storage_do_download_file_ex实现下载
  result = storage_do_download_file1_ex(pTrackerServer, \
                NULL, FDFS_DOWNLOAD_TO_FILE, file_id, \
                file_offset, download_bytes, \
                &local_filename, NULL, &file_size);
  if (result != 0)
  {
    printf("download file fail, " \
      "error no: %d, error info: %s\n", \
      result, STRERROR(result));
  }
  tracker_close_connection_ex(pTrackerServer, true);
  fdfs_client_destroy();
  return 0;
}

2)storage_do_download_file_ex

主要有三部分

1.连接stoarge

2.打包协议信息,发送下载请求

3.下载文件(可以下载到磁盘,内存,获得通过回调函数的方式,默认是下载到磁盘FDFS_DOWNLOAD_TO_FILE

int storage_do_download_file_ex(ConnectionInfo *pTrackerServer, \
    ConnectionInfo *pStorageServer, \
    const int download_type, \
    const char *group_name, const char *remote_filename, \
    const int64_t file_offset, const int64_t download_bytes, \
    char **file_buff, void *arg, int64_t *file_size)
{
  ...
  *file_size = 0;
  if ((result=storage_get_read_connection(pTrackerServer, \
    &pStorageServer, group_name, remote_filename, \
    &storageServer, &new_connection)) != 0)     // 连接storage
  {
    return result;
  }
  do
  {
  ...//将一些请求协议的信息打包到buffer中
  pHeader->cmd = STORAGE_PROTO_CMD_DOWNLOAD_FILE;   // 请求storage下载
  if ((result=tcpsenddata_nb(pStorageServer->sock, out_buff, \
    out_bytes, g_fdfs_network_timeout)) != 0)   //发送下载请求
  {
    logError("file: "__FILE__", line: %d, " \
      "send data to storage server %s:%d fail, " \
      "errno: %d, error info: %s", __LINE__, \
      pStorageServer->ip_addr, pStorageServer->port, \
      result, STRERROR(result));
    break;
  }
  if (download_type == FDFS_DOWNLOAD_TO_FILE)//storage回应数据
  { // 请求接收header
    if ((result=fdfs_recv_header(pStorageServer, \    
      &in_bytes)) != 0)  //in_bytes 一般就是说我们body的长度(解析获取的header,可以获得body的长度)
    {
            logError("file: "__FILE__", line: %d, "
                    "fdfs_recv_header fail, result: %d",
                    __LINE__, result);
      break;
    }
    //in_bytes 是获取到的文件的大小
    if ((result=tcprecvfile(pStorageServer->sock, \   
        *file_buff, in_bytes, 0, \
        g_fdfs_network_timeout, \
        &total_recv_bytes)) != 0)  // 接收文件
    {
      break;
    }
  }
  else if (download_type == FDFS_DOWNLOAD_TO_BUFF)//如果要把数据读到内存里去
  {
    *file_buff = NULL;
    if ((result=fdfs_recv_response(pStorageServer, \
      file_buff, 0, &in_bytes)) != 0)
    {
            logError("file: "__FILE__", line: %d, "
                    "fdfs_recv_response fail, result: %d",
                    __LINE__, result);
      break;
    }
  }
  else//通过设置的回调函数去下载
  {
    DownloadCallback callback;
    char buff[2048];
    int recv_bytes;
    int64_t remain_bytes;
    if ((result=fdfs_recv_header(pStorageServer, \
      &in_bytes)) != 0)
    {
            logError("file: "__FILE__", line: %d, "
                    "fdfs_recv_header fail, result: %d",
                    __LINE__, result);
      break;
    }
    callback = (DownloadCallback)*file_buff;
    remain_bytes = in_bytes;
    while (remain_bytes > 0)
    {
      if (remain_bytes > sizeof(buff))
      {
        recv_bytes = sizeof(buff);
      }
      else
      {
        recv_bytes = remain_bytes;
      }
      if ((result=tcprecvdata_nb(pStorageServer->sock, buff, \
        recv_bytes, g_fdfs_network_timeout)) != 0)
      {
        logError("file: "__FILE__", line: %d, " \
          "recv data from storage server " \
          "%s:%d fail, " \
          "errno: %d, error info: %s", __LINE__, \
          pStorageServer->ip_addr, \
          pStorageServer->port, \
          result, STRERROR(result));
        break;
      }
      result = callback(arg, in_bytes, buff, recv_bytes);
      if (result != 0)
      {
        logError("file: "__FILE__", line: %d, " \
          "call callback function fail, " \
          "error code: %d", __LINE__, result);
        break;
      }
      remain_bytes -= recv_bytes;
    }
    if (remain_bytes != 0)
    {
      break;
    }
  }
  *file_size = in_bytes;
  } while (0);
  if (new_connection)
  {
    tracker_close_connection_ex(pStorageServer, result != 0);
  }
  return result;
}

4、补充

多线程下载

由于协议中有偏移和下载字节数,因此可以自己去实现使用多线程进程下载,分成很多段,用不同的线程去下载

这样做有什么好处?

1、比如服务器对每个下载请求都限速了,那么可以用多线程来提速

2、服务器如果此时负荷比较高,多个连接,可以多占用资源进行下载

断点续传

这是fdfs内部就实现的功能,将文件追加到上一次上传的文件的末尾

echo hello > test1.txt
echo world > test2.txt
fdfs_upload_appender /etc/fdfs/client.conf test1.txt
得到:group1/M00/00/00/rBMYd2Id2FmEay2-AAAAADY6MCA286.txt ,在fdfs_append_file的时候需要
fdfs_append_file /etc/fdfs/client.conf group1/M00/00/00/rBMYd2Id2FmEay2-AAAAADY6MCA286.txt test2.txt
在服务器相应的⽬录下查找对应的⽂件,⽤cat读取⽂件内容。
root@iZbp1h2l856zgoegc8rvnhZ:/home/fastdfs/storage/data/00/00# cat rBMYd2Id2FmEay2-AAAAADY6MCA286.txt
hello
world
  • fdfs_upload_appender 上传第⼀部分⽂件
  • fdfs_append_file 上传其他部分的⽂件

注意:

注意断点续传的顺序性

⽀持断点续传,但fastdfs并不⽀持多线程分⽚上传同⼀个⽂件

相关文章
|
3月前
|
JSON Rust 前端开发
【sheetjs】纯前端如何实现Excel导出下载和上传解析?
本文介绍了如何使用`sheetjs`的`xlsx`库在前端实现Excel的导出和上传。项目依赖包括Vite、React、SheetJS和Arco-Design。对于导出,从后端获取JSON数据,通过`json_to_sheet`、`book_new`和`writeFile`函数生成并下载Excel文件。对于上传,使用`read`函数将上传的Excel文件解析为JSON并发送至后端。完整代码示例可在GitHub仓库[fullee/sheetjs-demo](https://github.com/fullee/sheetjs-demo)中查看。
211 10
|
存储 NoSQL 算法
文件上传下载系列——如何实现文件秒传
文件上传下载系列——如何实现文件秒传
|
存储 前端开发 NoSQL
注册java实现文件分片上传并且断点续传
一、简单的分片上传 针对第一个问题,如果文件过大,上传到一半断开了,若重新开始上传的话,会很消耗时间,并且你也并不知道距离上次断开时,已经上传到哪一部分了。因此我们应该先对大文件进行分片处理,防止上面提到的问题。
|
存储 前端开发 JavaScript
后端文件上传以及下载功能实现
上一章讲到前端文件下载功能的实现,之前也讲过前端文件上传功能的实现,这一章就讲一下后端怎么接收前端上传的文件,以及怎么实现文件下载功能。
363 0
|
Java Maven
fastdfs-client-java实现图片文件上传
fastdfs-client-java实现图片文件上传
113 0
|
前端开发 JavaScript
请问:怎么实现大文件快速上传?
请问:怎么实现大文件快速上传?
181 0
|
缓存 前端开发
前端下载并生成文件
前端下载并生成文件
|
Java 应用服务中间件 Linux
springboot上传下载文件(1)(项目和文件资源放在同一个服务器上)
springboot上传下载文件(1)(项目和文件资源放在同一个服务器上)
308 0
springboot上传下载文件(1)(项目和文件资源放在同一个服务器上)
|
缓存 安全 JavaScript
如何实现上传文件到 nodejs 和文件下载
最近拿 next.js 做个全栈项目,需要文件上传和下载,这里记录下实现方式,也写一下使用原生 node 代码如何实现。