C++11线程安全队列和安全栈

简介: C++11线程安全队列和安全栈
文章代码取自C++11并发编程指南,记录于此方便日后查看
#include "stdafx.h"
#include <thread>
#include <iostream>
#include <vector>
#include <algorithm>
#include <map>
#include <mutex>
#include <stack>
#include <string>
#include <exception>
#include <memory> // For std::shared_ptr<>
#include <queue>
#include <condition_variable>
#include <atomic>
using namespace std;
struct empty_stack : std::exception
{
  const char* what() const throw() {
    return "empty stack!";
  };
};
// 线程安全的栈 
template<typename T = int>
class threadsafe_stack
{
public:
  threadsafe_stack(){}
  threadsafe_stack(const threadsafe_stack& other)
  {
    // 在构造函数体中的执行拷贝
    std::lock_guard<std::mutex> lock(other.m);
    data = other.data; 
  }
  // 删除赋值运算符
  threadsafe_stack& operator=(const threadsafe_stack&) = delete;
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lock(m);
    data.push(new_value);
  } 
        // 如果为空则抛出empty_stack异常
  // front和pop的功能
  std::shared_ptr<T> pop()
  {
    std::lock_guard<std::mutex> lock(m);
    // 在调用pop前,检查栈是否为空
    if (data.empty()) throw empty_stack(); 
    // 在修改堆栈前,分配出返回值
    std::shared_ptr<T> const res(std::make_shared<T>(data.top())); 
    data.pop();
    return res;
  } 
  void pop(T& value)
  {
    std::lock_guard<std::mutex> lock(m);
    if (data.empty()) throw empty_stack();
    value = data.top();
    data.pop();
  }
  bool empty() const
  {
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
  }
private:
  std::stack<T> data;
  mutable std::mutex m;
};
template<typename T = int>
class threadsafe_queue
{
public:
  threadsafe_queue(){}
  threadsafe_queue(threadsafe_queue const& other)
  {
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue = other.data_queue;
  } 
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }
  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this]{return !data_queue.empty(); });
    value = data_queue.front();
    data_queue.pop();
  } 
        std::shared_ptr<T> wait_and_pop()
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this]{return !data_queue.empty(); });
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }
  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return false;
    value = data_queue.front();
    data_queue.pop();
    return true;
  }
       std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  } 
  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
  int size() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.size();
  }
private:
  mutable std::mutex mut; // 1 互斥量必须是可变的
  std::queue<T> data_queue;
  std::condition_variable data_cond;
};
threadsafe_stack<int> MyStack;
threadsafe_queue<std::string> MyQueue;
atomic<bool> MyQueueFlag = false;
void PushStackThread(int StartNum/*起始号码*/, int Sum/*个数*/)
{
  for (int index = 0; index < Sum; ++index)
  {
    MyStack.push(StartNum++);
  }
}
void PopStackThread()
{
  while (!MyStack.empty())
  {
    try
    {
      shared_ptr<int> re = MyStack.pop();
      cout << *re << endl;
    }
    catch (empty_stack &ex)
    {
      cout << ex.what() << endl;
    }
  }
  cout << "end" << endl;
}
void Add(int &elem)
{
  elem = elem + 1000;
}
void ReadQueue()
{
  while (MyQueueFlag)
  {
    std::string Value;
    if (MyQueue.try_pop(Value))
    {
      cout << "try_pop value is " << Value << endl;
    }
    else
    {
      cout << "try_pop failed!" << endl;
      std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
//      shared_ptr<std::string> pValue = MyQueue.try_pop();
//      if (pValue)
//      {
//        cout << "try_pop shared_ptr value is " << pValue << endl;
//      }
//    auto pValueWait = MyQueue.wait_and_pop();
//    if (!pValueWait)
//    {
//      cout << "wait_and_pop shared_ptr value is " << pValueWait << endl;
//    }
//    std::string WaitValue;
//    MyQueue.wait_and_pop(WaitValue);
//    if (!WaitValue.empty())
//    {
//      cout << "wait_and_pop value is " << WaitValue << endl;
//    }
  } 
}
void WriteQueue()
{
  int Cnt = 0;
  while (MyQueueFlag)
  {
    if (MyQueue.size() > 1000)
    {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      continue;
    }
    char Value[64] = { 0 };
    sprintf_s(Value, 64, "%d", ++Cnt);
    MyQueue.push(std::string(Value));
  }
}
int _tmain(int argc, _TCHAR* argv[])
{
  bool IsEmptyMyStack = MyStack.empty();
#if 0 // 单线程
  for (int index = 0; index < 10; ++index)
  {
    MyStack.push(index + 1);
  }
  for (int index = 0; index < 10; ++index)
  {
    shared_ptr<int> pValue = MyStack.pop();
    cout << *pValue << endl;
  }
//#else // 多线程操作
  vector<std::thread> MyThreads;
  for (int index = 0; index < 10; ++index)
  {
    MyThreads.push_back(std::thread(PushStackThread, index * 10, 10));
  }
  // 等待所有线程结束
  for_each(MyThreads.begin(), MyThreads.end(), std::mem_fn(&std::thread::detach));
  // 至此堆栈中已经添加了100个数据
  vector<std::thread> MyPopThreads;
  for (int index = 0; index < 10; ++index)
  {
    MyPopThreads.push_back(std::thread(PopStackThread));
  }
  for_each(MyPopThreads.begin(), MyPopThreads.end(), std::mem_fn(&std::thread::join));
#endif
#if 0
  vector<int> MyAdd;
  for (int index = 0; index < 100; ++index)
  {
    MyAdd.push_back(index);
  }
  for_each(MyAdd.begin(), MyAdd.end(), Add);
#endif
  MyQueueFlag = true;
  std::thread t(WriteQueue);
  vector<std::thread> MyThreads;
  for (int index = 0; index < 100; index++)
  {
    MyThreads.push_back(std::thread(ReadQueue));
  }
  std::this_thread::sleep_for(std::chrono::minutes(1));
  MyQueueFlag = false;
  t.join();
  for_each(MyThreads.begin(), MyThreads.end(), mem_fn(&std::thread::join));
  return 0;
}
相关文章
|
2月前
|
存储 监控 Java
|
2月前
|
Java
【Java集合类面试十二】、HashMap为什么线程不安全?
HashMap在并发环境下执行put操作可能导致循环链表的形成,进而引起死循环,因而它是线程不安全的。
|
2月前
|
安全 算法 Java
【Java集合类面试二】、 Java中的容器,线程安全和线程不安全的分别有哪些?
这篇文章讨论了Java集合类的线程安全性,列举了线程不安全的集合类(如HashSet、ArrayList、HashMap)和线程安全的集合类(如Vector、Hashtable),同时介绍了Java 5之后提供的java.util.concurrent包中的高效并发集合类,如ConcurrentHashMap和CopyOnWriteArrayList。
【Java集合类面试二】、 Java中的容器,线程安全和线程不安全的分别有哪些?
|
2月前
|
Java 调度
基于C++11的线程池
基于C++11的线程池
|
2月前
|
数据采集 Java Python
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
|
2月前
|
存储 安全 Java
解锁Java并发编程奥秘:深入剖析Synchronized关键字的同步机制与实现原理,让多线程安全如磐石般稳固!
【8月更文挑战第4天】Java并发编程中,Synchronized关键字是确保多线程环境下数据一致性与线程安全的基础机制。它可通过修饰实例方法、静态方法或代码块来控制对共享资源的独占访问。Synchronized基于Java对象头中的监视器锁实现,通过MonitorEnter/MonitorExit指令管理锁的获取与释放。示例展示了如何使用Synchronized修饰方法以实现线程间的同步,避免数据竞争。掌握其原理对编写高效安全的多线程程序极为关键。
53 1
|
3月前
|
算法 编译器 C++
开发与运维线程问题之在C++的原子操作中memory_order如何解决
开发与运维线程问题之在C++的原子操作中memory_order如何解决
35 2
|
3月前
|
缓存 安全 Java
多线程线程池问题之为什么手动创建的线程池比使用Executors类提供的线程池更安全
多线程线程池问题之为什么手动创建的线程池比使用Executors类提供的线程池更安全
|
2月前
|
Dart 编译器 API
Dart ffi 使用问题之在C++线程中无法直接调用Dart函数的问题如何解决
Dart ffi 使用问题之在C++线程中无法直接调用Dart函数的问题如何解决
|
2月前
|
Dart API C语言
Dart ffi 使用问题之想在C/C++中创建异步线程来调用Dart方法,如何操作
Dart ffi 使用问题之想在C/C++中创建异步线程来调用Dart方法,如何操作