1 组成
- 线程(生产者)
- 发布任务
- 队列
- 任务上下文
- 任务执行函数
- 线程池(消费者)
- 取出任务
- 执行任务
- 线程调度方式
2 作用
- 复用线程资源
- 节省线程创建的销毁的开销
- 可以异步处理生产者发布的任务
- 提高处理多个任务的效率
3 封装
3.1 变量对象
- 任务队列
- 工作线程数组(向量)
- 互斥锁
- 条件变量
- 状态标志
- 线程数量
- 原子变量(任务数量)
3.2 任务结构体
struct TaskFunc { TaskFunc(expireTime) : _expireTime(expireTime) {} std::function<void()> _func; uint64_t _expireTime = 0; } typedef std::shared_pte<TaskFunc> TaskFuncPtr;
3.3 构造函数
- 初始化线程数量
- 初始化状态标志
ThreadPool::ThreadPool() :thread_num_(1), terminate_(false) {}
3.4 析构函数
- 执行Stop函数
ThreadPool::~ThreadPool() { Stop(); }
3.5 线程池初始化
- 支持手动设置线程数量
bool ThreadPool::Init(int64_t num) { std::unique_lock<std::mutex> lock(mutex_); if(!threads_.empty()) return false; thread_num_ = num; return true; }
3.6 线程池停止
- 修改状态标志
- 唤醒所有线程,让他们在后台执行完后退出,释放
- 清空线程数组
void ThreadPool::Stop() { { std::unique_lock<std::mutex> lock(mutex_); terminate_ = true; condition_.notify_all(); } for(int i = 0; i < threads_.size(); i++) { if(threads_[i]->joinable()) threads_[i]->join(); delete threads_[i]; threads_[i] = nullptr; } std::unique_lock<std::mutex> lock(mutex_); threads_.clear(); }
3.7 获取线程池线程数量
int ThreadPool::GetThreadNum() { return threads_.size(); }
3.8 获取任务数量
int ThreadPool::GetTaskNum() { return tasks_.size(); }
3.9 线程池开启
- 创建thread_num个线程
- 用Run函数作为线程的执行函数
bool ThreadPool::Start() { std::unique_lock<std::mutex> lock(mutex_); if(!threads_.empty()) return false; for(size_t i = 0; i < thread_num_; i++) { threads_.push_back(new std::thread(&ThreadPool::Run, this)); } return true; }
3.10 传入任务
- 将(限定时间),任务函数,任务呢函数添加进任务队列
auto Exec(F &&f, Args... args) -> std::future<decltype(f(args...))> { Exec(0, f, args...); }
3.11 等待函数
- 如果任务队列为空,返回
- 如果传入时间小于0,等到所有任务执行完
- 如果传入时间大于0,规定时间执行任务,返回队列是否为空
bool ThreadPool::WaitForAllDone(int timewait) { std::unique_lock<std::mutex> lock(mutex_); if(tasks_.empty()) return true; if(timewait < 0) { condition_.wait(lock, [this]{return tasks_.empty();}); return true; } else { return condition_.wait_for(lock, std::chrono::milliseconds(timewait), [this]{return tasks_.empty();}); } }
3.12 获取任务
- 如果任务队列为空,释放互斥锁等待;直到有任务或者认为 停止线程池
- 如果任务队列不为空,获取并且释放一个任务
bool ThreadPool::Get(TaskFuncPtr &task) { std::unique_lock<std::mutex> lock(mutex_); if(tasks_.empty()) { condition_.wait(lock, [this] { return terminate_ || !tasks_.empty(); }); } if(terminate_) return false; if(!tasks_.empty()) { task = std::move(tasks_.front()); tasks_.pop(); return true; } return false; }
3.13 执行任务
- 如果成功获取任务,执行任务+1,执行任务
- 执行任务结束后,执行任务-1
- 通知等待函数
void ThreadPool::Run() { while (!terminate_) { TaskFuncPtr task; bool ok = Get(task); if(ok) { atomic_++; try { if(task->_expireTime > 0 && task->_expireTime > GetNowMs()) { //任务超时处理 } else task->_func; } catch (...) {} atomic_--; std::unique_lock<std::mutex> lock(mutex_); if(atomic_ == 0 && tasks_.empty()) condition_.notify_all(); } } }