【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用

简介: 【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用

1. 引言

并发编程中,线程池(Thread Pool)是一种常见的设计模式,它可以有效地管理和控制多线程的执行。线程池中预先创建了一定数量的线程,这些线程可以并发地执行多个任务。当新的任务到来时,线程池会选择一个空闲的线程来执行这个任务。当任务执行完毕,线程会返回到线程池中,等待下一个任务的到来。

线程池的主要优点是减少了线程创建和销毁的开销。线程的创建和销毁都是需要消耗系统资源的,如果频繁地创建和销毁线程,会导致系统性能下降。线程池通过复用已经创建的线程,避免了这种开销。此外,线程池还可以限制系统中线程的数量,防止系统资源被过多的线程耗尽。

在C++中,我们可以使用标准库中的线程(std::thread)和互斥锁(std::mutex)等工具来实现线程池。下面,我们将详细介绍如何设计和实现一个线程池。

2. 线程池类设计

线程池的设计主要包括以下几个部分:

2.1 线程池类的主要成员变量

线程池类主要包括以下几个成员变量:

  • 工作线程队列:这是一个存储所有工作线程的队列。每个工作线程都是一个独立的线程,可以并发地执行任务。
  • 任务队列:这是一个存储所有待执行任务的队列。当有新的任务到来时,任务会被添加到这个队列中。工作线程会从这个队列中取出任务并执行。
  • 互斥锁:这是一个用于保护任务队列的互斥锁。当工作线程需要从任务队列中取出任务时,需要先获取这个互斥锁。这可以防止多个线程同时修改任务队列,导致数据不一致。
  • 条件变量:这是一个用于线程同步的条件变量。当任务队列为空时,工作线程会等待这个条件变量。当有新的任务被添加到任务队列时,条件变量会被通知,然后工作线程会被唤醒,取出任务并执行。

线程池类的主要成员变量示意图如下:

2.2 线程池类的构造函数和析构函数

线程池类的构造函数主要负责初始化线程池,包括创建工作线程,初始化互斥锁和条件变量等。在创建工作线程时,每个线程都会运行一个无限循环,等待任务队列中的任务。当任务队列中有任务时,线程会取出任务并执行。

线程池类的析构函数主要负责清理线程池,包括停止所有的工作线程,并等待它们完成当前的任务。

3. 线程池任务的添加与执行

在这一章节中,我们将深入探讨如何在线程池中添加和执行任务。我们将通过一个综合的代码示例来介绍这个过程,并通过注释和解释来帮助你理解每个步骤的作用。

3.1 如何添加任务到线程池

在我们的线程池类中,添加任务到线程池是通过enqueue方法实现的。这个方法接收一个函数(或者其他可调用的目标,如lambda表达式)和这个函数的参数,然后创建一个任务并将这个任务添加到任务队列中。

这个过程的关键是std::packaged_taskstd::bindstd::packaged_task是一个模板类,它可以包装任何可以调用的目标,使它们可以作为异步任务被调用。std::bind则是用于绑定函数和参数,创建一个可以直接调用的函数对象。

下面是enqueue方法的代码示例和注释:

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;
    // 创建一个 std::packaged_task 对象,包装用户提供的函数和参数
    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
    // 获取这个任务的 std::future 对象,用于之后获取任务的结果
    std::future<return_type> res = task->get_future();
    // 将任务添加到任务队列中
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        // 如果线程池已经停止,那么抛出一个运行时错误
        if(stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks.emplace([task](){ (*task)(); });
    }
    // 通知一个等待的工作线程,有新的任务可以执行了
    condition.notify_one();
    // 返回 std::future 对象,用户可以用它来获取任务的结果
    return res;
}

3.2 如何在线程池中执行任务

在线程池中执行任务是通过工作线程的主循环实现的。每个工作线程都会运行一个无限循环,不断地从任务队列中取出任务并执行。

这个过程的关键是std::condition_variablestd::unique_lockstd::condition_variable是用于等待和通知的条件变量,std::unique_lock则是一个可以自动解锁的互斥锁,

它们被用于同步线程,确保任务的正确执行。

下面是工作线程的主循环的代码示例和注释:

workers.back()->setmain([this] {
    for(;;) {
        std::function<void()> task;
        // 锁定互斥锁,并等待任务
        {
            std::unique_lock<std::mutex> lock(this->queue_mutex);
            this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
            // 如果线程池已经停止,并且任务队列为空,那么退出循环,结束线程
            if(this->stop && this->tasks.empty()) {
                return;
            }
            // 从任务队列中取出一个任务
            task = std::move(this->tasks.front());
            this->tasks.pop();
        }
        // 增加正在工作的线程数量
        ++workingThreads;
        // 执行任务
        task();
        // 减少正在工作的线程数量
        --workingThreads;
    }
});

这个循环会一直运行,直到线程池被停止并且任务队列为空。在每次循环中,线程会锁定互斥锁,然后等待任务。当任务队列中有任务时,线程会取出任务并执行。执行任务时,正在工作的线程数量会增加;任务执行完毕后,正在工作的线程数量会减少。

下面是这个过程的流程图,可以帮助你更好地理解这个过程:

3.3 如何获取任务的返回值

在我们的线程池类中,获取任务的返回值是通过std::future实现的。当你添加一个任务到线程池时,enqueue方法会返回一个std::future对象。你可以通过这个std::future对象来获取任务的返回值。

std::future是一个模板类,它表示一个异步操作的结果。你可以调用std::future::get方法来获取结果。如果结果还没有准备好,那么get方法会阻塞,直到结果准备好为止。

下面是如何使用std::future获取任务返回值的代码示例:

// 创建一个线程池
ThreadPool pool(4);
// 添加一个任务到线程池
auto future = pool.enqueue([]{
    return 1 + 1;
});
// 获取任务的返回值
int result = future.get();
// 输出结果
std::cout << "The result is " << result << std::endl;

在这个示例中,我们首先创建了一个线程池,然后添加了一个任务到线程池。这个任务是一个lambda表达式,它的作用是计算1 + 1的结果。然后,我们通过std::future::get方法获取了任务的返回值,并将结果输出到控制台。

下面是这个过程的流程图,可以帮助你更好地理解这个过程:

通过这个章节,我们深入探讨了如何在线程池中添加和执行任务,以及如何获取任务的返回值。我们通过代码示例和注释,以及流程图,详细解释了每个步骤的作用和实现方式。希望这个章节能帮助你更好地理解线程池的工作原理,以及如何在自己的项目中使用线程池。

2.3 线程池类的主要成员函数

线程池类的主要成员函数包括添加任务的函数和获取正在工作的线程数量的函数。

添加任务的函数接收一个函数和该函数的参数,创建一个任务,并将任务添加到任务队列中。如果线程池已经停止,那么添加任务会抛出一个运行时错误。这个函数返回一个std::future,表示任务的返回值。

获取正在工作的线程数量的函数返回当前正在工作的线程数量。

4. 深入理解std::packaged_task和std::future

在C++的并发编程中,std::packaged_taskstd::future是两个非常重要的工具,它们可以帮助我们更好地管理和控制异步任务。在本章中,我们将深入探讨这两个工具的作用和用法。

std::packaged_task的作用和用法

std::packaged_task(标准包装任务)是一个模板类,它的主要作用是将任何可以调用的目标(函数、lambda表达式、bind表达式或其他函数对象)包装成一个任务,这个任务可以在另一个线程中执行,并且可以将结果存储在一个std::future对象中。

下面是一个std::packaged_task的基本用法示例:

std::packaged_task<int(int, int)> task([](int a, int b) {
    return a + b;
});
std::future<int> result = task.get_future();
std::thread(std::move(task), 2, 3).detach();
std::cout << "Result: " << result.get() << std::endl;

在这个示例中,我们首先定义了一个std::packaged_task对象,它接受一个lambda表达式作为参数。然后,我们通过get_future方法获取一个std::future对象,这个对象将在任务完成时获得结果。最后,我们创建一个新的线程来执行任务,并立即将其分离。

std::future的作用和用法

std::future(标准未来)是一个模板类,它的主要作用是存储异步任务的结果。当你创建一个异步任务(例如通过std::asyncstd::packaged_task)时,你会得到一个std::future对象。你可以使用这个对象来查询任务的状态,等待任务完成,或获取任务的结果。

下面是一个std::future的基本用法示例:

std::future<int> result = std::async(std::launch::async, [](int a, int b) {
    return a + b;
}, 2, 3);
std::cout << "Result: " << result.get() << std::endl;

在这个示例中,我们使用std::async函数创建了一个异步任务,并得到了一个std::future对象。然后,我们通过get方法获取任务的结果。

如何使用std::packaged_task和std::future获取异步任务的结果

在前面的章节中,我们已经介绍了如何使用std::packaged_taskstd::future来创建异步任务和获取任务的结果。现在,让我们看一个更复杂的示例,

在这个示例中,我们将使用线程池来并行处理一组任务,并使用std::packaged_taskstd::future来获取任务的结果。

// 创建一个线程池
ThreadPool pool(4);
// 创建一组任务
std::vector<std::future<int>> results;
for (int i = 0; i < 10; ++i) {
    results.push_back(
        pool.enqueue([i](int value) {
            std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时任务
            return i + value;
        }, i)
    );
}
// 获取任务的结果
for (auto& result : results) {
    std::cout << "Result: " << result.get() << std::endl;
}

在这个示例中,我们首先创建了一个包含4个线程的线程池。然后,我们创建了一组任务,并将它们添加到线程池中。每个任务都是一个lambda表达式,它接受一个整数参数,并返回这个整数加上任务的索引。我们使用enqueue方法将任务添加到线程池中,并获取一个std::future对象来表示任务的结果。最后,我们遍历所有的std::future对象,使用get方法获取任务的结果。

这个示例展示了如何使用线程池、std::packaged_taskstd::future来并行处理一组任务,并获取任务的结果。这是一个非常强大的模式,它可以让你充分利用多核处理器的性能,同时简化并发编程的复杂性。

5. 线程池中的线程管理

在本章中,我们将深入探讨线程池中的线程管理,包括如何创建和启动线程,如何管理线程的生命周期,以及如何统计正在工作的线程数量。

5.1 如何创建和启动线程

在C++中,我们使用std::thread类来创建和管理线程。std::thread类的构造函数接受一个可调用的对象(如函数、lambda表达式等)和该对象的参数,然后在一个新的线程中执行该对象。

在我们的线程池类中,我们在构造函数中创建了指定数量的线程,并将它们添加到workers队列中。每个线程都运行一个无限循环,等待tasks队列中的任务。

以下是相关的代码示例:

for (size_t i = 0; i < maxThreads; ++i) {
    workers.emplace_back(new ThreadWrapper);
    workers.back()->setmain([this] {
        for(;;) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(this->queue_mutex);
                this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
                if(this->stop && this->tasks.empty()) {
                    return;
                }
                task = std::move(this->tasks.front());
                this->tasks.pop();
            }
            ++workingThreads;
            task();
            --workingThreads;
        }
    });
    workers.back()->start();
}

在这段代码中,我们首先创建了一个新的ThreadWrapper对象,并将它添加到workers队列的末尾。然后,我们使用setmain方法设置了线程的主函数,这个函数是一个无限循环,等待tasks队列中的任务。最后,我们使用start方法启动了线程。

5.2 如何管理线程的生命周期

在C++中,我们使用std::thread::join方法来等待线程完成,然后销毁线程对象。如果一个线程对象被销毁,但是线程还没有完成,那么程序会终止。

在我们的线程池类中,我们在析构函数中停止了所有的线程,并等待它们完成当前的任务。以下是相关的代码示例:

~ThreadPool(){
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(auto &worker : workers) {
        worker->stop();
    }
}

在这段代码中,我们首先设置了stop标志,然后通知了所有等待的线程。然后,我们遍历workers队列,对每个线程调用stop方法。这个方法会等待线程完成当前的任务,然后停止线程。

5.3 如何统计正在工作的线程数量

在我们的线程池类中,我们使用了一个std::atomic<size_t>对象来跟踪正在工作的线程数量。std::atomic是一个模板类,它提供了一种在多线程环境中对数据进行安全访问的方式。

以下是相关的代码示例:

++workingThreads;
task();
--workingThreads;

在这段代码中,当一个线程开始执行任务时,我们使用++workingThreads;来增加正在工作的线程数量。当任务执行完毕时,我们使用--workingThreads;来减少正在工作的线程数量。这两行代码被放在任务执行前后,是为了确保正在工作的线程数量始终正确。

我们还提供了一个getWorkingThreadCount方法,用于获取当前正在工作的线程数量。以下是相关的代码示例:

size_t getWorkingThreadCount() {
    return workingThreads.load();
}

在这段代码中,我们使用std::atomic::load方法来获取workingThreads的值。这个方法会返回workingThreads当前的值,而不会改变它的值。

6. 线程同步与互斥

在多线程编程中,线程同步(Thread Synchronization)和互斥(Mutex)是两个非常重要的概念。它们用于解决多线程环境中的资源共享和访问冲突问题。

6.1 std::mutex的作用和用法

互斥量(Mutex)是一种同步机制,用于防止多个线程同时访问共享资源。在C++中,我们可以使用std::mutex类来创建互斥量。

std::mutex提供了两个主要的方法:lockunlock。当一个线程调用lock方法时,如果互斥量未被锁定,那么这个线程会锁定互斥量并继续执行。如果互斥量已经被另一个线程锁定,那么这个线程会被阻塞,直到互斥量被解锁。当线程完成对共享资源的访问后,它应该调用unlock方法来解锁互斥量,这样其他线程就可以锁定互斥量并访问共享资源了。

在实际使用中,我们通常会使用std::lock_guardstd::unique_lock来管理互斥量。这两个类在构造函数中锁定互斥量,在析构函数中解锁互斥量,这样可以保证在任何情况下,互斥量都会被正确地解锁。

以下是一个使用std::mutexstd::lock_guard的例子:

std::mutex mtx;
std::queue<int> queue;
void add_to_queue(int value) {
    std::lock_guard<std::mutex> lock(mtx);
    queue.push(value);
}

在这个例子中,add_to_queue函数使用std::lock_guard来锁定互斥量,保证在添加元素到队列时,不会有其他线程同时访问队列。

6.2 std::condition_variable的作用和用法

条件变量(Condition Variable)是一种同步机制,用于让线程在某个条件为真时才继续执行。在C++中,我们可以使用std::condition_variable类来创建条件变量。

std::condition_variable提供了三个主要的方法:waitnotify_onenotify_allwait方法用于让当前线程等待,直到条件变量被通知或者满足某个条件。notify_one方法用于唤醒一个等待的线程,notify_all方法用于唤醒所有等待的线程。

以下是一个使用std::condition_variable的例子:

std::mutex mtx;
std::condition_variable cv;
std::queue<int> queue;
void add_to_queue(int value) {
    std::lock_guard<std::mutex> lock(mtx);
    queue.push(value);
    cv.notify_one(); // 通知一个等待的线程
}
void process_queue() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return !queue.empty(); }); // 等待队列非空
    int value = queue.front();
    queue.pop();
    // 处理value...
}

在这个例子中,add_to_queue函数在添加元素到队列后,调用cv.notify_one()来唤醒一个等待的线程。process_queue函数使用cv.wait()来等待队列非空,当队列非空时,它会取出队列的第一个元素并处理。

6.3 如何使用std::mutex和std::condition_variable进行线程同步

在多线程编程中,我们经常需要使用互斥量和条件变量来进行线程同步。以下是一个使用std::mutexstd::condition_variable进行线程同步的例子:

std::mutex mtx;
std::condition_variable cv;
std::queue<std::function<void()>> tasks;
bool stop = false;
void worker_thread() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(mtx);
            cv.wait(lock, []{ return stop || !tasks.empty(); });
            if (stop && tasks.empty()) {
                return;
            }
            task = std::move(tasks.front());
            tasks.pop();
        }
        task();
    }
}
void add_task(std::function<void()> task) {
    {
        std::lock_guard<std::mutex> lock(mtx);
        tasks.push(std::move(task));
    }
    cv.notify_one();
}

在这个例子中,worker_thread函数是工作线程的主循环,它会一直等待并执行任务,直到线程被停止并且任务队列为空。add_task函数用于添加任务到任务队列,并唤醒一个工作线程。

线程同步与互斥的关键点

  • 使用std::mutex来防止多个线程同时访问共享资源。
  • 使用std::condition_variable来让线程在某个条件为真时才继续执行。
  • 使用std::lock_guardstd::unique_lock来管理互斥量,保证在任何情况下,互斥量都会被正确地解锁。
  • 使用std::condition_variable::wait方法来让当前线程等待,直到条件变量被通知或者满足某个条件。
  • 使用std::condition_variable::notify_onestd::condition_variable::notify_all方法来唤醒等待的线程。

以下是一个线程同步与互斥的流程图,它展示了两个线程如何使用互斥量和条件变量进行同步和互斥访问共享资源。

在这个流程图中,"Thread 1"和"Thread 2"代表两个线程,"Mutex"代表互斥量,"Condition Variable"代表条件变量,"Shared Resource"代表共享资源。线程1首先请求访问共享资源,互斥量授予访问权限,线程1访问共享资源,然后释放互斥量,并通知条件变量。线程2等待互斥量,得到互斥量后访问共享资源,然后释放互斥量。

这个流程图清晰地展示了如何使用互斥量和条件变量进行线程同步和互斥访问共享资源,这是多线程编程中的一个重要技术。

结语

在我们的编程学习之旅中,理解是我们迈向更高层次的重要一步。然而,掌握新技能、新理念,始终需要时间和坚持。从心理学的角度看,学习往往伴随着不断的试错和调整,这就像是我们的大脑在逐渐优化其解决问题的“算法”。

这就是为什么当我们遇到错误,我们应该将其视为学习和进步的机会,而不仅仅是困扰。通过理解和解决这些问题,我们不仅可以修复当前的代码,更可以提升我们的编程能力,防止在未来的项目中犯相同的错误。

我鼓励大家积极参与进来,不断提升自己的编程技术。无论你是初学者还是有经验的开发者,我希望我的博客能对你的学习之路有所帮助。如果你觉得这篇文章有用,不妨点击收藏,或者留下你的评论分享你的见解和经验,也欢迎你对我博客的内容提出建议和问题。每一次的点赞、评论、分享和关注都是对我的最大支持,也是对我持续分享和创作的动力。

目录
相关文章
|
6月前
|
存储 负载均衡 算法
基于 C++ 语言的迪杰斯特拉算法在局域网计算机管理中的应用剖析
在局域网计算机管理中,迪杰斯特拉算法用于优化网络路径、分配资源和定位故障节点,确保高效稳定的网络环境。该算法通过计算最短路径,提升数据传输速率与稳定性,实现负载均衡并快速排除故障。C++代码示例展示了其在网络模拟中的应用,为企业信息化建设提供有力支持。
156 15
|
7月前
|
算法 Serverless 数据处理
从集思录可转债数据探秘:Python与C++实现的移动平均算法应用
本文探讨了如何利用移动平均算法分析集思录提供的可转债数据,帮助投资者把握价格趋势。通过Python和C++两种编程语言实现简单移动平均(SMA),展示了数据处理的具体方法。Python代码借助`pandas`库轻松计算5日SMA,而C++代码则通过高效的数据处理展示了SMA的计算过程。集思录平台提供了详尽且及时的可转债数据,助力投资者结合算法与社区讨论,做出更明智的投资决策。掌握这些工具和技术,有助于在复杂多变的金融市场中挖掘更多价值。
221 12
|
8月前
|
编译器 数据安全/隐私保护 C++
【C++面向对象——继承与派生】派生类的应用(头歌实践教学平台习题)【合集】
本实验旨在学习类的继承关系、不同继承方式下的访问控制及利用虚基类解决二义性问题。主要内容包括: 1. **类的继承关系基础概念**:介绍继承的定义及声明派生类的语法。 2. **不同继承方式下对基类成员的访问控制**:详细说明`public`、`private`和`protected`继承方式对基类成员的访问权限影响。 3. **利用虚基类解决二义性问题**:解释多继承中可能出现的二义性及其解决方案——虚基类。 实验任务要求从`people`类派生出`student`、`teacher`、`graduate`和`TA`类,添加特定属性并测试这些类的功能。最终通过创建教师和助教实例,验证代码
171 5
|
10月前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
562 7
|
10月前
|
消息中间件 存储 安全
|
11月前
|
存储 并行计算 安全
C++多线程应用
【10月更文挑战第29天】C++ 中的多线程应用广泛,常见场景包括并行计算、网络编程中的并发服务器和图形用户界面(GUI)应用。通过多线程可以显著提升计算速度和响应能力。示例代码展示了如何使用 `pthread` 库创建和管理线程。注意事项包括数据同步与互斥、线程间通信和线程安全的类设计,以确保程序的正确性和稳定性。
198 5
|
4月前
|
机器学习/深度学习 消息中间件 存储
【高薪程序员必看】万字长文拆解Java并发编程!(9-2):并发工具-线程池
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程中的强力并发工具-线程池,废话不多说让我们直接开始。
168 0
|
7月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
118 26
|
7月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
126 17
|
9月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
575 2