C++11线程安全队列和安全栈

简介: C++11线程安全队列和安全栈
文章代码取自C++11并发编程指南,记录于此方便日后查看
#include "stdafx.h"
#include <thread>
#include <iostream>
#include <vector>
#include <algorithm>
#include <map>
#include <mutex>
#include <stack>
#include <string>
#include <exception>
#include <memory> // For std::shared_ptr<>
#include <queue>
#include <condition_variable>
#include <atomic>
using namespace std;
struct empty_stack : std::exception
{
  const char* what() const throw() {
    return "empty stack!";
  };
};
// 线程安全的栈 
template<typename T = int>
class threadsafe_stack
{
public:
  threadsafe_stack(){}
  threadsafe_stack(const threadsafe_stack& other)
  {
    // 在构造函数体中的执行拷贝
    std::lock_guard<std::mutex> lock(other.m);
    data = other.data; 
  }
  // 删除赋值运算符
  threadsafe_stack& operator=(const threadsafe_stack&) = delete;
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lock(m);
    data.push(new_value);
  } 
        // 如果为空则抛出empty_stack异常
  // front和pop的功能
  std::shared_ptr<T> pop()
  {
    std::lock_guard<std::mutex> lock(m);
    // 在调用pop前,检查栈是否为空
    if (data.empty()) throw empty_stack(); 
    // 在修改堆栈前,分配出返回值
    std::shared_ptr<T> const res(std::make_shared<T>(data.top())); 
    data.pop();
    return res;
  } 
  void pop(T& value)
  {
    std::lock_guard<std::mutex> lock(m);
    if (data.empty()) throw empty_stack();
    value = data.top();
    data.pop();
  }
  bool empty() const
  {
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
  }
private:
  std::stack<T> data;
  mutable std::mutex m;
};
template<typename T = int>
class threadsafe_queue
{
public:
  threadsafe_queue(){}
  threadsafe_queue(threadsafe_queue const& other)
  {
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue = other.data_queue;
  } 
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }
  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this]{return !data_queue.empty(); });
    value = data_queue.front();
    data_queue.pop();
  } 
        std::shared_ptr<T> wait_and_pop()
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this]{return !data_queue.empty(); });
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }
  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return false;
    value = data_queue.front();
    data_queue.pop();
    return true;
  }
       std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  } 
  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
  int size() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.size();
  }
private:
  mutable std::mutex mut; // 1 互斥量必须是可变的
  std::queue<T> data_queue;
  std::condition_variable data_cond;
};
threadsafe_stack<int> MyStack;
threadsafe_queue<std::string> MyQueue;
atomic<bool> MyQueueFlag = false;
void PushStackThread(int StartNum/*起始号码*/, int Sum/*个数*/)
{
  for (int index = 0; index < Sum; ++index)
  {
    MyStack.push(StartNum++);
  }
}
void PopStackThread()
{
  while (!MyStack.empty())
  {
    try
    {
      shared_ptr<int> re = MyStack.pop();
      cout << *re << endl;
    }
    catch (empty_stack &ex)
    {
      cout << ex.what() << endl;
    }
  }
  cout << "end" << endl;
}
void Add(int &elem)
{
  elem = elem + 1000;
}
void ReadQueue()
{
  while (MyQueueFlag)
  {
    std::string Value;
    if (MyQueue.try_pop(Value))
    {
      cout << "try_pop value is " << Value << endl;
    }
    else
    {
      cout << "try_pop failed!" << endl;
      std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
//      shared_ptr<std::string> pValue = MyQueue.try_pop();
//      if (pValue)
//      {
//        cout << "try_pop shared_ptr value is " << pValue << endl;
//      }
//    auto pValueWait = MyQueue.wait_and_pop();
//    if (!pValueWait)
//    {
//      cout << "wait_and_pop shared_ptr value is " << pValueWait << endl;
//    }
//    std::string WaitValue;
//    MyQueue.wait_and_pop(WaitValue);
//    if (!WaitValue.empty())
//    {
//      cout << "wait_and_pop value is " << WaitValue << endl;
//    }
  } 
}
void WriteQueue()
{
  int Cnt = 0;
  while (MyQueueFlag)
  {
    if (MyQueue.size() > 1000)
    {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      continue;
    }
    char Value[64] = { 0 };
    sprintf_s(Value, 64, "%d", ++Cnt);
    MyQueue.push(std::string(Value));
  }
}
int _tmain(int argc, _TCHAR* argv[])
{
  bool IsEmptyMyStack = MyStack.empty();
#if 0 // 单线程
  for (int index = 0; index < 10; ++index)
  {
    MyStack.push(index + 1);
  }
  for (int index = 0; index < 10; ++index)
  {
    shared_ptr<int> pValue = MyStack.pop();
    cout << *pValue << endl;
  }
//#else // 多线程操作
  vector<std::thread> MyThreads;
  for (int index = 0; index < 10; ++index)
  {
    MyThreads.push_back(std::thread(PushStackThread, index * 10, 10));
  }
  // 等待所有线程结束
  for_each(MyThreads.begin(), MyThreads.end(), std::mem_fn(&std::thread::detach));
  // 至此堆栈中已经添加了100个数据
  vector<std::thread> MyPopThreads;
  for (int index = 0; index < 10; ++index)
  {
    MyPopThreads.push_back(std::thread(PopStackThread));
  }
  for_each(MyPopThreads.begin(), MyPopThreads.end(), std::mem_fn(&std::thread::join));
#endif
#if 0
  vector<int> MyAdd;
  for (int index = 0; index < 100; ++index)
  {
    MyAdd.push_back(index);
  }
  for_each(MyAdd.begin(), MyAdd.end(), Add);
#endif
  MyQueueFlag = true;
  std::thread t(WriteQueue);
  vector<std::thread> MyThreads;
  for (int index = 0; index < 100; index++)
  {
    MyThreads.push_back(std::thread(ReadQueue));
  }
  std::this_thread::sleep_for(std::chrono::minutes(1));
  MyQueueFlag = false;
  t.join();
  for_each(MyThreads.begin(), MyThreads.end(), mem_fn(&std::thread::join));
  return 0;
}
相关文章
|
12天前
|
缓存 安全 Java
7张图带你轻松理解Java 线程安全,java缓存机制面试
7张图带你轻松理解Java 线程安全,java缓存机制面试
|
14天前
|
安全 前端开发 程序员
|
14天前
|
存储 设计模式 C语言
C++中的栈和队列
C++中的栈和队列
15 0
|
1天前
|
安全 Java 编译器
多线程问题(二)(安全问题)
多线程问题(二)(安全问题)
9 0
|
4天前
|
算法 C++
c++算法学习笔记 (15) 单调栈与单调队列
c++算法学习笔记 (15) 单调栈与单调队列
|
4天前
|
算法 C++
c++算法学习笔记 (14) 栈与队列
c++算法学习笔记 (14) 栈与队列
|
5天前
|
安全 Go 对象存储
C++多线程编程:并发与同步的实战应用
本文介绍了C++中的多线程编程,包括基础知识和实战应用。C++借助`&lt;thread&gt;`库支持多线程,通过`std::thread`创建线程执行任务。文章探讨了并发与同步的概念,如互斥锁(Mutex)用于保护共享资源,条件变量(Condition Variable)协调线程等待与通知,以及原子操作(Atomic Operations)保证线程安全。实战部分展示了如何使用多线程进行并发计算,利用`std::async`实现异步任务并获取结果。多线程编程能提高效率,但也需注意数据竞争和同步问题,以确保程序的正确性。
|
5天前
|
存储 安全 Java
Java多线程安全风险-Java多线程(2)
Java多线程安全风险-Java多线程(2)
9 1
|
14天前
|
Java Linux 调度
|
14天前
|
安全 C++
C++多线程编程:并发与同步
C++多线程编程:并发与同步
14 0