fastdfs源码阅读:文件传输原理与网络IO模型(accept线程、work线程(网络io处理)、dio线程(文件io处理))

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析DNS,个人版 1个月
全局流量管理 GTM,标准版 1个月
简介: fastdfs源码阅读:文件传输原理与网络IO模型(accept线程、work线程(网络io处理)、dio线程(文件io处理))

一、fastdfs网络IO模型的结构

fdfs文件服务器主要有3种线程,accept线程、work线程(网络io处理)、dio线程(处理文件)

accept新连接,有个专门的accept线程去处理。每个线程池处理自己的事,比如在业务中,还要设计一个视频解码的功能,要另开个线程池,处理专门的任务。而不是把所有逻辑都放在一个线程池里面。

nio是net io的意思 (网络io)

dio是data io的意思 (文件io)

  • 1.accept线程在接受新的连接后,从对象池中取出一个任务对象,封装客户端信息,通过轮询的方式发送给对应的work线程,写入管道pipe(传输的是对象的地址),pipe是能够触发work线程中的epoll。(在接受新连接前,在work线程中就已经把pipe加入了监听)。在网络io线程中,就可以读取到新的连接任务对象,在storage_nio_init中重新绑定读写事件。
  • 2.文件io线程,通过从队列中取任务,然后写入到磁盘中。
    那么如何通知文件io线程呢,直接往文件处理队列(阻塞队列)中添加即可,在文件io线程中就能取到(锁和条件变量,取消阻塞)。没有直接在文件io线程中检测网络io事件,主要是为了将功能解耦合,让work线程(处理网络io)去做这件事,每个线程只做自己的事,使得逻辑清楚。
  • 3.dio线程不会直接给nio线程设置各种读写事件,⽽是通过
    FDFS_STORAGE_STAGE_NIO_INIT、FDFS_STORAGE_STAGE_NIO_RECV、
    FDFS_STORAGE_STAGE_NIO_SEND、FDFS_STORAGE_STAGE_NIO_CLOSE、
    FDFS_STORAGE_STAGE_DIO_THREAD等状态 + 通过pipe通知nio线程响应storage_recv_notify_read
    进⾏io事件的处理。

只要文件io线程读取完了文件处理队列中所有的数据,那么就会请求fd加入可写事件(通过pipe向work线程发送信号,触发epoll去做这件事),让网络io去读取新的事件。

不同线程之间通过以下方式进行通信

1.队列 (阻塞队列,锁+条件变量)

2.管道(通过pipe创建)

二、服务端的一些逻辑

新连接:

首先通过accept线程去接受新的连接incomingsock,让后去获取它的信息,并从对象池取出一个pTask,将新的连接fd以及它的信息封装成pTask。通过管道将pTask发送出去,在work线程epoll中就已经监听了该管道的读fd,accept线程发送的pipe,在work线程中epoll能检测到pipe事件,并读取信息。然后将该客户端fd读事件加入到epoll中。

上传:

work线程的epoll检测到客户端发送的信息,通过解析协议,获取它的数据,如果其中的CMD是上传的标志,那么就会执行client_sock_read–>storage_deal_task

–>storage_upload_file–>storage_write_to_file

然后会将协议解析的数据和上传回调函数(保存到服务器磁盘)dio_write_file,上传完成的回调函数storage_upload_file_done_callback等信息都封装到pTask,然后将它加入队列中。

然后dio线程中队列就能检测到新的任务,就会执行deal_func也就是dio_write_file,进行写入磁盘。

三、源码阅读

1、fastdfs/storage/fdfs_storage.c

这一部分是进行启动storage服务端

  • 一些初始化工作(如创建socket对象(进行listen),等)
  • 创建work线程(读写网络io)
  • 创建dio线程(dataIO线程,也就是处理文件的,比如将上传的内容写入到磁盘)
  • 创建accept线程(用于接受新的连接)

当然还有写同步的功能,但不在本文介绍。

int main(int argc, char *argv[])
{
  ...
  sock = socketServer(g_bind_addr, g_server_port, &result);//socket、bind、listen
  if (sock < 0)
  {
    logCrit("exit abnormally!\n");
        delete_pid_file(pidFilename);
    log_destroy();
    return result;
  }
  ...
  if ((result=storage_service_init()) != 0)//storage_service初始化,包含work线程初始化(网络IO部分)
  {
    logCrit("file: "__FILE__", line: %d, " \
      "storage_service_init fail, program exit!", __LINE__);
    g_continue_flag = false;
    return result;
  }
  ...
  if ((result=storage_dio_init()) != 0)//初始化dio线程(dataIO线程,就也是处理文件的)
  {
    logCrit("exit abnormally!\n");
    log_destroy();
    return result;
  }
  log_set_cache(true);
  bTerminateFlag = false;
  accept_stage = ACCEPT_STAGE_DOING;
  storage_accept_loop(sock);//初始化accept线程
  accept_stage = ACCEPT_STAGE_DONE;
  ...
}

2、storage_accept_loop

创建accept线程(数量由g_accept_threads决定)

线程的执行函数为accept_thread_entrance

void storage_accept_loop(int server_sock)
{
  if (g_accept_threads > 1)
  {
    pthread_t tid;
    pthread_attr_t thread_attr;
    int result;
    int i;
    if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
    {
      logWarning("file: "__FILE__", line: %d, " \
        "init_pthread_attr fail!", __LINE__);
    }
    else
    {
      for (i=1; i<g_accept_threads; i++)
      {
      if ((result=pthread_create(&tid, &thread_attr, \
        accept_thread_entrance, \
        (void *)(long)server_sock)) != 0)//创建accept线程
      {
        logError("file: "__FILE__", line: %d, " \
        "create thread failed, startup threads: %d, " \
        "errno: %d, error info: %s", \
        __LINE__, i, result, STRERROR(result));
        break;
      }
      }
      pthread_attr_destroy(&thread_attr);
    }
  }
  accept_thread_entrance((void *)(long)server_sock);
}

1)accept_thread_entrance

流程:

其中获取任务对象是从对象池中获取,然后 将新连接的信息封装到任务对象。

通过轮询的方式指定work线程,然后将任务对象的地址发送(write)给该work线程

源码:

static void *accept_thread_entrance(void* arg)
{
  int server_sock;
  int incomesock;
  struct sockaddr_in inaddr;
  socklen_t sockaddr_len;
  in_addr_t client_addr;
  char szClientIp[IP_ADDRESS_SIZE];
  long task_addr;
  struct fast_task_info *pTask;
  StorageClientInfo *pClientInfo;
  struct storage_nio_thread_data *pThreadData;
  server_sock = (long)arg;
  while (g_continue_flag)
  {
    sockaddr_len = sizeof(inaddr);
    incomesock = accept(server_sock, (struct sockaddr*)&inaddr, \
          &sockaddr_len);//accept
    if (incomesock < 0) //error
    {
      if (!(errno == EINTR || errno == EAGAIN))
      {
        logError("file: "__FILE__", line: %d, " \
          "accept failed, " \
          "errno: %d, error info: %s", \
          __LINE__, errno, STRERROR(errno));
      }
      continue;
    }
    client_addr = getPeerIpaddr(incomesock, \
        szClientIp, IP_ADDRESS_SIZE);
    if (g_allow_ip_count >= 0)
    {
      if (bsearch(&client_addr, g_allow_ip_addrs, \
          g_allow_ip_count, sizeof(in_addr_t), \
          cmp_by_ip_addr_t) == NULL)
      {
        logError("file: "__FILE__", line: %d, " \
          "ip addr %s is not allowed to access", \
          __LINE__, szClientIp);
        close(incomesock);
        continue;
      }
    }
    if (tcpsetnonblockopt(incomesock) != 0)
    {
      close(incomesock);
      continue;
    }
    pTask = free_queue_pop();   // 取task对象(从对象池中取)
    if (pTask == NULL)
    {
      logError("file: "__FILE__", line: %d, "
        "malloc task buff fail, you should "
        "increase the parameter \"max_connections\" "
                "in storage.conf, or check your applications "
                "for connection leaks", __LINE__);
      close(incomesock);
      continue;
    }
    pClientInfo = (StorageClientInfo *)pTask->arg;  // 封装客户端信息
    pTask->event.fd = incomesock; // socket fd 
    pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_INIT; // 初始化client的状态
    pClientInfo->nio_thread_index = pTask->event.fd % g_work_threads;//通过轮询的方式,发送给对应的work线程
    pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;//g_nio_thread_data是一个全局的信息(指针),加上一个index就可以指向,具体的线程信息
    strcpy(pTask->client_ip, szClientIp);
    task_addr = (long)pTask;
    if (write(pThreadData->thread_data.pipe_fds[1], &task_addr, \
      sizeof(task_addr)) != sizeof(task_addr))//写入管道          
    {
      close(incomesock);
      free_queue_push(pTask);//如果写入失败,就把对象重新放入对象池中
      logError("file: "__FILE__", line: %d, " \
        "call write failed, " \
        "errno: %d, error info: %s", \
        __LINE__, errno, STRERROR(errno));
    }
        else
        {
            int current_connections;
            current_connections = __sync_add_and_fetch(&g_storage_stat.connection.
                    current_count, 1);//连接数量+1 (CAS)
            if (current_connections > g_storage_stat.connection.max_count) {
                g_storage_stat.connection.max_count = current_connections;
            }
            ++g_stat_change_count;
        }
  }
  return NULL;
}

注意pipe_fds[1]是管道的写端,pipe_fds[0]是读端。因此如果pipe_fds[0]加入epoll的话,往pipe_fds[1]中写入数据,那么epoll就能监听到。

3、storage_service_init

这部分的主要内容是创建work线程,work线程的执行函数为work_thread_entrance

int storage_service_init()
{
  ...
  bytes = sizeof(struct storage_nio_thread_data) * g_work_threads;//work线程默认个数是4,也可以从配置文件中读出来
  g_nio_thread_data = (struct storage_nio_thread_data *)malloc(bytes);
  if (g_nio_thread_data == NULL)
  {
    logError("file: "__FILE__", line: %d, " \
      "malloc %d bytes fail, errno: %d, error info: %s", \
      __LINE__, bytes, errno, STRERROR(errno));
    return errno != 0 ? errno : ENOMEM;
  }
  memset(g_nio_thread_data, 0, bytes);
  g_storage_thread_count = 0;
  pDataEnd = g_nio_thread_data + g_work_threads;
  for (pThreadData=g_nio_thread_data; pThreadData<pDataEnd; pThreadData++)
  {
    ...
    if (pipe(pThreadData->thread_data.pipe_fds) != 0)//创建管道
    {
      result = errno != 0 ? errno : EPERM;
      logError("file: "__FILE__", line: %d, " \
        "call pipe fail, " \
        "errno: %d, error info: %s", \
        __LINE__, result, STRERROR(result));
      break;
    }
    ...
    if ((result=pthread_create(&tid, &thread_attr, \
      work_thread_entrance, pThreadData)) != 0)//创建work线程
    {
      logError("file: "__FILE__", line: %d, " \
        "create thread failed, startup threads: %d, " \
        "errno: %d, error info: %s", \
        __LINE__, g_storage_thread_count, \
        result, STRERROR(result));
      break;
    }
    else
    {
      if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
      {
        logError("file: "__FILE__", line: %d, " \
          "call pthread_mutex_lock fail, " \
          "errno: %d, error info: %s", \
          __LINE__, result, STRERROR(result));
      }
      g_storage_thread_count++;//创建线程成功,因此+1
      if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
      {
        logError("file: "__FILE__", line: %d, " \
          "call pthread_mutex_lock fail, " \
          "errno: %d, error info: %s", \
          __LINE__, result, STRERROR(result));
      }
    }
  }
  ...
}

1)work_thread_entrance

ioevent_loop是 事件循环所在

static void *work_thread_entrance(void* arg)
{
  int result;
  struct storage_nio_thread_data *pThreadData;
  pThreadData = (struct storage_nio_thread_data *)arg;
  ...
  // 事件循环所在
  ioevent_loop(&pThreadData->thread_data, storage_recv_notify_read,
    task_finish_clean_up, &g_continue_flag);
  ioevent_destroy(&pThreadData->thread_data.ev_puller);
  ...
  return NULL;
}

2)ioevent_loop

事件循环所在

比如将pipe_fds[0]管道的读端fd加入epoll管理

进行epoll_wait

如果事件触发,就执行相应的回调函数

int ioevent_loop(struct nio_thread_data *pThreadData,
  IOEventCallback recv_notify_callback, TaskCleanUpCallback
  clean_up_callback, volatile bool *continue_flag)
{
  int result;
  struct ioevent_notify_entry ev_notify;
  FastTimerEntry head;
  struct fast_task_info *task;
  time_t last_check_time;
  int count;
  memset(&ev_notify, 0, sizeof(ev_notify));
  ev_notify.event.fd = FC_NOTIFY_READ_FD(pThreadData);  // socket fd
  ev_notify.event.callback = recv_notify_callback;      // 对应的是 storage_recv_notify_read
  ev_notify.thread_data = pThreadData;  // 自己所属的线程
  if (ioevent_attach(&pThreadData->ev_puller,
    pThreadData->pipe_fds[0], IOEVENT_READ,
    &ev_notify) != 0)             // 管道添加到epoll管理
  {
    result = errno != 0 ? errno : ENOMEM;
    logCrit("file: "__FILE__", line: %d, " \
      "ioevent_attach fail, " \
      "errno: %d, error info: %s", \
      __LINE__, result, STRERROR(result));
    return result;
  }
    pThreadData->deleted_list = NULL;
  last_check_time = g_current_time;
  while (*continue_flag)
  {
    pThreadData->ev_puller.iterator.count = ioevent_poll(   // 实际是调用epoll_wait
                &pThreadData->ev_puller);
    if (pThreadData->ev_puller.iterator.count > 0)
    {
      deal_ioevents(&pThreadData->ev_puller);   // 真正有数据来进入该函数(执行回调函数)
    }
    else if (pThreadData->ev_puller.iterator.count < 0)
    {
      result = errno != 0 ? errno : EINVAL;
      if (result != EINTR)
      {
        logError("file: "__FILE__", line: %d, " \
          "ioevent_poll fail, " \
          "errno: %d, error info: %s", \
          __LINE__, result, STRERROR(result));
        return result;
      }
    }
    ...
  }
  return 0;
}

3)storage_recv_notify_read

// 这里的socket实际是pipe
void storage_recv_notify_read(int sock, short event, void *arg)// 数据服务器socket事件回调,比如说在上传文件时,接收了一部分之后,调用storage_nio_notify(pTask)
{
  struct fast_task_info *pTask;
  StorageClientInfo *pClientInfo;//注意这个参数是不同的,一个是跟踪服务器参数,一个是数据服务器参数
  long task_addr; // 读取task的地址
  int64_t remain_bytes;
  int bytes;
  int result;
  while (1)         // 循环读取task任务
  {
    if ((bytes=read(sock, &task_addr, sizeof(task_addr))) < 0)    // 读取task任务
    {
      if (!(errno == EAGAIN || errno == EWOULDBLOCK))
      {
        logError("file: "__FILE__", line: %d, " \
          "call read failed, " \
          "errno: %d, error info: %s", \
          __LINE__, errno, STRERROR(errno));
      }
      break;        // 没有task可读
    }
    else if (bytes == 0)
    {
      logError("file: "__FILE__", line: %d, " \
        "call read failed, end of file", __LINE__);
      break;
    }
    pTask = (struct fast_task_info *)task_addr;     // 还原任务
    pClientInfo = (StorageClientInfo *)pTask->arg;
    if (pTask->event.fd < 0)  //quit flag, 这个是对应的 socket fd
    {
      return;
    }
    /* //logInfo("=====thread index: %d, pTask->event.fd=%d", \
      pClientInfo->nio_thread_index, pTask->event.fd);
    */
    if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD)
    {
      pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD;
    }
    switch (pClientInfo->stage)
    {
      case FDFS_STORAGE_STAGE_NIO_INIT: //数据服务器服务端socket接收过来的任务的pClientInfo->stage=FDFS_STORAGE_STAGE_NIO_INIT
        result = storage_nio_init(pTask);  //因此在这里在重新绑定读写事件 //每连接一个客户端,在这里都会触发这个动作
        break;
      case FDFS_STORAGE_STAGE_NIO_RECV:
        pTask->offset = 0;        //在次接受包体时pTask->offset偏移量被重置
        remain_bytes = pClientInfo->total_length - \  
                 pClientInfo->total_offset;//任务的长度=包的总长度-包的总偏移量
        if (remain_bytes > pTask->size) //总是试图将余下的自己一次接收收完
        {
          pTask->length = pTask->size;    // pTask->size 是每次最大的数据接收长度
        }
        else
        {
          pTask->length = remain_bytes;
        }
        if (set_recv_event(pTask) == 0)
        {
          client_sock_read(pTask->event.fd,   // 通过socket fd读取数据
            IOEVENT_READ, pTask);   // 读取数据
        }
        result = 0;
        break;
      case FDFS_STORAGE_STAGE_NIO_SEND:
        result = storage_send_add_event(pTask); // 数据发送
        break;
      case FDFS_STORAGE_STAGE_NIO_CLOSE:
        result = EIO;   //close this socket
        break;
      default:
        logError("file: "__FILE__", line: %d, " \
          "invalid stage: %d", __LINE__, \
          pClientInfo->stage);
        result = EINVAL;
        break;
    }
    if (result != 0)
    {
      ioevent_add_to_deleted_list(pTask);   // 如果出错再将对应的task加入到删除队列进行处理
    }
  }
}
(1)client_sock_read

从socket读取数据(也可以是pipe)

大部分内容都是读取数据的操作,最后一步才是关键的

recv读取完数据后,分为两种,

1、storage_deal_task(pTask)

这里面可以解析协议中的CMD,后续调用到storage_upload_file进行文件上传的初始化工作

2、data数据处理

读取完的数据交由dio(dataIO线程)进行处理storage_dio_queue_push(pTask),放入队列中,那么dio线程就能读取到,并执行dio_write_file写入磁盘中

static void client_sock_read(int sock, short event, void *arg)
{
  ...
  while (1)
  {
    if (pClientInfo->total_length == 0) //recv header //初始时pClientInfo->total_length=0 pTask->offset=0
    {
      recv_bytes = sizeof(TrackerHeader) - pTask->offset;
    }
    else  // 至少读到了10个字节后 sizeof(TrackerHeader)
    {
      recv_bytes = pTask->length - pTask->offset; //在次接受上传文件的数据包时,因为发生storage_nio_notify(pTask)
    }
    /*
    logInfo("total_length=%"PRId64", recv_bytes=%d, "
      "pTask->length=%d, pTask->offset=%d",
      pClientInfo->total_length, recv_bytes, 
      pTask->length, pTask->offset);
    */
    bytes = recv(sock, pTask->data + pTask->offset, recv_bytes, 0);   // 根据buffer情况读取数据
    if (bytes < 0)
    {
      if (errno == EAGAIN || errno == EWOULDBLOCK)
      {
      }
      else if (errno == EINTR)
      {
        continue;
      }
      else
      {
        logError("file: "__FILE__", line: %d, " \
          "client ip: %s, recv failed, " \
          "errno: %d, error info: %s", \
          __LINE__, pTask->client_ip, \
          errno, STRERROR(errno));
        task_finish_clean_up(pTask);
      }
      return;
    }
    else if (bytes == 0)
    {
      logDebug("file: "__FILE__", line: %d, " \
        "client ip: %s, recv failed, " \
        "connection disconnected.", \
        __LINE__, pTask->client_ip);
      task_finish_clean_up(pTask);
      return;
    }
    if (pClientInfo->total_length == 0) //header
    {  // 要来解析header
      if (pTask->offset + bytes < sizeof(TrackerHeader)) // 还没有读够 header
      {
        pTask->offset += bytes;   
        return;
      }
      pClientInfo->total_length=buff2long(((TrackerHeader *) \  //确定包data的总长度:比如下载文件时,接收的包,就只有包的长度, 这里不包括header
            pTask->data)->pkg_len);
      if (pClientInfo->total_length < 0)
      {
        logError("file: "__FILE__", line: %d, " \
          "client ip: %s, pkg length: " \
          "%"PRId64" < 0", \
          __LINE__, pTask->client_ip, \
          pClientInfo->total_length);
        task_finish_clean_up(pTask);
        return;
      }
       //包的总长度=包头+包体的长度  //设想发送的场景:包头+包体+包体+...(其中在包头里面含有多个包体的总长度)
      pClientInfo->total_length += sizeof(TrackerHeader);   //因为默认的接收缓冲只有K,所以会分次发送, 计算出来包括header的长度
      if (pClientInfo->total_length > pTask->size)
      {
        pTask->length = pTask->size;  //如果包的总长大于包的分配的长度,那么任务长度等于任务分配的长度, 读到对应的数据就去触发dio
      }
      else
      {
        pTask->length = pClientInfo->total_length;  //确定任务的长度
      }
    }
    pTask->offset += bytes;  // offset增加
    if (pTask->offset >= pTask->length) //recv current pkg done  //接收到当前包完成
    {
      if (pClientInfo->total_offset + pTask->length >= \  //上次操作接收的总的偏移量+这次接收的数据长度,如果大于包的总长度,那么说明包接收完毕
          pClientInfo->total_length)
      {
        /* current req recv done */
        pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
        pTask->req_count++;
      }
      if (pClientInfo->total_offset == 0)
      { // 说明还没有开始处理
        pClientInfo->total_offset = pTask->length;   //数据服务器进行处理
        storage_deal_task(pTask); // 解析header 以及我们协议附加的信息
      }
      else
      {
        pClientInfo->total_offset += pTask->length;  //否则继续写文件
        /* continue write to file */
        storage_dio_queue_push(pTask); // 比如文件增加
      }
      return;
    }
  }
  return;
}
(2)client_sock_write

服务端发送数据write给客户端,对于客户端来说,就是下载文件

一个下载的任务,要分成好几个ptask,让网络io线程去发送(work线程)。

如果ptask的数据发送完了(ptask->offset>=ptask->length),那么看总任务的是不是都发送完了。如果总任务都发送完成了,那么就切换 接受RECV状态。

如果总任务还没发送,那么就加入队列中storage_dio_queue_push(pTask),让dio线程再去读取数据。

static void client_sock_write(int sock, short event, void *arg)
{
  int bytes;
  struct fast_task_info *pTask;
        StorageClientInfo *pClientInfo;
  pTask = (struct fast_task_info *)arg;
        pClientInfo = (StorageClientInfo *)pTask->arg;
  if (pTask->canceled)
  {
    return;
  }
  if (event & IOEVENT_TIMEOUT)
  {
    logError("file: "__FILE__", line: %d, "
      "client ip: %s, send timeout, offset: %d, "
            "remain bytes: %d", __LINE__, pTask->client_ip,
            pTask->offset, pTask->length - pTask->offset);
    task_finish_clean_up(pTask);
    return;
  }
  if (event & IOEVENT_ERROR)
  {
    logDebug("file: "__FILE__", line: %d, "
      "client ip: %s, recv error event: %d, "
      "close connection", __LINE__, pTask->client_ip, event);
    task_finish_clean_up(pTask);
    return;
  }
  while (1)
  {
    fast_timer_modify(&pTask->thread_data->timer,
      &pTask->event.timer, g_current_time +
      g_fdfs_network_timeout);
    bytes = send(sock, pTask->data + pTask->offset, \
        pTask->length - pTask->offset,  0);
    //printf("%08X sended %d bytes\n", (int)pTask, bytes);
    if (bytes < 0)
    {
      if (errno == EAGAIN || errno == EWOULDBLOCK)
      {
        set_send_event(pTask);
      }
      else if (errno == EINTR)
      {
        continue;
      }
      else
      {
        logError("file: "__FILE__", line: %d, " \
          "client ip: %s, recv failed, " \
          "errno: %d, error info: %s", \
          __LINE__, pTask->client_ip, \
          errno, STRERROR(errno));
        task_finish_clean_up(pTask);
      }
      return;
    }
    else if (bytes == 0)
    {
      logWarning("file: "__FILE__", line: %d, " \
        "send failed, connection disconnected.", \
        __LINE__);
      task_finish_clean_up(pTask);
      return;
    }
    pTask->offset += bytes;
    if (pTask->offset >= pTask->length)
    {
      if (set_recv_event(pTask) != 0)
      {
        return;
      }
      pClientInfo->total_offset += pTask->length;
      if (pClientInfo->total_offset>=pClientInfo->total_length)
      {
        if (pClientInfo->total_length == sizeof(TrackerHeader)
          && ((TrackerHeader *)pTask->data)->status == EINVAL)
        {
          logDebug("file: "__FILE__", line: %d, "\
            "close conn: #%d, client ip: %s", \
            __LINE__, pTask->event.fd,
            pTask->client_ip);
          task_finish_clean_up(pTask);
          return;
        }
        /*  response done, try to recv again */
        pClientInfo->total_length = 0;
        pClientInfo->total_offset = 0;
        pTask->offset = 0;
        pTask->length = 0;
        pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
      }
      else  //continue to send file content
      {
        pTask->length = 0;
        /* continue read from file */
        storage_dio_queue_push(pTask);    // 继续发送数据
      }
      return;
    }
  }
}

4)storage_upload_file

如果client_sock_read读取数据,协议中CMD为上传的指令,那么在work进程中(nio)中

执行该函数storage_upload_file,初始化要保存的文件信息,并且storage_upload_file末尾,通过storage_write_to_file将写入磁盘任务,放入队列中storage_dio_queue_push,让dio线程去执行

上传文件是在dio线程中执行dio_write_file,它是作为回调函数执行的。

static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
{
  StorageClientInfo *pClientInfo;
  StorageFileContext *pFileContext;
  DisconnectCleanFunc clean_func;
  char *p;
  char filename[128];
  char file_ext_name[FDFS_FILE_PREFIX_MAX_LEN + 1];
  int64_t nInPackLen;
  int64_t file_offset;
  int64_t file_bytes;
  int crc32;
  int store_path_index;
  int result;
  int filename_len;
  pClientInfo = (StorageClientInfo *)pTask->arg;
  pFileContext =  &(pClientInfo->file_context);     // 对应一个文件上下文
  nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
  if (nInPackLen < 1 + FDFS_PROTO_PKG_LEN_SIZE + 
      FDFS_FILE_EXT_NAME_MAX_LEN)
  {
    logError("file: "__FILE__", line: %d, " \
      "cmd=%d, client ip: %s, package size " \
      "%"PRId64" is not correct, " \
      "expect length >= %d", __LINE__, \
      STORAGE_PROTO_CMD_UPLOAD_FILE, \
      pTask->client_ip,  nInPackLen, \
      1 + FDFS_PROTO_PKG_LEN_SIZE + \
      FDFS_FILE_EXT_NAME_MAX_LEN);
    return EINVAL;
  }
  p = pTask->data + sizeof(TrackerHeader);  // 跳过header
  store_path_index = *p++;        // store_path_index 解析store path值
  if (store_path_index == -1)       // 
  {
    if ((result=storage_get_storage_path_index( \
      &store_path_index)) != 0)
    {
      logError("file: "__FILE__", line: %d, " \
        "get_storage_path_index fail, " \
        "errno: %d, error info: %s", __LINE__, \
        result, STRERROR(result));
      return result;
    }
  }
  else if (store_path_index < 0 || store_path_index >= \
    g_fdfs_store_paths.count)
  {
    logError("file: "__FILE__", line: %d, " \
      "client ip: %s, store_path_index: %d " \
      "is invalid", __LINE__, \
      pTask->client_ip, store_path_index);
    return EINVAL;
  }
  file_bytes = buff2long(p);    // 解析处理要上传的文件大小
  p += FDFS_PROTO_PKG_LEN_SIZE;
  if (file_bytes < 0 || file_bytes != nInPackLen - \
      (1 + FDFS_PROTO_PKG_LEN_SIZE + \
       FDFS_FILE_EXT_NAME_MAX_LEN))
  {
    logError("file: "__FILE__", line: %d, " \
      "client ip: %s, pkg length is not correct, " \
      "invalid file bytes: %"PRId64 \
      ", total body length: %"PRId64, \
      __LINE__, pTask->client_ip, file_bytes, nInPackLen);
    return EINVAL;
  }
  memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);
  *(file_ext_name + FDFS_FILE_EXT_NAME_MAX_LEN) = '\0';
  p += FDFS_FILE_EXT_NAME_MAX_LEN;
  if ((result=fdfs_validate_filename(file_ext_name)) != 0)    // 检验扩展名
  {
    logError("file: "__FILE__", line: %d, " \
      "client ip: %s, file_ext_name: %s " \
      "is invalid!", __LINE__, \
      pTask->client_ip, file_ext_name);
    return result;
  }
  pFileContext->calc_crc32 = true;
  pFileContext->calc_file_hash = g_check_file_duplicate;  // 是否要检测文件唯一性
  pFileContext->extra_info.upload.start_time = g_current_time;
  strcpy(pFileContext->extra_info.upload.file_ext_name, file_ext_name);
  storage_format_ext_name(file_ext_name, \
      pFileContext->extra_info.upload.formatted_ext_name);
  pFileContext->extra_info.upload.trunk_info.path. \
        store_path_index = store_path_index;
  pFileContext->extra_info.upload.file_type = _FILE_TYPE_REGULAR;         // 常规文件
  pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_CREATE_FILE;         // 创建文件
  pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time;   // 时间戳
  pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
  if (bAppenderFile)
  {
    pFileContext->extra_info.upload.file_type |= \        // 是否为追加文件模式
          _FILE_TYPE_APPENDER;    
  }
  else
  {
    if (g_if_use_trunk_file && trunk_check_size( \
      TRUNK_CALC_SIZE(file_bytes)))
    {
      pFileContext->extra_info.upload.file_type |= \    // 附加信息
            _FILE_TYPE_TRUNK;           // 设置为trunk文件模式
    }
  }
  if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK)   // trunk文件
  {
    FDFSTrunkFullInfo *pTrunkInfo;
    pFileContext->extra_info.upload.if_sub_path_alloced = true;
    pTrunkInfo = &(pFileContext->extra_info.upload.trunk_info);
    if ((result=trunk_client_trunk_alloc_space( \       // TRUNK_CALC_SIZE(file_bytes) = trunk header + file size
      TRUNK_CALC_SIZE(file_bytes), pTrunkInfo)) != 0)
    {
      return result;
    }
    clean_func = dio_trunk_write_finish_clean_up;
    file_offset = TRUNK_FILE_START_OFFSET((*pTrunkInfo));
    pFileContext->extra_info.upload.if_gen_filename = true;
    trunk_get_full_filename(pTrunkInfo, pFileContext->filename, \
        sizeof(pFileContext->filename));
    pFileContext->extra_info.upload.before_open_callback = \
          dio_check_trunk_file_when_upload;
    pFileContext->extra_info.upload.before_close_callback = \
          dio_write_chunk_header;
    pFileContext->open_flags = O_RDWR | g_extra_open_file_flags;
  }
  else
  {
    char reserved_space_str[32];
    if (!storage_check_reserved_space_path(g_fdfs_store_paths.paths \
      [store_path_index].total_mb, g_fdfs_store_paths.paths \
      [store_path_index].free_mb - (file_bytes/FDFS_ONE_MB), \
      g_avg_storage_reserved_mb))
    {
      logError("file: "__FILE__", line: %d, " \
        "no space to upload file, "
        "free space: %d MB is too small, file bytes: " \
        "%"PRId64", reserved space: %s", \
        __LINE__, g_fdfs_store_paths.paths[store_path_index].\
        free_mb, file_bytes, \
        fdfs_storage_reserved_space_to_string_ex( \
          g_storage_reserved_space.flag, \
          g_avg_storage_reserved_mb, \
          g_fdfs_store_paths.paths[store_path_index]. \
          total_mb, g_storage_reserved_space.rs.ratio,\
          reserved_space_str));
      return ENOSPC;
    }
    crc32 = rand();
    *filename = '\0';
    filename_len = 0;
    pFileContext->extra_info.upload.if_sub_path_alloced = false;
    if ((result=storage_get_filename(pClientInfo, \     // 获取file id
      pFileContext->extra_info.upload.start_time, \   
      file_bytes, crc32, pFileContext->extra_info.upload.\
      formatted_ext_name, filename, &filename_len, \  // 生成的文件需要扩展名
      pFileContext->filename)) != 0)
    {
      return result;
    }
    clean_func = dio_write_finish_clean_up;
    file_offset = 0;
    pFileContext->extra_info.upload.if_gen_filename = true;
    pFileContext->extra_info.upload.before_open_callback = NULL;
    pFileContext->extra_info.upload.before_close_callback = NULL;
    pFileContext->open_flags = O_WRONLY | O_CREAT | O_TRUNC \
            | g_extra_open_file_flags;
  }
    pFileContext->continue_callback = storage_nio_notify;   // 处理完毕后
    return storage_write_to_file(pTask, file_offset, file_bytes, \
            p - pTask->data, dio_write_file, \
      storage_upload_file_done_callback, \
      clean_func, store_path_index);
}

4、storage_dio_init

主要是对dio线程进行初始化

线程的执行函数是dio_thread_entrance

int storage_dio_init()
{
  ...
  for (pThreadData=g_dio_thread_data; pThreadData<pDataEnd; pThreadData++)
  {
    ...
    for (pContext=pThreadData->contexts; pContext<pContextEnd; \
      pContext++)
    {
      if ((result=blocked_queue_init(&(pContext->queue))) != 0)
      {
        return result;
      }
      if ((result=pthread_create(&tid, &thread_attr, \
          dio_thread_entrance, pContext)) != 0)
      {
        logError("file: "__FILE__", line: %d, " \
          "create thread failed, " \
          "startup threads: %d, " \
          "errno: %d, error info: %s", \
          __LINE__, g_dio_thread_count, \
          result, STRERROR(result));
        return result;
      }
      else
      {
        pthread_mutex_lock(&g_dio_thread_lock);
        g_dio_thread_count++;
        pthread_mutex_unlock(&g_dio_thread_lock);
      }
    }
  }
  pthread_attr_destroy(&thread_attr);
  return result;
}

1)dio_thread_entrance

dio线程要做的事,就是从阻塞队列中,一旦获取数据,就执行回调函数

static void *dio_thread_entrance(void* arg) 
{
  ... 
  while (g_continue_flag)
  {
    while ((pTask=blocked_queue_pop(&(pContext->queue))) != NULL)
    {
      ((StorageClientInfo *)pTask->arg)->deal_func(pTask);//执行回调函数
    }
  }
  ...
}

2)dio_write_file

在dio线程中执行,将上传的文件写入磁盘中

int dio_write_file(struct fast_task_info *pTask)
{
  StorageClientInfo *pClientInfo;
  StorageFileContext *pFileContext;
  int result;
  int write_bytes;
  char *pDataBuff;
  pClientInfo = (StorageClientInfo *)pTask->arg;
  pFileContext = &(pClientInfo->file_context);
  result = 0;
  do
  {
  if (pFileContext->fd < 0)
  {
    if (pFileContext->extra_info.upload.before_open_callback!=NULL)
    {
      result = pFileContext->extra_info.upload. \
          before_open_callback(pTask);
      if (result != 0)
      {
        break;
      }
    }
    if ((result=dio_open_file(pFileContext)) != 0)
    {
      break;
    }
  }
  pDataBuff = pTask->data + pFileContext->buff_offset;    // 跳过header以及附加信息, 在deal task的时候赋值的 pFileContext->buff_offset
  write_bytes = pTask->length - pFileContext->buff_offset;  // 
  if (fc_safe_write(pFileContext->fd, pDataBuff, write_bytes) != write_bytes)
  {
    result = errno != 0 ? errno : EIO;
    logError("file: "__FILE__", line: %d, " \
      "write to file: %s fail, fd=%d, write_bytes=%d, " \
      "errno: %d, error info: %s", \
      __LINE__, pFileContext->filename, \
      pFileContext->fd, write_bytes, \
      result, STRERROR(result));
  }
  pthread_mutex_lock(&g_dio_thread_lock);
  g_storage_stat.total_file_write_count++;
  if (result == 0)
  {
    g_storage_stat.success_file_write_count++;
  }
  pthread_mutex_unlock(&g_dio_thread_lock);
  if (result != 0)
  {
    break;
  }
  if (pFileContext->calc_crc32)
  {
    pFileContext->crc32 = CRC32_ex(pDataBuff, write_bytes, \
          pFileContext->crc32);
  }
  if (pFileContext->calc_file_hash)
  {
    if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
    {
      CALC_HASH_CODES4(pDataBuff, write_bytes, \
          pFileContext->file_hash_codes)
    }
    else
    {
      my_md5_update(&pFileContext->md5_context, \
        (unsigned char *)pDataBuff, write_bytes);
    }
  }
  /*
  logInfo("###dio write bytes: %d, pTask->length=%d, buff_offset=%d", \
    write_bytes, pTask->length, pFileContext->buff_offset);
  */
  pFileContext->offset += write_bytes;    // 增加写入文件的字数数量
  if (pFileContext->offset < pFileContext->end)   // pFileContext->end实际是指文件的大小。
  {
    pFileContext->buff_offset = 0;        // 为什么设置为0?因为下一次传输的数据全部为文件内容了
        pFileContext->continue_callback(pTask); // 等待下一次的继续触发,比如 storage_nio_notify
  }
  else              // 文件已经写入完毕
  {
    if (pFileContext->calc_crc32)
    {
      pFileContext->crc32 = CRC32_FINAL( \
            pFileContext->crc32);
    }
    if (pFileContext->calc_file_hash)
    {
      if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
      {
        FINISH_HASH_CODES4(pFileContext->file_hash_codes)
      }
      else
      {
        my_md5_final((unsigned char *)(pFileContext-> \
        file_hash_codes), &pFileContext->md5_context);
      }
    }
    if (pFileContext->extra_info.upload.before_close_callback != NULL)
    {
      result = pFileContext->extra_info.upload. \
          before_close_callback(pTask);
    }
    /* file write done, close it */
    close(pFileContext->fd);
    pFileContext->fd = -1;
    if (pFileContext->done_callback != NULL)
    {
      pFileContext->done_callback(pTask, result);// 比如 storage_upload_file_done_callback
    }
  }
  return 0;
  } while (0);
  pClientInfo->clean_func(pTask);
  if (pFileContext->done_callback != NULL)
  {
    pFileContext->done_callback(pTask, result);
  }
  return result;
}
相关文章
|
9天前
|
消息中间件 存储 Serverless
函数计算产品使用问题之怎么访问网络附加存储(NAS)存储模型文件
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
9天前
|
Kubernetes 负载均衡 网络安全
Kubernetes 网络模型与实践
【8月更文第29天】Kubernetes(K8s)是当今容器编排领域的佼佼者,它提供了一种高效的方式来管理容器化应用的部署、扩展和运行。Kubernetes 的网络模型是其成功的关键因素之一,它支持服务发现、负载均衡和集群内外通信等功能。本文将深入探讨 Kubernetes 的网络模型,并通过实际代码示例来展示服务发现和服务网格的基本概念及其实现。
28 1
|
5天前
|
网络协议 数据安全/隐私保护 网络架构
计算机网络模型
【9月更文挑战第2天】
37 24
|
3天前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
3天前
|
算法
基于GA遗传优化的离散交通网络双层规划模型设计matlab仿真
该程序基于GA遗传优化设计了离散交通网络的双层规划模型,以路段收费情况的优化为核心,并通过一氧化碳排放量评估环境影响。在MATLAB2022a版本中进行了验证,显示了系统总出行时间和区域排放最小化的过程。上层模型采用多目标优化策略,下层则确保总阻抗最小,实现整体最优解。
|
6天前
|
分布式计算 负载均衡 监控
p2p网络架构模型
P2P(Peer-to-Peer)模式是一种网络架构模型,在这种模型中,每个节点(peer)既是服务的提供者也是服务的消费者。这意味着每个参与的节点都可以直接与其他节点通信,并且可以相互提供资源和服务,例如文件共享、流媒体传输等。
16 6
|
3天前
|
网络协议 安全 网络安全
C语言 网络编程(四)常见网络模型
这段内容介绍了目前被广泛接受的三种网络模型:OSI七层模型、TCP五层模型以及TCP/IP四层模型,并简述了多个网络协议的功能与特性,包括HTTP、HTTPS、FTP、DNS、SMTP、TCP、UDP、IP、ICMP、ARP、RARP及SSH协议等,同时提到了ssh的免费开源实现openssh及其在Linux系统中的应用。
|
3天前
|
Linux C语言
C语言 文件IO (系统调用)
本文介绍了Linux系统调用中的文件I/O操作,包括文件描述符、`open`、`read`、`write`、`lseek`、`close`、`dup`、`dup2`等函数,以及如何获取文件属性信息(`stat`)、用户信息(`getpwuid`)和组信息(`getgrgid`)。此外还介绍了目录操作函数如`opendir`、`readdir`、`rewinddir`和`closedir`,并提供了相关示例代码。系统调用直接与内核交互,没有缓冲机制,效率相对较低,但实时性更高。
|
3天前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
11天前
|
存储 监控 Linux
性能分析之从 IO 高定位到具体文件
【8月更文挑战第21天】性能分析之从 IO 高定位到具体文件
19 0
性能分析之从 IO 高定位到具体文件
下一篇
DDNS