简单的互斥管理
既然并发需要锁,但是又要避免死锁的发生,为了方便我们开发总是需要使用一些简单的方法.如果每次使用mutex
还需要我们自己手动lock unlock
属实也不太方便,所以c++还提供了一些进阶的方法
类 | 说明 |
lock_guard | 实现严格基于作用域的互斥量所有权 |
unique_lock | 实现可移动的互斥量所有权 |
shared_lock | 实现可移动的共享互斥量所有权 |
scoped_lock | 用于多个互斥量的免死锁 |
锁定策略 | 说明 |
defer_lock | 不获得互斥量的所有权 |
try_to_lock | 尝试获得互斥量所有权但是不阻塞 |
adopt_lock | 调用方已拥有互斥量所有权 |
使用这些方法做个小例子
#include<iostream> #include<thread> #include<mutex> using namespace std; int v=0; std::mutex v_mutex; void incre(){ std::lock_guard<std::mutex> lock(v_mutex); ++v; std::cout<<std::this_thread::get_id()<<":"<<v<<std::endl; } int main(){ std::cout<<"origin v:"<<v<<std::endl; std::thread t1(incre); std::thread t2(incre); t1.join(); t2.join(); std::cout<<"done v:"<<v<<std::endl; } 复制代码
使用lock_guard自动为互斥量加锁与解锁,在构造的时候就自动加锁,在生命周期结束的时候自动解锁.
对于上面的几种锁,如果要锁住多个互斥量可以这样写
- lock_guard
lock(mutex1,mutex2); lock_guard lock1(mutex1,adopt_lock); lock_guard lock2(mutex2,adopt_lock); 复制代码
- unique_lock
unique_lock lock1(mutex1,defer_lock); unique_lock lock2(mutex2,defer_lock); lock(mutex1,mutex2); 复制代码
- scoped_lock
scoped_lock lockAll(mutex1,mutex2); 复制代码
条件变量
更常见的情况是当满足某个条件时才继续执行,否则就等待.这个时候就需要使用条件变量condition_variable
,它得配合unique_lock
一起使用.
#include<iostream> #include<thread> #include<mutex> using namespace std; int v=0; static mutex mtx; void change(int change_v){ std::unique_lock<std::mutex> lock(mtx); v=change_v; cout<<v<<endl; lock.unlock(); lock.lock(); v+=1; cout<<v<<endl; } int main(){ std::thread t1(change,2),t2(change,5); t1.join(); t2.join(); return 0; }
配合上条件变量我们就可以写一个生产者和消费者的模型
#include<iostream> #include<queue> #include<chrono> #include<mutex> #include<thread> #include<condition_variable> using namespace std; int main(){ queue<int> produced; mutex mtx; condition_variable cond; bool notified=false; auto producer=[&](){ for(int i=0;;i++){ std::this_thread::sleep_for(std::chrono::milliseconds(900)); std::unique_lock<std::mutex> lock(mtx); //生产 cout<<"Producing:"<<i<<endl; produced.push(i); notified=true; cond.notify_all(); } }; auto consumer=[&](){ while(true){ std::unique_lock<std::mutex> lock(mtx); while(!notified){ cond.wait(lock); } lock.unlock(); //消费 std::this_thread::sleep_for(std::chrono::milliseconds(1000)); lock.lock(); while(!produced.empty()){ cout<<"Consuming..."<<endl; int xxx=produced.front(); xxx+=100; cout<<"xxx="<<xxx<<endl; produced.pop(); } notified=false; } }; std::thread p(producer); std::thread cs[2]; for(int i=0;i<2;++i){ cs[i]=std::thread(consumer); } p.join(); for(int i=0;i<2;++i){ cs[i].join(); } return 0; } 复制代码
当然这里也会发现一些问题,比如我们消费的耗时太久那就会导致生产队列一直增长,内存占用越来越高
原子操作
我们除了锁,还可以使用原子操作来防止并发读写问题.可以用atomic
来实例化一些原子操作对象,并且很多运算符都经过重载过方便使用.
异步
async
异步可以让耗时的操作不影响当前主进程的执行,而是单独的启动一个新的线程来运行任务
#include<iostream> #include<future> #include<thread> #include<mutex> #include<cmath> using namespace std; static const int MAX=10e8; static double sum=0; void worker(int min,int max){ cout<<"The Thread ID is:"<<std::this_thread::get_id()<<endl; for(int i=min;i<max;i++){ sum+=sqrt(i); } } int main(){ sum=0; cout<<"The Thread ID is:"<<std::this_thread::get_id()<<endl; auto f1=async(launch::async,worker,0,MAX); //auto f1=async(worker,0,MAX); cout<<"Async trigger"<<endl; f1.wait(); cout<<"Async Finish!!! result="<<sum<<endl; } 复制代码
future
上面说异步是我们希望开辟一个线程去执行某操作,但是我们并不想停下来等它执行结束,而是未来某个时间能得到结果.有了future
之后就可以很简单地获得异步任务结果.
#include<iostream> #include<future> #include<thread> using namespace std; int main(){ std::packaged_task<int()>task([](){return 111;}); std::future<int> result=task.get_future(); std::thread(std::move(task)).detach(); cout<<"Waiting ..."<<endl; result.wait(); cout<<"Done!!!"<<endl<<"Result is: "<<result.get()<<endl; return 0; } 复制代码
这里用一个线程去得到返回值,然后当我们需要的时候使用get
方法就可以得到结果
当然还有一些更进阶的使用方式,当我们需要某个函数计算结果并且将任务结束和结果返回分离开这个时候还需要使用promise
搭配future
使用
#include<iostream> #include<future> #include<thread> #include<cmath> #include<vector> using namespace std; static int MAX=10e7; double concurrent_worker(int min, int max) { double sum = 0; for (int i = min; i <= max; i++) { sum += sqrt(i); } return sum; } void concurrent_task(int min, int max, promise<double>* result) { vector<future<double>> results; unsigned concurrent_count = thread::hardware_concurrency(); min = 0; for (int i = 0; i < concurrent_count; i++) { packaged_task<double(int, int)> task(concurrent_worker); results.push_back(task.get_future()); int range = max / concurrent_count * (i + 1); thread t(std::move(task), min, range); t.detach(); min = range + 1; } cout << "threads create finish" << endl; double sum = 0; for (auto& r : results) { sum += r.get(); } result->set_value(sum); cout << "concurrent_task finish" << endl; } int main() { auto start_time = chrono::steady_clock::now(); promise<double> sum; concurrent_task(0, MAX, &sum); auto end_time = chrono::steady_clock::now(); auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count(); cout << "Concurrent task finish, " << ms << " ms consumed." << endl; cout << "Result: " << sum.get_future().get() << endl; return 0; }