一. 什么是线程池?
线程池一种线程使用模式。线程过多会带来调度开销,进而影响整个进程的缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时线程创建和销毁线程的代价。线程池不仅能够保证内核充分利用多线程,还能防止过分调度。此外,可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景
需要大量的线程来完成任务,且完成任务的时间比较短的场景。 比如web服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间长太多了。
对性能要求苛刻的应用可以使用线程池。比如要求服务器迅速响应客户请求。
接受突发性的大量请求,但不至于使服务器因此突然产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但短时间内产生大量线程可能使内存到达极限,从而出现错误。
二. 为什么要有线程池?
降低资源消耗。通过重复利用已创建好的线程来降低线程创建和销毁时给系统带来的消耗。
提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即得到处理。
提高线程的可管理性。我们可以对线程池里的线程进行统一的分配,调优和监控。
问题:创建的线程越多性能越高,对吗?
答:不对。线程数量越多,可能会导致线程切换越频繁, 进而还有可能导致程序运行效率降低。多线程程序的运行效率, 呈现为正态分布, 线程数量从最开始的1开始逐渐增加, 程序的运行效率也逐渐变高, 直到线程数量达到一个临界值, 然后再次增加线程数量时, 程序的运行效率会减小(主要是由于频繁地线程切换影响到了整体线程运行效率)。
三. 模拟实现一个线程池
1. 线程池实现原理
线程池通过一个线程安全的阻塞任务队列加上一个或一个以上的线程实现。
线程池中的线程可以从阻塞任务队列中获取任务然后进行任务处理。
当线程都处于繁忙状态时可以将任务加入阻塞队列中,等到其它的线程空闲后进行处理。
2. 线程池基本框架
线程池的主体是一个任务队列和n个线程:
任务队列用来保存外部传入的需要解决的任务。
而线程池中的n个线程负责从任务队列中拿出任务并解决。这n个线程统一在线程池的构造函数中创建。
另外还需要一把互斥锁和一个条件变量:
互斥锁用来保护任务队列的数据安全,即维护多线程从任务队列中pop任务时的互斥关系。
条件变量用来维护多线程之间的同步关系,当任务队列为空时要求线程释放互斥锁并在条件变量下等待,这时任务队列中每插入一个任务就唤醒一个线程。
PS:类型模板参数T由我们创建线程池对象时显示传入,它代表我们要处理的任务的类型。
// 线程池中线程个数的缺省值 const size_t NUM = 5; template<class T> class ThreadPool { public: // 构造函数,负责初始化成员变量和创建好所有线程 ThreadPool(const size_t num = NUM) :_threadNum(num) ,_tids(num) { pthread_mutex_init(&_lock, nullptr); pthread_cond_init(&_empty, nullptr); pthread_t id; for(size_t i = 0; i < _threadNum; ++i) { pthread_create(&id, nullptr, Routine, this); _tids[i] = id; } } // 析构函数中负责销毁互斥锁、条件变量和线程池中的所有线程 ~ThreadPool() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_empty); for(size_t i = 0; i < _threadNum; ++i) { pthread_cancel(_tids[i]); } } // 任务队列判空 bool Empty() { return _taskQueue.empty(); } // 申请任务队列的互斥锁 void Lock() { pthread_mutex_lock(&_lock); } // 释放任务队列的互斥锁 void UnLock() { pthread_mutex_unlock(&_lock); } // 任务队列为空时,线程在_empty条件变量下等待 void Wait() { pthread_cond_wait(&_empty, &_lock); } // 唤醒一个在_empty下等待的线程 void WakeUp() { pthread_cond_signal(&_empty); } private: queue<T> _taskQueue; // 任务队列,用于存储push进来的任务 size_t _threadNum; // 记录线程池中线程的数量 vector<pthread_t> _tids;// 记录池中所有线程的线程id pthread_mutex_t _lock; // 保证任务队列数据安全的互斥锁 pthread_cond_t _empty; // 队列为空时线程在该条件变量下等待 };
3. 线程需要做的事
每一个线程死循环地执行:申请锁 -> 从任务队列中拿取数据 -> 释放锁 -> 解决任务。
PS:从任务队列中拿取任务前要做检查,如果队列为空则需要把该线程放到_empty条件变量下阻塞等待并释放互斥锁,直到任务队列中有新任务插入时再唤醒_empty这个条件变量下等待的线程去重新申请锁和从任务队列中拿取任务。
template<class T> class ThreadPool { public: // ... private: // ... // 线程池中的线程执行该函数不断地从任务队列中拿任务并解决任务 static void* Routine(void* arg) { pthread_detach(pthread_self()); ThreadPool* argThis = (ThreadPool*)arg; while(true) { argThis->Lock(); while(argThis->Empty()) { argThis->Wait(); } T task; argThis->Pop(task); argThis->UnLock(); task.Run();// 任务类对象专门有个Run函数来解决任务 } } };
问题1:为什么要把线程执行的函数设为static?
因为我们是在线程池对象的构造函数中去创建的线程和在析构函数中去销毁这些线程,所以最好把线程执行的函数Routine也封装到线程池这个类中,可是线程执行函数只能有一个void* 类型的参数,如果我们把Routine函数写到了类中就会存在两个参数导致编译不通过(作为类的非静态成员函数,它的第一个位置的参数默认是这个类对象的this指针,而线程执行函数本身规定只能有一个void*类型的参数)
对此我们的解决办法是把线程执行函数设为类的静态成员函数并在创建线程时把对象的this指针传入作为执行函数的唯一参数传入,这样在Routine函数内部也可以通过this指针访问到类对象的所有成员了。
问题2:判断任务队列是否为空时,为什么用while而不用if语句呢?
要知道当任务队列为空时就不允许线程再到队列中拿取任务了,此时已经申请到锁的线程必须在条件变量下等待,直到队列中有了新任务后再唤醒。这里如果用if语句给任务队列判空的话,万一这个线程执行条件变量等待的操作失败了,就会结束if语句继续往后执行取任务的操作,那么整个逻辑就乱套了。如果用while循环语句的话就能够保证,不论线程等待成功与否,继续往后执行的条件一定是队列中有任务了。
4. 从任务队列中插入、拿取任务
外部调用线程池对象的void ThreadPool::Push(...)函数,向任务队列中插入一个任务。
池中的线程调用线程池对象的void ThreadPool ::Pop(...)函数,从任务队列中拿出一个任务并通过输出型参数返回。
template<class T> class ThreadPool { public: // ... // 外部调用该函数向任务队列中插入一个任务 void Push(const T& task) { Lock(); _taskQueue.push(task); UnLock(); WakeUp(); } private: // ... // 线程池中的线程调用该成员函数获得任务队列中的一个任务 void Pop(T& task) { task = _taskQueue.front(); _taskQueue.pop(); } };
5. 线程池类完整代码
// 线程池线程个数缺省值 const size_t NUM = 5; template<class T> class ThreadPool { public: // 构造函数,负责初始化成员变量和创建线程 ThreadPool(const size_t num = NUM) :_threadNum(num) ,_tids(num) { pthread_mutex_init(&_lock, nullptr); pthread_cond_init(&_empty, nullptr); pthread_t id; for(size_t i = 0; i < _threadNum; ++i) { pthread_create(&id, nullptr, Routine, this); _tids[i] = id; } } // 析构函数中负责销毁互斥锁、条件变量和线程池中的线程 ~ThreadPool() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_empty); for(size_t i = 0; i < _threadNum; ++i) { pthread_cancel(_tids[i]); } } // 任务队列判空 bool Empty() { return _taskQueue.empty(); } // 申请任务队列的互斥锁 void Lock() { pthread_mutex_lock(&_lock); } // 释放任务队列的互斥锁 void UnLock() { pthread_mutex_unlock(&_lock); } // 任务队列为空时,线程在_empty条件变量下等待 void Wait() { pthread_cond_wait(&_empty, &_lock); } // 唤醒一个在_empty下等待的线程 void WakeUp() { pthread_cond_signal(&_empty); } // 外部调用该函数向任务队列中插入一个任务 void Push(const T& task) { Lock(); _taskQueue.push(task); UnLock(); WakeUp(); } private: // 线程池中的线程调用该成员函数获得任务队列中的一个任务 void Pop(T& task) { task = _taskQueue.front(); _taskQueue.pop(); } // 线程池中的线程执行该函数不断从任务队列中拿任务并解决任务 static void* Routine(void* arg) { pthread_detach(pthread_self()); ThreadPool* argThis = (ThreadPool*)arg; while(true) { argThis->Lock(); while(argThis->Empty()) { argThis->Wait(); } T task; argThis->Pop(task); argThis->UnLock(); task.Run(); } } // 成员变量 vector<pthread_t> _tids;// 记录池中所有线程的线程id size_t _threadNum; // 记录线程池中线程的数量 queue<T> _taskQueue; // 任务队列,用于存储push进来的任务 pthread_mutex_t _lock; // 保证任务队列数据安全的互斥锁 pthread_cond_t _empty; // 队列为空时线程在该条件变量下等待 };
6. 自己定义的任务类
每个线程池对象可以解决一种类型的任务,这个任务的相关数据和解决逻辑需要我们自己写,解决逻辑部分需要专门写到类的公有成员函数Task::Run()中,这样线程池中的线程在拿到任务后就可以统一执行该函数来解决任务。
下面我们自己写一个任务类,它的功能是进行正整数的加减乘除、取模运算:
class Task { public: // 构造函数要求创建任务对象时显示传入相关数据 Task(int x, int y, char op) :_x(x) ,_y(y) ,_op(op) {} // 解决任务的Run函数 void Run() { int ret = 0; switch(_op) { case '+': ret = _x + _y; break; case '-': ret = _x - _y; break; case '*': ret = _x * _y; break; case '/': if(_y == 0) { cerr<<"div is error"<<endl; break; } else { ret = _x / _y; break; } case '%': if(_y == 0) { cerr<<"div is error"<<endl; break; } else { ret = _x % _y; break; } default: cerr<<"operation error"<<endl; break; } cout<<"Thread["<<pthread_self()<<"]:"<<_x<<' '<<_op<<' '<<_y<<' '<<'='<<' '<<ret<<endl; } // 运算数以及运算符 private: int _x; int _y; char _op; };
7. 测试线程池
结合上面声明的线程池类和任务类,接下来对我们写的线程池进行测试。在主线程中执行以下逻辑:
创建一个任务类型为class Task的线程池对象。
通过种子随机生成不同的运算数和运算符来构造Task对象。
把Task对象不断地插入到任务队列中,让线程池中的线程去解决任务。
int main() { srand((size_t)time(nullptr)); char opAll[5] = {'+', '-', '*', '/', '%'}; // 创建一个任务类型为Task的线程池对象 ThreadPool<Task> tp; // 不断地向线程池对象中派发任务,让里面的线程来解决这个任务 while(true) { int x = rand()%100 + 1; int y = rand()%100 + 1; char op = opAll[rand()%5]; tp.Push(Task(x, y, op)); sleep(1); } return 0; }
编译运行:
四. 学习总结