【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用

简介: 【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用

1. 引言

并发编程中,线程池(Thread Pool)是一种常见的设计模式,它可以有效地管理和控制多线程的执行。线程池中预先创建了一定数量的线程,这些线程可以并发地执行多个任务。当新的任务到来时,线程池会选择一个空闲的线程来执行这个任务。当任务执行完毕,线程会返回到线程池中,等待下一个任务的到来。

线程池的主要优点是减少了线程创建和销毁的开销。线程的创建和销毁都是需要消耗系统资源的,如果频繁地创建和销毁线程,会导致系统性能下降。线程池通过复用已经创建的线程,避免了这种开销。此外,线程池还可以限制系统中线程的数量,防止系统资源被过多的线程耗尽。

在C++中,我们可以使用标准库中的线程(std::thread)和互斥锁(std::mutex)等工具来实现线程池。下面,我们将详细介绍如何设计和实现一个线程池。

2. 线程池类设计

线程池的设计主要包括以下几个部分:

2.1 线程池类的主要成员变量

线程池类主要包括以下几个成员变量:

  • 工作线程队列:这是一个存储所有工作线程的队列。每个工作线程都是一个独立的线程,可以并发地执行任务。
  • 任务队列:这是一个存储所有待执行任务的队列。当有新的任务到来时,任务会被添加到这个队列中。工作线程会从这个队列中取出任务并执行。
  • 互斥锁:这是一个用于保护任务队列的互斥锁。当工作线程需要从任务队列中取出任务时,需要先获取这个互斥锁。这可以防止多个线程同时修改任务队列,导致数据不一致。
  • 条件变量:这是一个用于线程同步的条件变量。当任务队列为空时,工作线程会等待这个条件变量。当有新的任务被添加到任务队列时,条件变量会被通知,然后工作线程会被唤醒,取出任务并执行。

线程池类的主要成员变量示意图如下:

2.2 线程池类的构造函数和析构函数

线程池类的构造函数主要负责初始化线程池,包括创建工作线程,初始化互斥锁和条件变量等。在创建工作线程时,每个线程都会运行一个无限循环,等待任务队列中的任务。当任务队列中有任务时,线程会取出任务并执行。

线程池类的析构函数主要负责清理线程池,包括停止所有的工作线程,并等待它们完成当前的任务。

3. 线程池任务的添加与执行

在这一章节中,我们将深入探讨如何在线程池中添加和执行任务。我们将通过一个综合的代码示例来介绍这个过程,并通过注释和解释来帮助你理解每个步骤的作用。

3.1 如何添加任务到线程池

在我们的线程池类中,添加任务到线程池是通过enqueue方法实现的。这个方法接收一个函数(或者其他可调用的目标,如lambda表达式)和这个函数的参数,然后创建一个任务并将这个任务添加到任务队列中。

这个过程的关键是std::packaged_taskstd::bindstd::packaged_task是一个模板类,它可以包装任何可以调用的目标,使它们可以作为异步任务被调用。std::bind则是用于绑定函数和参数,创建一个可以直接调用的函数对象。

下面是enqueue方法的代码示例和注释:

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;
    // 创建一个 std::packaged_task 对象,包装用户提供的函数和参数
    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
    // 获取这个任务的 std::future 对象,用于之后获取任务的结果
    std::future<return_type> res = task->get_future();
    // 将任务添加到任务队列中
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        // 如果线程池已经停止,那么抛出一个运行时错误
        if(stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks.emplace([task](){ (*task)(); });
    }
    // 通知一个等待的工作线程,有新的任务可以执行了
    condition.notify_one();
    // 返回 std::future 对象,用户可以用它来获取任务的结果
    return res;
}

3.2 如何在线程池中执行任务

在线程池中执行任务是通过工作线程的主循环实现的。每个工作线程都会运行一个无限循环,不断地从任务队列中取出任务并执行。

这个过程的关键是std::condition_variablestd::unique_lockstd::condition_variable是用于等待和通知的条件变量,std::unique_lock则是一个可以自动解锁的互斥锁,

它们被用于同步线程,确保任务的正确执行。

下面是工作线程的主循环的代码示例和注释:

workers.back()->setmain([this] {
    for(;;) {
        std::function<void()> task;
        // 锁定互斥锁,并等待任务
        {
            std::unique_lock<std::mutex> lock(this->queue_mutex);
            this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
            // 如果线程池已经停止,并且任务队列为空,那么退出循环,结束线程
            if(this->stop && this->tasks.empty()) {
                return;
            }
            // 从任务队列中取出一个任务
            task = std::move(this->tasks.front());
            this->tasks.pop();
        }
        // 增加正在工作的线程数量
        ++workingThreads;
        // 执行任务
        task();
        // 减少正在工作的线程数量
        --workingThreads;
    }
});

这个循环会一直运行,直到线程池被停止并且任务队列为空。在每次循环中,线程会锁定互斥锁,然后等待任务。当任务队列中有任务时,线程会取出任务并执行。执行任务时,正在工作的线程数量会增加;任务执行完毕后,正在工作的线程数量会减少。

下面是这个过程的流程图,可以帮助你更好地理解这个过程:

3.3 如何获取任务的返回值

在我们的线程池类中,获取任务的返回值是通过std::future实现的。当你添加一个任务到线程池时,enqueue方法会返回一个std::future对象。你可以通过这个std::future对象来获取任务的返回值。

std::future是一个模板类,它表示一个异步操作的结果。你可以调用std::future::get方法来获取结果。如果结果还没有准备好,那么get方法会阻塞,直到结果准备好为止。

下面是如何使用std::future获取任务返回值的代码示例:

// 创建一个线程池
ThreadPool pool(4);
// 添加一个任务到线程池
auto future = pool.enqueue([]{
    return 1 + 1;
});
// 获取任务的返回值
int result = future.get();
// 输出结果
std::cout << "The result is " << result << std::endl;

在这个示例中,我们首先创建了一个线程池,然后添加了一个任务到线程池。这个任务是一个lambda表达式,它的作用是计算1 + 1的结果。然后,我们通过std::future::get方法获取了任务的返回值,并将结果输出到控制台。

下面是这个过程的流程图,可以帮助你更好地理解这个过程:

通过这个章节,我们深入探讨了如何在线程池中添加和执行任务,以及如何获取任务的返回值。我们通过代码示例和注释,以及流程图,详细解释了每个步骤的作用和实现方式。希望这个章节能帮助你更好地理解线程池的工作原理,以及如何在自己的项目中使用线程池。

2.3 线程池类的主要成员函数

线程池类的主要成员函数包括添加任务的函数和获取正在工作的线程数量的函数。

添加任务的函数接收一个函数和该函数的参数,创建一个任务,并将任务添加到任务队列中。如果线程池已经停止,那么添加任务会抛出一个运行时错误。这个函数返回一个std::future,表示任务的返回值。

获取正在工作的线程数量的函数返回当前正在工作的线程数量。

4. 深入理解std::packaged_task和std::future

在C++的并发编程中,std::packaged_taskstd::future是两个非常重要的工具,它们可以帮助我们更好地管理和控制异步任务。在本章中,我们将深入探讨这两个工具的作用和用法。

std::packaged_task的作用和用法

std::packaged_task(标准包装任务)是一个模板类,它的主要作用是将任何可以调用的目标(函数、lambda表达式、bind表达式或其他函数对象)包装成一个任务,这个任务可以在另一个线程中执行,并且可以将结果存储在一个std::future对象中。

下面是一个std::packaged_task的基本用法示例:

std::packaged_task<int(int, int)> task([](int a, int b) {
    return a + b;
});
std::future<int> result = task.get_future();
std::thread(std::move(task), 2, 3).detach();
std::cout << "Result: " << result.get() << std::endl;

在这个示例中,我们首先定义了一个std::packaged_task对象,它接受一个lambda表达式作为参数。然后,我们通过get_future方法获取一个std::future对象,这个对象将在任务完成时获得结果。最后,我们创建一个新的线程来执行任务,并立即将其分离。

std::future的作用和用法

std::future(标准未来)是一个模板类,它的主要作用是存储异步任务的结果。当你创建一个异步任务(例如通过std::asyncstd::packaged_task)时,你会得到一个std::future对象。你可以使用这个对象来查询任务的状态,等待任务完成,或获取任务的结果。

下面是一个std::future的基本用法示例:

std::future<int> result = std::async(std::launch::async, [](int a, int b) {
    return a + b;
}, 2, 3);
std::cout << "Result: " << result.get() << std::endl;

在这个示例中,我们使用std::async函数创建了一个异步任务,并得到了一个std::future对象。然后,我们通过get方法获取任务的结果。

如何使用std::packaged_task和std::future获取异步任务的结果

在前面的章节中,我们已经介绍了如何使用std::packaged_taskstd::future来创建异步任务和获取任务的结果。现在,让我们看一个更复杂的示例,

在这个示例中,我们将使用线程池来并行处理一组任务,并使用std::packaged_taskstd::future来获取任务的结果。

// 创建一个线程池
ThreadPool pool(4);
// 创建一组任务
std::vector<std::future<int>> results;
for (int i = 0; i < 10; ++i) {
    results.push_back(
        pool.enqueue([i](int value) {
            std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时任务
            return i + value;
        }, i)
    );
}
// 获取任务的结果
for (auto& result : results) {
    std::cout << "Result: " << result.get() << std::endl;
}

在这个示例中,我们首先创建了一个包含4个线程的线程池。然后,我们创建了一组任务,并将它们添加到线程池中。每个任务都是一个lambda表达式,它接受一个整数参数,并返回这个整数加上任务的索引。我们使用enqueue方法将任务添加到线程池中,并获取一个std::future对象来表示任务的结果。最后,我们遍历所有的std::future对象,使用get方法获取任务的结果。

这个示例展示了如何使用线程池、std::packaged_taskstd::future来并行处理一组任务,并获取任务的结果。这是一个非常强大的模式,它可以让你充分利用多核处理器的性能,同时简化并发编程的复杂性。

5. 线程池中的线程管理

在本章中,我们将深入探讨线程池中的线程管理,包括如何创建和启动线程,如何管理线程的生命周期,以及如何统计正在工作的线程数量。

5.1 如何创建和启动线程

在C++中,我们使用std::thread类来创建和管理线程。std::thread类的构造函数接受一个可调用的对象(如函数、lambda表达式等)和该对象的参数,然后在一个新的线程中执行该对象。

在我们的线程池类中,我们在构造函数中创建了指定数量的线程,并将它们添加到workers队列中。每个线程都运行一个无限循环,等待tasks队列中的任务。

以下是相关的代码示例:

for (size_t i = 0; i < maxThreads; ++i) {
    workers.emplace_back(new ThreadWrapper);
    workers.back()->setmain([this] {
        for(;;) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(this->queue_mutex);
                this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });
                if(this->stop && this->tasks.empty()) {
                    return;
                }
                task = std::move(this->tasks.front());
                this->tasks.pop();
            }
            ++workingThreads;
            task();
            --workingThreads;
        }
    });
    workers.back()->start();
}

在这段代码中,我们首先创建了一个新的ThreadWrapper对象,并将它添加到workers队列的末尾。然后,我们使用setmain方法设置了线程的主函数,这个函数是一个无限循环,等待tasks队列中的任务。最后,我们使用start方法启动了线程。

5.2 如何管理线程的生命周期

在C++中,我们使用std::thread::join方法来等待线程完成,然后销毁线程对象。如果一个线程对象被销毁,但是线程还没有完成,那么程序会终止。

在我们的线程池类中,我们在析构函数中停止了所有的线程,并等待它们完成当前的任务。以下是相关的代码示例:

~ThreadPool(){
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(auto &worker : workers) {
        worker->stop();
    }
}

在这段代码中,我们首先设置了stop标志,然后通知了所有等待的线程。然后,我们遍历workers队列,对每个线程调用stop方法。这个方法会等待线程完成当前的任务,然后停止线程。

5.3 如何统计正在工作的线程数量

在我们的线程池类中,我们使用了一个std::atomic<size_t>对象来跟踪正在工作的线程数量。std::atomic是一个模板类,它提供了一种在多线程环境中对数据进行安全访问的方式。

以下是相关的代码示例:

++workingThreads;
task();
--workingThreads;

在这段代码中,当一个线程开始执行任务时,我们使用++workingThreads;来增加正在工作的线程数量。当任务执行完毕时,我们使用--workingThreads;来减少正在工作的线程数量。这两行代码被放在任务执行前后,是为了确保正在工作的线程数量始终正确。

我们还提供了一个getWorkingThreadCount方法,用于获取当前正在工作的线程数量。以下是相关的代码示例:

size_t getWorkingThreadCount() {
    return workingThreads.load();
}

在这段代码中,我们使用std::atomic::load方法来获取workingThreads的值。这个方法会返回workingThreads当前的值,而不会改变它的值。

6. 线程同步与互斥

在多线程编程中,线程同步(Thread Synchronization)和互斥(Mutex)是两个非常重要的概念。它们用于解决多线程环境中的资源共享和访问冲突问题。

6.1 std::mutex的作用和用法

互斥量(Mutex)是一种同步机制,用于防止多个线程同时访问共享资源。在C++中,我们可以使用std::mutex类来创建互斥量。

std::mutex提供了两个主要的方法:lockunlock。当一个线程调用lock方法时,如果互斥量未被锁定,那么这个线程会锁定互斥量并继续执行。如果互斥量已经被另一个线程锁定,那么这个线程会被阻塞,直到互斥量被解锁。当线程完成对共享资源的访问后,它应该调用unlock方法来解锁互斥量,这样其他线程就可以锁定互斥量并访问共享资源了。

在实际使用中,我们通常会使用std::lock_guardstd::unique_lock来管理互斥量。这两个类在构造函数中锁定互斥量,在析构函数中解锁互斥量,这样可以保证在任何情况下,互斥量都会被正确地解锁。

以下是一个使用std::mutexstd::lock_guard的例子:

std::mutex mtx;
std::queue<int> queue;
void add_to_queue(int value) {
    std::lock_guard<std::mutex> lock(mtx);
    queue.push(value);
}

在这个例子中,add_to_queue函数使用std::lock_guard来锁定互斥量,保证在添加元素到队列时,不会有其他线程同时访问队列。

6.2 std::condition_variable的作用和用法

条件变量(Condition Variable)是一种同步机制,用于让线程在某个条件为真时才继续执行。在C++中,我们可以使用std::condition_variable类来创建条件变量。

std::condition_variable提供了三个主要的方法:waitnotify_onenotify_allwait方法用于让当前线程等待,直到条件变量被通知或者满足某个条件。notify_one方法用于唤醒一个等待的线程,notify_all方法用于唤醒所有等待的线程。

以下是一个使用std::condition_variable的例子:

std::mutex mtx;
std::condition_variable cv;
std::queue<int> queue;
void add_to_queue(int value) {
    std::lock_guard<std::mutex> lock(mtx);
    queue.push(value);
    cv.notify_one(); // 通知一个等待的线程
}
void process_queue() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return !queue.empty(); }); // 等待队列非空
    int value = queue.front();
    queue.pop();
    // 处理value...
}

在这个例子中,add_to_queue函数在添加元素到队列后,调用cv.notify_one()来唤醒一个等待的线程。process_queue函数使用cv.wait()来等待队列非空,当队列非空时,它会取出队列的第一个元素并处理。

6.3 如何使用std::mutex和std::condition_variable进行线程同步

在多线程编程中,我们经常需要使用互斥量和条件变量来进行线程同步。以下是一个使用std::mutexstd::condition_variable进行线程同步的例子:

std::mutex mtx;
std::condition_variable cv;
std::queue<std::function<void()>> tasks;
bool stop = false;
void worker_thread() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(mtx);
            cv.wait(lock, []{ return stop || !tasks.empty(); });
            if (stop && tasks.empty()) {
                return;
            }
            task = std::move(tasks.front());
            tasks.pop();
        }
        task();
    }
}
void add_task(std::function<void()> task) {
    {
        std::lock_guard<std::mutex> lock(mtx);
        tasks.push(std::move(task));
    }
    cv.notify_one();
}

在这个例子中,worker_thread函数是工作线程的主循环,它会一直等待并执行任务,直到线程被停止并且任务队列为空。add_task函数用于添加任务到任务队列,并唤醒一个工作线程。

线程同步与互斥的关键点

  • 使用std::mutex来防止多个线程同时访问共享资源。
  • 使用std::condition_variable来让线程在某个条件为真时才继续执行。
  • 使用std::lock_guardstd::unique_lock来管理互斥量,保证在任何情况下,互斥量都会被正确地解锁。
  • 使用std::condition_variable::wait方法来让当前线程等待,直到条件变量被通知或者满足某个条件。
  • 使用std::condition_variable::notify_onestd::condition_variable::notify_all方法来唤醒等待的线程。

以下是一个线程同步与互斥的流程图,它展示了两个线程如何使用互斥量和条件变量进行同步和互斥访问共享资源。

在这个流程图中,"Thread 1"和"Thread 2"代表两个线程,"Mutex"代表互斥量,"Condition Variable"代表条件变量,"Shared Resource"代表共享资源。线程1首先请求访问共享资源,互斥量授予访问权限,线程1访问共享资源,然后释放互斥量,并通知条件变量。线程2等待互斥量,得到互斥量后访问共享资源,然后释放互斥量。

这个流程图清晰地展示了如何使用互斥量和条件变量进行线程同步和互斥访问共享资源,这是多线程编程中的一个重要技术。

结语

在我们的编程学习之旅中,理解是我们迈向更高层次的重要一步。然而,掌握新技能、新理念,始终需要时间和坚持。从心理学的角度看,学习往往伴随着不断的试错和调整,这就像是我们的大脑在逐渐优化其解决问题的“算法”。

这就是为什么当我们遇到错误,我们应该将其视为学习和进步的机会,而不仅仅是困扰。通过理解和解决这些问题,我们不仅可以修复当前的代码,更可以提升我们的编程能力,防止在未来的项目中犯相同的错误。

我鼓励大家积极参与进来,不断提升自己的编程技术。无论你是初学者还是有经验的开发者,我希望我的博客能对你的学习之路有所帮助。如果你觉得这篇文章有用,不妨点击收藏,或者留下你的评论分享你的见解和经验,也欢迎你对我博客的内容提出建议和问题。每一次的点赞、评论、分享和关注都是对我的最大支持,也是对我持续分享和创作的动力。

目录
相关文章
|
7天前
|
编译器 数据安全/隐私保护 C++
【C++面向对象——继承与派生】派生类的应用(头歌实践教学平台习题)【合集】
本实验旨在学习类的继承关系、不同继承方式下的访问控制及利用虚基类解决二义性问题。主要内容包括: 1. **类的继承关系基础概念**:介绍继承的定义及声明派生类的语法。 2. **不同继承方式下对基类成员的访问控制**:详细说明`public`、`private`和`protected`继承方式对基类成员的访问权限影响。 3. **利用虚基类解决二义性问题**:解释多继承中可能出现的二义性及其解决方案——虚基类。 实验任务要求从`people`类派生出`student`、`teacher`、`graduate`和`TA`类,添加特定属性并测试这些类的功能。最终通过创建教师和助教实例,验证代码
26 5
|
20天前
|
存储 缓存 编译器
【硬核】C++11并发:内存模型和原子类型
本文从C++11并发编程中的关键概念——内存模型与原子类型入手,结合详尽的代码示例,抽丝剥茧地介绍了如何实现无锁化并发的性能优化。
|
2月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
219 6
|
1月前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
45 2
|
2月前
|
数据采集 存储 数据处理
Python中的多线程编程及其在数据处理中的应用
本文深入探讨了Python中多线程编程的概念、原理和实现方法,并详细介绍了其在数据处理领域的应用。通过对比单线程与多线程的性能差异,展示了多线程编程在提升程序运行效率方面的显著优势。文章还提供了实际案例,帮助读者更好地理解和掌握多线程编程技术。
|
2月前
|
存储 监控 安全
深入理解ThreadLocal:线程局部变量的机制与应用
在Java的多线程编程中,`ThreadLocal`变量提供了一种线程安全的解决方案,允许每个线程拥有自己的变量副本,从而避免了线程间的数据竞争。本文将深入探讨`ThreadLocal`的工作原理、使用方法以及在实际开发中的应用场景。
93 2
|
2月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
82 6
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
113 7
|
2月前
|
消息中间件 存储 安全