线程池-手写线程池C++11版本(生产者-消费者模型)

简介: 线程池-手写线程池C++11版本(生产者-消费者模型)

本项目是基于C++11的线程池。使用了许多C++的新特性,包含不限于模板函数泛型编程、std::future、std::packaged_task、std::bind、std::forward完美转发、std::make_shared智能指针、decltype类型推断、std::unique_lock锁等C++11新特性功能。


代码结构

本项目线程池功能分以下几个函数去实现:

threadpool.init(isize_t num);设置线程的数量

threadpool::get(TaskFuncPtr& task);读取任务队列中的任务

threadpool::run();通过get()读取任务并执行

threadpool.start(); 启动线程池,并通过run()执行任务

threadpool.exec();封装任务到任务队列中

threadpool.waitForAllDone();等待所有任务执行完成

threadpool.stop();分离线程,释放内存

threadpool.init

init的功能是初始化线程池,主要是设置线程的数量到类的成员变量中。

bool ZERO_ThreadPool::init(size_t num)
{
  std::unique_lock<std::mutex> lock(mutex_);
  if (!threads_.empty())
  {
    return false;
  }
  threadNum_ = num;
  return true;
}

threadNum_:保存线程的数量,在init函数中被赋值

此处使用unique_lock或lock_guard的加锁方式都能实现自动加锁和解锁。但是unique_lock可以进行临时解锁和再上锁,而lock_guard不行,特殊情况下还是必须使用unique_lock(用到条件变量的情况)。(lock_guard比较简单,相对来说性能要好一点)

threadpool::get

从任务队列中获取获取任务,这里其实就是我们的消费者模块

bool ZERO_ThreadPool::get(TaskFuncPtr& task)
{
  std::unique_lock<std::mutex> lock(mutex_);
  if (tasks_.empty()) //判断任务是否存在
  {
    //要终止线程池   bTerminate_设置为true,任务队列不为空
    condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });
  }
  if (bTerminate_)
    return false;
  if (!tasks_.empty())
  {
    task = std::move(tasks_.front());  // 使用了移动语义
    tasks_.pop(); //释放资源,释放一个任务
    return true;
  }
  return false;
}
  • 条件变量condition_.wait(lock, [this] { return bTerminate_ || !tasks_.empty(); });是需要一直等待条件完成才退出。即任务终止,或者任务队列不为空时,就会退出条件变量的阻塞状态,继续执行下面的逻辑。
  • task = std::move(tasks_.front()); 使用了移动语义,将 tasks_.front() 的内容移动到了 task 中。可以减少内容拷贝。移动完之后tasks_.front() 的内容会变为未指定的状态,所以直接pop掉就好了。

threadpool::run

这里是运行我们的任务部分。包括调用get在任务队列中获取任务,以及执行任务。

void ZERO_ThreadPool::run()  // 执行任务的线程
{
  //调用处理部分
  while (!isTerminate()) // 判断是不是要停止
  {
    TaskFuncPtr task;
    bool ok = get(task);        // 读取任务
    if (ok)
    {
      ++atomic_;
      try
      {
        if (task->_expireTime != 0 && task->_expireTime < TNOWMS)
        {//如果设置了超时,并且超时了,就需要执行本逻辑
        //超时任务,本代码未实现,有需要可实现在此处
        }
        else
        {
          task->_func();  // 执行任务
        }
      }
      catch (...)
      {
      }
      --atomic_;
      }
    }
  }
}

atomic_:运行一个任务,该参数+1;执行完毕,该参数-1。这里是为了待会停止线程池时判断是否还有运行中的任务(未完成的线程)。

threadpool.start

创建线程,并把线程池存入vector中,后面释放线程池时,好一一释放线程。

bool ZERO_ThreadPool::start()
{
  std::unique_lock<std::mutex> lock(mutex_);
  if (!threads_.empty())
  {
    return false;
  }
  for (size_t i = 0; i < threadNum_; i++)
  {
    threads_.push_back(new thread(&ZERO_ThreadPool::run, this));
  }
  return true;
}

threads_.push_back(new thread(&ZERO_ThreadPool::run, this));创建线程,线程的回调函数为run。

threadpool.exec

exec是将我们的任务存入任务队列中,这段代码是本项目最难的,用了很多C++的新特性。

/*
  template <class F, class... Args>
  它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数
  auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))>
  std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值
  返回值后置
  */
  template <class F, class... Args>
  auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>//接受一个超时时间 `timeoutMs`,一个可调用对象 `f` 和其它参数 `args...`,并返回一个 `std::future` 对象,该对象可以用于获取任务执行的结果。
  {
    int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);  // 根据超时时间计算任务的过期时间 `expireTime`,如果超时时间为 0,则任务不会过期。
    //定义返回值类型
    using RetType = decltype(f(args...));  // 使用 `decltype` 推导出 `f(args...)` 的返回值类型,并将其定义为 `RetType`(这里的using和typedef功能一样,就是为一个类型起一个别名)。
    // 封装任务 使用 `std::packaged_task` 将可调用对象 `f` 和其它参数 `args...` 封装成一个可执行的函数,并将其存储在一个 `std::shared_ptr` 对象 `task` 中。
    auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);  // 封装任务指针,设置过期时间 创建一个 `TaskFunc` 对象,并将任务的过期时间 `expireTime` 传递给它。
    fPtr->_func = [task]() {  // 具体执行的函数 将封装好的任务函数存储在 `TaskFunc` 对象的 `_func` 成员中,该函数会在任务执行时被调用。
      (*task)();
    };
    std::unique_lock<std::mutex> lock(mutex_);
    tasks_.push(fPtr);              // 将任务插入任务队列中
    condition_.notify_one();        // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notify
    return task->get_future();; //返回一个 `std::future` 对象,该对象可以用于获取任务执行的结果。
  }

使用了可变参数模板函数。

tasks_:保存任务的队列

condition_.notify_one():保存一个任务唤醒一个条件变量

std::future : 异步指向某个任务,然后通过future特性去获取任务函数的返回结果。

std::bind:将参数列表和函数绑定,生成一个新的可调用对象

std::packaged_task:将任务和feature绑定在一起的模板,是一种封装对任务的封装。

本函数用到了泛型编程模板函数,输入参数有3个:一个超时时间 timeoutMs,一个可调用对象 f 和参数 args...。采用返回值后置的方式返回一个std::future对象。这里采用返回值后置是为了方便使用decltype(f(args…)推导数据类型。

auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward(f), std::forward(args)…));是将我们传进来的任务函数和参数bind成一个对象,这个对象可以看作是一整个函数,其返回值就是RetType 类型,并且没有输入参数。所以用std::packaged_task<RetType()>这样的格式来打包封装。封装好的对象用智能指针(std::make_shared)来管理。

同时还要创建一个TaskFunc的对象,同样用智能指针管理,这个对象包括两项内容,一个就是超时时间,一个就是我们封装好的task对象。

通过TaskFuncPtr fPtr = std::make_shared(expireTime);和fPtr->_func = task {(*task)();};两条代码将这两项传进去。

最后会通过task->get_future()返回我们任务函数执行的结果返回值。

threadpool.waitForAllDone

等待所有任务执行完成。

bool ZERO_ThreadPool::waitForAllDone(int millsecond)
{
  std::unique_lock<std::mutex> lock(mutex_);
  if (tasks_.empty() && atomic_ == 0)
    return true;
  if (millsecond < 0)
  {
    condition_.wait(lock, [this] { return tasks_.empty() && atomic_ == 0; });
    return true;
  }
  else
  {
    return condition_.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return tasks_.empty() && atomic_ == 0; });
  }
}

使用条件变量来等待任务执行完成。支持超时执行功能。

此处unique_lock的使用是必须的: 条件变量condition_在wait时会进行unlock再进入休眠, lock_guard并无该操作接口

threadpool.stop

终止线程池。会调用waitForAllDone等待所有任务执行完成再终止。

void ZERO_ThreadPool::stop()
{
  {
    std::unique_lock<std::mutex> lock(mutex_);
    bTerminate_ = true;
    condition_.notify_all();
  }
  waitForAllDone();
  for (size_t i = 0; i < threads_.size(); i++)
  {
    if (threads_[i]->joinable())
    {
      threads_[i]->join();
    }
    delete threads_[i];
    threads_[i] = NULL;
  }
  std::unique_lock<std::mutex> lock(mutex_);
  threads_.clear();
}

通过join等线程执行完成后才返回。

主函数调用

class Test
{
public:
  int test(int i) {
    cout << _name << ", i = " << i << endl;
    Sleep(1000);
    return i;
  }
  void setName(string name) {
    _name = name;
  }
  string _name;
};
void test3() // 测试类对象函数的绑定
{
  ZERO_ThreadPool threadpool;
  threadpool.init(2);
  threadpool.start(); // 启动线程池
  Test t1;
  Test t2;
  t1.setName("Test1");
  t2.setName("Test2");
  auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);
  auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);
  cout << "t1 " << f1.get() << endl;
  cout << "t2 " << f2.get() << endl;
  threadpool.stop();
}
int main()
{
  test3(); // 测试类对象函数的绑定
  cout << "main finish!" << endl;
  return 0;
}

运行结果:


目录
相关文章
|
2月前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
56 1
|
2月前
|
Linux 编译器 测试技术
【C++】CentOS环境搭建-快速升级G++版本
通过上述任一方法,您都可以在CentOS环境中高效地升级G++至所需的最新版本,进而利用C++的新特性,提升开发效率和代码质量。
191 64
|
19天前
|
机器学习/深度学习 人工智能 自然语言处理
C++构建 GAN 模型:生成器与判别器平衡训练的关键秘籍
生成对抗网络(GAN)是AI领域的明星,尤其在C++中构建时,平衡生成器与判别器的训练尤为关键。本文探讨了GAN的基本架构、训练原理及平衡训练的重要性,提出了包括合理初始化、精心设计损失函数、动态调整学习率、引入正则化技术和监测训练过程在内的五大策略,旨在确保GAN模型在C++环境下的高效、稳定训练,以生成高质量的结果,推动AI技术的发展。
45 10
|
2月前
|
Linux 编译器 测试技术
【C++】CentOS环境搭建-快速升级G++版本
通过上述任一方法,您都可以在CentOS环境中高效地升级G++至所需的最新版本,进而利用C++的新特性,提升开发效率和代码质量。
237 63
|
1月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
112 38
|
27天前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
34 4
|
1月前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
86 2
|
1月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
95 4
|
1月前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
254 2
|
2月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
24 1