C++11线程安全队列和安全栈

简介: C++11线程安全队列和安全栈
文章代码取自C++11并发编程指南,记录于此方便日后查看
#include "stdafx.h"
#include <thread>
#include <iostream>
#include <vector>
#include <algorithm>
#include <map>
#include <mutex>
#include <stack>
#include <string>
#include <exception>
#include <memory> // For std::shared_ptr<>
#include <queue>
#include <condition_variable>
#include <atomic>
using namespace std;
struct empty_stack : std::exception
{
  const char* what() const throw() {
    return "empty stack!";
  };
};
// 线程安全的栈 
template<typename T = int>
class threadsafe_stack
{
public:
  threadsafe_stack(){}
  threadsafe_stack(const threadsafe_stack& other)
  {
    // 在构造函数体中的执行拷贝
    std::lock_guard<std::mutex> lock(other.m);
    data = other.data; 
  }
  // 删除赋值运算符
  threadsafe_stack& operator=(const threadsafe_stack&) = delete;
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lock(m);
    data.push(new_value);
  } 
        // 如果为空则抛出empty_stack异常
  // front和pop的功能
  std::shared_ptr<T> pop()
  {
    std::lock_guard<std::mutex> lock(m);
    // 在调用pop前,检查栈是否为空
    if (data.empty()) throw empty_stack(); 
    // 在修改堆栈前,分配出返回值
    std::shared_ptr<T> const res(std::make_shared<T>(data.top())); 
    data.pop();
    return res;
  } 
  void pop(T& value)
  {
    std::lock_guard<std::mutex> lock(m);
    if (data.empty()) throw empty_stack();
    value = data.top();
    data.pop();
  }
  bool empty() const
  {
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
  }
private:
  std::stack<T> data;
  mutable std::mutex m;
};
template<typename T = int>
class threadsafe_queue
{
public:
  threadsafe_queue(){}
  threadsafe_queue(threadsafe_queue const& other)
  {
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue = other.data_queue;
  } 
  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }
  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this]{return !data_queue.empty(); });
    value = data_queue.front();
    data_queue.pop();
  } 
        std::shared_ptr<T> wait_and_pop()
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this]{return !data_queue.empty(); });
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }
  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return false;
    value = data_queue.front();
    data_queue.pop();
    return true;
  }
       std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if (data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  } 
  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
  int size() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.size();
  }
private:
  mutable std::mutex mut; // 1 互斥量必须是可变的
  std::queue<T> data_queue;
  std::condition_variable data_cond;
};
threadsafe_stack<int> MyStack;
threadsafe_queue<std::string> MyQueue;
atomic<bool> MyQueueFlag = false;
void PushStackThread(int StartNum/*起始号码*/, int Sum/*个数*/)
{
  for (int index = 0; index < Sum; ++index)
  {
    MyStack.push(StartNum++);
  }
}
void PopStackThread()
{
  while (!MyStack.empty())
  {
    try
    {
      shared_ptr<int> re = MyStack.pop();
      cout << *re << endl;
    }
    catch (empty_stack &ex)
    {
      cout << ex.what() << endl;
    }
  }
  cout << "end" << endl;
}
void Add(int &elem)
{
  elem = elem + 1000;
}
void ReadQueue()
{
  while (MyQueueFlag)
  {
    std::string Value;
    if (MyQueue.try_pop(Value))
    {
      cout << "try_pop value is " << Value << endl;
    }
    else
    {
      cout << "try_pop failed!" << endl;
      std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
//      shared_ptr<std::string> pValue = MyQueue.try_pop();
//      if (pValue)
//      {
//        cout << "try_pop shared_ptr value is " << pValue << endl;
//      }
//    auto pValueWait = MyQueue.wait_and_pop();
//    if (!pValueWait)
//    {
//      cout << "wait_and_pop shared_ptr value is " << pValueWait << endl;
//    }
//    std::string WaitValue;
//    MyQueue.wait_and_pop(WaitValue);
//    if (!WaitValue.empty())
//    {
//      cout << "wait_and_pop value is " << WaitValue << endl;
//    }
  } 
}
void WriteQueue()
{
  int Cnt = 0;
  while (MyQueueFlag)
  {
    if (MyQueue.size() > 1000)
    {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      continue;
    }
    char Value[64] = { 0 };
    sprintf_s(Value, 64, "%d", ++Cnt);
    MyQueue.push(std::string(Value));
  }
}
int _tmain(int argc, _TCHAR* argv[])
{
  bool IsEmptyMyStack = MyStack.empty();
#if 0 // 单线程
  for (int index = 0; index < 10; ++index)
  {
    MyStack.push(index + 1);
  }
  for (int index = 0; index < 10; ++index)
  {
    shared_ptr<int> pValue = MyStack.pop();
    cout << *pValue << endl;
  }
//#else // 多线程操作
  vector<std::thread> MyThreads;
  for (int index = 0; index < 10; ++index)
  {
    MyThreads.push_back(std::thread(PushStackThread, index * 10, 10));
  }
  // 等待所有线程结束
  for_each(MyThreads.begin(), MyThreads.end(), std::mem_fn(&std::thread::detach));
  // 至此堆栈中已经添加了100个数据
  vector<std::thread> MyPopThreads;
  for (int index = 0; index < 10; ++index)
  {
    MyPopThreads.push_back(std::thread(PopStackThread));
  }
  for_each(MyPopThreads.begin(), MyPopThreads.end(), std::mem_fn(&std::thread::join));
#endif
#if 0
  vector<int> MyAdd;
  for (int index = 0; index < 100; ++index)
  {
    MyAdd.push_back(index);
  }
  for_each(MyAdd.begin(), MyAdd.end(), Add);
#endif
  MyQueueFlag = true;
  std::thread t(WriteQueue);
  vector<std::thread> MyThreads;
  for (int index = 0; index < 100; index++)
  {
    MyThreads.push_back(std::thread(ReadQueue));
  }
  std::this_thread::sleep_for(std::chrono::minutes(1));
  MyQueueFlag = false;
  t.join();
  for_each(MyThreads.begin(), MyThreads.end(), mem_fn(&std::thread::join));
  return 0;
}
相关文章
|
1月前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
47 7
|
1月前
|
消息中间件 存储 安全
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
58 1
C++ 多线程之初识多线程
|
2月前
|
存储 并行计算 安全
C++多线程应用
【10月更文挑战第29天】C++ 中的多线程应用广泛,常见场景包括并行计算、网络编程中的并发服务器和图形用户界面(GUI)应用。通过多线程可以显著提升计算速度和响应能力。示例代码展示了如何使用 `pthread` 库创建和管理线程。注意事项包括数据同步与互斥、线程间通信和线程安全的类设计,以确保程序的正确性和稳定性。
|
25天前
|
存储 设计模式 C++
【C++】优先级队列(容器适配器)
本文介绍了C++ STL中的线性容器及其适配器,包括栈、队列和优先队列的设计与实现。详细解析了`deque`的特点和存储结构,以及如何利用`deque`实现栈、队列和优先队列。通过自定义命名空间和类模板,展示了如何模拟实现这些容器适配器,重点讲解了优先队列的内部机制,如堆的构建与维护方法。
32 0
|
2月前
|
存储 前端开发 C++
C++ 多线程之带返回值的线程处理函数
这篇文章介绍了在C++中使用`async`函数、`packaged_task`和`promise`三种方法来创建带返回值的线程处理函数。
79 6
|
2月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
84 5
|
2月前
|
C++
C++ 多线程之线程管理函数
这篇文章介绍了C++中多线程编程的几个关键函数,包括获取线程ID的`get_id()`,延时函数`sleep_for()`,线程让步函数`yield()`,以及阻塞线程直到指定时间的`sleep_until()`。
37 0
C++ 多线程之线程管理函数
|
2月前
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
33 0
Linux C/C++之线程基础
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
27 3