【网络篇】第十篇——线程池版的TCP网络程序

简介: 【网络篇】第十篇——线程池版的TCP网络程序

单纯多线程存在的问题

当前多线程版的服务器存在的问题:

  • 每当有新连接到来时,服务端的主线程都会重新为该客户端创建为其提供服务的新线程,而当服务结束后又会将该新线程销毁。这样做不仅麻烦,而且效率低下,每当连接到来的时候服务端才创建对应提供服务的线程。
  • 如果有大量的客户端连接请求,此时服务端要为每一个客户端创建对应的服务线程。计算机当中的线程越多,CPU的压力就越大,因为CPU要不断在这些线程之间来回切换,此时CPU在调度线程的时候,线程和线程之间切换的成本就会变得很高。此外,一旦线程太多,每一个线程再次被调度的周期就变长了,而线程是为客户端提供服务的,线程被调度的周期变长,客户端也迟迟得不到应答。

解决思路

针对这两个问题,对应的解决思路如下:

  • 可以在服务端预先创建一批线程,当有客户端请求连接时就让这些线程为客户端提供服务,此时客户端一来就有线程为其提供服务,而不是当客户端来了才创建对应的服务线程。
  • 当某个线程为客户端提供完服务后,不要让该线程退出,而是让该线程继续为下一个客户端提供服务,如果当前没有客户端连接请求,则可以让该线程先进入休眠状态,当有客户端连接到来时再将该线程唤醒。
  • 服务端创建的这一批线程的数量不能太多,此时CPU的压力也就不会太大。此外,如果有客户端连接到来,但此时这一批线程都在给其他客户端提供服务,这时服务端不应该再创建线程,而应该让这个新来的连接请求在全连接队列进行排队,等服务端这一批线程中有空闲线程后,再将该连接请求获取上来并为其提供服务。

引入线程池

实际要解决这里的问题我们就需要在服务端引入线程池,因为线程池的存在就是为了避免处理短时间任务时创建与销毁线程的代价,此外,线程池还能够保证内核充分利用,防止过分调度。

其中在线程池里面有一个任务队列,当有新的任务到来的时候,就可以将任务Push到线程池当中,在线程池当中我们默认创建了5个线程,这些线程不断检测任务队列当中是否有任务,如果有任务就拿出任务,然后调用该任务对应的Run函数对该任务进行处理,如果线程池当中没有任务那么当前线程就会进入休眠状态。


在博主的另一篇博客当中详细介绍并实现了线程池,这里就直接将线程池的代码接入到当前的TCP服务器,因此下面只会讲解线程池接入的方法,如果对线程池的实现有疑问的可以去阅读那篇博客。

#define NUM 5
//线程池
template<class T>
class ThreadPool
{
private:
  bool IsEmpty()
  {
    return _task_queue.size() == 0;
  }
  void LockQueue()
  {
    pthread_mutex_lock(&_mutex);
  }
  void UnLockQueue()
  {
      pthread_mutex_unlock(&_mutex);
  }
  void Wait()
  {
      pthread_cond_wait(&_cond, &_mutex);
  }
  void WakeUp()
  {
      pthread_cond_signal(&_cond);
  }
public:
  ThreadPool(int num = NUM)
    : _thread_num(num)
  {
    pthread_mutex_init(&_mutex, nullptr);
    pthread_cond_init(&_cond, nullptr);
  }
  ~ThreadPool()
  {
      pthread_mutex_destroy(&_mutex);
      pthread_cond_destroy(&_cond);
  }
  //线程池中线程的执行例程
  static void* Routine(void* arg)
  {
      pthread_detach(pthread_self());
      ThreadPool* self = (ThreadPool*)arg;
      //不断从任务队列获取任务进行处理
    while (true){
      self->LockQueue();
      while (self->IsEmpty()){
        self->Wait();
      }
      T task;
      self->Pop(task);
      self->UnLockQueue();
      task.Run(); //处理任务
    }
  }
  void ThreadPoolInit()
  {
    pthread_t tid;
    for (int i = 0; i < _thread_num; i++){
      pthread_create(&tid, nullptr, Routine, this); //注意参数传入this指针
    }
  }
  //往任务队列塞任务(主线程调用)
  void Push(const T& task)
  {
      LockQueue();
      _task_queue.push(task);
      UnLockQueue();
      WakeUp();
  }
  //从任务队列获取任务(线程池中的线程调用)
  void Pop(T& task)
  {
      task = _task_queue.front();
      _task_queue.pop();
  }
private:
  std::queue<T> _task_queue; //任务队列
  int _thread_num; //线程池中线程的数量
  pthread_mutex_t _mutex;
  pthread_cond_t _cond;
};

服务类新增线程池成员

现在服务端引入了线程池,因此在服务类当中需要新增一个指向线程池的指针成员:

  • 当实例化服务器对象时,先将这个线程池指针先初始化为空。
  • 当服务器初始化完毕后,再实际构造这个线程池对象,在构造线程池对象时可以指定线程池当中线程的个数,也可以不指定,此时默认线程的个数为5。

在启动服务器之前对线程池进行初始化,此时就会将线程池当中的若干线程创建出来,而这些线程创建出来后就会不断检测任务队列,从任务队列当中拿出任务进行处理。

现在当服务进程调用accept函数获取到一个连接请求后,就会根据该客户端的套接字、IP地址以及端口号构建出一个任务,然后调用线程池提供的Push接口将该任务塞入任务队列。


这实际也是一个生产者消费者模型,其中服务进程就作为了任务的生产者,而后端线程池当中的若干线程就不断从任务队列当中获取任务进行处理,它们承担的就是消费者的角色,其中生产者和消费者的交易场所就是线程池当中的任务队列。

class TcpServer
{
public:
  TcpServer(int port)
    : _listen_sock(-1)
    , _port(port)
    , _tp(nullptr)
  {}
  void InitServer()
  {
    //创建套接字
    _listen_sock = socket(AF_INET, SOCK_STREAM, 0);
    if (_listen_sock < 0){
      std::cerr << "socket error" << std::endl;
      exit(2);
    }
    //绑定
    struct sockaddr_in local;
    memset(&local, '\0', sizeof(local));
    local.sin_family = AF_INET;
    local.sin_port = htons(_port);
    local.sin_addr.s_addr = INADDR_ANY;
    if (bind(_listen_sock, (struct sockaddr*)&local, sizeof(local)) < 0){
      std::cerr << "bind error" << std::endl;
      exit(3);
    }
    //监听
    if (listen(_listen_sock, BACKLOG) < 0){
      std::cerr << "listen error" << std::endl;
      exit(4);
    }
    _tp = new ThreadPool<Task>(); //构造线程池对象
  }
  void Start()
  {
    _tp->ThreadPoolInit(); //初始化线程池
    for (;;){
      //获取连接
      struct sockaddr_in peer;
      memset(&peer, '\0', sizeof(peer));
      socklen_t len = sizeof(peer);
      int sock = accept(_listen_sock, (struct sockaddr*)&peer, &len);
      if (sock < 0){
        std::cerr << "accept error, continue next" << std::endl;
        continue;
      }
      std::string client_ip = inet_ntoa(peer.sin_addr);
      int client_port = ntohs(peer.sin_port);
      std::cout << "get a new link->" << sock << " [" << client_ip << "]:" << client_port << std::endl;
      Task task(sock, client_ip, client_port); //构造任务
      _tp->Push(task); //将任务Push进任务队列
    }
  }
private:
  int _listen_sock; //监听套接字
  int _port; //端口号
  ThreadPool<Task>* _tp; //线程池
};

设计任务类

现在我们要做的就是设计一个任务类,该任务类当中需要包含客户端对应的套接字、IP地址、端口号,表示该任务是为哪一个客户端提供服务,对应操作的套接字是哪一个。

此外,任务类当中需要包含一个Run方法,当线程池中的线程拿到任务后就会直接调用这个Run方法对该任务进行处理,而实际处理这个任务的方法就是服务类当中的Service函数,服务端就是通过调用Service函数为客户端提供服务的。


我们可以直接拿出服务类当中的Service函数,将其放到任务类当中作为任务类当中的Run方法,但这实际不利于软件分层。我们可以给任务类新增一个仿函数成员,当执行任务类当中的Run方法处理任务时就可以以回调的方式处理该任务。

class Task
{
public:
  Task()
  {}
  Task(int sock, std::string client_ip, int client_port)
    : _sock(sock)
    , _client_ip(client_ip)
    , _client_port(client_port)
  {}
  ~Task()
  {}
  //任务处理函数
  void Run()
  {
      _handler(_sock, _client_ip, _client_port); //调用仿函数
  }
private:
  int _sock; //套接字
  std::string _client_ip; //IP地址
  int _client_port; //端口号
  Handler _handler; //处理方法
};

注意: 当任务队列当中有任务时,线程池当中的线程会先定义出一个Task对象,然后将这个Task对象作为输出型参数调用任务队列的Pop函数,从任务队列当中获取任务,因此Task类除了提供带参的构造函数以外,还需要提供一个无参的构造函数,方便我们可以定义无参对象。

设计Handler类

此时需要再设计一个Handler类,在Handler类当中对()操作符进行重载,将()操作符的执行动作重载为执行Service函数的代码。

class Handler
{
public:
  Handler()
  {}
  ~Handler()
  {}
  void operator()(int sock, std::string client_ip, int client_port)
  {
    char buffer[1024];
    while (true){
      ssize_t size = read(sock, buffer, sizeof(buffer)-1);
      if (size > 0){ //读取成功
        buffer[size] = '\0';
        std::cout << client_ip << ":" << client_port << "# " << buffer << std::endl;
        write(sock, buffer, size);
      }
      else if (size == 0){ //对端关闭连接
        std::cout << client_ip << ":" << client_port << " close!" << std::endl;
        break;
      }
      else{ //读取失败
        std::cerr << sock << " read error!" << std::endl;
        break;
      }
    }
    close(sock); //归还文件描述符
    std::cout << client_ip << ":" << client_port << " service done!" << std::endl;
  }
};

实际我们可以让服务器处理不同的任务,当前服务器只是在进行字符串的回显处理,而实际要怎么处理这个任务完全是由任务类当中的handler成员来决定的。

如果想要让服务器处理其他任务,只需要修改Handler类当中对()的重载函数就行了,而服务器的初始化、启动服务器以及线程池的代码都是不需要更改的,这就叫做把通信功能和业务逻辑在软件上做解耦。

代码测试

此时我们再重新编译服务端代码,并用以下监控脚本查看服务端的各个线程。

while :; do ps -aL|head -1&&ps -aL|grep tcp_server;echo "####################";sleep 1;done

运行服务端后,就算没有客户端发来连接请求,此时在服务端就已经有了6个线程,其中有一个是接收新连接的服务线程,而其余的5个是线程池当中为客户端提供服务的线程。

image.png

此时当客户端连接服务器后,服务端的主线程就会获取该客户端的连接请求,并将其封装为一个任务对象后塞入任务队列,此时线程池中的5个线程就会有一个线程从任务队列当中获取到该任务,并执行该任务的处理函数为客户端提供服务。

image.png

当第二个客户端发起连接请求时,服务端也会将其封装为一个任务类塞到任务队列,然后线程池当中的线程再从任务队列当中获取到该任务进行处理,此时也是不同的执行流为这两个客户端提供的服务,因此这两个客户端也是能够同时享受服务的。

image.png

与之前不同的是,无论现在有多少客户端发来请求,在服务端都只会有线程池当中的5个线程为之提供服务,线程池当中的线程个数不会随着客户端连接的增多而增多,这些线程也不会因为客户端的退出而退出。

相关文章
|
8天前
|
网络协议 网络安全 网络虚拟化
本文介绍了十个重要的网络技术术语,包括IP地址、子网掩码、域名系统(DNS)、防火墙、虚拟专用网络(VPN)、路由器、交换机、超文本传输协议(HTTP)、传输控制协议/网际协议(TCP/IP)和云计算
本文介绍了十个重要的网络技术术语,包括IP地址、子网掩码、域名系统(DNS)、防火墙、虚拟专用网络(VPN)、路由器、交换机、超文本传输协议(HTTP)、传输控制协议/网际协议(TCP/IP)和云计算。通过这些术语的详细解释,帮助读者更好地理解和应用网络技术,应对数字化时代的挑战和机遇。
38 3
|
8天前
|
存储 网络协议 安全
30 道初级网络工程师面试题,涵盖 OSI 模型、TCP/IP 协议栈、IP 地址、子网掩码、VLAN、STP、DHCP、DNS、防火墙、NAT、VPN 等基础知识和技术,帮助小白们充分准备面试,顺利踏入职场
本文精选了 30 道初级网络工程师面试题,涵盖 OSI 模型、TCP/IP 协议栈、IP 地址、子网掩码、VLAN、STP、DHCP、DNS、防火墙、NAT、VPN 等基础知识和技术,帮助小白们充分准备面试,顺利踏入职场。
25 2
|
19天前
|
网络协议 安全 Go
Go语言进行网络编程可以通过**使用TCP/IP协议栈、并发模型、HTTP协议等**方式
【10月更文挑战第28天】Go语言进行网络编程可以通过**使用TCP/IP协议栈、并发模型、HTTP协议等**方式
46 13
|
1月前
|
Java 应用服务中间件
面对海量网络请求,Tomcat线程池如何进行扩展?
【10月更文挑战第4天】本文详细探讨了Tomcat线程池相较于标准Java实用工具包(JUC)线程池的关键改进。首先,Tomcat线程池在启动时即预先创建全部核心线程,以应对启动初期的高并发请求。其次,通过重写阻塞队列的入队逻辑,Tomcat能够在任务数超过当前线程数但未达最大线程数时,及时创建非核心线程,而非等到队列满才行动。此外,Tomcat还引入了在拒绝策略触发后重新尝试入队的机制,以提高吞吐量。这些优化使得Tomcat线程池更适应IO密集型任务,有效提升了性能。
面对海量网络请求,Tomcat线程池如何进行扩展?
|
20天前
|
网络协议 算法 网络性能优化
计算机网络常见面试题(一):TCP/IP五层模型、TCP三次握手、四次挥手,TCP传输可靠性保障、ARQ协议
计算机网络常见面试题(一):TCP/IP五层模型、应用层常见的协议、TCP与UDP的区别,TCP三次握手、四次挥手,TCP传输可靠性保障、ARQ协议、ARP协议
|
27天前
|
Web App开发 缓存 网络协议
不为人知的网络编程(十八):UDP比TCP高效?还真不一定!
熟悉网络编程的(尤其搞实时音视频聊天技术的)同学们都有个约定俗成的主观论调,一提起UDP和TCP,马上想到的是UDP没有TCP可靠,但UDP肯定比TCP高效。说到UDP比TCP高效,理由是什么呢?事实真是这样吗?跟着本文咱们一探究竟!
51 10
|
1月前
|
Java Linux
【网络】高并发场景处理:线程池和IO多路复用
【网络】高并发场景处理:线程池和IO多路复用
45 2
|
1月前
|
网络协议 Java API
【网络】TCP回显服务器和客户端的构造,以及相关bug解决方法
【网络】TCP回显服务器和客户端的构造,以及相关bug解决方法
62 2
|
1月前
|
存储 网络协议 Java
【网络】UDP和TCP之间的差别和回显服务器
【网络】UDP和TCP之间的差别和回显服务器
65 1
|
6天前
|
SQL 安全 网络安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
【10月更文挑战第40天】在数字化时代,网络安全和信息安全已成为我们生活中不可或缺的一部分。本文将介绍网络安全漏洞、加密技术以及安全意识等方面的知识,帮助读者更好地了解网络安全的重要性,并提供一些实用的技巧和建议,以保护个人和组织的信息安全。
29 6
下一篇
无影云桌面