在工作中有时会有这种需求,在延时中执行一些任务,等待任务超时或者任务返回结果再往下执行。如果不做封装,可能会怎么做?每次都进入while true?
...... auto start = std::chrono::system_clock::now(); auto timeout = 500; while (true){ auto now = std::chrono::system_clock::now(); auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(now- start ).count(); if (_duration > timeout){ LOGGING_ERROR("duration is %d", timeout); break; } //do something std::this_thread::sleep_for(std::chrono::milliseconds(10)); }
或许也能完成目的,但是写法很不优雅。且程序执行到此处进入while(true)只能等在这里了,其他啥活都干不了。
可以怎么优化?可以把这部分任务放入线程中异步执行。
// 耗时操作 auto fetchDataFromDB = [](std::string recvdData,std::function<int(std::string &)> cback) { // Make sure that function takes 5 seconds to complete std::this_thread::sleep_for(seconds(5)); //Do stuff like creating DB Connection and fetching Data if(cback != nullptr){ std::string out = "this is from callback "; cback(out); } return "DB_" + recvdData; }; //把fetchDataFromDB这一IO耗时任务放到线程里异步执行 // std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data0", [&](std::string &result){ std::cout << "callback result from thread:" << result << std::endl; return 0; }); // do otherthings 可以做些其他工作 ...... // 等待异步的结果 std::string result = resultFromDB.get(); // waits for fetchDataFromDB to return
然而每次都要创建线程损耗也不小,可以基于线程池进一步改造。
me::ThreadPool pool(4); ...... //把fetchDataFromDB这一IO耗时操作放到pool中 pool.commit(fetchDataFromDB,"Data1",[&](std::string &result){ std::cout << "callback result from pool thread:" << result << std::endl; return 0; });
然而,这种还是不够灵活。比如不能灵活控制耗时任务的超时时间和检测频率。
想要的是这种效果:
//! //! \brief 任务运行函数 //! //! \param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数 //! \param duration 超时时间 //! \param timer_duration 嘀嗒周期 //! \return //! taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)
则使用起来就简单多了。
...... auto f = [&] { return checkIO(errorCode); }; // f的执行频率为30毫秒执行一次,超时时间5秒钟 auto future = misc::TaskRunner::getInstance()->taskRunner(f,std::chrono::seconds(5),30); misc::TaskRunner::getInstance()->waitForResult(future);
以下为TaskRunner的封装实现:
#define KeepRunning (0U) #define StopRunning (1U) #include <ThreadPool.h> namespace misc { //! //! \brief 任务运行器 //! class TaskRunner { public: //! //! \brief 单例模型 //! \return //! static TaskRunner *getInstance() { static TaskRunner w; return &w; } ~TaskRunner() { } public: using taskFunction = std::function<int()>; using taskResult_t = std::shared_future<int>; //! //! \brief 任务运行函数 //! //! 如果运行超时,则返回失败代码(未启用超时功能) //! //! \param f 嘀嗒时期调用的函数,如果使用 std::bind 函数则会自动匹配该函数 //! \param duration 超时时间 //! \return //! taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration) { return start(f, duration); } //! //! \brief 任务运行函数 //! //! 如果运行超时,则返回失败代码(未启用超时功能) //! //! \param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数 //! \param duration 超时时间 //! \return //! taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration) { return start(f, duration); } //! //! \brief 任务运行函数 //! //! 如果运行超时,则返回失败代码(未启用超时功能) //! //! \param f 嘀嗒时期调用的函数,如果使用 std::bind 函数则会自动匹配该函数 //! \param duration 超时时间 //! \param timer_duration 嘀嗒周期,因为默认的嘀嗒时期为 //! 20um,如果想要自主决定,则使用该函数 \return //! taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration, int timer_duration) { return start(f, duration, timer_duration); } //! //! \brief 任务运行函数 //! //! 如果运行超时,则返回失败代码(未启用超时功能) //! //! \param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数 //! \param duration 超时时间 //! \param timer_duration 嘀嗒周期,因为默认的嘀嗒时期为 //! 20um,如果想要自主决定,则使用该函数 \return //! taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration) { return start(f, duration, timer_duration); } int waitForResult(const taskResult_t &result) { if (result.valid()) { result.wait(); } return 1; } private: #define NOW std::chrono::system_clock::now() TaskRunner() : pool_(2) {} std::shared_future<int> start(const taskFunction &f, const std::chrono::steady_clock::duration &duration, uint64_t interval = 1) { auto taskResultPtr = std::make_shared<resultMap_t>(); taskResultPtr->isUsed_ = false; taskResultPtr->id_ = idFactory_++; auto id = taskResultPtr->id_; taskResultPtr->taskResult_ = pool_.enqueue([id, f, duration, interval, this] { auto func = f; return taskRunner(id, func, NOW, duration, interval); }); return taskResultPtr->taskResult_; } int taskRunner(uint32_t id, const taskFunction &f, const std::chrono::system_clock::time_point &now, const std::chrono::steady_clock::duration &timeout /* millisecond */, uint32_t interval /* millisecond */) { while (true) { auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(NOW - now); if (_duration > timeout) { LOGGING_ERROR("duration is %d", timeout.count()); return 1; } if (f() == StopRunning) { LOGGING_WARN("function is running out."); break; } std::this_thread::sleep_for(std::chrono::milliseconds(interval)); } return 0; } void waitAll() { pool_.Wait(); } private: ThreadPool pool_; struct resultMap_t { unsigned int id_; bool isUsed_; taskResult_t taskResult_; }; using resultMapPtr_t = std::shared_ptr<resultMap_t>; std::atomic_uint idFactory_ = ATOMIC_VAR_INIT(1); }; } // namespace misc