c++基于ThreadPool实现灵活的异步任务

简介: c++基于ThreadPool实现灵活的异步任务

在工作中有时会有这种需求,在延时中执行一些任务,等待任务超时或者任务返回结果再往下执行。如果不做封装,可能会怎么做?每次都进入while true?


......
auto start = std::chrono::system_clock::now();
auto timeout = 500;
while (true){
      auto now  = std::chrono::system_clock::now();
      auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(now- start ).count();
      if (_duration > timeout){
        LOGGING_ERROR("duration is %d", timeout);
        break;
      }
      //do something
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
}


或许也能完成目的,但是写法很不优雅。且程序执行到此处进入while(true)只能等在这里了,其他啥活都干不了。


可以怎么优化?可以把这部分任务放入线程中异步执行。


// 耗时操作
auto fetchDataFromDB = [](std::string recvdData,std::function<int(std::string &)> cback) {
    // Make sure that function takes 5 seconds to complete
    std::this_thread::sleep_for(seconds(5));
    //Do stuff like creating DB Connection and fetching Data
    if(cback != nullptr){
      std::string out = "this is from callback ";
      cback(out);
    }
    return "DB_" + recvdData;
  };
//把fetchDataFromDB这一IO耗时任务放到线程里异步执行
//
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data0",
      [&](std::string &result){
      std::cout << "callback result from thread:" << result << std::endl;
      return 0;
      }); 
// do otherthings 可以做些其他工作
......
// 等待异步的结果
std::string result = resultFromDB.get(); // waits for fetchDataFromDB to return


然而每次都要创建线程损耗也不小,可以基于线程池进一步改造。


me::ThreadPool pool(4);
......
//把fetchDataFromDB这一IO耗时操作放到pool中
pool.commit(fetchDataFromDB,"Data1",[&](std::string &result){
      std::cout << "callback result from pool thread:" << result << std::endl;
      return 0;
      });


然而,这种还是不够灵活。比如不能灵活控制耗时任务的超时时间和检测频率。


想要的是这种效果:


  //!
  //! \brief 任务运行函数
  //!
  //! \param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数
  //! \param duration 超时时间
  //! \param timer_duration 嘀嗒周期
  //! \return
  //!
  taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)


则使用起来就简单多了。


 ...... 
 auto f = [&]
  {
    return checkIO(errorCode);
  };
 // f的执行频率为30毫秒执行一次,超时时间5秒钟
 auto future = misc::TaskRunner::getInstance()->taskRunner(f,std::chrono::seconds(5),30);
 misc::TaskRunner::getInstance()->waitForResult(future);


以下为TaskRunner的封装实现:


#define KeepRunning (0U)
#define StopRunning (1U)
#include <ThreadPool.h>
namespace misc
{
//!
//! \brief 任务运行器
//!
class TaskRunner
{
public:
  //!
  //! \brief 单例模型
  //! \return
  //!
  static TaskRunner *getInstance()
  {
    static TaskRunner w;
    return &w;
  }
  ~TaskRunner() {  }
public:
  using taskFunction = std::function<int()>;
  using taskResult_t = std::shared_future<int>;
  //!
  //! \brief 任务运行函数
  //!
  //! 如果运行超时,则返回失败代码(未启用超时功能)
  //!
  //! \param f 嘀嗒时期调用的函数,如果使用 std::bind 函数则会自动匹配该函数
  //! \param duration 超时时间
  //! \return
  //!
  taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration)
  {
    return start(f, duration);
  }
  //!
  //! \brief 任务运行函数
  //!
  //! 如果运行超时,则返回失败代码(未启用超时功能)
  //!
  //! \param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数
  //! \param duration 超时时间
  //! \return
  //!
  taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration)
  {
    return start(f, duration);
  }
  //!
  //! \brief 任务运行函数
  //!
  //! 如果运行超时,则返回失败代码(未启用超时功能)
  //!
  //! \param f 嘀嗒时期调用的函数,如果使用 std::bind 函数则会自动匹配该函数
  //! \param duration 超时时间
  //! \param timer_duration 嘀嗒周期,因为默认的嘀嗒时期为
  //! 20um,如果想要自主决定,则使用该函数 \return
  //!
  taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration, int timer_duration)
  {
    return start(f, duration, timer_duration);
  }
  //!
  //! \brief 任务运行函数
  //!
  //! 如果运行超时,则返回失败代码(未启用超时功能)
  //!
  //! \param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数
  //! \param duration 超时时间
  //! \param timer_duration 嘀嗒周期,因为默认的嘀嗒时期为
  //! 20um,如果想要自主决定,则使用该函数 \return
  //!
  taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)
  {
    return start(f, duration, timer_duration);
  }
  int waitForResult(const taskResult_t &result)
  {
    if (result.valid())
    {
      result.wait();
    }
    return 1;
  }
private:
#define NOW std::chrono::system_clock::now()
  TaskRunner() : pool_(2) {}
  std::shared_future<int> start(const taskFunction &f, const std::chrono::steady_clock::duration &duration,
                                uint64_t interval = 1)
  {
    auto taskResultPtr = std::make_shared<resultMap_t>();
    taskResultPtr->isUsed_ = false;
    taskResultPtr->id_ = idFactory_++;
    auto id = taskResultPtr->id_;
    taskResultPtr->taskResult_ = pool_.enqueue([id, f, duration, interval, this] {
      auto func = f;
      return taskRunner(id, func, NOW, duration, interval);
    });
    return taskResultPtr->taskResult_;
  }
  int taskRunner(uint32_t id, const taskFunction &f, const std::chrono::system_clock::time_point &now,
                 const std::chrono::steady_clock::duration &timeout /* millisecond */,
                 uint32_t interval /* millisecond */)
  {
    while (true)
    {
      auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(NOW - now);
      if (_duration > timeout)
      {
        LOGGING_ERROR("duration is %d", timeout.count());
        return 1;
      }
      if (f() == StopRunning)
      {
        LOGGING_WARN("function is running out.");
        break;
      }
      std::this_thread::sleep_for(std::chrono::milliseconds(interval));
    }
    return 0;
  }
  void waitAll()
  {
      pool_.Wait();
  }
private:
  ThreadPool pool_;
  struct resultMap_t
  {
    unsigned int id_;
    bool isUsed_;
    taskResult_t taskResult_;
  };
  using resultMapPtr_t = std::shared_ptr<resultMap_t>;
  std::atomic_uint idFactory_ = ATOMIC_VAR_INIT(1);
};
}  // namespace misc


相关文章
|
7月前
|
存储 并行计算 前端开发
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术(二)
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术
85 1
|
7月前
|
存储 监控 算法
【C++ 软件设计思路】高效管理历史任务记录:内存与磁盘结合的策略解析
【C++ 软件设计思路】高效管理历史任务记录:内存与磁盘结合的策略解析
129 0
|
7月前
|
数据安全/隐私保护 C++ 容器
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术(一)
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术
120 0
|
7月前
|
设计模式 uml C++
C++中的装饰器模式:灵活地扩展功能
C++中的装饰器模式:灵活地扩展功能
105 0
|
7月前
|
消息中间件 NoSQL Linux
workFlow c++异步网络库编译教程与简介
搜狗公司C++服务器引擎,编程范式。支撑搜狗几乎所有后端C++在线服务,包括所有搜索服务,云输入法,在线广告等,每日处理数百亿请求。这是一个设计轻盈优雅的企业级程序引擎,可以满足大多数后端与嵌入式开发需求。 编程范式 结构化并发与任务隐藏回调与内存回收机制
137 0
|
7月前
|
设计模式 程序员 C++
【C++ 泛型编程 高级篇】C++模板元编程:使用模板特化 灵活提取嵌套类型与多容器兼容性
【C++ 泛型编程 高级篇】C++模板元编程:使用模板特化 灵活提取嵌套类型与多容器兼容性
934 2
|
2月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
87 5
|
7月前
|
存储 算法 Java
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
188 0
|
4月前
|
Dart API C语言
Dart ffi 使用问题之想在C/C++中创建异步线程来调用Dart方法,如何操作
Dart ffi 使用问题之想在C/C++中创建异步线程来调用Dart方法,如何操作
|
7月前
|
安全 Java 调度
【C/C++ 线程池设计思路 】设计与实现支持优先级任务的C++线程池 简要介绍
【C/C++ 线程池设计思路 】设计与实现支持优先级任务的C++线程池 简要介绍
223 2