简介
半同步半异步线程池用的比较多,实现也比较简单。
其中同步层包括同步服务层和排队层,指的是将接收的任务排队,将所有的任务排队到一个队列中,等待处理;
异步层指多个线程处理任务,异步处理层从同步层取出任务,并发处理任务。
同步队列
同步队列属于同步层的内容,主要作用是保证队列中共享数据线程安全,同时也提供新增任务的接口,以及提供取任务的接口。
这里使用C++11的锁、条件变量、右值引用、std::move和std::forward来实现。
同步队列主要包括三个函数,Take、Add和Stop。
Take函数
这里实现重载了两个Take函数,可支持一次获取多个任务,或者一次获取一个任务。
//可一次性获取多个任务,放在list中,减少互斥锁阻塞时间 void Take(std::list<T>& list) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); if (m_needStop) { return; } list = std::move(m_queue); m_notFull.notify_one(); } //获取单个任务 void Take(T& t) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); if (m_needStop) { return; } t = m_queue.front(); m_queue.pop_front(); m_notFull.notify_one(); }
先创建一个unique *lock 获取 mutex,然后再通过条件变量 m_*notEmpty 来等待判断式。判断式由两个条件组成,一个是停止的标志,另一个是不为空的条件,当不满足任何一个条件时,条件变量会释放 mutex 并将线程置于 waiting 状态,等待其他线程调用 notify_one/notify all 将其唤醒;当满足任何一个条件时,则继续往下执行后面的逻辑,即将队列中的任务取出,并唤醒一个正处于等待状态的添加任务的线程去添加任务。当处于 waiting 状态的线程被 notify_one 或notify all 唤醒时,条件变量会先重新获取 mutex,然后再检查条件是否满足,如果满足,则往下执行,如果不满足,则释放 mutex 继续等待。
Add函数
Add 的过程和 Take 的过程是类似的,也是先获取 mutex,然后检查条件是否满足,不满足条件时,释放 mutex 继续等待,如果满足条件,则将新的任务插入到队列中,并唤醒取任务的线程去取数据。
template<typename F> void Add(F &&x) { std::unique_lock<std::mutex> locker(m_mutex); m_notFull.wait(locker, [this] {return m_needStop || NotFull(); }); if (m_needStop) return; m_queue.emplace_back(std::forward<F>(x)); m_notEmpty.notify_one(); }
Stop函数
Stop 函数先获取 mutex,然后将停止标志置为 true。注意,为了保证线程安全,这里需要先获取 mutex,在将其标志置为 true 之后,再唤醒所有等待的线,因为等待的条件是m_needStop,并且满足条件,所以线程会继续往下执行。由于线程在 m_needStop 为 true 时会退出,所以所有的等待线程会相继退出。
另外一个值得注意的地方是,我们把 m notFull.notify_all0放到lock_guard 保护范围之外了,这里也可以将 m_notFull.notify all0)放到ockguard保护范围之内,放到外面是为了做一点优化。因为 notify_one 或 notify_all 会唤醒一个在等待的线程,线程被唤醒后会先获取 mutex 再检查条件是否满足,如果这时被 lock guard保护,被唤醒的线程则需要 lock guard 析构释放 mutex 才能获取(即stop函数执行完了才释放)。如果在 lock_guard 之外notify_one 或notify_all,被唤醒的线程获取锁的时候不需要等待 lock_guard 释放锁,性能会好一点,所以在执行 notify_one或notify_all 时不需要加锁保护。
void Stop() { { std::lock_guard<std::mutex> locker(m_mutex); m_needStop = true; } m_notFull.notify_all(); m_notEmpty.notify_all(); }
SyncQueue完整代码
”SyncQueue.h”
同步队列整体代码:
#pragma once #include <iostream> #include <list> #include <mutex> using namespace std; template<typename T> class SyncQueue { public: SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false) { } void Put(const T &x) { Add(x); } void Put(T &&x) { Add(std::forward<T>(x)); } //可一次性获取多个任务,放在list中,减少互斥锁阻塞时间 void Take(std::list<T>& list) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); if (m_needStop) { return; } list = std::move(m_queue); m_notFull.notify_one(); } //获取单个任务 void Take(T& t) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); if (m_needStop) { return; } t = m_queue.front(); m_queue.pop_front(); m_notFull.notify_one(); } void Stop() { { std::lock_guard<std::mutex> locker(m_mutex); m_needStop = true; } m_notFull.notify_all(); m_notEmpty.notify_all(); } bool Empty() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.empty(); } bool Full() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size() == m_maxSize; } //可以获取任务数量 int Count() { return m_queue.size(); } private: bool NotFull() const { bool full = m_queue.size() >= m_maxSize; if (full) { cout << "缓冲区满了,需要等待。。。" << endl; } return !full; } bool NotEmpty() const { bool empty = m_queue.empty(); if (empty) { cout << "缓冲区空了,需要等待。。。,异步层的线程ID:" << this_thread::get_id() << endl; } return !empty; } template<typename F> void Add(F &&x) { std::unique_lock<std::mutex> locker(m_mutex); m_notFull.wait(locker, [this] {return m_needStop || NotFull(); }); if (m_needStop) return; m_queue.emplace_back(std::forward<F>(x)); m_notEmpty.notify_one(); } private: std::list<T> m_queue; //缓冲区 std::mutex m_mutex; //互斥量 std::condition_variable m_notEmpty; //不为空的条件变量 std::condition_variable m_notFull; //没有满的条件变量 int m_maxSize; //同步队列最大的size bool m_needStop; //停止的标志 };
线程池
“ThreadPool.h”
线程池ThreadPool有3个成员变量,一个是线程组,这个线程组中的线程是预先创建的,应该创建多少个线程由外面传人,一般建议创建 CPU 核数的线程以达到最优的效率,线程组循环从同步队列中取出任务并执行,如果线程池为空,线程组将处于等待状态,等待任务的到来。
另一个成员变量是同步队列,它不仅用来做线程同步,还用来限制同步队列的上限,这个上限也是由使用者设置的。
第三个成员变量是用来停止线程池的,为了保证线程安全,我们用到了原子变量 atomic bool。下一节中将展示使用这个半同步半异步的线程池的实例。
#include<list> #include<thread> #include<functional> #include<memory> #include<atomic> #include "SyncQueue.h" const int MaxTaskCount = 100; class ThreadPool { public: using Task = std::function<void()>; ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount) { Start(numThreads); } ~ThreadPool(void) { Stop(); } void Stop() { //保证多线程情况下只调用一次 StopThreadGroup std::call_once(m_flag, [this] {StopThreadGroup(); }); } //可输入右值,例如lambda表达式 void AddTask(Task&& task) { m_queue.Put(std::forward<Task>(task)); } void AddTask(const Task& task) { m_queue.Put(task); } void Start(int numThreads) { m_running = true; //创建线程组 for (int i = 0; i < numThreads; ++i) { m_threadgroup.emplace_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this)); } } private: void RunInThread() { while (m_running) { //取任务分别执行 std::list<Task> list; m_queue.Take(list); for (auto& task : list) { if (!m_running) return; task(); } } } void StopThreadGroup() { m_queue.Stop(); //让同步队列中的线程停止 m_running = false; //置为false,让内部线程跳出循环并退出 for (auto thread : m_threadgroup) { if (thread) thread->join(); } m_threadgroup.clear(); } std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组 SyncQueue<Task> m_queue; //同步队列 atomic_bool m_running; //是否停止的标志 std::once_flag m_flag; };
主函数测试
#include <iostream> #include "ThreadPool.h" using namespace std; void TestThdPool() { ThreadPool pool(2);//创建一个2个线程的线程池 //创建一个线程来添加10个任务1 std::thread thd1([&pool] { for (int i = 0; i < 10; i++) { auto thdId = this_thread::get_id(); pool.AddTask([thdId] {//添加任务可以使用lambda表达式,代码中实现了右值作为参数输入 cout << "同步线程1的线程ID:" << thdId << endl; }); } }); //创建一个线程来添加20个任务2 std::thread thd2([&pool] { for (int i = 0; i < 20; i++) { auto thdId = this_thread::get_id(); pool.AddTask([thdId] { cout << "同步线程2的线程ID:" << thdId << endl; }); } }); this_thread::sleep_for(std::chrono::seconds(2)); getchar(); pool.Stop(); thd1.join(); thd2.join(); } int main() { TestThdPool(); return 0; }
运行结果: