【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用

简介: 【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用

1. 引言

并发编程中,线程池(Thread Pool)是一种常见的设计模式,它可以有效地管理和控制多线程的执行。线程池中预先创建了一定数量的线程,这些线程可以并发地执行多个任务。当新的任务到来时,线程池会选择一个空闲的线程来执行这个任务。当任务执行完毕,线程会返回到线程池中,等待下一个任务的到来。

线程池的主要优点是减少了线程创建和销毁的开销。线程的创建和销毁都是需要消耗系统资源的,如果频繁地创建和销毁线程,会导致系统性能下降。线程池通过复用已经创建的线程,避免了这种开销。此外,线程池还可以限制系统中线程的数量,防止系统资源被过多的线程耗尽。

在C++中,我们可以使用标准库中的线程(std::thread)和互斥锁(std::mutex)等工具来实现线程池。下面,我们将详细介绍如何设计和实现一个线程池。

2. 线程池类设计

线程池的设计主要包括以下几个部分:

2.1 线程池类的主要成员变量

线程池类主要包括以下几个成员变量:

  • 工作线程队列:这是一个存储所有工作线程的队列。每个工作线程都是一个独立的线程,可以并发地执行任务。
  • 任务队列:这是一个存储所有待执行任务的队列。当有新的任务到来时,任务会被添加到这个队列中。工作线程会从这个队列中取出任务并执行。
  • 互斥锁:这是一个用于保护任务队列的互斥锁。当工作线程需要从任务队列中取出任务时,需要先获取这个互斥锁。这可以防止多个线程同时修改任务队列,导致数据不一致。
  • 条件变量:这是一个用于线程同步的条件变量。当任务队列为空时,工作线程会等待这个条件变量。当有新的任务被添加到任务队列时,条件变量会被通知,然后工作线程会被唤醒,取出任务并执行。

线程池类的主要成员变量示意图如下:

2.2 线程池类的构造函数和析构函数

线程池类的构造函数主要负责初始化线程池,包括创建工作线程,初始化互斥锁和条件变量等。在创建工作线程时,每个线程都会运行一个无限循环,等待任务队列中的任务。当任务队列中有任务时,线程会取出任务并执行。

线程池类的析构函数主要负责清理线程池,包括停止所有的工作线程,并等待它们完成当前的任务。

3. 线程池任务的添加与执行

在这一章节中,我们将深入探讨如何在线程池中添加和执行任务。我们将通过一个综合的代码示例来介绍这个过程,并通过注释和解释来帮助你理解每个步骤的作用。

3.1 如何添加任务到线程池

在我们的线程池类中,添加任务到线程池是通过enqueue方法实现的。这个方法接收一个函数(或者其他可调用的目标,如lambda表达式)和这个函数的参数,然后创建一个任务并将这个任务添加到任务队列中。

这个过程的关键是std::packaged_taskstd::bindstd::packaged_task是一个模板类,它可以包装任何可以调用的目标,使它们可以作为异步任务被调用。std::bind则是用于绑定函数和参数,创建一个可以直接调用的函数对象。

下面是enqueue方法的代码示例和注释:

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;
    // 创建一个 std::packaged_task 对象,包装用户提供的函数和参数
    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
    // 获取这个任务的 std::future 对象,用于之后获取任务的结果
    std::future<return_type> res = task->get_future();
    // 将任务添加到任务队列中
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        // 如果线程池已经停止,那么抛出一个运行时错误
        if(stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks.emplace([task](){ (*task)(); });
    }
    // 通知一个等待的工作线程,有新的任务可以执行了
    condition.notify_one();
    // 返回 std::future 对象,用户可以用它来获取任务的结果
    return res;
}

3.2 如何在线程池中执行任务

在线程池中执行任务是通过工作线程的主循环实现的。每个工作线程都会运行一个无限循环,不断地从任务队列中取出任务并执行。

这个过程的关键是std::condition_variablestd::unique_lockstd::condition_variable是用于等待和通知的条件变量,std::unique_lock则是一个可以自动解锁的互斥锁,

它们被用于同步线程,确保任务的正确执行。

下面是工作线程的主循环的代码示例和注释:

workers.back()->setmain([this] {
    for(;;) {
        std::function<void()> task;
        // 锁定互斥锁,并等待任务
        {
            std::unique_lock<std::mutex> lock(this->queue_mutex);
            this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
            // 如果线程池已经停止,并且任务队列为空,那么退出循环,结束线程
            if(this->stop && this->tasks.empty()) {
                return;
            }
            // 从任务队列中取出一个任务
            task = std::move(this->tasks.front());
            this->tasks.pop();
        }
        // 增加正在工作的线程数量
        ++workingThreads;
        // 执行任务
        task();
        // 减少正在工作的线程数量
        --workingThreads;
    }
});

这个循环会一直运行,直到线程池被停止并且任务队列为空。在每次循环中,线程会锁定互斥锁,然后等待任务。当任务队列中有任务时,线程会取出任务并执行。执行任务时,正在工作的线程数量会增加;任务执行完毕后,正在工作的线程数量会减少。

下面是这个过程的流程图,可以帮助你更好地理解这个过程:

3.3 如何获取任务的返回值

在我们的线程池类中,获取任务的返回值是通过std::future实现的。当你添加一个任务到线程池时,enqueue方法会返回一个std::future对象。你可以通过这个std::future对象来获取任务的返回值。

std::future是一个模板类,它表示一个异步操作的结果。你可以调用std::future::get方法来获取结果。如果结果还没有准备好,那么get方法会阻塞,直到结果准备好为止。

下面是如何使用std::future获取任务返回值的代码示例:

// 创建一个线程池
ThreadPool pool(4);
// 添加一个任务到线程池
auto future = pool.enqueue([]{
    return 1 + 1;
});
// 获取任务的返回值
int result = future.get();
// 输出结果
std::cout << "The result is " << result << std::endl;

在这个示例中,我们首先创建了一个线程池,然后添加了一个任务到线程池。这个任务是一个lambda表达式,它的作用是计算1 + 1的结果。然后,我们通过std::future::get方法获取了任务的返回值,并将结果输出到控制台。

下面是这个过程的流程图,可以帮助你更好地理解这个过程:

通过这个章节,我们深入探讨了如何在线程池中添加和执行任务,以及如何获取任务的返回值。我们通过代码示例和注释,以及流程图,详细解释了每个步骤的作用和实现方式。希望这个章节能帮助你更好地理解线程池的工作原理,以及如何在自己的项目中使用线程池。

2.3 线程池类的主要成员函数

线程池类的主要成员函数包括添加任务的函数和获取正在工作的线程数量的函数。

添加任务的函数接收一个函数和该函数的参数,创建一个任务,并将任务添加到任务队列中。如果线程池已经停止,那么添加任务会抛出一个运行时错误。这个函数返回一个std::future,表示任务的返回值。

获取正在工作的线程数量的函数返回当前正在工作的线程数量。

4. 深入理解std::packaged_task和std::future

在C++的并发编程中,std::packaged_taskstd::future是两个非常重要的工具,它们可以帮助我们更好地管理和控制异步任务。在本章中,我们将深入探讨这两个工具的作用和用法。

std::packaged_task的作用和用法

std::packaged_task(标准包装任务)是一个模板类,它的主要作用是将任何可以调用的目标(函数、lambda表达式、bind表达式或其他函数对象)包装成一个任务,这个任务可以在另一个线程中执行,并且可以将结果存储在一个std::future对象中。

下面是一个std::packaged_task的基本用法示例:

std::packaged_task<int(int, int)> task([](int a, int b) {
    return a + b;
});
std::future<int> result = task.get_future();
std::thread(std::move(task), 2, 3).detach();
std::cout << "Result: " << result.get() << std::endl;

在这个示例中,我们首先定义了一个std::packaged_task对象,它接受一个lambda表达式作为参数。然后,我们通过get_future方法获取一个std::future对象,这个对象将在任务完成时获得结果。最后,我们创建一个新的线程来执行任务,并立即将其分离。

std::future的作用和用法

std::future(标准未来)是一个模板类,它的主要作用是存储异步任务的结果。当你创建一个异步任务(例如通过std::asyncstd::packaged_task)时,你会得到一个std::future对象。你可以使用这个对象来查询任务的状态,等待任务完成,或获取任务的结果。

下面是一个std::future的基本用法示例:

std::future<int> result = std::async(std::launch::async, [](int a, int b) {
    return a + b;
}, 2, 3);
std::cout << "Result: " << result.get() << std::endl;

在这个示例中,我们使用std::async函数创建了一个异步任务,并得到了一个std::future对象。然后,我们通过get方法获取任务的结果。

如何使用std::packaged_task和std::future获取异步任务的结果

在前面的章节中,我们已经介绍了如何使用std::packaged_taskstd::future来创建异步任务和获取任务的结果。现在,让我们看一个更复杂的示例,

在这个示例中,我们将使用线程池来并行处理一组任务,并使用std::packaged_taskstd::future来获取任务的结果。

// 创建一个线程池
ThreadPool pool(4);
// 创建一组任务
std::vector<std::future<int>> results;
for (int i = 0; i < 10; ++i) {
    results.push_back(
        pool.enqueue([i](int value) {
            std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时任务
            return i + value;
        }, i)
    );
}
// 获取任务的结果
for (auto& result : results) {
    std::cout << "Result: " << result.get() << std::endl;
}

在这个示例中,我们首先创建了一个包含4个线程的线程池。然后,我们创建了一组任务,并将它们添加到线程池中。每个任务都是一个lambda表达式,它接受一个整数参数,并返回这个整数加上任务的索引。我们使用enqueue方法将任务添加到线程池中,并获取一个std::future对象来表示任务的结果。最后,我们遍历所有的std::future对象,使用get方法获取任务的结果。

这个示例展示了如何使用线程池、std::packaged_taskstd::future来并行处理一组任务,并获取任务的结果。这是一个非常强大的模式,它可以让你充分利用多核处理器的性能,同时简化并发编程的复杂性。

5. 线程池中的线程管理

在本章中,我们将深入探讨线程池中的线程管理,包括如何创建和启动线程,如何管理线程的生命周期,以及如何统计正在工作的线程数量。

5.1 如何创建和启动线程

在C++中,我们使用std::thread类来创建和管理线程。std::thread类的构造函数接受一个可调用的对象(如函数、lambda表达式等)和该对象的参数,然后在一个新的线程中执行该对象。

在我们的线程池类中,我们在构造函数中创建了指定数量的线程,并将它们添加到workers队列中。每个线程都运行一个无限循环,等待tasks队列中的任务。

以下是相关的代码示例:

for (size_t i = 0; i < maxThreads; ++i) {
    workers.emplace_back(new ThreadWrapper);
    workers.back()->setmain([this] {
        for(;;) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(this->queue_mutex);
                this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
                if(this->stop && this->tasks.empty()) {
                    return;
                }
                task = std::move(this->tasks.front());
                this->tasks.pop();
            }
            ++workingThreads;
            task();
            --workingThreads;
        }
    });
    workers.back()->start();
}

在这段代码中,我们首先创建了一个新的ThreadWrapper对象,并将它添加到workers队列的末尾。然后,我们使用setmain方法设置了线程的主函数,这个函数是一个无限循环,等待tasks队列中的任务。最后,我们使用start方法启动了线程。

5.2 如何管理线程的生命周期

在C++中,我们使用std::thread::join方法来等待线程完成,然后销毁线程对象。如果一个线程对象被销毁,但是线程还没有完成,那么程序会终止。

在我们的线程池类中,我们在析构函数中停止了所有的线程,并等待它们完成当前的任务。以下是相关的代码示例:

~ThreadPool(){
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(auto &worker : workers) {
        worker->stop();
    }
}

在这段代码中,我们首先设置了stop标志,然后通知了所有等待的线程。然后,我们遍历workers队列,对每个线程调用stop方法。这个方法会等待线程完成当前的任务,然后停止线程。

5.3 如何统计正在工作的线程数量

在我们的线程池类中,我们使用了一个std::atomic<size_t>对象来跟踪正在工作的线程数量。std::atomic是一个模板类,它提供了一种在多线程环境中对数据进行安全访问的方式。

以下是相关的代码示例:

++workingThreads;
task();
--workingThreads;

在这段代码中,当一个线程开始执行任务时,我们使用++workingThreads;来增加正在工作的线程数量。当任务执行完毕时,我们使用--workingThreads;来减少正在工作的线程数量。这两行代码被放在任务执行前后,是为了确保正在工作的线程数量始终正确。

我们还提供了一个getWorkingThreadCount方法,用于获取当前正在工作的线程数量。以下是相关的代码示例:

size_t getWorkingThreadCount() {
    return workingThreads.load();
}

在这段代码中,我们使用std::atomic::load方法来获取workingThreads的值。这个方法会返回workingThreads当前的值,而不会改变它的值。

6. 线程同步与互斥

在多线程编程中,线程同步(Thread Synchronization)和互斥(Mutex)是两个非常重要的概念。它们用于解决多线程环境中的资源共享和访问冲突问题。

6.1 std::mutex的作用和用法

互斥量(Mutex)是一种同步机制,用于防止多个线程同时访问共享资源。在C++中,我们可以使用std::mutex类来创建互斥量。

std::mutex提供了两个主要的方法:lockunlock。当一个线程调用lock方法时,如果互斥量未被锁定,那么这个线程会锁定互斥量并继续执行。如果互斥量已经被另一个线程锁定,那么这个线程会被阻塞,直到互斥量被解锁。当线程完成对共享资源的访问后,它应该调用unlock方法来解锁互斥量,这样其他线程就可以锁定互斥量并访问共享资源了。

在实际使用中,我们通常会使用std::lock_guardstd::unique_lock来管理互斥量。这两个类在构造函数中锁定互斥量,在析构函数中解锁互斥量,这样可以保证在任何情况下,互斥量都会被正确地解锁。

以下是一个使用std::mutexstd::lock_guard的例子:

std::mutex mtx;
std::queue<int> queue;
void add_to_queue(int value) {
    std::lock_guard<std::mutex> lock(mtx);
    queue.push(value);
}

在这个例子中,add_to_queue函数使用std::lock_guard来锁定互斥量,保证在添加元素到队列时,不会有其他线程同时访问队列。

6.2 std::condition_variable的作用和用法

条件变量(Condition Variable)是一种同步机制,用于让线程在某个条件为真时才继续执行。在C++中,我们可以使用std::condition_variable类来创建条件变量。

std::condition_variable提供了三个主要的方法:waitnotify_onenotify_allwait方法用于让当前线程等待,直到条件变量被通知或者满足某个条件。notify_one方法用于唤醒一个等待的线程,notify_all方法用于唤醒所有等待的线程。

以下是一个使用std::condition_variable的例子:

std::mutex mtx;
std::condition_variable cv;
std::queue<int> queue;
void add_to_queue(int value) {
    std::lock_guard<std::mutex> lock(mtx);
    queue.push(value);
    cv.notify_one(); // 通知一个等待的线程
}
void process_queue() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return !queue.empty(); }); // 等待队列非空
    int value = queue.front();
    queue.pop();
    // 处理value...
}

在这个例子中,add_to_queue函数在添加元素到队列后,调用cv.notify_one()来唤醒一个等待的线程。process_queue函数使用cv.wait()来等待队列非空,当队列非空时,它会取出队列的第一个元素并处理。

6.3 如何使用std::mutex和std::condition_variable进行线程同步

在多线程编程中,我们经常需要使用互斥量和条件变量来进行线程同步。以下是一个使用std::mutexstd::condition_variable进行线程同步的例子:

std::mutex mtx;
std::condition_variable cv;
std::queue<std::function<void()>> tasks;
bool stop = false;
void worker_thread() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(mtx);
            cv.wait(lock, []{ return stop || !tasks.empty(); });
            if (stop && tasks.empty()) {
                return;
            }
            task = std::move(tasks.front());
            tasks.pop();
        }
        task();
    }
}
void add_task(std::function<void()> task) {
    {
        std::lock_guard<std::mutex> lock(mtx);
        tasks.push(std::move(task));
    }
    cv.notify_one();
}

在这个例子中,worker_thread函数是工作线程的主循环,它会一直等待并执行任务,直到线程被停止并且任务队列为空。add_task函数用于添加任务到任务队列,并唤醒一个工作线程。

线程同步与互斥的关键点

  • 使用std::mutex来防止多个线程同时访问共享资源。
  • 使用std::condition_variable来让线程在某个条件为真时才继续执行。
  • 使用std::lock_guardstd::unique_lock来管理互斥量,保证在任何情况下,互斥量都会被正确地解锁。
  • 使用std::condition_variable::wait方法来让当前线程等待,直到条件变量被通知或者满足某个条件。
  • 使用std::condition_variable::notify_onestd::condition_variable::notify_all方法来唤醒等待的线程。

以下是一个线程同步与互斥的流程图,它展示了两个线程如何使用互斥量和条件变量进行同步和互斥访问共享资源。

在这个流程图中,"Thread 1"和"Thread 2"代表两个线程,"Mutex"代表互斥量,"Condition Variable"代表条件变量,"Shared Resource"代表共享资源。线程1首先请求访问共享资源,互斥量授予访问权限,线程1访问共享资源,然后释放互斥量,并通知条件变量。线程2等待互斥量,得到互斥量后访问共享资源,然后释放互斥量。

这个流程图清晰地展示了如何使用互斥量和条件变量进行线程同步和互斥访问共享资源,这是多线程编程中的一个重要技术。

结语

在我们的编程学习之旅中,理解是我们迈向更高层次的重要一步。然而,掌握新技能、新理念,始终需要时间和坚持。从心理学的角度看,学习往往伴随着不断的试错和调整,这就像是我们的大脑在逐渐优化其解决问题的“算法”。

这就是为什么当我们遇到错误,我们应该将其视为学习和进步的机会,而不仅仅是困扰。通过理解和解决这些问题,我们不仅可以修复当前的代码,更可以提升我们的编程能力,防止在未来的项目中犯相同的错误。

我鼓励大家积极参与进来,不断提升自己的编程技术。无论你是初学者还是有经验的开发者,我希望我的博客能对你的学习之路有所帮助。如果你觉得这篇文章有用,不妨点击收藏,或者留下你的评论分享你的见解和经验,也欢迎你对我博客的内容提出建议和问题。每一次的点赞、评论、分享和关注都是对我的最大支持,也是对我持续分享和创作的动力。

目录
相关文章
|
16天前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
43 1
|
24天前
|
缓存 Java
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
这篇文章详细介绍了Java中线程的四种初始化方式,包括继承Thread类、实现Runnable接口、实现Callable接口与FutureTask结合使用,以及使用线程池。同时,还深入探讨了线程池的七大参数及其作用,解释了线程池的运行流程,并列举了四种常见的线程池类型。最后,阐述了在开发中使用线程池的原因,如降低资源消耗、提高响应速度和增强线程的可管理性。
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
|
9天前
|
监控 Java
线程池中线程异常后:销毁还是复用?技术深度剖析
在并发编程中,线程池作为一种高效利用系统资源的工具,被广泛用于处理大量并发任务。然而,当线程池中的线程在执行任务时遇到异常,如何妥善处理这些异常线程成为了一个值得深入探讨的话题。本文将围绕“线程池中线程异常后:销毁还是复用?”这一主题,分享一些实践经验和理论思考。
20 3
|
19天前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
30 2
|
21天前
|
存储 算法 C++
C++ STL应用宝典:高效处理数据的艺术与实战技巧大揭秘!
【8月更文挑战第22天】C++ STL(标准模板库)是一组高效的数据结构与算法集合,极大提升编程效率与代码可读性。它包括容器、迭代器、算法等组件。例如,统计文本中单词频率可用`std::map`和`std::ifstream`实现;对数据排序及找极值则可通过`std::vector`结合`std::sort`、`std::min/max_element`完成;而快速查找字符串则适合使用`std::set`配合其内置的`find`方法。这些示例展示了STL的强大功能,有助于编写简洁高效的代码。
31 2
|
15天前
|
数据采集 Java Python
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
|
16天前
|
Java
线程池中线程抛了异常,该如何处理?
【8月更文挑战第27天】在Java多线程编程中,线程池(ThreadPool)是一种常用的并发处理工具,它能够有效地管理线程的生命周期,提高资源利用率,并简化并发编程的复杂性。然而,当线程池中的线程在执行任务时抛出异常,如果不妥善处理,这些异常可能会导致程序出现未预料的行为,甚至崩溃。因此,了解并掌握线程池异常处理机制至关重要。
88 0
|
22天前
|
存储 编译器 C++
C++多态实现的原理:深入探索与实战应用
【8月更文挑战第21天】在C++的浩瀚宇宙中,多态性(Polymorphism)无疑是一颗璀璨的星辰,它赋予了程序高度的灵活性和可扩展性。多态允许我们通过基类指针或引用来调用派生类的成员函数,而具体调用哪个函数则取决于指针或引用所指向的对象的实际类型。本文将深入探讨C++多态实现的原理,并结合工作学习中的实际案例,分享其技术干货。
32 0
|
25天前
|
JSON Android开发 C++
Android c++ core guideline checker 应用
Android c++ core guideline checker 应用
|
27天前
|
Dart 编译器 API
Dart ffi 使用问题之在C++线程中无法直接调用Dart函数的问题如何解决
Dart ffi 使用问题之在C++线程中无法直接调用Dart函数的问题如何解决