文章代码取自C++11并发编程指南,记录于此方便日后查看 #include "stdafx.h" #include <thread> #include <iostream> #include <vector> #include <algorithm> #include <map> #include <mutex> #include <stack> #include <string> #include <exception> #include <memory> // For std::shared_ptr<> #include <queue> #include <condition_variable> #include <atomic> using namespace std; struct empty_stack : std::exception { const char* what() const throw() { return "empty stack!"; }; }; // 线程安全的栈 template<typename T = int> class threadsafe_stack { public: threadsafe_stack(){} threadsafe_stack(const threadsafe_stack& other) { // 在构造函数体中的执行拷贝 std::lock_guard<std::mutex> lock(other.m); data = other.data; } // 删除赋值运算符 threadsafe_stack& operator=(const threadsafe_stack&) = delete; void push(T new_value) { std::lock_guard<std::mutex> lock(m); data.push(new_value); } // 如果为空则抛出empty_stack异常 // front和pop的功能 std::shared_ptr<T> pop() { std::lock_guard<std::mutex> lock(m); // 在调用pop前,检查栈是否为空 if (data.empty()) throw empty_stack(); // 在修改堆栈前,分配出返回值 std::shared_ptr<T> const res(std::make_shared<T>(data.top())); data.pop(); return res; } void pop(T& value) { std::lock_guard<std::mutex> lock(m); if (data.empty()) throw empty_stack(); value = data.top(); data.pop(); } bool empty() const { std::lock_guard<std::mutex> lock(m); return data.empty(); } private: std::stack<T> data; mutable std::mutex m; }; template<typename T = int> class threadsafe_queue { public: threadsafe_queue(){} threadsafe_queue(threadsafe_queue const& other) { std::lock_guard<std::mutex> lk(other.mut); data_queue = other.data_queue; } void push(T new_value) { std::lock_guard<std::mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this]{return !data_queue.empty(); }); value = data_queue.front(); data_queue.pop(); } std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this]{return !data_queue.empty(); }); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool try_pop(T& value) { std::lock_guard<std::mutex> lk(mut); if (data_queue.empty()) return false; value = data_queue.front(); data_queue.pop(); return true; } std::shared_ptr<T> try_pop() { std::lock_guard<std::mutex> lk(mut); if (data_queue.empty()) return std::shared_ptr<T>(); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool empty() const { std::lock_guard<std::mutex> lk(mut); return data_queue.empty(); } int size() const { std::lock_guard<std::mutex> lk(mut); return data_queue.size(); } private: mutable std::mutex mut; // 1 互斥量必须是可变的 std::queue<T> data_queue; std::condition_variable data_cond; }; threadsafe_stack<int> MyStack; threadsafe_queue<std::string> MyQueue; atomic<bool> MyQueueFlag = false; void PushStackThread(int StartNum/*起始号码*/, int Sum/*个数*/) { for (int index = 0; index < Sum; ++index) { MyStack.push(StartNum++); } } void PopStackThread() { while (!MyStack.empty()) { try { shared_ptr<int> re = MyStack.pop(); cout << *re << endl; } catch (empty_stack &ex) { cout << ex.what() << endl; } } cout << "end" << endl; } void Add(int &elem) { elem = elem + 1000; } void ReadQueue() { while (MyQueueFlag) { std::string Value; if (MyQueue.try_pop(Value)) { cout << "try_pop value is " << Value << endl; } else { cout << "try_pop failed!" << endl; std::this_thread::sleep_for(std::chrono::milliseconds(50)); } // shared_ptr<std::string> pValue = MyQueue.try_pop(); // if (pValue) // { // cout << "try_pop shared_ptr value is " << pValue << endl; // } // auto pValueWait = MyQueue.wait_and_pop(); // if (!pValueWait) // { // cout << "wait_and_pop shared_ptr value is " << pValueWait << endl; // } // std::string WaitValue; // MyQueue.wait_and_pop(WaitValue); // if (!WaitValue.empty()) // { // cout << "wait_and_pop value is " << WaitValue << endl; // } } } void WriteQueue() { int Cnt = 0; while (MyQueueFlag) { if (MyQueue.size() > 1000) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } char Value[64] = { 0 }; sprintf_s(Value, 64, "%d", ++Cnt); MyQueue.push(std::string(Value)); } } int _tmain(int argc, _TCHAR* argv[]) { bool IsEmptyMyStack = MyStack.empty(); #if 0 // 单线程 for (int index = 0; index < 10; ++index) { MyStack.push(index + 1); } for (int index = 0; index < 10; ++index) { shared_ptr<int> pValue = MyStack.pop(); cout << *pValue << endl; } //#else // 多线程操作 vector<std::thread> MyThreads; for (int index = 0; index < 10; ++index) { MyThreads.push_back(std::thread(PushStackThread, index * 10, 10)); } // 等待所有线程结束 for_each(MyThreads.begin(), MyThreads.end(), std::mem_fn(&std::thread::detach)); // 至此堆栈中已经添加了100个数据 vector<std::thread> MyPopThreads; for (int index = 0; index < 10; ++index) { MyPopThreads.push_back(std::thread(PopStackThread)); } for_each(MyPopThreads.begin(), MyPopThreads.end(), std::mem_fn(&std::thread::join)); #endif #if 0 vector<int> MyAdd; for (int index = 0; index < 100; ++index) { MyAdd.push_back(index); } for_each(MyAdd.begin(), MyAdd.end(), Add); #endif MyQueueFlag = true; std::thread t(WriteQueue); vector<std::thread> MyThreads; for (int index = 0; index < 100; index++) { MyThreads.push_back(std::thread(ReadQueue)); } std::this_thread::sleep_for(std::chrono::minutes(1)); MyQueueFlag = false; t.join(); for_each(MyThreads.begin(), MyThreads.end(), mem_fn(&std::thread::join)); return 0; }