C++11线程安全队列和安全栈

简介: C++11线程安全队列和安全栈
文章代码取自C++11并发编程指南,记录于此方便日后查看
#include "stdafx.h"
#include <thread>
#include <iostream>
#include <vector>
#include <algorithm>
#include <map>
#include <mutex>
#include <stack>
#include <string>
#include <exception>
#include <memory> // For std::shared_ptr<>
#include <queue>
#include <condition_variable>
#include <atomic>
using namespace std;
struct empty_stack : std::exception
{
  const char* what() const throw() {
    return "empty stack!";
  };
};
// 线程安全的栈 
template<typename T = int>
class threadsafe_stack
{
public:
  threadsafe_stack(){}
  threadsafe_stack(const threadsafe_stack& other)
  {
    // 在构造函数体中的执行拷贝
    std::lock_guard<std::mutex> lock(other.m);
    data = other.data; 
  }
  // 删除赋值运算符
  threadsafe_stack& operator=(const threadsafe_stack&) = delete;
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lock(m);
    data.push(new_value);
  } 
        // 如果为空则抛出empty_stack异常
  // front和pop的功能
  std::shared_ptr<T> pop()
  {
    std::lock_guard<std::mutex> lock(m);
    // 在调用pop前,检查栈是否为空
    if (data.empty()) throw empty_stack(); 
    // 在修改堆栈前,分配出返回值
    std::shared_ptr<T> const res(std::make_shared<T>(data.top())); 
    data.pop();
    return res;
  } 
  void pop(T& value)
  {
    std::lock_guard<std::mutex> lock(m);
    if (data.empty()) throw empty_stack();
    value = data.top();
    data.pop();
  }
  bool empty() const
  {
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
  }
private:
  std::stack<T> data;
  mutable std::mutex m;
};
template<typename T = int>
class threadsafe_queue
{
public:
  threadsafe_queue(){}
  threadsafe_queue(threadsafe_queue const& other)
  {
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue = other.data_queue;
  } 
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }
  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this]{return !data_queue.empty(); });
    value = data_queue.front();
    data_queue.pop();
  } 
        std::shared_ptr<T> wait_and_pop()
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this]{return !data_queue.empty(); });
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }
  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return false;
    value = data_queue.front();
    data_queue.pop();
    return true;
  }
       std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  } 
  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
  int size() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.size();
  }
private:
  mutable std::mutex mut; // 1 互斥量必须是可变的
  std::queue<T> data_queue;
  std::condition_variable data_cond;
};
threadsafe_stack<int> MyStack;
threadsafe_queue<std::string> MyQueue;
atomic<bool> MyQueueFlag = false;
void PushStackThread(int StartNum/*起始号码*/, int Sum/*个数*/)
{
  for (int index = 0; index < Sum; ++index)
  {
    MyStack.push(StartNum++);
  }
}
void PopStackThread()
{
  while (!MyStack.empty())
  {
    try
    {
      shared_ptr<int> re = MyStack.pop();
      cout << *re << endl;
    }
    catch (empty_stack &ex)
    {
      cout << ex.what() << endl;
    }
  }
  cout << "end" << endl;
}
void Add(int &elem)
{
  elem = elem + 1000;
}
void ReadQueue()
{
  while (MyQueueFlag)
  {
    std::string Value;
    if (MyQueue.try_pop(Value))
    {
      cout << "try_pop value is " << Value << endl;
    }
    else
    {
      cout << "try_pop failed!" << endl;
      std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
//      shared_ptr<std::string> pValue = MyQueue.try_pop();
//      if (pValue)
//      {
//        cout << "try_pop shared_ptr value is " << pValue << endl;
//      }
//    auto pValueWait = MyQueue.wait_and_pop();
//    if (!pValueWait)
//    {
//      cout << "wait_and_pop shared_ptr value is " << pValueWait << endl;
//    }
//    std::string WaitValue;
//    MyQueue.wait_and_pop(WaitValue);
//    if (!WaitValue.empty())
//    {
//      cout << "wait_and_pop value is " << WaitValue << endl;
//    }
  } 
}
void WriteQueue()
{
  int Cnt = 0;
  while (MyQueueFlag)
  {
    if (MyQueue.size() > 1000)
    {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      continue;
    }
    char Value[64] = { 0 };
    sprintf_s(Value, 64, "%d", ++Cnt);
    MyQueue.push(std::string(Value));
  }
}
int _tmain(int argc, _TCHAR* argv[])
{
  bool IsEmptyMyStack = MyStack.empty();
#if 0 // 单线程
  for (int index = 0; index < 10; ++index)
  {
    MyStack.push(index + 1);
  }
  for (int index = 0; index < 10; ++index)
  {
    shared_ptr<int> pValue = MyStack.pop();
    cout << *pValue << endl;
  }
//#else // 多线程操作
  vector<std::thread> MyThreads;
  for (int index = 0; index < 10; ++index)
  {
    MyThreads.push_back(std::thread(PushStackThread, index * 10, 10));
  }
  // 等待所有线程结束
  for_each(MyThreads.begin(), MyThreads.end(), std::mem_fn(&std::thread::detach));
  // 至此堆栈中已经添加了100个数据
  vector<std::thread> MyPopThreads;
  for (int index = 0; index < 10; ++index)
  {
    MyPopThreads.push_back(std::thread(PopStackThread));
  }
  for_each(MyPopThreads.begin(), MyPopThreads.end(), std::mem_fn(&std::thread::join));
#endif
#if 0
  vector<int> MyAdd;
  for (int index = 0; index < 100; ++index)
  {
    MyAdd.push_back(index);
  }
  for_each(MyAdd.begin(), MyAdd.end(), Add);
#endif
  MyQueueFlag = true;
  std::thread t(WriteQueue);
  vector<std::thread> MyThreads;
  for (int index = 0; index < 100; index++)
  {
    MyThreads.push_back(std::thread(ReadQueue));
  }
  std::this_thread::sleep_for(std::chrono::minutes(1));
  MyQueueFlag = false;
  t.join();
  for_each(MyThreads.begin(), MyThreads.end(), mem_fn(&std::thread::join));
  return 0;
}
目录
打赏
0
0
0
0
1
分享
相关文章
|
3月前
|
【C++数据结构——栈与队列】顺序栈的基本运算(头歌实践教学平台习题)【合集】
本关任务:编写一个程序实现顺序栈的基本运算。开始你的任务吧,祝你成功!​ 相关知识 初始化栈 销毁栈 判断栈是否为空 进栈 出栈 取栈顶元素 1.初始化栈 概念:初始化栈是为栈的使用做准备,包括分配内存空间(如果是动态分配)和设置栈的初始状态。栈有顺序栈和链式栈两种常见形式。对于顺序栈,通常需要定义一个数组来存储栈元素,并设置一个变量来记录栈顶位置;对于链式栈,需要定义节点结构,包含数据域和指针域,同时初始化栈顶指针。 示例(顺序栈): 以下是一个简单的顺序栈初始化示例,假设用C语言实现,栈中存储
178 77
【c++丨STL】priority_queue(优先级队列)的使用与模拟实现
本文介绍了STL中的容器适配器`priority_queue`(优先级队列)。`priority_queue`根据严格的弱排序标准设计,确保其第一个元素始终是最大元素。它底层使用堆结构实现,支持大堆和小堆,默认为大堆。常用操作包括构造函数、`empty`、`size`、`top`、`push`、`pop`和`swap`等。我们还模拟实现了`priority_queue`,通过仿函数控制堆的类型,并调用封装容器的接口实现功能。最后,感谢大家的支持与关注。
79 1
|
3月前
|
【C++数据结构——栈与队列】环形队列的基本运算(头歌实践教学平台习题)【合集】
【数据结构——栈与队列】环形队列的基本运算(头歌实践教学平台习题)【合集】初始化队列、销毁队列、判断队列是否为空、进队列、出队列等。本关任务:编写一个程序实现环形队列的基本运算。(6)出队列序列:yzopq2*(5)依次进队列元素:opq2*(6)出队列序列:bcdef。(2)依次进队列元素:abc。(5)依次进队列元素:def。(2)依次进队列元素:xyz。开始你的任务吧,祝你成功!(4)出队一个元素a。(4)出队一个元素x。
84 13
【C++数据结构——栈与队列】环形队列的基本运算(头歌实践教学平台习题)【合集】
JAVA线程池有哪些队列? 以及它们的适用场景案例
不同的线程池队列有着各自的特点和适用场景,在实际使用线程池时,需要根据具体的业务需求、系统资源状况以及对任务执行顺序、响应时间等方面的要求,合理选择相应的队列来构建线程池,以实现高效的任务处理。
156 12
|
3月前
|
【C++数据结构——栈与队列】链栈的基本运算(头歌实践教学平台习题)【合集】
本关任务:编写一个程序实现链栈的基本运算。开始你的任务吧,祝你成功!​ 相关知识 初始化栈 销毁栈 判断栈是否为空 进栈 出栈 取栈顶元素 初始化栈 概念:初始化栈是为栈的使用做准备,包括分配内存空间(如果是动态分配)和设置栈的初始状态。栈有顺序栈和链式栈两种常见形式。对于顺序栈,通常需要定义一个数组来存储栈元素,并设置一个变量来记录栈顶位置;对于链式栈,需要定义节点结构,包含数据域和指针域,同时初始化栈顶指针。 示例(顺序栈): 以下是一个简单的顺序栈初始化示例,假设用C语言实现,栈中存储整数,最大
63 9
|
3月前
|
C++
【C++数据结构——栈和队列】括号配对(头歌实践教学平台习题)【合集】
【数据结构——栈和队列】括号配对(头歌实践教学平台习题)【合集】(1)遇到左括号:进栈Push()(2)遇到右括号:若栈顶元素为左括号,则出栈Pop();否则返回false。(3)当遍历表达式结束,且栈为空时,则返回true,否则返回false。本关任务:编写一个程序利用栈判断左、右圆括号是否配对。为了完成本关任务,你需要掌握:栈对括号的处理。(1)遇到左括号:进栈Push()开始你的任务吧,祝你成功!测试输入:(()))
64 7
【JaveEE】——多线程中使用顺序表,队列,哈希表
多线程环境下使用ArrayList(同步机制,写时拷贝),使用队列,哈希表(高频)ConcurrentHashMap(缩小锁粒度,CAS,扩容优化)
【JavaEE】——单例模式引起的多线程安全问题:“饿汉/懒汉”模式,及解决思路和方法(面试高频)
单例模式下,“饿汉模式”,“懒汉模式”,单例模式下引起的线程安全问题,解锁思路和解决方法
【JavaEE】——线程的安全问题和解决方式
【JavaEE】——线程的安全问题和解决方式。为什么多线程运行会有安全问题,解决线程安全问题的思路,synchronized关键字的运用,加锁机制,“锁竞争”,几个变式
|
5月前
|
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
281 7