【C++并发编程】std::future、std::async、std::packaged_task与std::promise的深度探索(二)https://developer.aliyun.com/article/1464317
六、并行类和线程池
并行库
std::future
是C++标准库的一部分,它表示将来可能在其他线程上计算出的一个值。std::future
本身并不直接涉及线程池。然而,它通常与如std::async
等机制结合使用,这些机制可以利用线程池执行异步任务。
事实上,std::async
的行为取决于给它的参数。如果传入参数 std::launch::async
,它将在新线程中执行任务。如果传入参数 std::launch::deferred
,任务将在调用 std::future::get()
时同步运行。无论如何,std::async
的实现可以使用线程池,这取决于标准库的实现和系统限制。
总之,std::future
并不直接与线程池有关,但它可以与使用线程池的异步执行机制一起使用。
C++标准库中,并没有直接提供线程池功能。std::future
和 std::async
只提供了一种基本的异步执行方式,所以在C++标准库中,你无法直接控制线程池的细节,例如工作线程数量、可调参数等。要实现这种控制,你可以创建自定义线程池,或使用已有的开源线程池库。
std::packaged_task
也可以与线程池一起使用,但它本身并不是一个线程池实现。std::packaged_task
是一个在C++中包装可调用对象的类模板,它允许将函数与一个 std::future
结合使用。当该可调用对象(函数、lambda表达式或函数对象)被调用时,std::packaged_task
会将结果存储起来,并使关联的 std::future
变得就绪。
你可以使用 std::packaged_task
创建任务,然后将这些任务提交给一个线程池。这使得在线程池中执行的任务能够返回一个 std::future
对象,从而对任务结果进行异步访问。
选择权衡
线程池通常更适用于长时间运行的任务,因为线程池意味着在执行时间较长的任务时可以复用线程资源。这样就能避免频繁地创建和销毁线程所带来的性能损失。线程池还允许你控制并发线程的数量来满足特定性能需求或系统限制。
而对于短时间且不频繁的任务,使用并行库(如C++标准库中的 std::async
、Intel TBB、Microsoft PPL和C++ Boost.Asio库)可能更恰当。这些库在只需执行少量任务时可以提供简便的接口,并避免为管理线程池带来额外的复杂性。并行库通常会处理线程创建和销毁的资源管理问题,因此对于这些罕见的任务是一个不错的选择。
请注意,在具体选择如何并发执行任务时,任务的性质(如任务是否有优先级、是否需要同步之类)以及所使用的库(它们会有不同的功能和优化)也是应该考虑的因素。
自定义线程池
以下是一个简单的自定义线程池示例:
#include <iostream> #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> class ThreadPool { public: ThreadPool(size_t num_threads); ~ThreadPool(); void enqueue(std::function<void()> task); private: void worker(); std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex tasks_mutex; std::condition_variable tasks_cv; bool stop; }; ThreadPool::ThreadPool(size_t num_threads) : stop(false) { for (size_t i = 0; i < num_threads; ++i) { workers.emplace_back(&ThreadPool::worker, this); } } ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(tasks_mutex); stop = true; } tasks_cv.notify_all(); for (auto &worker : workers) { worker.join(); } } void ThreadPool::enqueue(std::function<void()> task) { { std::unique_lock<std::mutex> lock(tasks_mutex); tasks.push(task); } tasks_cv.notify_one(); } void ThreadPool::worker() { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(tasks_mutex); tasks_cv.wait(lock, [this]() { return !tasks.empty() || stop; }); if (stop && tasks.empty()) { return; } task = tasks.front(); tasks.pop(); } task(); } }
通过以上自定义线程池实现,你可以自由地控制线程池的大小,以及对任务队列进行管理。此外,还有许多开源线程池库可供选择,例如 Intel TBB,Microsoft PPL和C++ Boost.Asio库。这些库为多线程编程提供了更多的优化和高级控制。
并行库对线程池的帮助
C++中的并行类,包括std::thread、std::future、std::async、std::packaged_task和std::promise等,可以用来实现线程池,这对于提高多核处理器的利用率,减少线程创建和销毁的开销,以及提高程序的响应性能具有重要的帮助。下面我们详细讨论这些类如何辅助实现线程池。
1. std::thread
std::thread 是 C++ 的线程库中的基础,它可以用来创建和管理线程。在实现线程池时,我们通常会创建一组线程并保存在容器中(例如std::vector)。这些线程在创建时会开始执行一个特定的函数,这个函数通常是一个无限循环,不断从任务队列中取出任务并执行。
2. std::future和std::promise
std::future 和 std::promise 可以用来传递和获取任务的结果。在实现线程池时,我们通常会为每个任务创建一个 std::promise 对象,并将对应的 std::future 对象返回给调用者。当任务完成时,工作线程将结果设置到 std::promise 对象中,调用者可以通过 std::future 对象获取结果。
3. std::async
std::async 是一种简单的异步编程工具,它可以用来启动一个异步任务并返回一个 std::future 对象。虽然 std::async 本身并不适合用来实现线程池(因为它总是创建新的线程),但是我们可以借鉴它的设计来简化线程池的接口。具体来说,我们可以提供一个类似于 std::async 的函数,这个函数接受一个可调用对象和一组参数,将它们封装成任务并添加到任务队列中,然后返回一个 std::future 对象。
4. std::packaged_task
std::packaged_task 可以看作是一个包装了可调用对象的类,它将可调用对象和一个 std::promise 对象绑定在一起。当调用 std::packaged_task 对象时,它会调用内部的可调用对象,并将结果保存到 std::promise 对象中。在实现线程池时,我们可以用 std::packaged_task 来封装任务,这样就可以将任何可调用对象转换为一个可以放入任务队列的统一类型。
这些并行类提供了创建线程、异步执行任务和传递任务结果等基础功能,使得我们可以在 C++ 中实现高效的线程池。而线程池的使用可以更好地控制线程的数量,避免过多的线程创建和销毁带来的开销,提高多核处理器的利用率,从而提高程序的性能。
类名 | 功能描述 | 实现线程池的作用 | 用户编程的角度 | 实用性 |
std::thread | 用于创建和管理线程 | 线程池的基础,负责执行任务 | 简单易用,但需要手动管理线程生命周期 | 高 |
std::future | 用于获取异步任务的结果 | 提供任务结果的获取方式,使调用者可以等待任务完成并获取结果 | 提供了一种安全且简单的方式来获取异步任务的结果 | 高 |
std::promise | 用于设置异步任务的结果 | 提供任务结果的设置方式,使工作线程可以设置任务的结果 | 需要与 std::future 配合使用,使用稍复杂 | 中 |
std::async | 用于启动异步任务 | 可以借鉴其设计来简化线程池的接口 | 非常简单易用,但不适合用于实现线程池 | 中 |
std::packaged_task | 用于封装任务 | 可以将任何可调用对象封装为任务,使任务可以被放入队列 | 简化了任务的创建和结果的传递,但需要手动管理其生命周期 | 高 |
并行库与线程池结合
以下是使用 std::thread
,std::future
,std::promise
,std::async
和 std::packaged_task
的自定义线程池实现。
#include <iostream> #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <future> class ThreadPool { public: // 构造函数: 创建指定数量的工作线程 // Constructor: creates the specified number of worker threads ThreadPool(size_t num_threads); // 析构函数: 关闭所有线程并释放资源 // Destructor: stops all threads and releases resources ~ThreadPool(); // 任务入队函数: 将任务添加到任务队列中 // Enqueue function: adds a task to the task queue template <typename F, typename... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; private: // 工作线程执行函数 // Worker thread execution function void worker(); std::vector<std::thread> workers; // 工作线程 std::queue<std::function<void()>> tasks; // 任务队列 std::mutex tasks_mutex; // 保护任务队列的互斥锁 std::condition_variable tasks_cv; // 通知工作线程的条件变量 bool stop; // 标记线程池是否停止 }; ThreadPool::ThreadPool(size_t num_threads) : stop(false) { for (size_t i = 0; i < num_threads; ++i) { workers.emplace_back(&ThreadPool::worker, this); } } ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(tasks_mutex); stop = true; } tasks_cv.notify_all(); for (auto &worker : workers) { worker.join(); } } template <typename F, typename... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; // 创建 packaged_task,包装任务,将任务与 future 关联 auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); std::future<return_type> result = task->get_future(); { // 将任务包装为 std::function,并添加到任务队列 std::unique_lock<std::mutex> lock(tasks_mutex); tasks.emplace([task](){ (*task)(); }); } tasks_cv.notify_one(); // 通知一个工作线程 return result; } void ThreadPool::worker() { while (true) { std::function<void()> task; // 从任务队列中获取任务 { std::unique_lock<std::mutex> lock(tasks_mutex); tasks_cv.wait(lock, [this]() { return !tasks.empty() || stop; }); // 如果线程池已停止且没有剩余任务,则退出 if (stop && tasks.empty()) { return; } task = tasks.front(); tasks.pop(); } // 执行任务 task(); } }
此实现中,请注意以下关键部分:
- 构造函数初始化线程池并创建指定数量的工作线程。
enqueue()
函数是任务入队方法,可以将任务添加到任务队列中。它会创建一个std::packaged_task
并将任务与关联的std::future
对象关联起来。该方法返回一个std::future
对象,调用者可以使用它来获取异步任务的结果。- 线程池内的工作线程会等待并从任务队列中获取任务。执行完任务后,任务所对应的
std::future
对象将变为就绪状态,可以获取任务结果。 - 析构函数会停止所有工作线程并释放资源。
这个线程池提供了基本的线程管理功能,你可以根据需要进行扩展以支持其他功能,例如控制线程数量或提供任务优先级。
结语
在我们的探索过程中,我们已经深入了解了C++并发编程的强大功能和广泛应用。然而,学习这些技术只是开始。真正的力量来自于你如何将它们融入到你的日常工作中,以提高效率和生产力。
心理学告诉我们,学习是一个持续且积极参与的过程。所以,我鼓励你不仅要阅读和理解这些命令,还要动手实践它们。尝试创建自己的命令,逐步掌握C++并发编程,使其成为你日常工作的一部分。
同时,请记住分享是学习过程中非常重要的一环。如果你发现本博客对你有帮助,请不吝点赞并留下评论。分享你自己在使用C++并发编程时遇到的问题或者有趣的经验,可以帮助更多人从中学习。
此外,我也欢迎你收藏本博客,并随时回来查阅。因为复习和反复实践也是巩固知识、提高技能的关键。
最后,请记住:每个人都可以通过持续学习和实践成为C++并发编程专家。我期待看到你在这个旅途中取得更大进步!