线程池
线程池一种线程的使用模式。本质上创建一个新线程是需要成本的并且线程过多会带来调度开销,进而影响缓存局部性和整体性能,所以线程池的思想就是提前创建好一批线程,要用的时候直接就可以用,不需要等到需要的时候再创建。
线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。不仅能够保证内核的充分利用,还能防止过分调度。
可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量
应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短,例如WEB服务器完成网页请求这样的任务。
- 对性能要求苛刻的应用,例如要求服务器迅速响应客户请求
- 接受突发性的大量请求,但不至于是服务器因此产生大量线程的应用
基于线程池实现生产者消费者模型
要求
- 创建固定数量的线程池,循环从任务队列中获取任务对象。任务队列没有任务时所有线程阻塞等待
- 获取到任务对象后,执行任务对象中的任务接口
- 主线程负责往任务队列中插入任务,线程池中的线程等待获取
- 对线程库进行封装,使用类对象实现线程对应操作
Task.hpp (任务对象)
#include <iostream> #include <string> #include <functional> #include <cstdio> // 任务类 class CPTask { // 调用的计算方法,根据传入的字符参数决定 typedef std::function<int(int, int, char)> func_t; public: CPTask() { } CPTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _func(func) { } // 实现传入的函数调用 std::string operator()() { int count = _func(_x, _y, _op); // 将结果以自定义的字符串形式返回 char res[2048]; snprintf(res, sizeof res, "%d %c %d = %d", _x, _op, _y, count); return res; } // 显示出当前传入的参数 std::string tostring() { char res[1024]; snprintf(res, sizeof res, "%d %c %d = ", _x, _op, _y); return res; } private: int _x; int _y; char _op; func_t _func; }; // 负责计算的任务函数 int Math(int x, int y, char c) { int count; switch (c) { case '+': count = x + y; break; case '-': count = x - y; break; case '*': count = x * y; break; case '/': { if (y == 0) { std::cout << "div zero" << std::endl; count = -1; } else count = x / y; break; } default: break; } return count; } class SCTask { // 获取保存数据的方法 typedef std::function<void(std::string)> func_t; public: SCTask() { } SCTask(const std::string &str, func_t func) : _str(str), _func(func) { } void operator()() { _func(_str); } private: std::string _str; func_t _func; };
MyThread.hpp (封装线程库)
#pragma once #include <iostream> #include <pthread.h> #include <string> #include <cassert> #include <functional> class Thread { private: // 线程调用方法 // 静态类内成员函数不含this指针 // 因此参数直接传入this指针 static void *start_routine(void *args) { Thread *_this = (Thread *)args; return _this->run(); } public: typedef std::function<void *(void *)> func_; // 构造函数 Thread() { // 保存线程名 _name = "thread-No."; _name += std::to_string(_num++); } // 线程启动(创建线程并调用回调函数) void start(func_ func, void *args = nullptr) { _func = func; _args = args; // 创建线程成功后直接回调线程的函数 int n = pthread_create(&_tid, nullptr, start_routine, this); assert(n == 0); } // 传入回调函数的函数参数 void *run() { return _func(_args); } // 线程等待 void join() { int n = pthread_join(_tid, nullptr); assert(n == 0); } // 返回线程名 std::string ThreadName() { return _name; } ~Thread() { } private: std::string _name; // 线程名 pthread_t _tid; // 线程id func_ _func; // 线程调用方法 void *_args; // 线程调用方法参数 static int _num; // 线程计数,用于实现线程名 }; // 初始化线程数为1 int Thread::_num = 1;
ThreadPool.hpp (线程池)
#pragma once #include "MyThread.hpp" #include "Task.hpp" #include <unistd.h> #include <vector> #include <queue> // 线程池类定义位于下面,因此属性类想要获取到 // 就必须在前面声明 template <class T> class Pool; // 线程的属性类 template <class T> class ThreadData { public: Pool<T> *_pool; // 线程所在的线程池,获取到线程的this指针 std::string _name; // 线程的名字 ThreadData(Pool<T> *pool, const std::string &name) : _pool(pool), _name(name) { } }; // 线程池的类 template <class T> class Pool { private: static void *TaskPlay(void *argc) { // 获取到当前线程的线程属性类的对象 // 通过线程属性类里面存储的this指针可以访问当前的非静态成员变量 ThreadData<T> *_this = (ThreadData<T> *)argc; // 定义任务对象 T t; while (1) { // 加锁和条件变量判断 pthread_mutex_lock(&_this->_pool->_mutex); while (_this->_pool->_Taskqueue.empty()) pthread_cond_wait(&_this->_pool->_cond, &_this->_pool->_mutex); // 线程获取任务 t = _this->_pool->_Taskqueue.front(); _this->_pool->_Taskqueue.pop(); // 解锁 pthread_mutex_unlock(&_this->_pool->_mutex); // 线程将任务拿到后并不需要在临界资源里执行,出来后再执行为其他线程进入临界资源腾出时间 std::cout << _this->_name << " 处理完成: " << t() << std::endl; } return nullptr; } public: // 将指定数量的线程创建 // 初始化锁和条件变量 // 线程数量缺省值为5,默认线程数为5 Pool(const int num = 5) : _num(num) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); for (int i = 0; i < _num; ++i) { _threads.push_back(new Thread()); } } // 线程池启动 void start() { // 每个线程都创建一个线程属性类对象 // 将当前的this指针和创建线程后的线程名字传入 // 调用线程的启动函数时将线程的调用函数和线程属性类对象传入 // 根据线程属性类里的成员可以获取到线程的名字和在静态成员函数中获取到当前this指针 for (auto &t : _threads) { ThreadData<T> *td = new ThreadData<T>(this, t->ThreadName()); t->start(TaskPlay, (void *)td); std::cout << "start....." << std::endl; } } // 往线程池中插入任务到任务队列 // 为了保护任务队列需要加锁操作 // 往队列中插入任务完成后就可以唤醒线程执行任务了 void push(const T &in) { pthread_mutex_lock(&_mutex); _Taskqueue.push(in); pthread_cond_signal(&_cond); pthread_mutex_unlock(&_mutex); } ~Pool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); for (auto &e : _threads) delete e; } private: int _num; // 线程池里的线程数 std::vector<Thread *> _threads; // 存放所有线程的地址的数组 std::queue<T> _Taskqueue; // 存放线程的任务的队列,线程从此队列拿任务 pthread_mutex_t _mutex; // 定义一把锁,为了保护临界资源 pthread_cond_t _cond; // 定义条件变量 };
main.cc
#include "ThreadPool.hpp" int main() { srand(time(nullptr)); std::string s("+-*/"); // 创建线程池,传参为10,则创建10个线程 Pool<CPTask> *p = new Pool<CPTask>(10); // 调用线程池启动方法 p->start(); while(1) { // 随机生成任务类的参数 int x = rand() % 100; int y = rand() % 100; int sindex = rand() % s.size(); // Math方法在任务类头文件中 CPTask t(x, y, s[sindex], Math); std::cout << "录入任务:" << t.tostring() << std::endl; // 将任务类插入到线程池的任务队列中 p->push(t); sleep(1); } return 0; }
实现效果
最终成功实现主线程录入任务,线程池中的线程获取任务后执行。