C++11:多线程

简介: C++11:多线程

1、std::thread

// 创建std::thread对象,新线程调用threadFun函数,参数由 args 给出
 template<class Fn, class... Args> 
 explicit thread(Fn&& fn, Args&&... args);

特点:不能复制控制,只能移动(成功后则不再表示 thread 对象)

thread t1(threadFun, std::ref(x)); // std::ref 表示引用传递
 thread t2(std::move(t1)); // t1 线程失去所有权
 //t1.join(); // error,t1不再是 thread 执行对象
 t2.join(); //t3拥有控制权

注:std::ref 与 & 的区别

  • & 是类型说明符
  • std::ref 是一个函数,返回 std::reference_wrapper 类型,模拟引用传递(不是引用)。在函数式编程(如std::bind、std::thread)默认是对参数的直接拷贝。只有在模板自动推导类型时,ref 用包装类型 std::reference_wrapper 代替原来会被识别的值类型。

成员函数

  • get_id() 获取线程ID,返回类型 std::thread::id 对象
  • joinable() 判断线程是否可以加入等待。新创建的线程是 joinable,可被 joinable 的 thread 对象必须在销毁之前被主线程 join 或者将其设置为 detached。
  • join() 等该线程执行完成后才返回
  • detach() 线程解绑独立运行。调用后,目标线程驻留后台运行,成为了守护线程。与之相关的 std::thread 对象失去对目标线程的关联,无法再通过 std::thread 对象取得该线程的控制权。当线程主函数执行完之后,线程就结束了。运行时库负责清理与该线程相关的资源。detach 后的线程的特点:
  • *this 不再代表任何的线程执行实例。
  • joinable() == false
  • get_id() == std::thread::id()
  • yield() 让出 cpu 时间片
  • 1sleep_until() 睡眠直到某个时间点
  • sleep_for() 睡眠某段时间

实例:线程封装

子类继承父类,实现具体的业务逻辑处理。

// thread.h
 #ifndef THREAD_H
 #define THREAD_H
 #include <thread>
 class Thread {
 public:
     Thread(); // 构造函数
     virtual ~Thread(); // 析构函数
     bool start(); //启动线程
     void stop(); //停止线程
     bool isAlive() const; // 线程是否存活.
     std::thread::id id() { return pThread->get_id(); }
     std::thread* getThread() { return pThread; }
     void join();  // 等待当前线程结束, 不能在当前线程上调用
     void detach(); //能在当前线程上调用
     static size_t CURRENT_THREADID();
 protected:
     void threadEntry(); // 线程创建的函数
     virtual void run() = 0; // 子线程实现具体的业务逻辑方法
 protected:
     bool  _running; //是否在运行
     std::thread *pThread;
 };
 #endif 
 // thread.cc
 #include "thread.h"
 #include <sstream>
 #include <iostream>
 #include <exception>
 Thread::Thread() 
 : _running(false), pThread(NULL)
 {}
 Thread::~Thread(){
     // 调用析构函数的之前,子线程要么是join则触发detach
     // 此时是一个比较危险的动作,用户必须知道他在做什么
     if (pThread != NULL) {  
         if (pThread->joinable()) {
             std::cout << "~Thread detach\n";
             pThread->detach();
         }
         delete pThread;
         pThread = NULL;
     }
     std::cout << "~Thread()" << std::endl;
 }
 bool Thread::start() {
     // 已经在运行了
     if (_running) {
         return false;
     }
     try {
         // 创建线程并启动
         pThread = new std::thread(&Thread::threadEntry, this);
     }
     catch (...) {
         throw  "[ZERO_Thread::start] thread start error";
     }
     return true;
 }
 void Thread::stop() {
     _running = false;
 }
 bool Thread::isAlive() const {
     return _running;
 }
 void Thread::join() {
     if (pThread->joinable()) {
         pThread->join();
     }
 }
 void Thread::detach() {
     pThread->detach();
 }
 size_t Thread::CURRENT_THREADID() {
     // 声明为 thread_local 的本地变量在线程中是持续存在的。
     // 具有static变量一样的初始化特征和生命周期,即使它不被声明为static。
     static thread_local size_t threadId = 0;
     if (threadId == 0) {
         std::stringstream ss;
         ss << std::this_thread::get_id();
         threadId = strtol(ss.str().c_str(), NULL, 0);
     }
     return threadId;
 }
 // 创建新新线程时调用的threadFunc
 void Thread::threadEntry() {
     _running = true;
     try {
         run();   // 函数运行所在 调用子类的run函数
     }
     catch (std::exception &ex) {
         _running = false;
         throw ex;
     }
     catch (...) {
         _running = false;
         throw;
     }
     _running = false;
 }
 // main.cc
 #include <iostream>
 #include <chrono>
 #include "thread.h"
 using namespace std;
 class A : public Thread {
 public:
     void run() {
         while (_running) {
             cout << "print A " << endl;
             std::this_thread::sleep_for(std::chrono::seconds(5));
         }
         cout << "----- leave A " << endl;
     }
 };
 class B : public Thread
 {
 public:
     void run() {
         while (_running) {
             cout << "print B " << endl;
             std::this_thread::sleep_for(std::chrono::seconds(2));
         }
         cout << "----- leave B " << endl;
     }
 };
 int main(){
     {
         A a;
         a.start();
         B b;
         b.start();
         std::this_thread::sleep_for(std::chrono::seconds(5));
         a.stop(); // 否则线程的run方法不会停止,线程不会退出
         a.join(); 
         b.stop();
         b.join();  
     }
     cout << "Hello World!" << endl;
     system("pause");
     return 0;
 }

2、互斥量

互斥锁的种类

  • std::mutex,独占互斥量,不能递归使用
  • std::time_mutex,带超时的独占互斥量,不能递归使用
  • std::recursive_mutex,递归互斥量,不带超时功能
  • std::recursive_timed_mutex,带超时的递归互斥量

std::mutex

特点:不允许拷贝构造,也不允许 move 拷贝,初始状态是 unlocked。

  • lock(),调用线程将锁住该互斥量。有锁 -> 其他(阻塞),自己(死锁 deadlock)
  • unlock(), 解锁,释放对互斥量的所有权。
  • try_lock(),尝试锁住互斥量。有锁 -> 其他(不阻塞),自己(死锁)

lock_guard 与 unique_lock

都能实现自动加锁和解锁(RAII类,自动释放资源),但 unique_lock 可以临时解锁和上锁。

std::lock_guard

  • 构造函数中进行加锁,析构函数中进行解锁。
  • look_guard 仅用于互斥。

std::unique_lock

  • unique_lock 是通用互斥包装器,允许延迟锁定、锁定的有时限尝试、递归锁定、所有权转移和与条件变量 (notify + wait) 一同使用。
  • 使用更加灵活,功能更加强大。可以临时解锁 unlock() 再上锁 lock(),而不必等到析构自动解锁。
  • 需要付出更多的时间、性能成本。
#include <iostream>
 #include <deque>
 #include <thread>
 #include <mutex>
 #include <condition_variable>
 std::deque<int> q;
 std::mutex mu;
 std::condition_variable cond;
 int count = 0;
 void producer() {
     while (true) {
         {   // 离开作用域后自动析构
             std::unique_lock<std::mutex> locker(mu); // 不能替换成lock_guard
             std::cout << "fun1 lock" << std::endl;
             q.push_front(count++);
             //locker.unlock(); // 没必要
             cond.notify_one();  
         }
         std::this_thread::sleep_for(std::chrono::seconds(1));
     }
 }
 void consumer() {
     while (true) {
         std::unique_lock<std::mutex> locker(mu);
         std::cout << "fun2 lock" << std::endl;
         std::cout << "fun2 wait into" << std::endl;
         cond.wait(locker, []() {return !q.empty(); });
         std::cout << "fun2 wait leave" << std::endl;
         auto data = q.back();
         q.pop_back();
         // locker.unlock(); //没必要
         std::cout << "thread2 get value form thread1: " << data << std::endl;
     }
 }
 int main() {
     std::thread t1(producer);
     std::thread t2(consumer);
     t1.join();
     t2.join();
     return 0;
 }

3、条件变量

条件变量 std::condition_variable:实现线程同步,即线程间需要按照预定的先后次序顺序进行的行为.


条件变量的使用

  • 拥有条件变量的线程获取互斥量
  • wait 循环检查某个条件,如果条件不满足则阻塞直到条件满足
  • 某个线程满足条件执行完之后调用 notify 唤醒等待线程。

wait 函数

必须使用 unique_lock 对象,需要临时上锁和解锁。

  • 上半部:1、条件变量上排队 -> 2、解锁 -> 3、阻塞
  • 下半部:1、被唤醒 notify -> 2、加锁(锁没被使用,加锁成功;锁正在使用,阻塞,直至锁的释放,加锁)-> 3、函数返回
// 唤醒后,加锁获取互斥量,继续执行
 void wait (unique_lock<mutex>& lck); // unique_lock对象
 // 唤醒后,加锁获取互斥量,判断pred,若为false,则解锁阻塞;若为true,则继续执行
 template <class Predicate> 
 void wait (unique_lock<mutex>& lck, Predicate pred); // Predicate 对象(等待条件)

notify 函数

// 解锁正在等待当前条件的线程中的随机一个
 void notify_one() noexcept;
 // 解锁正在等待当前条件的所有线程
 void notify_all() noexcept;

例:生产者-消费者

// sync_queue.h
 #ifndef SIMPLE_SYNC_QUEUE_H
 #define SIMPLE_SYNC_QUEUE_H
 #include <thread>
 #include <condition_variable>
 #include <mutex>
 #include <list>
 #include <iostream>
 template<typename T>
 class SimpleSyncQueue {
 public:
     SimpleSyncQueue() {}
     void put(const T& x) {
         std::lock_guard<std::mutex> locker(_mutex);
         _queue.push_back(x);
         _notEmpty.notify_one();
     }
     void take(T& x) {
         std::unique_lock<std::mutex> locker(_mutex);
         _notEmpty.wait(locker, [this] {return !_queue.empty(); });  
         x = _queue.front();
         _queue.pop_front();
     }
     bool empty() {
         std::lock_guard<std::mutex> locker(_mutex);
         return _queue.empty();
     }
     size_t size() {
         std::lock_guard<std::mutex> locker(_mutex);
         return _queue.size();
     }
 private:
     std::list<T> _queue;
     std::mutex _mutex;
     std::condition_variable _notEmpty;
 };
 #endif // SIMPLE_SYNC_QUEUE_H
 // main.cc
 #include <iostream>
 #include <thread>
 #include <iostream>
 #include <mutex>
 #include "sync_queue.h"
 using namespace std;
 SimpleSyncQueue<int> syncQueue;
 void PutDatas() {
     for (int i = 0; i < 20; ++i) {
         syncQueue.put(i);
     }
 }
 void TakeDatas() {
     int x = 0;
     for (int i = 0; i < 20; ++i) {
         syncQueue.take(x);
         std::cout << x << std::endl;
     }
 }
 int main() {
     std::thread t1(PutDatas);
     std::thread t2(TakeDatas);
     t1.join();
     t2.join();
     std::cout << "main finish\n";
     system("pause");
     return 0;
 }


4、原子变量

atomic 不能被中断的操作。store() 赋值, load() 读取。

内存顺序(memory_order)

枚举值 定义规则
memory_order_relaxed 不对执行顺序做任何保障
memory_order_acquire 本线程中,所有后续的读操作均在本条原子操作完成后执行
memory_order_release 本线程中,所有之前的写操作完成后才能执行本条原子操作
memory_order_acq_rel 同时包含memory_order_acquire和memory_order_release标记
memory_order_consume 本线程中,所有后续的有关本原子类型的操作,必须在本条原子操作完成后执行
memory_order_seq_cst 全部存取都按顺序执行
#include <iostream>  
 #include <atomic>      
 #include <thread>     
 //std::atomic<int> count = 0; //error
 std::atomic<int> count(0); // 准确初始化,atomic 线程安全
 void set_count(int x) {
     std::cout << "set_count:" << x << std::endl;
     count.store(x, std::memory_order_relaxed); // set value atomically
 }
 void print_count() {
     int x;
     do {
         x = count.load(std::memory_order_relaxed);  // get value atomically
         std::cout << "--- wait ---" << std::endl;
     } while (x == 0);
     std::cout << "count: " << x << std::endl;
 }
 int main() {
     std::thread t1(print_count);
     std::thread t2(set_count, 10);
     t1.join();
     t2.join();
     std::cout << "main finish\n";
     return 0;
 }

5、异步操作

5.1、std::future

future 期望,当前线程持有 future 时,期待从 future 获取到想要的结果和返回,可以把future当做异步函数的返回值。

一旦 future 就绪,就无法复位,代表的是一次性事件。

  • std::future: 异步指向某个任务,然后通过 future 去获取任务函数的返回结果。当需要返回值时,对 future 使用 get() 方法线程就会阻塞直到 future 就绪,然后返回该值。
  • std::aysnc 异步运行某个任务函数,返回一个 future 对象
async(_Fty&& _Fnarg, _ArgTypes&&... _Args)

future 的类型

  • std::future:仅有一个实例指向其关联事件,std::unique_ptr
  • std::shared_future:可以有多个实例指向同一个关联事件,std::shared_ptr
#include <iostream>
 #include <future>
 #include <thread>
 using namespace std;
 int add() {
     std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟
     std::cout << "find_result_to_add" << std::endl;
     return 1 + 1;
 }
 int add2(int a, int b) {
     std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟
     return a + b;
 }
 void do_other_things() {
     std::cout << "do_other_things" << std::endl;
 }
 int main() {
     // std::future<T>
     // std::future<int> result = std::async(find_result_to_add); 1、指定类型
     // 2、decltype,自动推导函数返回类型
     // std::future<decltype(add())> result = std::async(add); 
     auto result = std::async(add);  // 3、auto,推荐写法
     do_other_things(); // std::async异步线程运行,不阻塞主线程函数的
     std::cout << "result: " << result.get() << std::endl;  // get()阻塞
     // std::future<decltype(add2(int, int))> result2 = std::async(add2, 10, 20); //error,把参数值传递进来
     std::future<decltype (add2(0, 0))> result2 = std::async(add2, 10, 20);
     std::cout << "result2: " << result2.get() << std::endl;  // get()阻塞
     system("pause");
     return 0;
 }


5.2、std::packaged_task

将任务和 future 绑定在一起的模板,是一种对任务的封装。可以调用get_future()方法获得 packaged_task 对象绑定的函数的返回值类型的future。模板类型是函数签名

#include <iostream>
 #include <future>
 #include <thread>
 using namespace std;
 int add(int a, int b, int c) {
     std::cout << "call add\n";
     return a + b + c;
 }
 int main() {
     // 封装任务,待执行
     std::packaged_task<int(int, int, int)> task(add);  // 模板类型:函数签名
     std::future<int> result = task.get_future(); // 待执行,这里只是获取 future
     //任务开始执行!
     task(1, 1, 2); // 必须先执行任务,否则在get()获取future的值时会一直阻塞
     std::cout << "result:" << result.get() << std::endl;
     return 0;
 }

5.3、std::promise

promise 承诺,当线程创建 promise 的同时创建一个 future,这个 promise 向线程承诺它必定会被人手动设置一个值,future 就是获取其返回的手段。两者配合,在线程间传递数据。

#include <future>
 #include <string>
 #include <thread>
 #include <iostream>
 using namespace std;
 void print(std::promise<std::string>& p) {
     p.set_value("helloWorld"); // promise设置值,返回 future
 }
 void do_some_other_things() {
     std::cout << "do_some_other_things" << std::endl;
 }
 int main() {
     std::promise<std::string> promise; 
     std::future<std::string> result = promise.get_future(); // 获取待返回的future
     std::thread t(print, std::ref(promise));  // 线程设置,传引用 promise
     do_some_other_things();
     // 在主线程等待promise的返回结果,返回helloWorld
     cout << "result " << result.get() << endl;
     t.join();
     return 0;
 }

6、线程池的实现

threadpool.h

#ifndef THREADPOOL_H
 #define THREADPOOL_H
 #include <future>
 #include <functional>
 #include <iostream>
 #include <queue>
 #include <mutex>
 #include <memory>
 #ifdef WIN32
 #include <windows.h>
 #else
 #include <sys/time.h>
 #endif
 using namespace std;
 void getNow(timeval *tv);
 int64_t getNowMs();
 #define TNOW    getNow()
 #define TNOWMS  getNowMs()
 /////////////////////////////////////////////////
 /**
  * @file thread_pool.h
  * @brief c++11线程池类
  * 使用说明:
  * ThreadPool tpool; // 封装线程池
  * tpool.init(5); // 初始化线程池线程数
  * tpool.start(); // 启动线程方式
  * tpool.exec(testFunction, 10); // 将任务丢到线程池中,返回future异步获取结果
  * tpool.waitForAllDone(1000); // 等待线程池结束,参数<0时, 表示无限等待
  * tpool.stop(); //外部结束线程池
  */
 class ThreadPool {
 protected:
     // 任务结构体 
     struct TaskFunc {
         TaskFunc(uint64_t expireTime): _expireTime(expireTime){}
         std::function<void()> _func; // 要执行的任务函数
         int64_t _expireTime = 0; //超时的绝对时间
     };
     // 指向任务结构体的指针
     typedef shared_ptr<TaskFunc> TaskFuncPtr;
 public:
     /** @brief 构造函数 */
     ThreadPool();
     /** @brief 析构函数*/
     virtual ~ThreadPool();
     /** @brief 初始化 @param num 工作线程个数 */
     bool init(size_t num);
     /** @brief 获取线程个数 @return size_t 线程个数 */
     size_t getThreadNum() {
         std::unique_lock<std::mutex> lock(_mutex);
         return _threads.size();
     }
     /** @brief 获取当前线程池的任务数 @return size_t 线程池的任务数 */
     size_t getJobNum() {
         std::unique_lock<std::mutex> lock(_mutex);
         return _tasks.size();
     }
     /** @brief 停止所有线程, 会等待所有线程结束 */
     void stop();
     /** @brief 创建并启动线程 */
     bool start(); 
     /**
     * @brief 用线程池启用任务(F是function, Args是参数)
     * @param bind function
     * @return 返回任务的future对象, 可以通过这个对象来获取返回值
     */
     template <class F, class... Args>
     auto exec(F&& f, Args&&... args) -> std::future<decltype(f(args...))> 
     {
         return exec(0, f, args...);
     }
     /**
     * @brief 用线程池启用任务(F是function, Args是参数)
     * @param 超时时间,单位ms (为0时不做超时控制);若任务超时,此任务将被丢弃
     * @param bind function
     * @return 返回任务的future对象, 可以通过这个对象来获取返回值
     */
     template <class F, class... Args>
     auto exec(int64_t timeoutMs, F&& f, Args&&... args) -> std::future<decltype(f(args...))>
     {
         // 获取现在时间
         int64_t expireTime = (timeoutMs == 0 ? 0 : TNOWMS + timeoutMs);  
         //定义返回值类型,推导返回值
         using RetType = decltype(f(args...));
         // 封装任务,绑定函数和函数的参数
         auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
         // 封装任务指针,设置过期时间
         TaskFuncPtr fPtr = std::make_shared<TaskFunc>(expireTime);  
         // 封装具体的执行函数
         fPtr->_func = [task]() { 
             (*task)();
         };
         std::unique_lock<std::mutex> lock(_mutex);
         // 插入任务
         _tasks.push(fPtr);
         // 唤醒阻塞的线程,可以考虑只有任务队列为空的情况再去notify              
         _condition.notify_one();
         // 返回绑定任务的 future
         return task->get_future();;
     }
     /**
     * @brief 等待当前任务队列中, 所有工作全部结束(队列无任务).
     * @param millsecond 等待的时间(ms), -1:永远等待
     * @return true, 所有工作都处理完毕;false,超时退出
     */
     bool waitForAllDone(int millsecond = -1);
 protected:
     /** @brief 获取任务  @return TaskFuncPtr */
     bool get(TaskFuncPtr&task);
     /** @brief 线程池是否退出 */
     bool isTerminate() { return _bTerminate; }
     /** @brief 线程运行态 */
     void run();
 protected:
     queue<TaskFuncPtr> _tasks; //任务队列
     std::vector<std::thread*> _threads; // 工作线程
     std::mutex _mutex;
     std::condition_variable _condition;
     size_t _threadNum; // 线程的数量 
     bool _bTerminate; // 线程池终止标志位
     std::atomic<int> _atomic{ 0 }; // 原子操作
 };
 #endif // THREADPOOL_H

http://threadpool.cc

#include "threadpool.h"
 ThreadPool::ThreadPool()
 : _threadNum(1), _bTerminate(false)
 {}
 ThreadPool::~ThreadPool() {
     stop();
 }
 bool ThreadPool::init(size_t num) {
     std::unique_lock<std::mutex> lock(_mutex);
     if (!_threads.empty()) {
         return false;
     }
     // 设置线程数量
     _threadNum = num; 
     return true;
 }
 void ThreadPool::stop() {
     {
         std::unique_lock<std::mutex> lock(_mutex);  //加锁
         _bTerminate = true;     // 触发退出
         _condition.notify_all(); // 唤醒其他线程
     }
     for (size_t i = 0; i < _threads.size(); i++) {
         if(_threads[i]->joinable()) { 
             _threads[i]->join(); // 等线程推出
         }
         delete _threads[i];
         _threads[i] = NULL;
     }
     std::unique_lock<std::mutex> lock(_mutex); // 加锁
     _threads.clear();
 }
 bool ThreadPool::start() {
     std::unique_lock<std::mutex> lock(_mutex);
     // 线程已经启动,不能再次启动
     if (!_threads.empty()) {
         return false;
     }
     for (size_t i = 0; i < _threadNum; i++) {
         _threads.push_back(new thread(&ThreadPool::run, this));
     }
     return true;
 }
 bool ThreadPool::get(TaskFuncPtr& task) {
     std::unique_lock<std::mutex> lock(_mutex); // 加锁
     // 没有任务了
     if (_tasks.empty()) {
         _condition.wait(lock, [this] { return _bTerminate  // 要么终止线程池 bTerminate_设置为true
                     || !_tasks.empty();  // 要么补充了新的任务,任务队列不为空
         }); // notify -> 1.退出线程池; 2.任务队列不为空(补充了新的任务)
     }
     // 线程池终止了
     if (_bTerminate) {
         return false;
     }
     // 有任务存在   
     if (!_tasks.empty()) {
         task = std::move(_tasks.front());  // 移动语义,获取一个任务
         _tasks.pop(); // 释放已被移动的任务
         return true; 
     }
     return false;
 }
 void ThreadPool::run() { // 执行任务的线程
     //调用处理部分
     while (!isTerminate()) { // 先判断是不是要停止
         TaskFuncPtr task;
         // 1、读取任务
         bool ok = get(task); 
         // 拿到了任务
         if (ok) {
             // 任务的执行是原子操作,一气呵成
             ++_atomic;
             try {
                 if (task->_expireTime != 0 && task->_expireTime < TNOWMS) {
                     //超时任务,是否需要处理?
                 }
                 else {
                     // 2、执行任务
                     task->_func();  
                 }
             }
             catch (...) 
             {}
             --_atomic;
             // 至此,任务全都执行完毕了
             std::unique_lock<std::mutex> lock(_mutex);
             // 3、检测是否所有任务都运行完毕
             if (_atomic == 0 && _tasks.empty())  {
                 _condition.notify_all();  // 这里只是为了通知waitForAllDone
             }
         }
     }
 }
 bool ThreadPool::waitForAllDone(int millsecond) { // 指定超时的时间1000ms
     std::unique_lock<std::mutex> lock(_mutex);
     if (_tasks.empty()) {
         return true;
     }
     // 超时,任务队列已清空
     if (millsecond < 0) {
         _condition.wait(lock, [this] { return _tasks.empty(); });
         return true;
     }
     // 不超时,1.等待时间结束,2.任务队列已清空
     else {
         return _condition.wait_for(lock, std::chrono::milliseconds(millsecond), [this] { return _tasks.empty(); });
     }
 }
 int gettimeofday(struct timeval &tv) {
 #if WIN32
     time_t clock;
     struct tm tm;
     SYSTEMTIME wtm;
     GetLocalTime(&wtm);
     tm.tm_year   = wtm.wYear - 1900;
     tm.tm_mon   = wtm.wMonth - 1;
     tm.tm_mday   = wtm.wDay;
     tm.tm_hour   = wtm.wHour;
     tm.tm_min   = wtm.wMinute;
     tm.tm_sec   = wtm.wSecond;
     tm. tm_isdst  = -1;
     clock = mktime(&tm);
     tv.tv_sec = clock;
     tv.tv_usec = wtm.wMilliseconds * 1000;
     return 0;
 #else
     return ::gettimeofday(&tv, 0);
 #endif
 }
 void getNow(timeval *tv) {
 #if TARGET_PLATFORM_IOS || TARGET_PLATFORM_LINUX
     int idx = _buf_idx;
     *tv = _t[idx];
     if(fabs(_cpu_cycle - 0) < 0.0001 && _use_tsc) {
         addTimeOffset(*tv, idx);
     }
     else {
         TC_Common::gettimeofday(*tv);
     }
 #else
     gettimeofday(*tv);
 #endif
 }
 int64_t getNowMs() {
     struct timeval tv;
     getNow(&tv);
     return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
 }

http://main.cc

#include <iostream>
 #include "threadpool.h"
 using namespace std;
 void func0() {
     cout << "func0()" << endl;
 }
 void func1(int a) {
     cout << "func1 int =" << a << endl;
 }
 void func2(int a, string b) {
     cout << "func2() a=" << a << ", b=" << b<< endl;
 }
 void test1() {// 简单测试线程池
     ThreadPool threadpool; // 封装一个线程池
     threadpool.init(1); // 设置线程的数量
     threadpool.start(); // 启动线程池,创建线程,开始调度
     // 装入要执行的任务
     threadpool.exec(1000,func0); 
     threadpool.exec(func1, 10);
     threadpool.exec(func2, 20, "hello"); // 插入任务
     threadpool.waitForAllDone(); 
     threadpool.stop();     
 }
 int func1_future(int a) {
     cout << "func1() a=" << a << endl;
     return a;
 }
 string func2_future(int a, string b) {
     cout << "func1() a=" << a << ", b=" << b<< endl;
     return b;
 }
 // 测试任务函数返回值
 void test2() {
     ThreadPool threadpool;
     threadpool.init(1);
     threadpool.start(); // 启动线程池
     // 假如要执行的任务
     std::future<decltype (func1_future(0))> result1 = threadpool.exec(func1_future, 10);
     auto result2 = threadpool.exec(func2_future, 20, "hello");
     std::cout << "result1: " << result1.get() << std::endl;
     std::cout << "result2: " << result2.get() << std::endl;
     threadpool.waitForAllDone();
     threadpool.stop();
 }
 class Test {
 public:
     int test(int i){
         cout << _name << ", i = " << i << endl;
         return i;
     }
     void setName(string name){
         _name = name;
     }
     string _name;
 };
 // 测试类对象函数的绑定
 void test3() { 
     ThreadPool threadpool;
     threadpool.init(1);
     threadpool.start(); // 启动线程池
     Test t1;
     Test t2;
     t1.setName("Test1");
     t2.setName("Test2");
     auto f1 = threadpool.exec(std::bind(&Test::test, &t1, std::placeholders::_1), 10);
     auto f2 = threadpool.exec(std::bind(&Test::test, &t2, std::placeholders::_1), 20);
     threadpool.waitForAllDone();
     cout << "t1 " << f1.get() << endl;
     cout << "t2 " << f2.get() << endl;
 }
 void func2_1(int a, int b) {
     cout << "func2_1 a + b = " << a+b << endl;
 }
 int func2_1(string a, string b) {
     cout << "func2_1 a + b = " << a << b<< endl;
     return 0;
 }
 // 简单测试线程池 threadpool;
 void test4() {    
     ThreadPool threadpool; 
     threadpool.init(1);             
     threadpool.start();              
     // 假如要执行的任务
     threadpool.exec((void(*)(int, int))func2_1, 10, 20); // 插入任务
     threadpool.exec((int(*)(string, string))func2_1, "Tom", " and Jerry");
     threadpool.waitForAllDone(); // 等待都执行完退出
     threadpool.stop(); // 这里才是真正执行退出
 }
 int main() {
     // test1(); // 简单测试线程池
     // test2(); // 测试任务函数返回值
     // test3(); // 测试类对象函数的绑定
     test4();
     cout << "main finish!" << endl;
     return 0;
 }
相关文章
|
15天前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
30 7
|
15天前
|
消息中间件 存储 安全
|
1月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
41 1
C++ 多线程之初识多线程
|
21天前
|
存储 并行计算 安全
C++多线程应用
【10月更文挑战第29天】C++ 中的多线程应用广泛,常见场景包括并行计算、网络编程中的并发服务器和图形用户界面(GUI)应用。通过多线程可以显著提升计算速度和响应能力。示例代码展示了如何使用 `pthread` 库创建和管理线程。注意事项包括数据同步与互斥、线程间通信和线程安全的类设计,以确保程序的正确性和稳定性。
|
1月前
|
存储 前端开发 C++
C++ 多线程之带返回值的线程处理函数
这篇文章介绍了在C++中使用`async`函数、`packaged_task`和`promise`三种方法来创建带返回值的线程处理函数。
45 6
|
1月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
74 5
|
1月前
|
C++
C++ 多线程之线程管理函数
这篇文章介绍了C++中多线程编程的几个关键函数,包括获取线程ID的`get_id()`,延时函数`sleep_for()`,线程让步函数`yield()`,以及阻塞线程直到指定时间的`sleep_until()`。
23 0
C++ 多线程之线程管理函数
|
1月前
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
29 0
Linux C/C++之线程基础
|
3月前
|
Java 调度
基于C++11的线程池
基于C++11的线程池
|
3月前
|
Dart 编译器 API
Dart ffi 使用问题之在C++线程中无法直接调用Dart函数的问题如何解决
Dart ffi 使用问题之在C++线程中无法直接调用Dart函数的问题如何解决