基于C++11的线程池

简介: 基于C++11的线程池

1 组成

  • 线程(生产者)
  • 发布任务
  • 队列
  • 任务上下文
  • 任务执行函数
  • 线程池(消费者)
  • 取出任务
  • 执行任务
  • 线程调度方式

2 作用

  • 复用线程资源
  • 节省线程创建的销毁的开销
  • 可以异步处理生产者发布的任务
  • 提高处理多个任务的效率

3 封装

3.1 变量对象

  • 任务队列
  • 工作线程数组(向量)
  • 互斥锁
  • 条件变量
  • 状态标志
  • 线程数量
  • 原子变量(任务数量)

3.2 任务结构体

struct TaskFunc {
    TaskFunc(expireTime) : _expireTime(expireTime) {}
    std::function<void()> _func;
    uint64_t _expireTime = 0;
}
typedef std::shared_pte<TaskFunc> TaskFuncPtr;

3.3 构造函数

  • 初始化线程数量
  • 初始化状态标志
ThreadPool::ThreadPool() :thread_num_(1), terminate_(false) {}

3.4 析构函数

  • 执行Stop函数
ThreadPool::~ThreadPool() {
    Stop();
}

3.5 线程池初始化

  • 支持手动设置线程数量
bool ThreadPool::Init(int64_t num) {
    std::unique_lock<std::mutex> lock(mutex_);
    if(!threads_.empty())
        return false;
    thread_num_ = num;
    return true;
}

3.6 线程池停止

  • 修改状态标志
  • 唤醒所有线程,让他们在后台执行完后退出,释放
  • 清空线程数组
void ThreadPool::Stop() {
    {
        std::unique_lock<std::mutex> lock(mutex_);
        terminate_ = true;
        condition_.notify_all();
    }
    for(int i = 0; i < threads_.size(); i++) {
        if(threads_[i]->joinable())
            threads_[i]->join();
        delete threads_[i];
        threads_[i] = nullptr;
    }
    std::unique_lock<std::mutex> lock(mutex_);
    threads_.clear();
}

3.7 获取线程池线程数量

int ThreadPool::GetThreadNum() {
    return threads_.size();
}

3.8 获取任务数量

int ThreadPool::GetTaskNum() {
    return tasks_.size();
}

3.9 线程池开启

  • 创建thread_num个线程
  • 用Run函数作为线程的执行函数
bool ThreadPool::Start() {
    std::unique_lock<std::mutex> lock(mutex_);
    if(!threads_.empty())
        return false;
    for(size_t i = 0; i < thread_num_; i++) {
        threads_.push_back(new std::thread(&ThreadPool::Run, this));
    }
    return true;
}

3.10 传入任务

  • 将(限定时间),任务函数,任务呢函数添加进任务队列
auto Exec(F &&f, Args... args) -> std::future<decltype(f(args...))> {
        Exec(0, f, args...);
}

3.11 等待函数

  • 如果任务队列为空,返回
  • 如果传入时间小于0,等到所有任务执行完
  • 如果传入时间大于0,规定时间执行任务,返回队列是否为空
bool ThreadPool::WaitForAllDone(int timewait) {
    std::unique_lock<std::mutex> lock(mutex_);
    if(tasks_.empty())
        return true;
    if(timewait < 0) {
        condition_.wait(lock, [this]{return tasks_.empty();});
        return true;
    }
    else {
        return condition_.wait_for(lock, std::chrono::milliseconds(timewait),
            [this]{return tasks_.empty();});
    }
}

3.12 获取任务

  • 如果任务队列为空,释放互斥锁等待;直到有任务或者认为 停止线程池
  • 如果任务队列不为空,获取并且释放一个任务
bool ThreadPool::Get(TaskFuncPtr &task) {
    std::unique_lock<std::mutex> lock(mutex_);
    if(tasks_.empty()) {
        condition_.wait(lock, [this] {
            return terminate_ || !tasks_.empty();
        });
    }
    if(terminate_)
        return false;
    if(!tasks_.empty()) {
        task = std::move(tasks_.front());
        tasks_.pop();
        return true;
    }
    return false;
}

3.13 执行任务

  • 如果成功获取任务,执行任务+1,执行任务
  • 执行任务结束后,执行任务-1
  • 通知等待函数
void ThreadPool::Run() {
    while (!terminate_) {
        TaskFuncPtr task;
        bool ok = Get(task);
        if(ok) {
            atomic_++;
            try {
                if(task->_expireTime > 0 && task->_expireTime > GetNowMs()) {
                    //任务超时处理
                }
                else
                    task->_func;
            }
            catch (...) {}
            atomic_--;
            std::unique_lock<std::mutex> lock(mutex_);
            if(atomic_ == 0 && tasks_.empty())
                condition_.notify_all();
        }
    }
}

相关文章
|
1月前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
49 7
|
1月前
|
消息中间件 存储 安全
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
58 1
C++ 多线程之初识多线程
|
2月前
|
存储 并行计算 安全
C++多线程应用
【10月更文挑战第29天】C++ 中的多线程应用广泛,常见场景包括并行计算、网络编程中的并发服务器和图形用户界面(GUI)应用。通过多线程可以显著提升计算速度和响应能力。示例代码展示了如何使用 `pthread` 库创建和管理线程。注意事项包括数据同步与互斥、线程间通信和线程安全的类设计,以确保程序的正确性和稳定性。
|
2月前
|
存储 前端开发 C++
C++ 多线程之带返回值的线程处理函数
这篇文章介绍了在C++中使用`async`函数、`packaged_task`和`promise`三种方法来创建带返回值的线程处理函数。
79 6
|
2月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
84 5
|
2月前
|
C++
C++ 多线程之线程管理函数
这篇文章介绍了C++中多线程编程的几个关键函数,包括获取线程ID的`get_id()`,延时函数`sleep_for()`,线程让步函数`yield()`,以及阻塞线程直到指定时间的`sleep_until()`。
37 0
C++ 多线程之线程管理函数
|
2月前
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
33 0
Linux C/C++之线程基础
|
5月前
|
算法 编译器 C++
开发与运维线程问题之在C++的原子操作中memory_order如何解决
开发与运维线程问题之在C++的原子操作中memory_order如何解决
51 2
|
4月前
|
Dart 编译器 API
Dart ffi 使用问题之在C++线程中无法直接调用Dart函数的问题如何解决
Dart ffi 使用问题之在C++线程中无法直接调用Dart函数的问题如何解决