C++11的半同步半异步线程池

简介: C++11的半同步半异步线程池

简介

半同步半异步线程池用的比较多,实现也比较简单。

其中同步层包括同步服务层和排队层,指的是将接收的任务排队,将所有的任务排队到一个队列中,等待处理;

异步层指多个线程处理任务,异步处理层从同步层取出任务,并发处理任务。

同步队列

同步队列属于同步层的内容,主要作用是保证队列中共享数据线程安全,同时也提供新增任务的接口,以及提供取任务的接口。

这里使用C++11的锁、条件变量、右值引用、std::move和std::forward来实现。

同步队列主要包括三个函数,Take、Add和Stop。

Take函数

这里实现重载了两个Take函数,可支持一次获取多个任务,或者一次获取一个任务。

//可一次性获取多个任务,放在list中,减少互斥锁阻塞时间
  void Take(std::list<T>& list)
  {
    std::unique_lock<std::mutex> locker(m_mutex);
    m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
    if (m_needStop)
    {
      return;
    }
    list = std::move(m_queue);
    m_notFull.notify_one();
  }
  //获取单个任务
  void Take(T& t)
  {
    std::unique_lock<std::mutex> locker(m_mutex);
    m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
    if (m_needStop)
    {
      return;
    }
    t = m_queue.front();
    m_queue.pop_front();
    m_notFull.notify_one();
  }

先创建一个unique *lock 获取 mutex,然后再通过条件变量 m_*notEmpty 来等待判断式。判断式由两个条件组成,一个是停止的标志,另一个是不为空的条件,当不满足任何一个条件时,条件变量会释放 mutex 并将线程置于 waiting 状态,等待其他线程调用 notify_one/notify all 将其唤醒;当满足任何一个条件时,则继续往下执行后面的逻辑,即将队列中的任务取出,并唤醒一个正处于等待状态的添加任务的线程去添加任务。当处于 waiting 状态的线程被 notify_one 或notify all 唤醒时,条件变量会先重新获取 mutex,然后再检查条件是否满足,如果满足,则往下执行,如果不满足,则释放 mutex 继续等待。

Add函数

Add 的过程和 Take 的过程是类似的,也是先获取 mutex,然后检查条件是否满足,不满足条件时,释放 mutex 继续等待,如果满足条件,则将新的任务插入到队列中,并唤醒取任务的线程去取数据。

template<typename F>
  void Add(F &&x)
  {
    std::unique_lock<std::mutex> locker(m_mutex);
    m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
    if (m_needStop)
      return;
    m_queue.emplace_back(std::forward<F>(x));
    m_notEmpty.notify_one();
  }

Stop函数

Stop 函数先获取 mutex,然后将停止标志置为 true。注意,为了保证线程安全,这里需要先获取 mutex,在将其标志置为 true 之后,再唤醒所有等待的线,因为等待的条件是m_needStop,并且满足条件,所以线程会继续往下执行。由于线程在 m_needStop 为 true 时会退出,所以所有的等待线程会相继退出。

另外一个值得注意的地方是,我们把 m notFull.notify_all0放到lock_guard 保护范围之外了,这里也可以将 m_notFull.notify all0)放到ockguard保护范围之内,放到外面是为了做一点优化。因为 notify_one 或 notify_all 会唤醒一个在等待的线程,线程被唤醒后会先获取 mutex 再检查条件是否满足,如果这时被 lock guard保护,被唤醒的线程则需要 lock guard 析构释放 mutex 才能获取(即stop函数执行完了才释放)。如果在 lock_guard 之外notify_one 或notify_all,被唤醒的线程获取锁的时候不需要等待 lock_guard 释放锁,性能会好一点,所以在执行 notify_one或notify_all 时不需要加锁保护。

void Stop()
  {
    {
      std::lock_guard<std::mutex> locker(m_mutex);
      m_needStop = true;
    }
    m_notFull.notify_all();
    m_notEmpty.notify_all();
  }

SyncQueue完整代码

”SyncQueue.h”

同步队列整体代码:

#pragma once
#include <iostream>
#include <list>
#include <mutex>
using namespace std;
template<typename T>
class SyncQueue
{
public:
  SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
  {
  }
  void Put(const T &x)
  {
    Add(x);
  }
  void Put(T &&x)
  {
    Add(std::forward<T>(x));
  }
  //可一次性获取多个任务,放在list中,减少互斥锁阻塞时间
  void Take(std::list<T>& list)
  {
    std::unique_lock<std::mutex> locker(m_mutex);
    m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
    if (m_needStop)
    {
      return;
    }
    list = std::move(m_queue);
    m_notFull.notify_one();
  }
  //获取单个任务
  void Take(T& t)
  {
    std::unique_lock<std::mutex> locker(m_mutex);
    m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
    if (m_needStop)
    {
      return;
    }
    t = m_queue.front();
    m_queue.pop_front();
    m_notFull.notify_one();
  }
  void Stop()
  {
    {
      std::lock_guard<std::mutex> locker(m_mutex);
      m_needStop = true;
    }
    m_notFull.notify_all();
    m_notEmpty.notify_all();
  }
  bool Empty()
  {
    std::lock_guard<std::mutex> locker(m_mutex);
    return m_queue.empty();
  }
  bool Full()
  {
    std::lock_guard<std::mutex> locker(m_mutex);
    return m_queue.size() == m_maxSize;
  }
  //可以获取任务数量
  int Count()
  {
    return m_queue.size();
  }
private:
  bool NotFull() const
  {
    bool full = m_queue.size() >= m_maxSize;
    if (full)
    {
      cout << "缓冲区满了,需要等待。。。" << endl;
    }
    return !full;
  }
  bool NotEmpty() const
  {
    bool empty = m_queue.empty();
    if (empty)
    {
      cout << "缓冲区空了,需要等待。。。,异步层的线程ID:" << this_thread::get_id() << endl;
    }
    return !empty;
  }
  template<typename F>
  void Add(F &&x)
  {
    std::unique_lock<std::mutex> locker(m_mutex);
    m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
    if (m_needStop)
      return;
    m_queue.emplace_back(std::forward<F>(x));
    m_notEmpty.notify_one();
  }
private:
  std::list<T> m_queue; //缓冲区
  std::mutex m_mutex; //互斥量
  std::condition_variable m_notEmpty; //不为空的条件变量
  std::condition_variable m_notFull; //没有满的条件变量
  int m_maxSize; //同步队列最大的size
  bool m_needStop; //停止的标志
};

线程池

“ThreadPool.h”

线程池ThreadPool有3个成员变量,一个是线程组,这个线程组中的线程是预先创建的,应该创建多少个线程由外面传人,一般建议创建 CPU 核数的线程以达到最优的效率,线程组循环从同步队列中取出任务并执行,如果线程池为空,线程组将处于等待状态,等待任务的到来。

另一个成员变量是同步队列,它不仅用来做线程同步,还用来限制同步队列的上限,这个上限也是由使用者设置的。

第三个成员变量是用来停止线程池的,为了保证线程安全,我们用到了原子变量 atomic bool。下一节中将展示使用这个半同步半异步的线程池的实例。

#include<list>
#include<thread>
#include<functional>
#include<memory>
#include<atomic>
#include "SyncQueue.h"
const int MaxTaskCount = 100;
class ThreadPool
{
public:
  using Task = std::function<void()>;
  ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
  {
    Start(numThreads);
  }
  ~ThreadPool(void)
  {
    Stop();
  }
  void Stop()
  {
    //保证多线程情况下只调用一次 StopThreadGroup
    std::call_once(m_flag, [this] {StopThreadGroup(); });
  }
  //可输入右值,例如lambda表达式
  void AddTask(Task&& task)
  {
    m_queue.Put(std::forward<Task>(task));
  }
  void AddTask(const Task& task)
  {
    m_queue.Put(task);
  }
  void Start(int numThreads)
  {
    m_running = true;
    //创建线程组
    for (int i = 0; i < numThreads; ++i)
    {
      m_threadgroup.emplace_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
    }
  }
private:
  void RunInThread()
  {
    while (m_running)
    {
      //取任务分别执行
      std::list<Task> list;
      m_queue.Take(list);
      for (auto& task : list)
      {
        if (!m_running)
          return;
        task();
      }
    }
  }
  void StopThreadGroup()
  {
    m_queue.Stop(); //让同步队列中的线程停止
    m_running = false; //置为false,让内部线程跳出循环并退出
    for (auto thread : m_threadgroup)
    {
      if (thread)
        thread->join();
    }
    m_threadgroup.clear();
  }
  std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
  SyncQueue<Task> m_queue; //同步队列
  atomic_bool m_running; //是否停止的标志
  std::once_flag m_flag;
};

主函数测试

#include <iostream>
#include "ThreadPool.h"
using namespace std;
void TestThdPool()
{
  ThreadPool pool(2);//创建一个2个线程的线程池
  //创建一个线程来添加10个任务1
  std::thread thd1([&pool] {
    for (int i = 0; i < 10; i++)
    {
      auto thdId = this_thread::get_id();
      pool.AddTask([thdId] {//添加任务可以使用lambda表达式,代码中实现了右值作为参数输入
        cout << "同步线程1的线程ID:" << thdId << endl;
      });
    }
  });
  //创建一个线程来添加20个任务2
  std::thread thd2([&pool] {
    for (int i = 0; i < 20; i++)
    {
      auto thdId = this_thread::get_id();
      pool.AddTask([thdId] {
        cout << "同步线程2的线程ID:" << thdId << endl;
      });
    }
  });
  this_thread::sleep_for(std::chrono::seconds(2));
  getchar();
  pool.Stop();
  thd1.join();
  thd2.join();
}
int main()
{
  TestThdPool();
  return 0;
}

运行结果:

目录
相关文章
|
15天前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
30 7
|
15天前
|
消息中间件 存储 安全
|
1月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
41 1
C++ 多线程之初识多线程
|
21天前
|
存储 并行计算 安全
C++多线程应用
【10月更文挑战第29天】C++ 中的多线程应用广泛,常见场景包括并行计算、网络编程中的并发服务器和图形用户界面(GUI)应用。通过多线程可以显著提升计算速度和响应能力。示例代码展示了如何使用 `pthread` 库创建和管理线程。注意事项包括数据同步与互斥、线程间通信和线程安全的类设计,以确保程序的正确性和稳定性。
|
1月前
|
存储 前端开发 C++
C++ 多线程之带返回值的线程处理函数
这篇文章介绍了在C++中使用`async`函数、`packaged_task`和`promise`三种方法来创建带返回值的线程处理函数。
45 6
|
1月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
74 5
|
1月前
|
C++
C++ 多线程之线程管理函数
这篇文章介绍了C++中多线程编程的几个关键函数,包括获取线程ID的`get_id()`,延时函数`sleep_for()`,线程让步函数`yield()`,以及阻塞线程直到指定时间的`sleep_until()`。
23 0
C++ 多线程之线程管理函数
|
1月前
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
29 0
Linux C/C++之线程基础
|
3月前
|
Java 调度
基于C++11的线程池
基于C++11的线程池
|
4月前
|
算法 编译器 C++
开发与运维线程问题之在C++的原子操作中memory_order如何解决
开发与运维线程问题之在C++的原子操作中memory_order如何解决
43 2