c++的ThreadPool,OpenHarmony源码实现版赏析和使用

简介: c++的ThreadPool,OpenHarmony源码实现版赏析和使用

前言


c++11虽然加入了线程库thread,然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现。比如备受期待的网络库至今标准库里还没有支持,常用acl或asio替代。鸿蒙OpenHarmony源码中的网络栈模块部分,也是十分漂亮的实现,值得学习研究。


c++的ThreadPool实现,网上有很多个版本,文章的末尾就有两种不同的实现。然而经过对比发现,还是OpenHarmony源码的实现最优雅。代码简练,且直观易懂。写的真漂亮!只是使用起来稍麻烦些,比如不支持lambda的写法。后续可基于此改造,使其支持lambda函数的调用。


关于线程池


简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态。当有新的任务进来,从线程池中取出一个空闲的线程处理任务然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用。当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行。


线程池优点


线程本来就是可重用的资源,不需要每次使用时都进行初始化。因此可以采用有限的线程个数处理无限的任务。既可以提高速度和效率,又降低线程频繁创建的开销。比如要异步干的活,就没必要等待。丢到线程池里处理,结果在回调中处理。频繁执行的异步任务,若每次都创建线程势必造成不小的开销。


源码位置


OpenHarmony,智能终端设备操作系统的框架和平台


该网络模块的github地址:communication_netstack: 网络协议栈


harmonyos\communication_netstack-master\utils\common_utils\include\thread_pool.h


网络协议栈模块作为电话子系统可裁剪部件,主要分为HTTP和socket模块。


网络协议栈模块的源码结构:


/foundation/communication/netstack
├─figures                            # 架构图
├─frameworks                         # API实现
│  └─js                              # JS API实现
│      ├─builtin                     # 小型系统JS API实现
│      └─napi                        # 标准系统JS API实现
│          ├─http                    # http API
│          ├─socket                  # socket API
│          └─websocket               # websocket API
├─interfaces                         # JS 接口定义
├─test                               # 测试
└─utils                              # 工具


socket接口架构图



ThreadPool源码


/*
 * Copyright (c) 2022 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef NETSTACK_THREAD_POOL
#define NETSTACK_THREAD_POOL
#include <atomic>
#include <condition_variable>
#include <queue>
#include <thread>
#include <vector>
namespace OHOS::NetStack {
template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
public:
    /**
     * disallow default constructor
     */
    ThreadPool() = delete;
    /**
     * disallow copy and move
     */
    ThreadPool(const ThreadPool &) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool &operator=(const ThreadPool &) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool(ThreadPool &&) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool &operator=(ThreadPool &&) = delete;
    /**
     * make DEFAULT_THREAD_NUM threads
     * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
     */
    explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
    {
        for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
            std::thread([this] { RunTask(); }).detach();
        }
    }
    /**
     * if ~ThreadPool, terminate all thread
     */
    ~ThreadPool()
    {
        // set needRun_ = false, and notify all the thread to wake and terminate
        needRun_ = false;
        while (runningNum_ > 0) {
            needRunCondition_.notify_all();
        }
    }
    /**
     * push it to taskQueue_ and notify a thread to run it
     * @param task new task to Execute
     */
    void Push(const Task &task)
    {
        PushTask(task);
        if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
            std::thread([this] { RunTask(); }).detach();
        }
        needRunCondition_.notify_all();
    }
private:
    bool IsQueueEmpty()
    {
        std::lock_guard<std::mutex> guard(mutex_);
        return taskQueue_.empty();
    }
    bool GetTask(Task &task)
    {
        std::lock_guard<std::mutex> guard(mutex_);
        // if taskQueue_ is empty, means timeout
        if (taskQueue_.empty()) {
            return false;
        }
        // if run to this line, means that taskQueue_ is not empty
        task = taskQueue_.top();
        taskQueue_.pop();
        return true;
    }
    void PushTask(const Task &task)
    {
        std::lock_guard<std::mutex> guard(mutex_);
        taskQueue_.push(task);
    }
    class NumWrapper {
    public:
        NumWrapper() = delete;
        explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
        {
            ++num_;
        }
        ~NumWrapper()
        {
            --num_;
        }
    private:
        std::atomic<uint32_t> &num_;
    };
    void Sleep()
    {
        std::mutex needRunMutex;
        std::unique_lock<std::mutex> lock(needRunMutex);
        /**
         * if the thread is waiting, it is idle
         * if wake up, this thread is not idle:
         *     1 this thread should return
         *     2 this thread should run task
         *     3 this thread should go to next loop
         */
        NumWrapper idleWrapper(idleThreadNum_);
        (void)idleWrapper;
        needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
                                   [this] { return !needRun_ || !IsQueueEmpty(); });
    }
    void RunTask()
    {
        NumWrapper runningWrapper(runningNum_);
        (void)runningWrapper;
        while (needRun_) {
            Task task;
            if (GetTask(task)) {
                task.Execute();
                continue;
            }
            Sleep();
            if (!needRun_) {
                return;
            }
            if (GetTask(task)) {
                task.Execute();
                continue;
            }
            if (runningNum_ > DEFAULT_THREAD_NUM) {
                return;
            }
        }
    }
private:
    /**
     * other thread put a task to the taskQueue_
     */
    std::mutex mutex_;
    std::priority_queue<Task> taskQueue_;
    /**
     * 1 terminate the thread if it is idle for timeout_ seconds
     * 2 wait for the thread started util timeout_
     * 3 wait for the thread notified util timeout_
     * 4 wait for the thread terminated util timeout_
     */
    uint32_t timeout_;
    /**
     * if idleThreadNum_ is zero, make a new thread
     */
    std::atomic<uint32_t> idleThreadNum_;
    /**
     * when ThreadPool object is deleted, wait until runningNum_ is zero.
     */
    std::atomic<uint32_t> runningNum_;
    /**
     * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
     */
    std::atomic_bool needRun_;
    std::condition_variable needRunCondition_;
};
} // namespace OHOS::NetStack
#endif /* NETSTACK_THREAD_POOL */


源码赏析


从这份源码里,可以看到queue是如何安全的被使用的。之前博主有篇文章,记录了多线程下使用queue造成的崩溃问题。链接在这里:c++的queue在多线程下崩溃原因分析_特立独行的猫a的博客-CSDN博客_c++ queue 多线程


通过华为鸿蒙源码的学习研究,可以发现queue的安全使用方式top和pop以及empty的判断都是使用了 std::lock_guard互斥量原子操作的保护。也证实了博主上篇文章分析中提到的,类似队列这种操作,要确保在一个原子操作内完成,不可被打断。试想一个线程刚好pop,另外一个线程却刚要执行top会怎样?那样逻辑就错了。


这份源码的实现,没有使用一些较难理解的语法,基本上就是使用线程+优先级队列实现的。提前创建指定数目的线程,每次取一个任务并执行。任务队列负责存放线程需要处理的任务,工作线程负责从任务队列中取出和运行任务,可以看成是一个生产者和多个消费者的模型。


源码中的另一种实现:


源码位置:code-v3.0-LTS\OpenHarmony\utils\native\base\src\thread_pool.cpp


/*
 * Copyright (c) 2021 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "thread_pool.h"
#include "errors.h"
#include <memory>
namespace OHOS {
ThreadPool::ThreadPool(const std::string& name)
    : myName_(name), maxTaskNum_(0), running_(false)
{
}
ThreadPool::~ThreadPool()
{
    if (running_) {
        Stop();
    }
}
uint32_t ThreadPool::Start(int numThreads)
{
    if (!threads_.empty()) {
        return ERR_INVALID_OPERATION;
    }
    if (numThreads <= 0) {
        return ERR_INVALID_VALUE;
    }
    running_ = true;
    threads_.reserve(numThreads);
    for (int i = 0; i < numThreads; ++i) {
        threads_.push_back(std::thread(&ThreadPool::WorkInThread,this));
    }
    return ERR_OK;
}
void ThreadPool::Stop()
{
    {
        std::unique_lock<std::mutex>  lock(mutex_);
        running_ = false;
        hasTaskToDo_.notify_all();
    }
    for (auto& e : threads_) {
        e.join();
    }
}
void ThreadPool::AddTask(const Task &f)
{
    if (threads_.empty()) {
        f();
    } else {
        std::unique_lock<std::mutex> lock(mutex_);
        while (Overloaded()) {
            acceptNewTask_.wait(lock);
        }
        tasks_.push_back(f);
        hasTaskToDo_.notify_one();
    }
}
size_t ThreadPool::GetCurTaskNum()
{
    std::unique_lock<std::mutex> lock(mutex_);
    return tasks_.size();
}
ThreadPool::Task ThreadPool::ScheduleTask()
{
    std::unique_lock<std::mutex> lock(mutex_);
    while (tasks_.empty() && running_) {
        hasTaskToDo_.wait(lock);
    }
    Task task;
    if (!tasks_.empty()) {
        task = tasks_.front();
        tasks_.pop_front();
        if (maxTaskNum_ > 0) {
            acceptNewTask_.notify_one();
        }
    }
    return task;
}
bool ThreadPool::Overloaded() const
{
    return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_);
}
void ThreadPool::WorkInThread()
{
    while (running_) {
        Task task = ScheduleTask();
        if (task) {
            task();
        }
    }
}
} // namespace OHOS


/*
 * Copyright (c) 2021 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include "nocopyable.h"
#include <thread>
#include <mutex>
#include <functional>
#include <string>
#include <condition_variable>
#include <deque>
#include <vector>
namespace OHOS {
const int INVALID_SEMA_VALUE = -1;
class ThreadPool : public NoCopyable {
public:
    typedef std::function<void()> Task;
    explicit ThreadPool(const std::string &name = std::string());
    ~ThreadPool();
    uint32_t Start(int threadsNum);
    void Stop();
    void AddTask(const Task& f);
    void SetMaxTaskNum(int maxSize) { maxTaskNum_ = maxSize; }
    // for testability
    size_t GetMaxTaskNum() const { return maxTaskNum_; }
    size_t GetCurTaskNum();
    size_t GetThreadsNum() const { return threads_.size(); }
    std::string GetName() const { return myName_; }
private:
    // tasks in the queue reach the maximum set by maxQueueSize, means thread pool is full load.
    bool Overloaded() const;
    void WorkInThread(); // main        function in each thread.
    Task ScheduleTask(); // fetch a task from the queue and execute
private:
    std::string myName_;
    std::mutex mutex_;
    std::condition_variable hasTaskToDo_;
    std::condition_variable acceptNewTask_;
    std::vector<std::thread> threads_;
    std::deque<Task> tasks_;
    size_t maxTaskNum_;
    bool running_;
};
} // namespace OHOS
#endif


ThreadPool使用


以下是该版本thread_pool的简单使用示例,可以看到使用稍微麻烦了些。必须定义格式如下的task类,必须实现operator<和Execute()方法。


需要注意的是,若有多个同一个实现的task实例放入thread_pool,Execute()方法内的逻辑可是在多线程环境下的,需注意多线程下变量访问的保护。如同以下示例,同一个task类的多个实例放入了thread_pool,不加std::lock_guard打印出的显示是乱的。


#include "doctest.h"
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN
#include <stdexcept>
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END
//#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
//#define DOCTEST_CONFIG_DISABLE
#include <string>
#include <iostream>
#include "thread_pool.h"
//
// Created by Administrator on 2022/8/10.
//
class Task {
public:
    Task() = default;
    explicit Task(std::string context){
        mContext = context;
    }
    bool operator<(const Task &e) const{
        return priority_ < e.priority_;
    }
    void Execute(){
        std::lock_guard<std::mutex> guard(mutex_);
        std::cout <<  "task is execute,name is:"<<mContext<<std::endl;
    }
public:
    uint32_t priority_;
private:
    std::string mContext;
    static std::mutex mutex_;
};
#define DEFAULT_THREAD_NUM 3
#define MAX_THREAD_NUM 6
#define TIME_OUT 500
std::mutex Task::mutex_;
static int myTest(){
    static OHOS_NetStack::ThreadPool<Task, DEFAULT_THREAD_NUM, MAX_THREAD_NUM> threadPool_(TIME_OUT);
    Task task1("name_1");
    Task task2("name_2");
    Task task3("name_3");
    Task task4("name_4");
    threadPool_.Push(task1);
    threadPool_.Push(task2);
    threadPool_.Push(task3);
    threadPool_.Push(task4);
    //system("pause");
    return 0;
}
TEST_CASE("threadPool simple use example, test by doctest unit tool") {
    myTest();
}


结果输出:



引用


c++11线程池的实现原理及回调函数的使用_特立独行的猫a的博客-CSDN博客_c++多线程回调

相关文章
|
7天前
|
编译器 C语言 C++
【c++丨STL】list模拟实现(附源码)
本文介绍了如何模拟实现C++中的`list`容器。`list`底层采用双向带头循环链表结构,相较于`vector`和`string`更为复杂。文章首先回顾了`list`的基本结构和常用接口,然后详细讲解了节点、迭代器及容器的实现过程。 最终,通过这些步骤,我们成功模拟实现了`list`容器的功能。文章最后提供了完整的代码实现,并简要总结了实现过程中的关键点。 如果你对双向链表或`list`的底层实现感兴趣,建议先掌握相关基础知识后再阅读本文,以便更好地理解内容。
15 1
|
2月前
|
C语言 C++ 容器
【c++丨STL】string模拟实现(附源码)
本文详细介绍了如何模拟实现C++ STL中的`string`类,包括其构造函数、拷贝构造、赋值重载、析构函数等基本功能,以及字符串的插入、删除、查找、比较等操作。文章还展示了如何实现输入输出流操作符,使自定义的`string`类能够方便地与`cin`和`cout`配合使用。通过这些实现,读者不仅能加深对`string`类的理解,还能提升对C++编程技巧的掌握。
81 5
|
3月前
|
存储 编译器 C++
【C++篇】揭开 C++ STL list 容器的神秘面纱:从底层设计到高效应用的全景解析(附源码)
【C++篇】揭开 C++ STL list 容器的神秘面纱:从底层设计到高效应用的全景解析(附源码)
85 2
|
5月前
|
存储 C++
【C++】C++ 基于QT实现散列表学生管理系统(源码+数据+课程论文)【独一无二】
【C++】C++ 基于QT实现散列表学生管理系统(源码+数据+课程论文)【独一无二】
120 1
【C++】C++ 基于QT实现散列表学生管理系统(源码+数据+课程论文)【独一无二】
|
5月前
|
存储 算法 C++
【C++】C++ QT实现Huffman编码器与解码器(源码+课程论文+文件)【独一无二】
【C++】C++ QT实现Huffman编码器与解码器(源码+课程论文+文件)【独一无二】
141 4
|
5月前
|
存储 算法 数据可视化
【C++】C++旅游管理系统(源码+论文)【独一无二】
【C++】C++旅游管理系统(源码+论文)【独一无二】
|
5月前
|
存储 数据可视化 C++
【C++】C++ 职工信息管理系统(源码)【独一无二】
【C++】C++ 职工信息管理系统(源码)【独一无二】
119 3
|
5月前
|
搜索推荐 数据处理 文件存储
【C++】C++ 培训报名系统 (源码+论文)【独一无二】
【C++】C++ 培训报名系统 (源码+论文)【独一无二】
|
5月前
|
存储 C++
【C++】C++公司人事管理系统(源码)【独一无二】
【C++】C++公司人事管理系统(源码)【独一无二】
155 2
|
5月前
|
存储 数据可视化 C++
【C++】C++-机房收费管理系统(源码+注释)【独一无二】
【C++】C++-机房收费管理系统(源码+注释)【独一无二】