【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用

简介: 【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用

1. 引言

并发编程中,线程池(Thread Pool)是一种常见的设计模式,它可以有效地管理和控制多线程的执行。线程池中预先创建了一定数量的线程,这些线程可以并发地执行多个任务。当新的任务到来时,线程池会选择一个空闲的线程来执行这个任务。当任务执行完毕,线程会返回到线程池中,等待下一个任务的到来。

线程池的主要优点是减少了线程创建和销毁的开销。线程的创建和销毁都是需要消耗系统资源的,如果频繁地创建和销毁线程,会导致系统性能下降。线程池通过复用已经创建的线程,避免了这种开销。此外,线程池还可以限制系统中线程的数量,防止系统资源被过多的线程耗尽。

在C++中,我们可以使用标准库中的线程(std::thread)和互斥锁(std::mutex)等工具来实现线程池。下面,我们将详细介绍如何设计和实现一个线程池。

2. 线程池类设计

线程池的设计主要包括以下几个部分:

2.1 线程池类的主要成员变量

线程池类主要包括以下几个成员变量:

  • 工作线程队列:这是一个存储所有工作线程的队列。每个工作线程都是一个独立的线程,可以并发地执行任务。
  • 任务队列:这是一个存储所有待执行任务的队列。当有新的任务到来时,任务会被添加到这个队列中。工作线程会从这个队列中取出任务并执行。
  • 互斥锁:这是一个用于保护任务队列的互斥锁。当工作线程需要从任务队列中取出任务时,需要先获取这个互斥锁。这可以防止多个线程同时修改任务队列,导致数据不一致。
  • 条件变量:这是一个用于线程同步的条件变量。当任务队列为空时,工作线程会等待这个条件变量。当有新的任务被添加到任务队列时,条件变量会被通知,然后工作线程会被唤醒,取出任务并执行。

线程池类的主要成员变量示意图如下:

2.2 线程池类的构造函数和析构函数

线程池类的构造函数主要负责初始化线程池,包括创建工作线程,初始化互斥锁和条件变量等。在创建工作线程时,每个线程都会运行一个无限循环,等待任务队列中的任务。当任务队列中有任务时,线程会取出任务并执行。

线程池类的析构函数主要负责清理线程池,包括停止所有的工作线程,并等待它们完成当前的任务。

3. 线程池任务的添加与执行

在这一章节中,我们将深入探讨如何在线程池中添加和执行任务。我们将通过一个综合的代码示例来介绍这个过程,并通过注释和解释来帮助你理解每个步骤的作用。

3.1 如何添加任务到线程池

在我们的线程池类中,添加任务到线程池是通过enqueue方法实现的。这个方法接收一个函数(或者其他可调用的目标,如lambda表达式)和这个函数的参数,然后创建一个任务并将这个任务添加到任务队列中。

这个过程的关键是std::packaged_taskstd::bindstd::packaged_task是一个模板类,它可以包装任何可以调用的目标,使它们可以作为异步任务被调用。std::bind则是用于绑定函数和参数,创建一个可以直接调用的函数对象。

下面是enqueue方法的代码示例和注释:

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;
    // 创建一个 std::packaged_task 对象,包装用户提供的函数和参数
    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
    // 获取这个任务的 std::future 对象,用于之后获取任务的结果
    std::future<return_type> res = task->get_future();
    // 将任务添加到任务队列中
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        // 如果线程池已经停止,那么抛出一个运行时错误
        if(stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks.emplace([task](){ (*task)(); });
    }
    // 通知一个等待的工作线程,有新的任务可以执行了
    condition.notify_one();
    // 返回 std::future 对象,用户可以用它来获取任务的结果
    return res;
}

3.2 如何在线程池中执行任务

在线程池中执行任务是通过工作线程的主循环实现的。每个工作线程都会运行一个无限循环,不断地从任务队列中取出任务并执行。

这个过程的关键是std::condition_variablestd::unique_lockstd::condition_variable是用于等待和通知的条件变量,std::unique_lock则是一个可以自动解锁的互斥锁,

它们被用于同步线程,确保任务的正确执行。

下面是工作线程的主循环的代码示例和注释:

workers.back()->setmain([this] {
    for(;;) {
        std::function<void()> task;
        // 锁定互斥锁,并等待任务
        {
            std::unique_lock<std::mutex> lock(this->queue_mutex);
            this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
            // 如果线程池已经停止,并且任务队列为空,那么退出循环,结束线程
            if(this->stop && this->tasks.empty()) {
                return;
            }
            // 从任务队列中取出一个任务
            task = std::move(this->tasks.front());
            this->tasks.pop();
        }
        // 增加正在工作的线程数量
        ++workingThreads;
        // 执行任务
        task();
        // 减少正在工作的线程数量
        --workingThreads;
    }
});

这个循环会一直运行,直到线程池被停止并且任务队列为空。在每次循环中,线程会锁定互斥锁,然后等待任务。当任务队列中有任务时,线程会取出任务并执行。执行任务时,正在工作的线程数量会增加;任务执行完毕后,正在工作的线程数量会减少。

下面是这个过程的流程图,可以帮助你更好地理解这个过程:

3.3 如何获取任务的返回值

在我们的线程池类中,获取任务的返回值是通过std::future实现的。当你添加一个任务到线程池时,enqueue方法会返回一个std::future对象。你可以通过这个std::future对象来获取任务的返回值。

std::future是一个模板类,它表示一个异步操作的结果。你可以调用std::future::get方法来获取结果。如果结果还没有准备好,那么get方法会阻塞,直到结果准备好为止。

下面是如何使用std::future获取任务返回值的代码示例:

// 创建一个线程池
ThreadPool pool(4);
// 添加一个任务到线程池
auto future = pool.enqueue([]{
    return 1 + 1;
});
// 获取任务的返回值
int result = future.get();
// 输出结果
std::cout << "The result is " << result << std::endl;

在这个示例中,我们首先创建了一个线程池,然后添加了一个任务到线程池。这个任务是一个lambda表达式,它的作用是计算1 + 1的结果。然后,我们通过std::future::get方法获取了任务的返回值,并将结果输出到控制台。

下面是这个过程的流程图,可以帮助你更好地理解这个过程:

通过这个章节,我们深入探讨了如何在线程池中添加和执行任务,以及如何获取任务的返回值。我们通过代码示例和注释,以及流程图,详细解释了每个步骤的作用和实现方式。希望这个章节能帮助你更好地理解线程池的工作原理,以及如何在自己的项目中使用线程池。

2.3 线程池类的主要成员函数

线程池类的主要成员函数包括添加任务的函数和获取正在工作的线程数量的函数。

添加任务的函数接收一个函数和该函数的参数,创建一个任务,并将任务添加到任务队列中。如果线程池已经停止,那么添加任务会抛出一个运行时错误。这个函数返回一个std::future,表示任务的返回值。

获取正在工作的线程数量的函数返回当前正在工作的线程数量。

4. 深入理解std::packaged_task和std::future

在C++的并发编程中,std::packaged_taskstd::future是两个非常重要的工具,它们可以帮助我们更好地管理和控制异步任务。在本章中,我们将深入探讨这两个工具的作用和用法。

std::packaged_task的作用和用法

std::packaged_task(标准包装任务)是一个模板类,它的主要作用是将任何可以调用的目标(函数、lambda表达式、bind表达式或其他函数对象)包装成一个任务,这个任务可以在另一个线程中执行,并且可以将结果存储在一个std::future对象中。

下面是一个std::packaged_task的基本用法示例:

std::packaged_task<int(int, int)> task([](int a, int b) {
    return a + b;
});
std::future<int> result = task.get_future();
std::thread(std::move(task), 2, 3).detach();
std::cout << "Result: " << result.get() << std::endl;

在这个示例中,我们首先定义了一个std::packaged_task对象,它接受一个lambda表达式作为参数。然后,我们通过get_future方法获取一个std::future对象,这个对象将在任务完成时获得结果。最后,我们创建一个新的线程来执行任务,并立即将其分离。

std::future的作用和用法

std::future(标准未来)是一个模板类,它的主要作用是存储异步任务的结果。当你创建一个异步任务(例如通过std::asyncstd::packaged_task)时,你会得到一个std::future对象。你可以使用这个对象来查询任务的状态,等待任务完成,或获取任务的结果。

下面是一个std::future的基本用法示例:

std::future<int> result = std::async(std::launch::async, [](int a, int b) {
    return a + b;
}, 2, 3);
std::cout << "Result: " << result.get() << std::endl;

在这个示例中,我们使用std::async函数创建了一个异步任务,并得到了一个std::future对象。然后,我们通过get方法获取任务的结果。

如何使用std::packaged_task和std::future获取异步任务的结果

在前面的章节中,我们已经介绍了如何使用std::packaged_taskstd::future来创建异步任务和获取任务的结果。现在,让我们看一个更复杂的示例,

在这个示例中,我们将使用线程池来并行处理一组任务,并使用std::packaged_taskstd::future来获取任务的结果。

// 创建一个线程池
ThreadPool pool(4);
// 创建一组任务
std::vector<std::future<int>> results;
for (int i = 0; i < 10; ++i) {
    results.push_back(
        pool.enqueue([i](int value) {
            std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时任务
            return i + value;
        }, i)
    );
}
// 获取任务的结果
for (auto& result : results) {
    std::cout << "Result: " << result.get() << std::endl;
}

在这个示例中,我们首先创建了一个包含4个线程的线程池。然后,我们创建了一组任务,并将它们添加到线程池中。每个任务都是一个lambda表达式,它接受一个整数参数,并返回这个整数加上任务的索引。我们使用enqueue方法将任务添加到线程池中,并获取一个std::future对象来表示任务的结果。最后,我们遍历所有的std::future对象,使用get方法获取任务的结果。

这个示例展示了如何使用线程池、std::packaged_taskstd::future来并行处理一组任务,并获取任务的结果。这是一个非常强大的模式,它可以让你充分利用多核处理器的性能,同时简化并发编程的复杂性。

5. 线程池中的线程管理

在本章中,我们将深入探讨线程池中的线程管理,包括如何创建和启动线程,如何管理线程的生命周期,以及如何统计正在工作的线程数量。

5.1 如何创建和启动线程

在C++中,我们使用std::thread类来创建和管理线程。std::thread类的构造函数接受一个可调用的对象(如函数、lambda表达式等)和该对象的参数,然后在一个新的线程中执行该对象。

在我们的线程池类中,我们在构造函数中创建了指定数量的线程,并将它们添加到workers队列中。每个线程都运行一个无限循环,等待tasks队列中的任务。

以下是相关的代码示例:

for (size_t i = 0; i < maxThreads; ++i) {
    workers.emplace_back(new ThreadWrapper);
    workers.back()->setmain([this] {
        for(;;) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(this->queue_mutex);
                this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
                if(this->stop && this->tasks.empty()) {
                    return;
                }
                task = std::move(this->tasks.front());
                this->tasks.pop();
            }
            ++workingThreads;
            task();
            --workingThreads;
        }
    });
    workers.back()->start();
}

在这段代码中,我们首先创建了一个新的ThreadWrapper对象,并将它添加到workers队列的末尾。然后,我们使用setmain方法设置了线程的主函数,这个函数是一个无限循环,等待tasks队列中的任务。最后,我们使用start方法启动了线程。

5.2 如何管理线程的生命周期

在C++中,我们使用std::thread::join方法来等待线程完成,然后销毁线程对象。如果一个线程对象被销毁,但是线程还没有完成,那么程序会终止。

在我们的线程池类中,我们在析构函数中停止了所有的线程,并等待它们完成当前的任务。以下是相关的代码示例:

~ThreadPool(){
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(auto &worker : workers) {
        worker->stop();
    }
}

在这段代码中,我们首先设置了stop标志,然后通知了所有等待的线程。然后,我们遍历workers队列,对每个线程调用stop方法。这个方法会等待线程完成当前的任务,然后停止线程。

5.3 如何统计正在工作的线程数量

在我们的线程池类中,我们使用了一个std::atomic<size_t>对象来跟踪正在工作的线程数量。std::atomic是一个模板类,它提供了一种在多线程环境中对数据进行安全访问的方式。

以下是相关的代码示例:

++workingThreads;
task();
--workingThreads;

在这段代码中,当一个线程开始执行任务时,我们使用++workingThreads;来增加正在工作的线程数量。当任务执行完毕时,我们使用--workingThreads;来减少正在工作的线程数量。这两行代码被放在任务执行前后,是为了确保正在工作的线程数量始终正确。

我们还提供了一个getWorkingThreadCount方法,用于获取当前正在工作的线程数量。以下是相关的代码示例:

size_t getWorkingThreadCount() {
    return workingThreads.load();
}

在这段代码中,我们使用std::atomic::load方法来获取workingThreads的值。这个方法会返回workingThreads当前的值,而不会改变它的值。

6. 线程同步与互斥

在多线程编程中,线程同步(Thread Synchronization)和互斥(Mutex)是两个非常重要的概念。它们用于解决多线程环境中的资源共享和访问冲突问题。

6.1 std::mutex的作用和用法

互斥量(Mutex)是一种同步机制,用于防止多个线程同时访问共享资源。在C++中,我们可以使用std::mutex类来创建互斥量。

std::mutex提供了两个主要的方法:lockunlock。当一个线程调用lock方法时,如果互斥量未被锁定,那么这个线程会锁定互斥量并继续执行。如果互斥量已经被另一个线程锁定,那么这个线程会被阻塞,直到互斥量被解锁。当线程完成对共享资源的访问后,它应该调用unlock方法来解锁互斥量,这样其他线程就可以锁定互斥量并访问共享资源了。

在实际使用中,我们通常会使用std::lock_guardstd::unique_lock来管理互斥量。这两个类在构造函数中锁定互斥量,在析构函数中解锁互斥量,这样可以保证在任何情况下,互斥量都会被正确地解锁。

以下是一个使用std::mutexstd::lock_guard的例子:

std::mutex mtx;
std::queue<int> queue;
void add_to_queue(int value) {
    std::lock_guard<std::mutex> lock(mtx);
    queue.push(value);
}

在这个例子中,add_to_queue函数使用std::lock_guard来锁定互斥量,保证在添加元素到队列时,不会有其他线程同时访问队列。

6.2 std::condition_variable的作用和用法

条件变量(Condition Variable)是一种同步机制,用于让线程在某个条件为真时才继续执行。在C++中,我们可以使用std::condition_variable类来创建条件变量。

std::condition_variable提供了三个主要的方法:waitnotify_onenotify_allwait方法用于让当前线程等待,直到条件变量被通知或者满足某个条件。notify_one方法用于唤醒一个等待的线程,notify_all方法用于唤醒所有等待的线程。

以下是一个使用std::condition_variable的例子:

std::mutex mtx;
std::condition_variable cv;
std::queue<int> queue;
void add_to_queue(int value) {
    std::lock_guard<std::mutex> lock(mtx);
    queue.push(value);
    cv.notify_one(); // 通知一个等待的线程
}
void process_queue() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return !queue.empty(); }); // 等待队列非空
    int value = queue.front();
    queue.pop();
    // 处理value...
}

在这个例子中,add_to_queue函数在添加元素到队列后,调用cv.notify_one()来唤醒一个等待的线程。process_queue函数使用cv.wait()来等待队列非空,当队列非空时,它会取出队列的第一个元素并处理。

6.3 如何使用std::mutex和std::condition_variable进行线程同步

在多线程编程中,我们经常需要使用互斥量和条件变量来进行线程同步。以下是一个使用std::mutexstd::condition_variable进行线程同步的例子:

std::mutex mtx;
std::condition_variable cv;
std::queue<std::function<void()>> tasks;
bool stop = false;
void worker_thread() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(mtx);
            cv.wait(lock, []{ return stop || !tasks.empty(); });
            if (stop && tasks.empty()) {
                return;
            }
            task = std::move(tasks.front());
            tasks.pop();
        }
        task();
    }
}
void add_task(std::function<void()> task) {
    {
        std::lock_guard<std::mutex> lock(mtx);
        tasks.push(std::move(task));
    }
    cv.notify_one();
}

在这个例子中,worker_thread函数是工作线程的主循环,它会一直等待并执行任务,直到线程被停止并且任务队列为空。add_task函数用于添加任务到任务队列,并唤醒一个工作线程。

线程同步与互斥的关键点

  • 使用std::mutex来防止多个线程同时访问共享资源。
  • 使用std::condition_variable来让线程在某个条件为真时才继续执行。
  • 使用std::lock_guardstd::unique_lock来管理互斥量,保证在任何情况下,互斥量都会被正确地解锁。
  • 使用std::condition_variable::wait方法来让当前线程等待,直到条件变量被通知或者满足某个条件。
  • 使用std::condition_variable::notify_onestd::condition_variable::notify_all方法来唤醒等待的线程。

以下是一个线程同步与互斥的流程图,它展示了两个线程如何使用互斥量和条件变量进行同步和互斥访问共享资源。

在这个流程图中,"Thread 1"和"Thread 2"代表两个线程,"Mutex"代表互斥量,"Condition Variable"代表条件变量,"Shared Resource"代表共享资源。线程1首先请求访问共享资源,互斥量授予访问权限,线程1访问共享资源,然后释放互斥量,并通知条件变量。线程2等待互斥量,得到互斥量后访问共享资源,然后释放互斥量。

这个流程图清晰地展示了如何使用互斥量和条件变量进行线程同步和互斥访问共享资源,这是多线程编程中的一个重要技术。

结语

在我们的编程学习之旅中,理解是我们迈向更高层次的重要一步。然而,掌握新技能、新理念,始终需要时间和坚持。从心理学的角度看,学习往往伴随着不断的试错和调整,这就像是我们的大脑在逐渐优化其解决问题的“算法”。

这就是为什么当我们遇到错误,我们应该将其视为学习和进步的机会,而不仅仅是困扰。通过理解和解决这些问题,我们不仅可以修复当前的代码,更可以提升我们的编程能力,防止在未来的项目中犯相同的错误。

我鼓励大家积极参与进来,不断提升自己的编程技术。无论你是初学者还是有经验的开发者,我希望我的博客能对你的学习之路有所帮助。如果你觉得这篇文章有用,不妨点击收藏,或者留下你的评论分享你的见解和经验,也欢迎你对我博客的内容提出建议和问题。每一次的点赞、评论、分享和关注都是对我的最大支持,也是对我持续分享和创作的动力。

目录
相关文章
|
12天前
|
安全
List并发线程安全问题
【10月更文挑战第21天】`List` 并发线程安全问题是多线程编程中一个非常重要的问题,需要我们认真对待和处理。只有通过不断地学习和实践,我们才能更好地掌握多线程编程的技巧和方法,提高程序的性能和稳定性。
119 59
|
11天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
88 38
|
3天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
6天前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
23 7
|
6天前
|
消息中间件 存储 安全
|
9天前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
33 2
|
11天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
42 4
|
11天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
68 2
|
29天前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
38 1
C++ 多线程之初识多线程
|
14天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
13 3

相关实验场景

更多