面试项目说实现了一个后端多线程网络服务器框架应该怎样写

简介: 面试项目说实现了一个后端多线程网络服务器框架应该怎样写

线程池引言

池分类

  1. 线程池
  2. 数据库连接池
  3. 内存池
  4. 异步请求池

池化优势

缓冲, 重复利用, 大大减少重建,  节约资源, 提高效率, 提高利用率

核心优势在哪里?

  1. 提前创建, 申请, 反复利用, 而不是重新创建, 申请.
  2. 反复利用所以利用率高, 也节约了资源
  3. 提前创建, 而不是临时创建, 省去了创建时间, 提高了效率

用在何处

  1. 频繁需要申请释放处。  反正经常用, 我何不提前创建好, 等待你用, 用完我也不扔掉, 继续等你其他时候用.
  2. 多线程处:均可以考虑抛入线程池, 减少线程频繁创建销毁
  3. 注意:频繁是核心.
  4. 生活例子: 蓄水池, 酒池肉林, 好处何在? 方便吧, 提前放好, 随用随取

线程池组件

线程队列:

提前开启线程,

多线程同步消费任务队列中的任务


多线程同步消费:加锁 + 条件消费,  同步等待, 一个线程需要消费必须同时具备两个条件, 1.获取锁 2. 满足条件  (存在任务)


同步:核心在于条件等待, 等待着一个条件满足,然后同步触发一个事件, 阻塞函数, 同步等待.


同步优势:对于共享资源的有序消费, 多线程之间互相等待, 互相同步, 稳定消费


任务队列

也可叫做阻塞队列:   blocking queue 学名


阻塞队列作用:异步解耦合, 任务放入任务队列中, 立刻返回, 在工作线程繁忙的时候,不至于需要等待线程空闲.


线程队列

组织成双向队列的多线程. 工作线程, 消费者线程, 提前开启, 等待处理任务


生活实例:办事窗口


线程池

将上述两大组件组合在一起   +  mutex锁 +  cond条件变量  实现同步消费  

Reactor分解

何为Reactor

反应堆, 事件反应堆, 反射堆:   将对io的操作封装成对事件的操作

Reactor组件

  1. 多路复用器 收集反应事件 epoll
  2. 事件处理器 回调处理机制
  3. 利用回调封装事件循环

网络IO处理分解

  1. io检测封装: epoll活跃io事件收集
  2. io操作封装, 读写io
  3. 对数据的解析操作封装, parser 业务逻辑

Reactor抛入线程池的方式

single reactor thread + worker threadpool

抛线程池方式1: 将parser业务处理抛入线程池

单线程reactor + 工作线程池

做法: 将业务逻辑处理单独抛入一个工作线程池进行处理.  实现网络IO跟业务的解耦合


使用场景:    相比IO, 业务逻辑处理耗时相当严重. 比如说写日志呀,  XML 文件的解析、数据库记录的查找、文件资料的读取和传输、计算型工作的处理等,它们会拖慢整个反应堆模式的执行效率。此时我们就可以将其单独抛到另外的Thread pool 中去执行业务需求.


好处:反应堆线程仅仅处理网络IO  而 decode、compute、enode 型工作放置到另外的线程池中, 两者解耦,在业务处理耗时情况下大大提高效率


抛线程池方式2: 将IO操作 + parser都抛入线程池处理

使用场景:IO操作跟parser的处理都相当耗时的情境下, 将其放在事件循环中会拖慢整个事件循环的进程。


好处:事件循环可以最快的响应活跃事件.


缺陷:针对IO操作: 我们可能存在对于fd的一个共享问题. 一个线程在操作fd,另外一个线程给fd关闭了,这个就是一个大的问题.   (核心在于可能出现fd的多线程共用的问题)


处理方式: 要么不应该使多个线程共享同一个fd,要么对fd进行简单的加锁操作。

充分利用多核CPU,主从Reactor

单Reactor的时候,  reactor 反应堆同时分发Acceptor 上的连接建立事件和已建立连接的 I/O 事件。这样对于客户端接入量不高的情况下是完全OK的.


但是一旦客户端接入量特别大的情况下, reactor既要分发连接建立,又分发已建立连接的 I/O,有点忙不过来,在实战中的表现可能就是客户端连接成功率偏低。


引出   ---   主从Reactor模式, 多Reactor模型, 将 acceptor 上的连接建立事件和已建立连接的 I/O 事件分离


核心思想:  main Reactor只负责分发连接建立事件, sub Reactor 来负责已经建立连接的事件的分发.


sub Reactor的数量设置:  依据CPU数量而定


优势:服务器稳定性大大提升, 客户端连接成功率大大提高

面试项目书写小技巧 (文末彩蛋)

对比书写, 分版本书写, 不同的版本优势是什么, 加入了什么技术, 带来了什么样的好处, 获益是什么


eg:本文中的reactor.


我在简历项目上书写, 实现了一个多线程reactor网络服务器框架


最开始使用什么样的技术, 实现了单线程reactor的框架, 然后对比单线程reacor跟多线程reactor, 表明加入线程池  +  主从reactor之后的优势所在.   带来了什么样的提升, 服务器稳定性提升呀, 性能抗压性提升等.                 (此处仅为小杰己见, 可能会有益处)  


如下是我在网上找到的一个大佬写的单Reactor + 线程池的实现, 我觉得写的很棒, 其中的代码很多都是特别值得我们去品味的

如下是小杰自己封装的一款单线程 reactor + 方式1 抛入线程池.

threadpool.h

#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
typedef void (*Func)(void*);
/*
  线程池组件
  1. 任务队列 (阻塞队列)
  2. 工作队列 (消化任务队列中的任务)
  3. 管理组件, 管理平衡工作 + 任务队列  (线程池)
*/
#define LL_ADD(item, list) do {\
  item->pre = NULL;\
  item->next = list;\
  list = item;\
} while (0)
#define LL_REMOVE(item, list) do {\
  if (item->pre != NULL) item->pre->next = item->next;\
  if (item->next != NULL) item->next->pre = item->pre;\
  if (item == list) list = item->next;\
  item->pre = item->next = NULL;\
} while (0)
typedef struct Task {
  Func task_run; //执行task任务, 处理user_data
  void* user_data;
  struct Task* pre;
  struct Task* next;
} Task;
//工作线程, 消化执行 task
typedef struct Wocker {
  pthread_t tid;
  int terminate;//终止, 停止工作
  struct Mannger* pool; 
  struct Wocker* pre;
  struct Wocker* next;
} Wocker;
//管理组件, 管理平衡上述的Task + Wocker
typedef struct Mannger {
  struct Wocker* wockers;//工作队列
  struct Task* tasks;//任务队列
  pthread_mutex_t lock;
  pthread_cond_t cond;
} ThreadPool;
//线程执行函数, 核心所在
void* thread_routine(void* arg) {
  Wocker* wocker = (Wocker*)arg;
  while (1) {
    pthread_mutex_lock(&wocker->pool->lock);
    if (wocker->pool->tasks == NULL) {
      if (wocker->terminate) break;//中断
      pthread_cond_wait(&wocker->pool->cond, &wocker->pool->lock);
    }
    //至此说明获取到锁了
    if (wocker->terminate) {
      pthread_mutex_unlock(&wocker->pool->lock);
      break;
    }
    Task* task = wocker->pool->tasks;
    if (task != NULL) {
      LL_REMOVE(task, wocker->pool->tasks);
    }
    pthread_mutex_unlock(&wocker->pool->lock);
    task->task_run(task);
  }
  free(wocker);//delete掉
}
//创建线程池, 开启消费者线程
ThreadPool* thread_pool_create(int thread_num) {
  ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
  pool->wockers = NULL;
    pool->tasks = NULL;
  pthread_mutex_init(&pool->lock, NULL);
  pthread_cond_init(&pool->cond, NULL);
  int i = 0;
  for (i = 0; i < thread_num; ++i) {
    Wocker* wocker = (Wocker*)calloc(sizeof(Wocker), 1);
    wocker->pool = pool;
    pthread_create(&wocker->tid, NULL, thread_routine, (void*)wocker);
    LL_ADD(wocker, pool->wockers);
  }
  return pool;
}
void thread_pool_destroy(ThreadPool* pool) {
  if (pool == NULL) return ;
  Wocker* wocker = NULL;
  for (wocker = pool->wockers; wocker != NULL; wocker = wocker->next) {
    wocker->terminate = 1;//中断运行, 所有的工作线程中断工作
  }
  pthread_mutex_lock(&pool->lock);
  pthread_cond_broadcast(&pool->cond);//核心所在
  //广播让所有的工作线程退出工作
  pthread_mutex_unlock(&pool->lock);
  pthread_cond_destroy(&pool->cond);
  pthread_mutex_destroy(&pool->lock);
  //释放所有资源
  //free_all(pool);
}
void thread_pool_push_task(ThreadPool* pool, Task* task) {
    pthread_mutex_lock(&pool->lock);
    LL_ADD(task, pool->tasks);
    pthread_cond_signal(&pool->cond);//通知有任务可以消费了
    pthread_mutex_unlock(&pool->lock);
}
#endif

reactor.h

#ifndef _REACTOR_H_
#define _REACTOR_H_
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include "threadpool.h"
//简易reactor封装, 实现事件循环, 事件驱动
//io检测封装: epoll事件收集
//事件驱动, 事件循环封装, 设置回调
//IO操作封装, 设置收发缓冲区 + 处理返回
//reactor进一步升级, 更符合业务需求.
//网络 跟 业务需求 隔离解除耦合性
#define MAX_N 512 
typedef struct sockaddr SA;
typedef int (*CallBack)(int fd, int events, void* arg);
#define BUFFSIZE 1024
void err_exit(const char* reason) {
  fprintf(stderr, "%s : %d : %s\n", reason, errno, strerror(errno));
  exit(EXIT_FAILURE);
}
//封装epoll
typedef struct reactor {
  int epfd;
  struct epoll_event* events;//容器, 收集活跃事件
  int stop;
  ThreadPool* pool;
} reactor;
typedef struct sockitem {
  int sockfd;
  //封装回调, 事件驱动
  CallBack callback;
  struct reactor* eventloop;
  //每条连接读写缓冲区封装
  char recvbuffer[BUFFSIZE];
  int rlen;
  char sendbuffer[BUFFSIZE];
  int slen;
} sockitem;
//事件处理器声明
int accept_cb(int fd, int events, void* arg);
int recv_cb(int fd, int events, void* arg);
int send_cb(int fd, int events, void* arg);
struct reactor* init_reactor(int n) {
  struct reactor* eventloop = (struct reactor*)malloc(sizeof(struct reactor));
  eventloop->epfd = epoll_create(1);
  eventloop->events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * n);
  eventloop->stop = 0;
  eventloop->pool = thread_pool_create(10);
  return eventloop; 
}
void release_reactor(struct reactor* eventloop) {
  if (NULL == eventloop) return;
  close(eventloop->epfd); //关闭epfd
  free(eventloop);
  return ;
}
struct sockitem* init_sockitem(struct reactor* eventloop, int sockfd, CallBack callback) {
  struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
  si->sockfd = sockfd;
  si->callback = callback;
  si->eventloop = eventloop;
  return si;
}
void setnoblock(int fd) {
  int flag = fcntl(fd, F_GETFL, 0);
  if (-1 == fcntl(fd, F_SETFL, flag | O_NONBLOCK)) {
    err_exit("fcntl");
  }
}
int add_event(struct reactor* eventloop, int events, sockitem* si) {
  struct epoll_event ev;
  ev.events = events;
  ev.data.ptr = si;
  if (-1 == epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, si->sockfd, &ev)) {
    err_exit("epoll_ctl add");
  }
  return 0;
}
int del_event(struct reactor* eventloop, sockitem* si) {
  if (NULL == si) return 1;
  if (-1 == epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, si->sockfd, NULL)) {
    err_exit("epoll_ctl del");
  }
  free(si);
  return 0;
}
int mod_event(struct reactor* eventloop, int events, sockitem* si) {
  struct epoll_event ev;
  ev.events = events;
  ev.data.ptr = si;
  if (-1 == epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, si->sockfd, &ev)) {
    err_exit("epoll_ctl mod");
  }
  return 0;
}
int accept_cb(int fd, int events, void* arg) {
  struct sockitem* si = (struct sockitem*)arg;
  socklen_t cli_addr_len = sizeof(struct sockaddr_in);
  struct sockaddr_in cli_addr;
  char ip_buff[INET_ADDRSTRLEN] = {0};
  int cli_fd = accept(fd, (SA*)&cli_addr, &cli_addr_len);
  setnoblock(cli_fd);//非阻塞, 避免因为一条连接的事件阻塞其他连接得不到处理
  if (-1 == cli_fd) {
    err_exit("accept");
  }
  printf("recv from ip %s at port %d\n", inet_ntop(AF_INET, &cli_addr.sin_addr, ip_buff, sizeof(ip_buff)),
    ntohs(cli_addr.sin_port));
  //封装si事件
  struct sockitem* newsi = init_sockitem(si->eventloop, cli_fd, recv_cb);
  //将si 放入eventloop事件循环
  add_event(si->eventloop, EPOLLIN | EPOLLET, newsi);
  return cli_fd;
}
void task_run(void* arg) {
  Task* task = (Task*)arg;
  //拿取任务执行
  struct sockitem* si = (struct sockitem*)task->user_data;
  char* data = si->recvbuffer;
  //处理数据, 进行运算
  int i = 0;
  char op;
  int lhs = 0, rhs = 0;
  int flag = 0;//标记lhs, rhs
  int ans = 0;
  while (data[i]) {
    if (data[i] >= '0' && data[i] <= '9') {
      if (!flag) {
        for (; data[i] >= '0' && data[i] <= '9'; ++i) {
          lhs = lhs * 10 + (data[i] - '0');
        }
        flag = 1;
        i -= 1;
      } else {
        for (; data[i] >= '0' && data[i] <= '9'; ++i) {
          rhs = rhs * 10 + (data[i] - '0');
        }
        break;
      }
    } else if (data[i] == ' ') {
      i += 1;
          continue;
    } else {
      op = data[i];
    } 
    i += 1;
  }
  switch(op) {
    case '+' : {
      ans = lhs + rhs;
    } break;
    case '-' : {
      ans = lhs - rhs;
    } break;
    case '*' : {
      ans = lhs * rhs;
    } break;
    case '/' : {
      ans = lhs / rhs;
    } break;
  }
    printf("%d %c %d = %d\n", lhs, op, rhs, ans);
  sprintf(si->sendbuffer, "%d %c %d = %d\n", lhs, op, rhs, ans);
  //数据写入到sendbuffer种去
  si->slen = strlen(si->sendbuffer);
  memset(si->recvbuffer, 0, BUFFSIZE);//清空读缓冲区.
  si->rlen = 0;
  si->callback = send_cb;
  mod_event(si->eventloop, EPOLLOUT | EPOLLET, si);
  free(task);
}
int recv_cb(int fd, int events, void* arg) {
  struct sockitem* si = (struct sockitem*)arg;
  struct epoll_event ev;
    int ret = 0;
  while (1) {
    ret = recv(fd, si->recvbuffer, BUFFSIZE, 0);
    if (ret < 0) {
      if (errno == EINTR) {//信号打断
        continue; 
      }
      if (errno == EWOULDBLOCK) {//写缓冲区满了
        break;
      }
      del_event(si->eventloop, si);
      close(fd);
      err_exit("recv");//出错了
    } else if (ret == 0) {
      //对端断开连接
      printf("fd %d disconnect\n", fd);
      del_event(si->eventloop, si);
      close(fd);
      return 0;
    } else {
      break;
    }
  }
  //接收到数据之后进行处理, 将其抛入到线程池处理
  #if 0
  //打印接收到的数据
  printf("recv: %s, %d Bytes\n", si->recvbuffer, ret);
  //设置sendbuffer
  si->rlen = ret;
  memcpy(si->sendbuffer, si->recvbuffer, si->rlen);
  si->slen = si->rlen;
    //清空recvbuffer
    memset(si->recvbuffer, 0,BUFFSIZE);
  si->callback = send_cb;
  mod_event(si->eventloop, EPOLLOUT | EPOLLET, si);
  #elif 1
  //如何抛入到线程池进行处理?
  /*
    假设业务逻辑是: 将字符串转换为算式进行计算
  */
  printf("recv: %s, %d Bytes\n", si->recvbuffer, ret);
  si->rlen = ret;
  Task* task = (Task*)malloc(sizeof(Task));
  //先创建任务结构体
  task->next = task->pre = NULL;
  task->task_run = task_run;
  task->user_data = (void*)si; 
  thread_pool_push_task(si->eventloop->pool, task);
  #endif
  return 0;
}
int send_cb(int fd, int events, void* arg) {
  struct sockitem* si = (struct sockitem*)arg;
  while (1) {
    int n = send(fd, si->sendbuffer, BUFFSIZE, 0);
    if (-1 == n) {
      if (errno == EINTR) {//信号打断
        continue; 
      }
      if (errno == EWOULDBLOCK) {//写缓冲区满了
        break;
      }
      //出错了
      err_exit("send");
    }
    printf("send %d bytes\n", n);
    //每一次send之后从新将senbuffer置为空
    memset(si->sendbuffer, 0, BUFFSIZE);
    break;//正常写完
  }
  si->callback = recv_cb;
  mod_event(si->eventloop, EPOLLIN | EPOLLET, si);
}
//事件循环一次
void eventloop_once(struct reactor* eventloop) {
  int nready = epoll_wait(eventloop->epfd, eventloop->events, MAX_N, -1);
  int i = 0;
  for (i = 0; i < nready; ++i) {
    int mask = 0;
    struct epoll_event* ev= &eventloop->events[i];
    if (ev->events & EPOLLIN) mask |= EPOLLIN;
        if (ev->events & EPOLLOUT) mask |= EPOLLOUT;
        //将EPOLLERR + EPOLLHUP 的处理交付到io函数中进行处理, 放到回调中处理
        if (ev->events & EPOLLERR) mask |= EPOLLIN|EPOLLOUT;
        if (ev->events & EPOLLHUP) mask |= EPOLLIN|EPOLLOUT; 
        if (mask & EPOLLIN) {
          struct sockitem* si = (struct sockitem*)ev->data.ptr;
          si->callback(si->sockfd, mask, si);
        }
        if (mask & EPOLLOUT) {
          struct sockitem* si = (struct sockitem*)ev->data.ptr;
          si->callback(si->sockfd, mask, si);
        }
  }
}
//开启事件循环
void start_eventloop(struct reactor* eventloop) {
  while (!eventloop->stop) {
    eventloop_once(eventloop);
  }
}
//停止事件循环
void stop_eventloop(struct reactor* eventloop) {
  eventloop->stop = 1;
}
#endif

reator.c

#include "reactor.h"
int init_sock(short port) {
  int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  if (-1 == sockfd) {
    err_exit("socket");
  }
  struct sockaddr_in addr;
  memset(&addr, 0, sizeof(addr));
  addr.sin_family = AF_INET;
  addr.sin_port = htons(port);
  addr.sin_addr.s_addr = INADDR_ANY;
  if (-1 == bind(sockfd, (SA*)&addr, sizeof(addr))) {
    err_exit("bind");
  }
  if (-1 == listen(sockfd, 5)) {
    err_exit("listen");
  }
  return sockfd;
}
int main(int argc, char* argv[]) {
  if (2 != argc) {
    fprintf(stderr, "usage: %s <port>", argv[0]);
    exit(EXIT_FAILURE);
  }
  short port = atoi(argv[1]);
  int sockfd = init_sock(port);
  //至此完成了网络连接了
  struct reactor* mainloop = init_reactor(MAX_N);
  //封装listen的 
  struct sockitem* si = init_sockitem(mainloop, sockfd, accept_cb);
  add_event(mainloop, EPOLLIN, si);//对于监视套接字一般是水平触发
  start_eventloop(mainloop);
  //回收资源
  release_reactor(mainloop);
  return 0;
}
相关文章
|
2月前
|
弹性计算 监控 负载均衡
|
2月前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
147 6
|
2月前
|
负载均衡 监控 应用服务中间件
配置Nginx反向代理时如何指定后端服务器的权重?
配置Nginx反向代理时如何指定后端服务器的权重?
177 61
|
1月前
|
机器学习/深度学习 算法 PyTorch
基于图神经网络的大语言模型检索增强生成框架研究:面向知识图谱推理的优化与扩展
本文探讨了图神经网络(GNN)与大型语言模型(LLM)结合在知识图谱问答中的应用。研究首先基于G-Retriever构建了探索性模型,然后深入分析了GNN-RAG架构,通过敏感性研究和架构改进,显著提升了模型的推理能力和答案质量。实验结果表明,改进后的模型在多个评估指标上取得了显著提升,特别是在精确率和召回率方面。最后,文章提出了反思机制和教师网络的概念,进一步增强了模型的推理能力。
67 4
基于图神经网络的大语言模型检索增强生成框架研究:面向知识图谱推理的优化与扩展
|
2月前
|
人工智能 自然语言处理
WebDreamer:基于大语言模型模拟网页交互增强网络规划能力的框架
WebDreamer是一个基于大型语言模型(LLMs)的网络智能体框架,通过模拟网页交互来增强网络规划能力。它利用GPT-4o作为世界模型,预测用户行为及其结果,优化决策过程,提高性能和安全性。WebDreamer的核心在于“做梦”概念,即在实际采取行动前,用LLM预测每个可能步骤的结果,并选择最有可能实现目标的行动。
75 1
WebDreamer:基于大语言模型模拟网页交互增强网络规划能力的框架
|
1月前
|
运维 监控 负载均衡
slb后端服务器故障
slb后端服务器故障
56 13
|
1月前
|
开发框架 .NET PHP
网站应用项目如何选择阿里云服务器实例规格+内存+CPU+带宽+操作系统等配置
对于使用阿里云服务器的搭建网站的用户来说,面对众多可选的实例规格和配置选项,我们应该如何做出最佳选择,以最大化业务效益并控制成本,成为大家比较关注的问题,如果实例、内存、CPU、带宽等配置选择不合适,可能会影响到自己业务在云服务器上的计算性能及后期运营状况,本文将详细解析企业在搭建网站应用项目时选购阿里云服务器应考虑的一些因素,以供参考。
|
2月前
|
JSON 数据处理 Swift
Swift 中的网络编程,主要介绍了 URLSession 和 Alamofire 两大框架的特点、用法及实际应用
本文深入探讨了 Swift 中的网络编程,主要介绍了 URLSession 和 Alamofire 两大框架的特点、用法及实际应用。URLSession 由苹果提供,支持底层网络控制;Alamofire 则是在 URLSession 基础上增加了更简洁的接口和功能扩展。文章通过具体案例对比了两者的使用方法,帮助开发者根据需求选择合适的网络编程工具。
46 3
|
2月前
|
JavaScript
使用node.js搭建一个express后端服务器
Express 是 Node.js 的一个库,用于搭建后端服务器。本文将指导你从零开始构建一个简易的 Express 服务器,包括项目初始化、代码编写、服务启动与项目结构优化。通过创建 handler 和 router 文件夹分离路由和处理逻辑,使项目更清晰易维护。最后,通过 Postman 测试确保服务正常运行。
125 1
|
2月前
|
存储 安全 网络安全
网络安全法律框架:全球视角下的合规性分析
网络安全法律框架:全球视角下的合规性分析
67 1

热门文章

最新文章