C++11支持语言级别的多线程编程,可以跨平台运行,支持windows/linux/mac等。
主要涉及:
1.thread/mutex/condition_variable
2.lock_quard/unique_lock
3.automic:原子类型,基于CAS操作的原子类型,线程安全的
4.sleep_for
C++的thread本质上还是调用系统支持的函数,windows(createThread)、linux(pthread_create)进行多线程。
初识多线程
1.如何创建启动一个线程?thread来创建一个线程对象,需要线程所需要的线程函数和参数;线程自动开启。
2.子线程如何结束?子线程函数运行完成,线程就结束了。
3.主线程如何处理子线程?join和detach方法。
#include <iostream> #include <string> #include<thread> void threadHandle1() { // 让子线程睡眠两秒,this_thread获取当前线程,chrono计时的函数 std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "hello,thread1" << std::endl; } void threadHandle2(int time) { // 让子线程睡眠两秒,this_thread获取当前线程,chrono计时的函数 std::this_thread::sleep_for(std::chrono::seconds(time)); std::cout << "hello,thread2" << std::endl; } int main() { // 创建了一个线程对象t1,传入一个线程函数,新线程就开始运行了 std::thread t1(threadHandle1); // join是子线程等待主线程结束,主线程继续往下执行, // detach则是分离线程,子线程和主线程无关联,可以独立运行,等主线程结束,整个程序结束,所有子线程都自动结束了 // 传入参数的情况 std::thread t2(threadHandle2, 2); t1.join(); t2.join(); std::cout << "main thread hello" << std::endl; return 0; }
mutex和lock
多线程程序中可能会出现,竞态条件:多线程程序执行的结果是一致的,不会随着CPU对线程不同的调用顺序,而产生不同的运行结果。所以需要引入互斥锁来防止多个线程之间的对资源的访问正确性。
初识mutex互斥锁,使用lock和unlock函数完成进程互斥,将会导致程序中断将会导致mutex的内存释放问题。具体如下所示:
// 模拟车站卖票的程序 #include <iostream> #include <string> #include<thread> #include<list> #include<mutex> using namespace std; // 一共有tickCount张票 int tickCount = 10; // 定义全局互斥锁 mutex mtx; void sellTicket(int index) { // mtx.lock(); //1、这样就只会存在一个窗口在卖票,因为while循环只支持一个线程访问 while(tickCount>0) { /* 直接使用mutex.lock()和mutex.unlock()将会出现一个问题, 当函数还未unlock时就因为程序中间return或者error结束后, 导致mutex资源无法释放的问题。 */ mtx.lock(); //2、仅在临界区代码段 -> 原子操作 -> 线程间互斥操作 -> mutex // 在锁里面加判断是防止当一个进程1面临tickCount=1时,还为-1成功, // 另一个线程2进入while循环,只是在mtx.lock阻塞了,等进程1-1成功后, // 其实进程2获取到的tickCount已经由1->0,但是还是会进行tickCount-- // 导致了最终卖出-1张票 if(tickCount>0) { cout << "窗口:" << index << "卖出第" << tickCount << "张票。" << endl; tickCount--; } mtx.unlock(); //2、 this_thread::sleep_for(chrono::milliseconds(2)); } // mtx.unlock(); // 1、 } /* 多线程程序: 竞态条件:多线程程序执行的结果是一致的,不会随着CPU对线程不同的调用顺序,而产生不同的运行结果。 */ int main() { list<thread> tlist; int thread_num = 3; for (int i = 0; i < thread_num;i++) { tlist.push_back(thread(sellTicket, i)); } for (thread &t : tlist) { t.join(); } return 0; }
所以进一步引入lock_guard(不可能用在函数参数传递或者返回过程中,也不能赋值使用,只能用于简单的加锁解锁的临界代码段当中)和unique_lock(一般用于进程通信,和condition_varible联合使用)在作用域结束后自动析构,类似于智能指针。如下代码所示
void sellTicket2(int index) { while (tickCount > 0) { // mtx.lock(); { // lock_guard函数删除了拷贝构造函数和操作符=重载,类似于scoped_ptr,但保留了构造函数 lock_guard<mutex> lock(mtx); //在这个局部作用域中,程序结束后自动析构,如果中间return了也会析构 if (tickCount > 0) { cout << "窗口:" << index << "卖出第" << tickCount << "张票。" << endl; tickCount--; } } // mtx.unlock(); this_thread::sleep_for(chrono::milliseconds(2)); } } void sellTicket3(int index) { while (tickCount > 0) { // mtx.lock(); { // lock_guard函数删除了拷贝构造函数和操作符=重载,类似于scoped_ptr,但保留了构造函数 // lock_guard<mutex> lock(mtx); // 在这个局部作用域中,程序结束后自动析构,如果中间return了也会析构 unique_lock<mutex> temp_lock(mtx); // 类似于unique_ptr,虽然删除了拷贝构造函数和操作符=重载,但是扩展了右值引用 if (tickCount > 0) { cout << "窗口:" << index << "卖出第" << tickCount << "张票。" << endl; tickCount--; } } // mtx.unlock(); this_thread::sleep_for(chrono::milliseconds(2)); } }
unique_lock和condition_variable使用连用:
mutex mtx; condition_variable cv; unique_lock<mutex> lck(mtx); cv.wait(lck); //1、wait的作用使线程进入等待状态;2、lck.unlock可以把mtx给释放掉 // 通知cv上等待的线程,条件成立了,可以往下运行了 //其他在cv上等待的线程,收到通知,从等待状态->阻塞状态->获取互斥锁->线程执行 cv.notify_all();
线程同步通信
多线程编程存在的问题:
1.线程间的互斥;防止资源的访问出现问题。竞态条件 -> 临界区代码段 -> 原子操作 -> 互斥锁mutex(lock_guard、unique_lock)/强两级的无锁实现CAS
2.线程间的同步通信。生产者和消费者线程模型。
互斥
多线程执行共享变量的这段代码可能会导致竞争状态,因此我们将此段代码称为临界区(criticalsection),它是执行共享资源的代码片段,一定不能给多线程同时执行。
所以我们希望这段代码是互斥(mutualexclusion)的,也就说执行临界区(criticalsection)代码段的只能有一个线程,其他线程阻塞等待,达到排队效果。
互斥并不只是针对多线程的竞争条件,同时还可用于多进程,避免共享资源混乱。
同步
互斥解决了「多进程/线程」对临界区使用的问题,但是它没有解决「多进程/线程」协同工作的问题
我们都知道在多线程里,每个线程一定是顺序执行的,它们各自独立,以不可预知的速度向前推进,但有时候我们希望多个线程能密切合作,以实现一个共同的任务。
所谓同步,就是「多进程/线程间」在一些关键点上可能需要互相等待与互通消息,这种相互制约的等待与互通信息称为「进程/线程」同步。
生产者、消费者线程通信
这里涉及到两个线程之间的通信,生产者和消费者线程互相告知,lock_guard无法实现进程间通信这样复杂的工作,所以使用unique_lock和condition_variable进行搭配使用可以实现进程间的通信。代码如下所示:
#include <iostream> #include <string> #include<thread> #include<mutex> #include<condition_variable> #include<queue> using namespace std; // 定义互斥锁,用户线程间互斥 mutex mtx; // 定义条件变量,用于线程间的同步通信 condition_variable cv; // 最常见的问题就是消费者线程消费的更快,生产者线程还没生产出来就开始消费了 class Queue { public: void put(int val) { // lock_guard<mutex> lock(mtx); unique_lock<mutex> lck(mtx); while(!que.empty()) { // que不为空,生产者应该通知消费者去消费,消费完了在生产 // 生产者进程应该进入阻塞状态,并把mtx互斥锁 cv.wait(lck); } que.push(val); /* notify_one:通知另外的一个线程 notify_all:通知另外的所有线程 */ // 通知其他的所有线程,生产了一个物品,可以进行消费了, // 其他线程得到该通知就会从 等待状态 -> 阻塞状态 -> 获取互斥锁才能继续执行。 cv.notify_all(); cout << "生产者 生产:" << val << "号物品" << endl; } int get() { // lock_guard<mutex> lock(mtx); unique_lock<mutex> lck(mtx); while(que.empty()) { // 消费者发现que是空的,通知生产者线程生产物品 // 进入等待状态,把互斥锁mutex进行释放 cv.wait(lck); } int val = que.front(); que.pop(); cv.notify_all(); //消费完了,通知其他线程进行生产 cout << "消费者 消费:" << val << "号物品" << endl; return val; } private: queue<int> que; }; void producer(Queue* que) { for (int i = 0; i <= 10;i++) { que->put(i); this_thread::sleep_for(chrono::milliseconds(100)); } } void consumer(Queue* que) { for (int i = 0; i <= 10; i++) { que->get(); this_thread::sleep_for(chrono::milliseconds(100)); } } int main() { Queue que; thread t1(producer, &que); thread t2(consumer,&que); t1.join(); t2.join(); return 0; }
CAS操作
互斥锁是比较重的,临界区代码做的事情如果很复杂,互斥锁使用便很麻烦。但是使用CAS来实现某些代码操作的原子特性便是足够了,CAS是无锁的。使用的头文件为atomic,其实本质上也就是将某些类型设置为原子类型变量,导致只有一个线程可以独立使用。如下示例所示:
#include <iostream> #include <string> #include<atomic> #include<list> #include<thread> using namespace std; /* 使用lock_guard实现临界代码段的互斥访问 lock_guard<mutex> lock(mtx); Count++; */ volatile std::atomic_bool isReady = {false}; volatile std::atomic_int number = {0}; void task() { while(!isReady) { // 让线程让出当前的CPU时间片,等待下一次调度 this_thread::yield(); } for (int i = 0; i < 100;i++) { number++; } } int main() { list<thread> tlist; for (int i = 0; i < 10;i++) { tlist.push_back(thread(task)); } // 让主线程睡眠三秒 this_thread::sleep_for(chrono::seconds(3)); cout << "number = " << number << endl; isReady = true; cout << "number = " << number << endl; for(thread &t:tlist) { t.join(); } cout << "number = " << number << endl; return 0; } /*output:: number = 0 number = 1000 number = 1000 */