前言
c++11虽然加入了线程库thread,然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现。比如备受期待的网络库至今标准库里还没有支持,常用acl或asio替代。鸿蒙OpenHarmony源码中的网络栈模块部分,也是十分漂亮的实现,值得学习研究。
c++的ThreadPool实现,网上有很多个版本,文章的末尾就有两种不同的实现。然而经过对比发现,还是OpenHarmony源码的实现最优雅。代码简练,且直观易懂。写的真漂亮!只是使用起来稍麻烦些,比如不支持lambda的写法。后续可基于此改造,使其支持lambda函数的调用。
关于线程池
简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态。当有新的任务进来,从线程池中取出一个空闲的线程处理任务然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用。当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行。
线程池优点
线程本来就是可重用的资源,不需要每次使用时都进行初始化。因此可以采用有限的线程个数处理无限的任务。既可以提高速度和效率,又降低线程频繁创建的开销。比如要异步干的活,就没必要等待。丢到线程池里处理,结果在回调中处理。频繁执行的异步任务,若每次都创建线程势必造成不小的开销。
源码位置
该网络模块的github地址:communication_netstack: 网络协议栈
harmonyos\communication_netstack-master\utils\common_utils\include\thread_pool.h
网络协议栈模块作为电话子系统可裁剪部件,主要分为HTTP和socket模块。
网络协议栈模块的源码结构:
/foundation/communication/netstack ├─figures # 架构图 ├─frameworks # API实现 │ └─js # JS API实现 │ ├─builtin # 小型系统JS API实现 │ └─napi # 标准系统JS API实现 │ ├─http # http API │ ├─socket # socket API │ └─websocket # websocket API ├─interfaces # JS 接口定义 ├─test # 测试 └─utils # 工具
图 socket接口架构图
ThreadPool源码
/* * Copyright (c) 2022 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef NETSTACK_THREAD_POOL #define NETSTACK_THREAD_POOL #include <atomic> #include <condition_variable> #include <queue> #include <thread> #include <vector> namespace OHOS::NetStack { template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool { public: /** * disallow default constructor */ ThreadPool() = delete; /** * disallow copy and move */ ThreadPool(const ThreadPool &) = delete; /** * disallow copy and move */ ThreadPool &operator=(const ThreadPool &) = delete; /** * disallow copy and move */ ThreadPool(ThreadPool &&) = delete; /** * disallow copy and move */ ThreadPool &operator=(ThreadPool &&) = delete; /** * make DEFAULT_THREAD_NUM threads * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated */ explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true) { for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) { std::thread([this] { RunTask(); }).detach(); } } /** * if ~ThreadPool, terminate all thread */ ~ThreadPool() { // set needRun_ = false, and notify all the thread to wake and terminate needRun_ = false; while (runningNum_ > 0) { needRunCondition_.notify_all(); } } /** * push it to taskQueue_ and notify a thread to run it * @param task new task to Execute */ void Push(const Task &task) { PushTask(task); if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) { std::thread([this] { RunTask(); }).detach(); } needRunCondition_.notify_all(); } private: bool IsQueueEmpty() { std::lock_guard<std::mutex> guard(mutex_); return taskQueue_.empty(); } bool GetTask(Task &task) { std::lock_guard<std::mutex> guard(mutex_); // if taskQueue_ is empty, means timeout if (taskQueue_.empty()) { return false; } // if run to this line, means that taskQueue_ is not empty task = taskQueue_.top(); taskQueue_.pop(); return true; } void PushTask(const Task &task) { std::lock_guard<std::mutex> guard(mutex_); taskQueue_.push(task); } class NumWrapper { public: NumWrapper() = delete; explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num) { ++num_; } ~NumWrapper() { --num_; } private: std::atomic<uint32_t> &num_; }; void Sleep() { std::mutex needRunMutex; std::unique_lock<std::mutex> lock(needRunMutex); /** * if the thread is waiting, it is idle * if wake up, this thread is not idle: * 1 this thread should return * 2 this thread should run task * 3 this thread should go to next loop */ NumWrapper idleWrapper(idleThreadNum_); (void)idleWrapper; needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_), [this] { return !needRun_ || !IsQueueEmpty(); }); } void RunTask() { NumWrapper runningWrapper(runningNum_); (void)runningWrapper; while (needRun_) { Task task; if (GetTask(task)) { task.Execute(); continue; } Sleep(); if (!needRun_) { return; } if (GetTask(task)) { task.Execute(); continue; } if (runningNum_ > DEFAULT_THREAD_NUM) { return; } } } private: /** * other thread put a task to the taskQueue_ */ std::mutex mutex_; std::priority_queue<Task> taskQueue_; /** * 1 terminate the thread if it is idle for timeout_ seconds * 2 wait for the thread started util timeout_ * 3 wait for the thread notified util timeout_ * 4 wait for the thread terminated util timeout_ */ uint32_t timeout_; /** * if idleThreadNum_ is zero, make a new thread */ std::atomic<uint32_t> idleThreadNum_; /** * when ThreadPool object is deleted, wait until runningNum_ is zero. */ std::atomic<uint32_t> runningNum_; /** * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated */ std::atomic_bool needRun_; std::condition_variable needRunCondition_; }; } // namespace OHOS::NetStack #endif /* NETSTACK_THREAD_POOL */
源码赏析
从这份源码里,可以看到queue是如何安全的被使用的。之前博主有篇文章,记录了多线程下使用queue造成的崩溃问题。链接在这里:c++的queue在多线程下崩溃原因分析_特立独行的猫a的博客-CSDN博客_c++ queue 多线程
通过华为鸿蒙源码的学习研究,可以发现queue的安全使用方式top和pop以及empty的判断都是使用了 std::lock_guard互斥量原子操作的保护。也证实了博主上篇文章分析中提到的,类似队列这种操作,要确保在一个原子操作内完成,不可被打断。试想一个线程刚好pop,另外一个线程却刚要执行top会怎样?那样逻辑就错了。
这份源码的实现,没有使用一些较难理解的语法,基本上就是使用线程+优先级队列实现的。提前创建指定数目的线程,每次取一个任务并执行。任务队列负责存放线程需要处理的任务,工作线程负责从任务队列中取出和运行任务,可以看成是一个生产者和多个消费者的模型。
源码中的另一种实现:
源码位置:code-v3.0-LTS\OpenHarmony\utils\native\base\src\thread_pool.cpp
/* * Copyright (c) 2021 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "thread_pool.h" #include "errors.h" #include <memory> namespace OHOS { ThreadPool::ThreadPool(const std::string& name) : myName_(name), maxTaskNum_(0), running_(false) { } ThreadPool::~ThreadPool() { if (running_) { Stop(); } } uint32_t ThreadPool::Start(int numThreads) { if (!threads_.empty()) { return ERR_INVALID_OPERATION; } if (numThreads <= 0) { return ERR_INVALID_VALUE; } running_ = true; threads_.reserve(numThreads); for (int i = 0; i < numThreads; ++i) { threads_.push_back(std::thread(&ThreadPool::WorkInThread,this)); } return ERR_OK; } void ThreadPool::Stop() { { std::unique_lock<std::mutex> lock(mutex_); running_ = false; hasTaskToDo_.notify_all(); } for (auto& e : threads_) { e.join(); } } void ThreadPool::AddTask(const Task &f) { if (threads_.empty()) { f(); } else { std::unique_lock<std::mutex> lock(mutex_); while (Overloaded()) { acceptNewTask_.wait(lock); } tasks_.push_back(f); hasTaskToDo_.notify_one(); } } size_t ThreadPool::GetCurTaskNum() { std::unique_lock<std::mutex> lock(mutex_); return tasks_.size(); } ThreadPool::Task ThreadPool::ScheduleTask() { std::unique_lock<std::mutex> lock(mutex_); while (tasks_.empty() && running_) { hasTaskToDo_.wait(lock); } Task task; if (!tasks_.empty()) { task = tasks_.front(); tasks_.pop_front(); if (maxTaskNum_ > 0) { acceptNewTask_.notify_one(); } } return task; } bool ThreadPool::Overloaded() const { return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_); } void ThreadPool::WorkInThread() { while (running_) { Task task = ScheduleTask(); if (task) { task(); } } } } // namespace OHOS
/* * Copyright (c) 2021 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef THREAD_POOL_H #define THREAD_POOL_H #include "nocopyable.h" #include <thread> #include <mutex> #include <functional> #include <string> #include <condition_variable> #include <deque> #include <vector> namespace OHOS { const int INVALID_SEMA_VALUE = -1; class ThreadPool : public NoCopyable { public: typedef std::function<void()> Task; explicit ThreadPool(const std::string &name = std::string()); ~ThreadPool(); uint32_t Start(int threadsNum); void Stop(); void AddTask(const Task& f); void SetMaxTaskNum(int maxSize) { maxTaskNum_ = maxSize; } // for testability size_t GetMaxTaskNum() const { return maxTaskNum_; } size_t GetCurTaskNum(); size_t GetThreadsNum() const { return threads_.size(); } std::string GetName() const { return myName_; } private: // tasks in the queue reach the maximum set by maxQueueSize, means thread pool is full load. bool Overloaded() const; void WorkInThread(); // main function in each thread. Task ScheduleTask(); // fetch a task from the queue and execute private: std::string myName_; std::mutex mutex_; std::condition_variable hasTaskToDo_; std::condition_variable acceptNewTask_; std::vector<std::thread> threads_; std::deque<Task> tasks_; size_t maxTaskNum_; bool running_; }; } // namespace OHOS #endif
ThreadPool使用
以下是该版本thread_pool的简单使用示例,可以看到使用稍微麻烦了些。必须定义格式如下的task类,必须实现operator<和Execute()方法。
需要注意的是,若有多个同一个实现的task实例放入thread_pool,Execute()方法内的逻辑可是在多线程环境下的,需注意多线程下变量访问的保护。如同以下示例,同一个task类的多个实例放入了thread_pool,不加std::lock_guard打印出的显示是乱的。
#include "doctest.h" DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN #include <stdexcept> DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END //#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN //#define DOCTEST_CONFIG_DISABLE #include <string> #include <iostream> #include "thread_pool.h" // // Created by Administrator on 2022/8/10. // class Task { public: Task() = default; explicit Task(std::string context){ mContext = context; } bool operator<(const Task &e) const{ return priority_ < e.priority_; } void Execute(){ std::lock_guard<std::mutex> guard(mutex_); std::cout << "task is execute,name is:"<<mContext<<std::endl; } public: uint32_t priority_; private: std::string mContext; static std::mutex mutex_; }; #define DEFAULT_THREAD_NUM 3 #define MAX_THREAD_NUM 6 #define TIME_OUT 500 std::mutex Task::mutex_; static int myTest(){ static OHOS_NetStack::ThreadPool<Task, DEFAULT_THREAD_NUM, MAX_THREAD_NUM> threadPool_(TIME_OUT); Task task1("name_1"); Task task2("name_2"); Task task3("name_3"); Task task4("name_4"); threadPool_.Push(task1); threadPool_.Push(task2); threadPool_.Push(task3); threadPool_.Push(task4); //system("pause"); return 0; } TEST_CASE("threadPool simple use example, test by doctest unit tool") { myTest(); }
结果输出: