【C++ 包裹类 std::thread】探索C++11 std::thread:如何使用它来创建、销毁和管理线程

简介: 【C++ 包裹类 std::thread】探索C++11 std::thread:如何使用它来创建、销毁和管理线程

std::thread 构造函数

//默认构造函数  
thread() noexcept;
//初始化构造函数  
template <class Fn, class... Args>
explicit thread(Fn&& fn, Args&&... args);
//拷贝构造函数 [deleted]  
thread(const thread&) = delete;
//Move 构造函数  
thread(thread&& x) noexcept;

  • 默认构造函数,创建一个空的 std::thread 执行对象。
  • 初始化构造函数,创建一个 std::thread 对象,该 std::thread 对象可被 joinable,新产生的线程会调用 fn 函数,该函数的参数由 args 给出。
  • 拷贝构造函数(被禁用),意味着 std::thread 对象不可拷贝构造。
  • Move 构造函数,move 构造函数(move 语义是 C++11 新出现的概念,详见附录),调用成功之后 x 不代表任何 std::thread 执行对象。
  • 📌注意:可被 joinablestd::thread 对象必须在他们销毁之前被主线程 join 或者将其设置为 detached.

std::thread 赋值操作

//Move 赋值操作  
thread& operator=(thread&& rhs) noexcept;
//拷贝赋值操作 [deleted]  
thread& operator=(const thread&) = delete;

  • Move 赋值操作(1),如果当前对象不可 joinable,需要传递一个右值引用(rhs)给 move 赋值操作;如果当前对象可被 joinable,则会调用 terminate() 报错。
  • 拷贝赋值操作(2),被禁用,因此 std::thread 对象不可拷贝赋值。
  • 示例
 #include <stdio.h>
 #include <stdlib.h>

 #include <chrono>    // std::chrono::seconds
 #include <iostream>  // std::cout
 #include <thread>    // std::thread, std::this_thread::sleep_for

 void thread_task(int n) {
     std::this_thread::sleep_for(std::chrono::seconds(n));
     std::cout << "hello thread "
         << std::this_thread::get_id()
         << " paused " << n << " seconds" << std::endl;
 }

 int main(int argc, const char *argv[])
 {
     std::thread threads[5];
     std::cout << "Spawning 5 threads...\n";
     for (int i = 0; i < 5; i++) {
         threads[i] = std::thread(thread_task, i + 1);
     }
     std::cout << "Done spawning threads! Now wait for them to join\n";
     for (auto& t: threads) {
         t.join();
     }
     std::cout << "All threads joined.\n";

     return EXIT_SUCCESS;
 }

std::thread其他成员函数

get_id: 获取线程 ID

  1. 如果该std::thread是joinable状态(joinable状态后面会提到),那么就返回一个独一无二的(unique)的当前std::thread的id(std::thread::id).
  2. 如果该std::thread是not joinable状态,返回std::thread::id();

joinable: 检查线程是否可被 join

该函数返回一个bool值:

  1. 如果当前std::thread对象是default-constructor构造出来的返回false.
  2. 如果当前std::thread对象被移动到了其他std::thread对象返回false.
  3. 如果当前std::thread对象的detach()和join()被调用过了返回false.

检查当前的线程对象是否表示了一个活动的执行线程,由默认构造函数创建的线程是不能被 join 的。
另外,如果某个线程 已经执行完任务,但是没有被 join 的话,该线程依然会被认为是一个活动的执行线程,因此也是可以被 join 的。

detach: Detach 线程

Detach 线程。 将当前线程对象所代表的执行实例与该线程对象分离,使得线程的执行可以单独进行。一旦线程执行完毕,它所分配的资源将会被释放。


调用 detach 函数之后:

  • *this 不再代表任何的线程执行实例。
  • joinable() == false
  • get_id() == std::thread::id()

📌另外,如果出错或者 joinable() == false,则会抛出 std::system_error

join: Detach 线程

block(阻塞)调用join()的线程,直到std::thread所在的线程完成,与此同时std::thread独显被设置为not joinable.

swap: Swap 线程

Swap 线程,交换两个线程对象所代表的底层句柄(underlying handles)。

native_handle: 返回 native handle

返回 native handle(由于 std::thread 的实现和操作系统相关,因此该函数返回与 std::thread 具体实现相关的线程句柄,例如在符合 Posix 标准的平台下(如 Unix/Linux)是 Pthread 库)。

  • 示例

 #include <thread>
 #include <iostream>
 #include <chrono>
 #include <cstring>
 #include <pthread.h>

 std::mutex iomutex;
 void f(int num)
 {
   std::this_thread::sleep_for(std::chrono::seconds(1));

  sched_param sch;
  int policy; 
  pthread_getschedparam(pthread_self(), &policy, &sch);
  std::lock_guard<std::mutex> lk(iomutex);
  std::cout << "Thread " << num << " is executing at priority "
            << sch.sched_priority << '\n';
 }

 int main()
 {
   std::thread t1(f, 1), t2(f, 2);

   sched_param sch;
   int policy; 
   pthread_getschedparam(t1.native_handle(), &policy, &sch);
   sch.sched_priority = 20;
   if(pthread_setschedparam(t1.native_handle(), SCHED_FIFO, &sch)) {
       std::cout << "Failed to setschedparam: " << std::strerror(errno) << '\n';
   }

   t1.join();
   t2.join();
 }

hardware_concurrency [static]

检测硬件并发特性,返回当前平台的线程实现所支持的线程并发数目,但返回值仅仅只作为系统提示(hint)。

  • 示例
 #include <iostream>
 #include <thread>

 int main() {
   unsigned int n = std::thread::hardware_concurrency();
   std::cout << n << " concurrent threads are supported.\n";
 }

std::this_thread 命名空间中相关辅助函数

  • get_id: 返回一个独一无二的当前线程的ID(一般也就是unsigned int类型).
  • yield: 使正在处于运行状态的该线程,回到可运行状态,以允许其他具有相同优先级的线程获得先执行的机会。但是很有可能该线程刚刚回到可执行状态又被再次执行。
  • sleep_until: 线程休眠至某个指定的时刻(time point),该线程才被重新唤醒
  • sleep_for: 线程休眠某个指定的时间片(time span),该线程才被重新唤醒,不过由于线程调度等原因,实际休眠时间可能比 sleep_duration 所表示的时间片更长。

std::thread的可联结性

一个 std::thread 对象只可能处于可联结或不可联结两种状态之一。即 std::thread 对象是否与某个有效的底层线程关联。


  • 可联结:当线程可运行、已经运行或处于阻塞时是可联结的。但如果某个底层线程已经执行完任务,但是没有被 join 的话,该线程依然会被认为是一个活动的执行线程,仍然处于 joinable 状态。
  • 不可联结:不带参构造的std::thread对象为不可联结,因为底层线程还没创建;已经移动的std::thread对象为不可联结;已经调用join或detach的对象为不可联结状态。
  • joinable():判断是否可以成功使用join()或者 detach() ,返回true 表示可以,否则返回 false

std::thread 对象析构

std::thread 对象析构时,会先判断joinable(),如果可联结,则程序会直接被终止(terminate)。
因此,在创建 thread 对象以后,要在随后的某个地方显式地调用 join 或 detach 以便让 std::thread 处于不可联结状态。

std::thread的注意事项

  • 在创建线程时,要确保传递给线程的函数或可调用对象的生命周期至少和线程一样长,否则可能会导致未定义行为或访问非法内存。
  • 不要在多个线程中共享数据,除非你已经确保线程安全,否则可能会导致竞态条件和数据竞争。
  • 在创建线程时,要注意线程的优先级和调度策略,这可能会影响程序的性能和响应时间。
  • 要注意线程的同步和通信,可以使用互斥锁、条件变量等同步机制来确保线程之间的正确交互。
  • 避免在程序退出前没有结束线程,这可能会导致资源泄漏或程序挂起。
  • 使用std::thread时,要遵守RAII(Resource Acquisition Is Initialization)原则,确保资源的正确释放。
  • 在使用std::thread时,要了解可移植性问题,因为不同操作系统和编译器可能有不同的实现细节。

总之,要注意线程的生命周期、同步和通信、资源管理等问题,才能确保使用std::thread的安全和有效。

std::thread 实现一个简单的线程池示例

#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <thread>

class ThreadPool {
public:
    ThreadPool(size_t num_threads) {
        for (size_t i = 0; i < num_threads; ++i) {
            threads_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->mutex_);
                        this->condition_.wait(lock, [this] {
                            return !this->tasks_.empty() || this->stop_;
                        });
                        if (this->stop_ && this->tasks_.empty()) {
                            return;
                        }
                        task = std::move(this->tasks_.front());
                        this->tasks_.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (auto& thread : threads_) {
            thread.join();
        }
    }

    template <typename Func, typename... Args>
    void add_task(Func&& func, Args&&... args) {
        auto task = std::make_shared<std::function<void()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
        {
            std::unique_lock<std::mutex> lock(mutex_);
            tasks_.emplace([task]() {
                (*task)();
            });
        }
        condition_.notify_one();
    }

private:
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;
    std::mutex mutex_;
    std::condition_variable condition_;
    bool stop_ = false;
};

void print(int i) {
    std::cout << "Thread " << std::this_thread::get_id() << " prints " << i << std::endl;
}

int main() {
    ThreadPool pool(4);
    for (int i = 0; i < 8; ++i) {
        pool.add_task(print, i);
    }
    return 0;
}

在这个例子中,线程池类ThreadPool包含了一个线程向量,一个任务队列,一个互斥量和一个条件变量。构造函数会创建指定数量的线程,并将它们设为等待任务。添加任务的函数add_task将函数和参数绑定为一个可调用对象,并用std::function包装成一个任务添加到队列中。添加任务后通过condition_variable通知等待的线程来处理任务。当析构函数被调用时,将设置标志stop_并通知所有线程退出。

封装一个功能全面的std::thread 类

#ifndef THREADWRAPPER_H
#define THREADWRAPPER_H
#include <iostream>
#include <thread>
#include <mutex>
#include <string>
#include <functional>
#include <stdexcept>
#ifdef _WIN32
    #include <Windows.h>
#else
    #include <sched.h>
    #include <pthread.h>
    #include <signal.h>
#endif
class ThreadWrapper {
public:
    // 函数: ThreadWrapper()
    // 描述: 构造函数,初始化成员变量
    // 参数: 无
    // 返回值: 无
    ThreadWrapper() : m_thread_(), m_mainFunc(nullptr),m_name("") {}
    // 函数: start()
    // 描述: 启动线程,使用m_mainFunc作为线程的执行函数
    // 参数: 无
    // 返回值: 无
    void start() {
        if (m_isRunning.load(std::memory_order_acquire)) {
            std::cerr << "Thread is already running." << std::endl;
            return;
        }
        if (!m_mainFunc) {
            std::cerr << "No main function has been set." << std::endl;
            return;
        }
        m_isRunning.store(true);
        std::thread tempThread([this]() {
            m_mainFunc();
            m_isRunning.store(false);
            //print thread quit info
            std::cout << "Thread " << m_name << " quit." << std::endl;
        });
        try {
            m_thread_ = std::move(tempThread);
        } catch (...) {
            std::cerr << "Caught exception while moving thread." << std::endl;
            return ;
        }
        if (!m_name.empty()) {
            setName(m_name);        
        }
    }
    bool isRunning() const {
        return this->m_isRunning.load();
    }
    // 函数: stop()
    // 描述: 停止线程,阻塞直至线程执行完成
    // 参数: 无
    // 返回值: 无
    inline  void stop() {
        if (m_thread_.joinable()) {
            m_thread_.join();
        }
    }
    // 函数: setmain(const std::function<void()>& func)
    // 描述: 设置线程的主函数
    // 参数: func - 线程执行的函数
    // 返回值: 无
    inline  void setmain(const std::function<void()>& func) {
        m_mainFunc = func;
    }
    // 函数: getThread()
    // 描述: 返回线程对象的引用
    // 参数: 无
    // 返回值: 线程对象的引用
    inline  std::thread& getThread() {
        return m_thread_;
    }

    // 函数: detach()
    // 描述: 设置线程的分离状态,分离状态的线程在终止时会自动清理其资源
    // 参数: 无
    // 返回值: 无
    inline  void detach() {
        if (m_thread_.joinable()) {
            m_thread_.detach();
        }
    }
    // 函数: getlimit(size_t stackSize)
    // 描述: 设置线程的栈大小
    // 参数: stackSize - 栈大小(字节)
    // 返回值: 无
    inline void getlimit(size_t &stackSize) {
        this->executeIfRunning([&]() {
            _getlimit(stackSize);
        });
    }
    // 函数: setname(const std::string& name)
    // 描述: 设置线程名称
    // 参数: name - 线程名称
    // 返回值: 无
    void setName(const std::string& name) {
        m_name = name;
        this->executeIfRunning([&]() {
            _setName(name);
        });
    }

    // 函数: setAffinity(const std::vector<int>& cpus)
    // 描述: 设置线程的CPU亲和性
    // 参数: cpus - 要绑定的CPU核心序列
    // 返回值: 无  
    void setAffinity(const std::vector<int>& cpus) {
        this->executeIfRunning([&]() {
            _setAffinity(cpus);
        });     
    }
// Function: setPriority
// Description: Set the scheduling policy and priority of the thread
// Parameters:
//   policy - The scheduling policy (ignored on Windows)
//   priority - The priority of the thread (1 to 7, where 1 is lowest and 7 is highest)
// Returns: None  
    void setPriority(int policy,int priority) {
        this->executeIfRunning([&]() {
            _setPriority(policy,priority);
        });
    }
    // 函数: setSignalMask(const std::vector<int>& signals)
    // 描述: 设置线程信号屏蔽集,Windows并没有类似于Linux中的信号处理机制,它使用的是事件和消息机制。
    // 参数: signals - 要屏蔽的信号列表
    // 返回值: 无 
    void setSignalMask(const std::vector<int>& signals) {
#ifndef _WIN32
        sigset_t signal_mask;
        sigemptyset(&signal_mask);

        for (const auto& sig : signals) {
            sigaddset(&signal_mask, sig);
        }

        int result = pthread_sigmask(SIG_BLOCK, &signal_mask, nullptr);
        if (result != 0) {
            throw std::runtime_error("Failed to set thread signal mask.");
        }
#endif
    }      
private:
    // Function: executeIfRunning
    // Description: Executes a function if the thread is running
    // Parameters:
    //   func - The function to execute
    // Returns: None
    void executeIfRunning(std::function<void()> func) {
        if (m_isRunning.load(std::memory_order_acquire)) {
            func();
        } else {
            std::cerr << "Thread is not running." << std::endl;
        }
    }
    void _getlimit(size_t &stackSize) {
#ifdef _WIN32
            DWORD dwStackSize = 0;
            if (GetThreadStackLimits(&dwStackSize, &stackSize)) {
                std::cout << "Thread stack size: " << stackSize << std::endl;
            } else {
                std::cerr << "Failed to get thread stack size." << std::endl;
            }
#else
        pthread_attr_t attr;
        if (pthread_getattr_np(m_thread_.native_handle(), &attr) == 0) {
            if (pthread_attr_getstacksize(&attr, &stackSize) == 0) {
                std::cout << "Thread stack size: " << stackSize << std::endl;
            } else {
                std::cerr << "Failed to get thread stack size." << std::endl;
            }
            pthread_attr_destroy(&attr);
        } else {
            std::cerr << "Failed to get thread attributes." << std::endl;
        }
#endif
    }
    void _setName(const std::string& name) {
#ifdef _WIN32
            std::wstring wname(m_name.begin(), m_name.end());
            SetThreadDescription(m_thread_.native_handle(), wname.c_str());
#else
            std::string short_name = m_name.substr(0, 15);
            int result = pthread_setname_np(m_thread_.native_handle(), short_name.c_str());
            if (result != 0) {
                throw std::runtime_error("Failed to set thread name.");
            }
#endif
    }
// Function: setPriority
// Description: Set the scheduling policy and priority of the thread
// Parameters:
//   policy - The scheduling policy (ignored on Windows)
//   priority - The priority of the thread (1 to 7, where 1 is lowest and 7 is highest)
// Returns: None  
    void _setPriority(int policy,int priority) {
#ifdef _WIN32
    // Check if the priority is in the valid range
    if (priority < 1 || priority > 7) {
        throw std::runtime_error("Invalid priority value.");
    }

    // Convert POSIX priority to Windows priority
    int win_priority;
    switch (priority) {
        case 1: win_priority = THREAD_PRIORITY_IDLE; break;
        case 2: win_priority = THREAD_PRIORITY_LOWEST; break;
        case 3: win_priority = THREAD_PRIORITY_BELOW_NORMAL; break;
        case 4: win_priority = THREAD_PRIORITY_NORMAL; break;
        case 5: win_priority = THREAD_PRIORITY_ABOVE_NORMAL; break;
        case 6: win_priority = THREAD_PRIORITY_HIGHEST; break;
        case 7: win_priority = THREAD_PRIORITY_TIME_CRITICAL; break;
        default: throw std::runtime_error("Invalid priority value.");
    }

    if (!SetThreadPriority(reinterpret_cast<HANDLE>(m_thread_.native_handle()), win_priority)) {
        throw std::runtime_error("Failed to set thread priority.");
    }
#else
    // Check if the policy is valid
    if (policy != SCHED_FIFO && policy != SCHED_RR && policy != SCHED_OTHER) {
        throw std::runtime_error("Invalid scheduling policy.");
    }

    // Check if the priority is in the valid range for the given policy
    int min_priority = sched_get_priority_min(policy);
    int max_priority = sched_get_priority_max(policy);
    if (priority < min_priority || priority > max_priority) {
        throw std::runtime_error("Invalid priority value.");
    }

    sched_param sch_params;
    sch_params.sched_priority = priority;

    int result = pthread_setschedparam(m_thread_.native_handle(), policy, &sch_params);
    if (result != 0) {
        throw std::runtime_error("Failed to set thread priority.");
    }
#endif
}
    // 函数: setAffinity(const std::vector<int>& cpus)
    // 描述: 设置线程的CPU亲和性
    // 参数: cpus - 要绑定的CPU核心序列
    // 返回值: 无  
    void _setAffinity(const std::vector<int>& cpus) {
        unsigned int num_cores = std::thread::hardware_concurrency();
        if (num_cores == 0) {
            throw std::runtime_error("Failed to determine the number of available cores.");
        }
#ifdef _WIN32
        DWORD_PTR mask = 0;
        for (const auto& cpu : cpus) {
            if (cpu < 0 || static_cast<unsigned int>(cpu) >= num_cores) {
                throw std::runtime_error("Invalid core number specified.");
            }
            mask |= (1 << cpu);
        }

        DWORD_PTR result = SetThreadAffinityMask(m_thread_.native_handle(), mask);
        if (result == 0) {
            throw std::runtime_error("Failed to set thread CPU affinity.");
        }
#else
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);

        for (const auto& cpu : cpus) {
            if (cpu < 0 || static_cast<unsigned int>(cpu) >= num_cores) {
                throw std::runtime_error("Invalid core number specified.");
            }
            CPU_SET(cpu, &cpuset);
        }

        int result = pthread_setaffinity_np(m_thread_.native_handle(), sizeof(cpu_set_t), &cpuset);
        if (result != 0) {
            throw std::runtime_error("Failed to set thread CPU affinity.");
        }
#endif
}


    std::thread m_thread_; // 线程对象
    std::function<void()> m_mainFunc; // 线程的主函数
    std::string m_name; // 线程名称
    std::atomic<bool> m_isRunning; // Atomic flag to track if the thread is running

};

#endif


目录
相关文章
|
7天前
|
存储 编译器 C++
【c++】类和对象(中)(构造函数、析构函数、拷贝构造、赋值重载)
本文深入探讨了C++类的默认成员函数,包括构造函数、析构函数、拷贝构造函数和赋值重载。构造函数用于对象的初始化,析构函数用于对象销毁时的资源清理,拷贝构造函数用于对象的拷贝,赋值重载用于已存在对象的赋值。文章详细介绍了每个函数的特点、使用方法及注意事项,并提供了代码示例。这些默认成员函数确保了资源的正确管理和对象状态的维护。
33 4
|
8天前
|
存储 编译器 Linux
【c++】类和对象(上)(类的定义格式、访问限定符、类域、类的实例化、对象的内存大小、this指针)
本文介绍了C++中的类和对象,包括类的概念、定义格式、访问限定符、类域、对象的创建及内存大小、以及this指针。通过示例代码详细解释了类的定义、成员函数和成员变量的作用,以及如何使用访问限定符控制成员的访问权限。此外,还讨论了对象的内存分配规则和this指针的使用场景,帮助读者深入理解面向对象编程的核心概念。
27 4
|
15天前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
30 7
|
15天前
|
消息中间件 存储 安全
|
22天前
|
存储 并行计算 安全
C++多线程应用
【10月更文挑战第29天】C++ 中的多线程应用广泛,常见场景包括并行计算、网络编程中的并发服务器和图形用户界面(GUI)应用。通过多线程可以显著提升计算速度和响应能力。示例代码展示了如何使用 `pthread` 库创建和管理线程。注意事项包括数据同步与互斥、线程间通信和线程安全的类设计,以确保程序的正确性和稳定性。
|
1月前
|
存储 编译器 对象存储
【C++打怪之路Lv5】-- 类和对象(下)
【C++打怪之路Lv5】-- 类和对象(下)
27 4
|
1月前
|
编译器 C语言 C++
【C++打怪之路Lv4】-- 类和对象(中)
【C++打怪之路Lv4】-- 类和对象(中)
23 4
|
30天前
|
存储 安全 C++
【C++打怪之路Lv8】-- string类
【C++打怪之路Lv8】-- string类
21 1
|
1月前
|
存储 前端开发 C++
C++ 多线程之带返回值的线程处理函数
这篇文章介绍了在C++中使用`async`函数、`packaged_task`和`promise`三种方法来创建带返回值的线程处理函数。
45 6
|
1月前
|
C++
C++ 多线程之线程管理函数
这篇文章介绍了C++中多线程编程的几个关键函数,包括获取线程ID的`get_id()`,延时函数`sleep_for()`,线程让步函数`yield()`,以及阻塞线程直到指定时间的`sleep_until()`。
23 0
C++ 多线程之线程管理函数