Linux多线程【线程池】

简介: Linux多线程【线程池】

🌇前言


线程池是一种管理线程的机制,它可以在需要时自动创建和销毁线程,以及分配和回收线程资源。线程池的主要优点是减少了频繁创建和销毁线程所带来的开销,提高了系统的稳定性和可扩展性。此外,线程池还可以有效地控制线程的数量,避免过多线程导致的资源竞争和系统过载


🏙️正文

1.线程池的概念

1.1.池化技术

所谓的 线程池 就是 提前创建一批线程,当任务来临时,线程直接从任务队列中获取任务执行,可以提高整体效率;同时一批线程会被合理维护,避免调度时造成额外开销

像这种把未来会高频使用到,并且创建较为麻烦的资源提前申请好的技术称为 池化技术池化技术 可以极大地提高性能,最典型的就是 线程池,常用于各种涉及网络连接相关的服务中,比如 MySQL 连接池、HTTP 连接池、Redis 连接池

除了线程池外还有内存池,比如 STL 中的容器在进行空间申请时,都是直接从 空间配置器 allocator 中获取的,并非直接使用系统调用来申请空间

池化技术 的本质:空间换时间

池化技术 就好比你把钱从银行提前取出一部分放在支付宝中,可以随时使用,十分方便和高效,总不至于需要用钱时还得跑到银行排队取钱

1.2.线程池的优点

线程池 的优点在于 高效、方便

  1. 线程在使用前就已经创建好了,使用时直接将任务交给线程完成
  2. 线程会被合理调度,确保 任务与线程 间能做到负载均衡

线程池 中的线程数量不是越多越好,因为线程增多会导致调度变复杂,具体创建多少线程取决于具体业务场景,比如 处理器内核、剩余内存、网络中的 socket 数量等

线程池 还可以配合 「生产者消费者模型」 一起使用,做到 解耦与提高效率

  • 可以把 任务队列 换成 「生产者消费者模型」

1.3.线程池的应用场景

线程池 有以下几种应用场景:

  1. 存在大量且短小的任务请求,比如 Web 服务器中的网页请求,使用 线程池 就非常合适,因为网页点击量众多,并且大多都没有长时间连接访问
  2. 对性能要求苛刻,力求快速响应需求,比如游戏服务器,要求对玩家的操作做出快速响应
  3. 突发大量请求,但不至于使服务器产生过多的线程,短时间内,在服务器创建大量线程会使得内存达到极限,造成出错,可以使用 线程池 规避问题

2.线程池的实现

2.1.线程池_V1(朴素版)

「朴素版」:实现最基本的线程池功能,直接使用系统提供的接口

所谓朴素版就是不加任何优化设计,只实现 线程池 最基础的功能,便于理解 线程池

创建 ThreadPool_v1.hpp 头文件

线程池 实现为一个类,提供接口供外部调用

首先要明白 线程池 的两大核心:一批线程任务队列,客户端发出请求,新增任务,线程获取任务,执行任务,因此 ThreadPool_v1.hpp 的大体框架如下

  • 一批线程,通过容器管理
  • 任务队列,存储就绪的任务
  • 互斥锁
  • 条件变量

互斥锁 的作用是 保证多个线程并访问任务队列时的线程安全,而 条件变量 可以在 任务队列 为空时,让一批线程进入等待状态,也就是线程同步

注:为了方便实现,直接使用系统调用接口及容器,比如 pthread_tvectorqueue

#pragma once
#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>
namespace Yohifo
{
#define THREAD_NUM 10
    template<class T>
    class ThreadPool
    {
    public:
        ThreadPool(int num = THREAD_NUM)
            :_threads(num), _num(num)
        {
            // 初始化互斥锁和条件变量
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_cond, nullptr);
        }
        ~ThreadPool()
        {
            // 互斥锁、条件变量
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_cond);
        }
        void init()
        {
            // 其他信息初始化(当前不需要)
        }
        void start()
        {
            // 启动线程池
            // ...
        }
        // 提供给线程的回调函数
        static void *threadRoutine(void *args)
        {
            // 业务处理
            // ...
        }
    private:
        std::vector<pthread_t> _threads;
        int _num; // 线程数量
        std::queue<T> _tasks; // 利用 STL 自动扩容的特性,无需担心容量
        pthread_mutex_t _mtx;
        pthread_cond_t _cond;
    };
}

注意:

  • 需要提前给 vector 扩容,避免后面使用时发生越界访问
  • 提供给线程的回调函数需要设置为静态,否则线程调不动(参数不匹配)

填补函数体

初始化线程池 init() — 位于 ThreadPool

当前场景只需要初始化 互斥锁条件变量,在 构造函数 中完成就行了,所以这里的 init() 函数不需要补充

启动线程池 start() — 位于 ThreadPool

启动 线程池 需要先创建出一批线程,这里直接循环创建即可

void start()
{
    // 创建一批线程并启动
    for(int i = 0; i < _num; i++)
        pthread_create(&_threads[i], nullptr, threadRoutine, nullptr); // (存疑)
}

线程的回调函数 threadRoutine() — 位于 ThreadPool


这里进行简单测试,打印当前线程的线程 ID 就行了,并且直接 detach,主线程无需等待次线程运行结束

// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());
    while (true)
    {
        std::cout << "Thread Running... " << pthread_self() << std::endl;
        sleep(1);
    }
}


创建 main.cc 源文件,测试线程池的代码

#include "ThreadPool_V1.hpp"
#include <memory>
int main()
{
    std::unique_ptr<Yohifo::ThreadPool<int>> ptr(new Yohifo::ThreadPool<int>());
    ptr->init();
    ptr->start();
    // 还有后续动作
    return 0;
}

编译并运行代码,可以看到 确实创建了一批线程,当主线程退出后,其他次线程也就跟着终止了


线程池 还需要提供一个重要的接口 pushTask(),将用户需要执行的业务装载至 任务队列 中,等待线程执行

装载任务 pushTask() — 位于 ThreadPool

// 装载任务
void pushTask(const T& task)
{
    // 本质上就是在生产商品,需要加锁保护
    pthread_mutex_lock(&_mtx);
    _tasks.push(task);
    // 唤醒消费者进行消费
    pthread_cond_signal(&_cond);
    pthread_mutex_unlock(&_mtx);
}


装载任务的本质就是在生产任务,相当于用户充当生产者,通过这个接口将任务生产至任务队列中,而线程充当消费者,从任务队列中获取任务并消费

所以线程的回调函数需要从 任务队列 中获取任务,进行消费

  1. 检测是否有任务
  2. 有 -> 消费
  3. 没有 -> 等待


线程回调函数 threadRoutine() — 位于 ThreadPool

// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());
    while (true)
    {
        // 任务队列是临界资源,需要保护
        pthread_mutex_lock(&_mtx);
        // 等待条件满足
        while(_tasks.empty())
            pthread_cond_wait(&_cond, &_mtx);
        T task = _tasks.front();
        _tasks.pop();
        // task(); // 进行消费(存疑)
        pthread_mutex_unlock(&_mtx);
    }
}


注意:判断任务队列是否为空需要使用 while,确保在多线程环境中不会出现问题

因为 任务队列、互斥锁、条件变量 是类内成员,而这里的 threadRoutine() 函数是一个静态函数,并没有 this 指针以访问类内成员,可以采取传递 this 指针的方式解决问题

启动线程池 start() — 位于 ThreadPool

void start()
{
    // 创建一批线程并启动
    for(int i = 0; i < _num; i++)
        pthread_create(&_threads[i], nullptr, threadRoutine, this); // 传递 this 指针
}


threadRoutine() 函数需要将参数 void* 转化为所在类对象的指针,并通过该指针访问类内成员

线程回调函数 threadRoutine() — 位于 ThreadPool

// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());
    auto ptr = static_cast<ThreadPool<T>*>(args);
    while (true)
    {
        // 任务队列是临界资源,需要保护
        pthread_mutex_lock(&ptr->_mtx);
        // 等待条件满足
        while(ptr->_tasks.empty())
            pthread_cond_wait(&ptr->_cond, &ptr->_mtx);
        T task = ptr->_tasks.front();
        ptr->_tasks.pop();
        //task(); // 进行消费(存疑)
        pthread_mutex_unlock(&ptr->_mtx);
    }
}

为了使得提高代码的可阅读性及可拓展性,这里将会封装一批接口,供函数调用

加锁、解锁 — 位于 ThreadPool

void lockQueue()
{
    pthread_mutex_lock(&_mtx);
}
void unlockQueue()
{
    pthread_mutex_unlock(&_mtx);
}


等待、唤醒 — 位于 ThreadPool

void threadWait()
{
    pthread_cond_wait(&_cond, &_mtx);
}
void threadWakeUp()
{
    pthread_cond_signal(&_cond);
}


判空、获取任务 — 位于 ThreadPool

bool isEmpty()
{
    return _tasks.empty();
}
T popTask()
{
    T task = _tasks.front();
    _tasks.pop();
    return task;
}


接口封装完毕后,可以顺便修改之前的代码,比如 装载任务pushTask()

装载任务 pushTask() — 位于 ThreadPool

// 装载任务
void pushTask(const T& task)
{
    // 本质上就是在生产商品,需要加锁保护
    lockQueue();
    _tasks.push(task);
    // 唤醒消费者进行消费
    threadWakeUp();
    unlockQueue();
}


以及 消费者threadRountine()

线程回调函数 threadRoutine() — 位于 ThreadPool

// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());
    auto ptr = static_cast<ThreadPool<T>*>(args);
    while (true)
    {
        // 任务队列是临界资源,需要保护
        ptr->lockQueue();
        // 等待条件满足
        while(ptr->isEmpty())
            ptr->threadWait();
        T task = ptr->popTask();
        pthread_mutex_unlock(&ptr->_mtx);
    // 消费行为可以不用加锁(一个商品只会被一个线程消费)
    task(); 
    }
}

细节:轮到线程执行任务时,不需要加锁,这就好比你买桶泡面回家,是不必担心别人会和你争抢,可以慢慢消费;同样的,你也不应该占用锁资源,主动让出锁资源以提高整体效率

task() 表示执行任务,这里实际是一个 operator()() 的重载,详见 Linux多线程【生产者消费者模型】 中关于 Task.hpp 的设计,因为我们这里也需要使用任务,所以可以直接把之前写的代码拷贝过来

任务类 Task.hpp

#pragma once
#include <string>
namespace Yohifo
{
    // 支持泛型
    template<class T>
    class Task
    {
    public:
        Task(T x = 0, T y = 0, char op = '+')
            :_x(x), _y(y), _op(op), _res(0), _err(0)
        {}
        // 重载运算操作
        void operator()()
        {
            // 简单计算
            switch(_op)
            {
                case '+':
                    _res = _x + _y;
                break;
                case '-':
                    _res = _x - _y;
                break;
                case '*':
                    _res = _x * _y;
                break;
                case '/':
                    if(_y == 0)
                        _err = -1;
                    else
                        _res = _x / _y;    
                break;
                case '%':
                    if(_y == 0)
                        _err = -2;
                    else
                        _res = _x % _y;    
                break;
                default:
                    _err = -3;
                break;
            }
        }
        // 获取计算结果
        std::string getResult()
        {
            // 根据错误标识,返回计算结果
            std::string ret = std::to_string(_x) + " " + _op + " " + std::to_string(_y);
            if(_err)
            {
                ret += " error";
                // 判读是 / 错误还是 % 错误
                if(_err == -1)
                    ret += " [-1] \t / 0 引发了错误";
                else if(_err == -2)
                    ret += " [-2] \t % 0 引发了错误";
                else
                    ret += " [-3] \t 不合法的操作符,只能为 [+-*/%]";
            }
            else
            {
                ret += " = " + std::to_string(_res);
            }
            return ret;
        }
    private:
        T _x;
        T _y;
        char _op; // 运算符
        T _res; // 结果
        int _err; // 错误标识
    };
}


轮到 main.cc 进行操作了,逻辑很简单:创建线程池对象,初始化线程池,启动线程池,装载任务,等待运行结果

补充 main.cc

#include "ThreadPool_V1.hpp"
#include <memory>
typedef Yohifo::Task<int> type;
int main()
{
    std::unique_ptr<Yohifo::ThreadPool<type>> ptr(new Yohifo::ThreadPool<type>());
    ptr->init();
    ptr->start();
    // 还有后续动作
    while(true)
    {
        // 输入 操作数 操作数 操作符
        int x = 0, y = 0;
        char op = '+';
        std::cout << "输入 x: ";
        std::cin >> x;
        std::cout << "输入 y: ";
        std::cin >> y;
        std::cout << "输入 op: ";
        std::cin >> op;
        // 构建任务对象
        type task(x, y, op);
        // 装载任务
        ptr->pushTask(task);
    }
    return 0;
}


现在还有最后一个问题:如何获取计算结果?可以在 线程 执行完任务后,直接显示计算结果,也可以通过传入回调函数的方式,获取计算结果,前者非常简单,只需要在 threadRoutine() 中加入这行代码即可

线程回调函数 threadRoutine() — 位于 ThreadPool

void *threadRoutine(void *args)
{
  // ...
    // 显示计算结果
    std::cout << task.getResult() << std::endl;
}


除此之外,我们也可以通过 回调函数 的方式获取计算结果

目标:给线程传入一个回调函数,线程执行完任务后,将任务传给回调函数,回调函数结合业务逻辑,灵活处理结果

单纯打印的话,很容易就可以写出这个回调函数

回调函数 callBack() — 位于 main.cc 源文件

// 回调函数
void callBack(type& task)
{
    // 获取计算结果后打印
    std::string ret = task.getResult();
    std::cout << "计算结果为: " << ret;
}


为了能让 线程 在执行任务后能回调,需要将这个函数对象作为参数,传递给 ThreadPool 对象

源文件 main.cc

// ...
int main()
{
    std::unique_ptr<Yohifo::ThreadPool<type>> ptr(new Yohifo::ThreadPool<type>(callBack));
  // ...
}


当然,这边传递了一个对象,那边就得接收此对象,为了存储该函数对象,ThreadPool 新增一个类成员:_func,函数对象类型为 void (T&)

修改 ThreadPool.hpp 头文件

// ...
#include <functional>
namespace Yohifo
{
#define THREAD_NUM 10
    template<class T>
    class ThreadPool
    {
        using func_t = std::function<void(T&)>; // 包装器
    public:
        ThreadPool(func_t func, int num = THREAD_NUM)
            :_threads(num), _num(num), _func(func)
        {
            // 初始化互斥锁和条件变量
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_cond, nullptr);
        }
    // ...
    private:
        // ...
            func_t _func;
    };
}


修改完成后,创建 ThreadPool 对象时,支持传入一个类型为 void(T&) 的函数对象

获取函数对象后,需要让 线程 在执行完任务后进行回调,但又因为这玩意是一个类内成员,同样需要借助外部传入的 this 指针进行访问,这里直接封装成一个接口,顺便进行调用

回调函数对象 callBack() — 位于 ThreadPool

func_t callBack(T &task)
{
    _func(task);
}


线程回调函数 threadRoutine() — 位于 ThreadPool

// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // ...
        task(); // 执行任务
        ptr->callBack(task); // 回调函数
    }
}

做完上述准备工作后,可以进行测试

程序结果正常,不必在意打印问题,因为屏幕也是被多线程并发访问的资源,没加锁保护,导致出现问题


2.2.线程池_V2(封装版)

「封装版」:引入自己封装实现的线程库 Thread.hpp,支持对线程做出更多操作

之前写的线程池代码不够优雅,所能展现的线程相关信息太少了,为此可以选择引入之前封装实现的 Thread.hpp

自己封装的 Thread.hpp 头文件

#pragma once
#include <iostream>
#include <string>
#include <pthread.h>
enum class Status
{
    NEW = 0, // 新建
    RUNNING, // 运行中
    EXIT // 已退出
};
// 参数、返回值为 void 的函数类型
typedef void (*func_t)(void*);
class Thread
{
public:
    Thread(int num = 0, func_t func = nullptr, void* args = nullptr)
        :_tid(0), _status(Status::NEW), _func(func), _args(args)
    {
        // 根据编号写入名字
        char name[128];
        snprintf(name, sizeof name, "thread-%d", num);
        _name = name;
    }
    ~Thread()
    {}
    // 获取 ID
    pthread_t getTID() const
    {
        return _tid;
    }
    // 获取线程名
    std::string getName() const
    {
        return _name;
    }
    // 获取状态
    Status getStatus() const
    {
        return _status;
    }
    // 回调方法
    static void* runHelper(void* args)
    {
        Thread* myThis = static_cast<Thread*>(args);
        // 很简单,回调用户传进来的 func 函数即可
        myThis->_func(myThis->_args);
    }
    // 启动线程
    void run()
    {
        int ret = pthread_create(&_tid, nullptr, runHelper, this);
        if(ret != 0)
        {
            std::cerr << "create thread fail!" << std::endl;
            exit(1); // 创建线程失败,直接退出
        }
        _status =  Status::RUNNING; // 更改状态为 运行中
    }
    // 线程等待
    void join()
    {
        int ret = pthread_join(_tid, nullptr);
        if(ret != 0)
        {
            std::cerr << "thread join fail!" << std::endl;
            exit(1); // 等待失败,直接退出
        }
        _status = Status::EXIT; // 更改状态为 退出
    }
private:
    pthread_t _tid; // 线程 ID
    std::string _name; // 线程名
    Status _status; // 线程状态
    func_t _func; // 线程回调函数
    void* _args; // 传递给回调函数的参数
};


不再直接使用原生线程库,转而使用自己封装的线程库

创建 ThreadPool_V2.hpp 头文件

拷贝 ThreadPool_V1.hpp,对其中的部分内容进行修改即可

#pragma once
#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"
namespace Yohifo
{
#define THREAD_NUM 10
    template<class T>
    class ThreadPool
    {
        using func_t = std::function<void(T&)>; // 包装器
    public:
        ThreadPool(func_t func, int num = THREAD_NUM)
            :_num(num), _func(func)
        {
            // 初始化互斥锁和条件变量
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_cond, nullptr);
        }
        ~ThreadPool()
        {
            // 等待线程退出
            for(auto &t : _threads)
                t.join();
            // 互斥锁、条件变量
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_cond);
        }
        void init()
        {
            // 创建一批线程
            for(int i = 0; i < _num; i++)
                _threads.push_back(Thread(i, threadRoutine, this));
        }
        void start()
        {
            // 启动线程
            for(auto &t : _threads)
                t.run();
        }
        // 提供给线程的回调函数(已修改返回类型为 void)
        static void threadRoutine(void *args)
        {
            // 避免等待线程,直接剥离
            pthread_detach(pthread_self());
            auto ptr = static_cast<ThreadPool<T>*>(args);
            while (true)
            {
                // 任务队列是临界资源,需要保护
                ptr->lockQueue();
                // 等待条件满足
                while(ptr->isEmpty())
                    ptr->threadWait();
                T task = ptr->popTask();
                ptr->unlockQueue();
                task();
                ptr->callBack(task); // 回调函数
            }
        }
        // 装载任务
        void pushTask(const T& task)
        {
            // 本质上就是在生产商品,需要加锁保护
            lockQueue();
            _tasks.push(task);
            // 唤醒消费者进行消费
            threadWakeUp();
            unlockQueue();
        }
    protected:
        void lockQueue()
        {
            pthread_mutex_lock(&_mtx);
        }
        void unlockQueue()
        {
            pthread_mutex_unlock(&_mtx);
        }
        void threadWait()
        {
            pthread_cond_wait(&_cond, &_mtx);
        }
        void threadWakeUp()
        {
            pthread_cond_signal(&_cond);
        }
        bool isEmpty()
        {
            return _tasks.empty();
        }
        T popTask()
        {
            T task = _tasks.front();
            _tasks.pop();
            return task;
        }
        func_t callBack(T &task)
        {
            _func(task);
        }
    private:
        std::vector<Thread> _threads;
        std::queue<T> _tasks; // 利用 STL 自动扩容的特性,无需担心容量
        pthread_mutex_t _mtx;
        pthread_cond_t _cond;
        func_t _func;
    };
}


涉及修改的内容:

  • _threads 类型由 vector 变为 vector
  • init() 函数用于创建线程,注册线程信息
  • start() 函数用于启动线程
  • ~ThreadPool() 中新增等待线程退出
  • 线程回调函数 threadRoutinue() 返回值改为 void
  • 新增函数对象 _func


测试结果如下


2.3.线程池_V3(优化版)

「优化版」:从任务队列入手,引入 「生产者消费者模型」,同时引入 RAII 风格的锁,实现自动化加锁与解锁

当前的 线程池 设计已经完成的差不多了,接下来重点在于完善其他地方,比如 任务队列及锁的优化

线程池 专注于 任务处理,至于如何确保任务装载及获取时的线程安全问题,交给 「生产者消费者模型」(基于阻塞队列) 就行了,线程池_V3 版的代码可以优化成下面这个样子

线程池 ThreadPool_V3.hpp

#pragma once
#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"
#include "BlockingQueue.hpp" // CP模型
namespace Yohifo
{
#define THREAD_NUM 10
    template<class T>
    class ThreadPool
    {
        using func_t = std::function<void(T&)>; // 包装器
    public:
        ThreadPool(func_t func, int num = THREAD_NUM)
            :_num(num), _func(func)
        {
        }
        ~ThreadPool()
        {
            // 等待线程退出
            for(auto &t : _threads)
                t.join();
        }
        void init()
        {
            // 创建一批线程
            for(int i = 0; i < _num; i++)
                _threads.push_back(Thread(i, threadRoutine, this));
        }
        void start()
        {
            // 启动线程
            for(auto &t : _threads)
                t.run();
        }
        // 提供给线程的回调函数(已修改返回类型为 void)
        static void threadRoutine(void *args)
        {
            // 避免等待线程,直接剥离
            pthread_detach(pthread_self());
            auto ptr = static_cast<ThreadPool<T>*>(args);
            while (true)
            {
                // 从CP模型中获取任务
                T task = ptr->popTask();
                task();
                ptr->callBack(task); // 回调函数
            }
        }
        // 装载任务
        void pushTask(const T& task)
        {
            _blockqueue.Push(task);
        }
    protected:
        func_t callBack(T &task)
        {
            _func(task);
        }
        T popTask()
        {
            T task;
            _blockqueue.Pop(&task);
            return task;
        }
    private:
        std::vector<Thread> _threads;
        int _num; // 线程数量
        BlockQueue<T> _blockqueue; // 阻塞队列
        func_t _func;
    };
}


之前的 互斥锁、条件变量 相关操作交给 「生产者消费者模型」 处理,线程池 不必关心,关于 「生产者消费者模型」 的实现详见 Linux多线程【生产者消费者模型】

手动 加锁、解锁 显得不够专业,并且容易出问题,比如忘记释放锁资源而造成死锁,因此我们可以设计一个小组件 LockGuard,实现 RAII 风格的锁:初始化创建,析构时销毁

小组件 LockGuard.hpp

#pragma once
#include <pthread.h>
class LockGuard
{
public:
    LockGuard(pthread_mutex_t *pmtx)
        :_pmtx(pmtx)
    {
        // 加锁
        pthread_mutex_lock(_pmtx);
    }
    ~LockGuard()
    {
        // 解锁
        pthread_mutex_unlock(_pmtx);
    }
private:
    pthread_mutex_t *_pmtx;
};


将这个小组件加入 BlockingQueue.hpp 中,可以得到以下代码

生产者消费者模型 BlockingQueue.hpp

#pragma once
#include <queue>
#include <mutex>
#include <pthread.h>
#include "LockGuard.hpp"
// 命名空间,避免冲突
namespace Yohifo
{
#define DEF_SIZE 10
    template<class T>
    class BlockQueue
    {
    public:
        BlockQueue(size_t cap = DEF_SIZE)
            :_cap(cap)
        {
            // 初始化锁与条件变量
            pthread_mutex_init(&_mtx, nullptr);
            pthread_cond_init(&_pro_cond, nullptr);
            pthread_cond_init(&_con_cond, nullptr);
        }
        ~BlockQueue()
        {
            // 销毁锁与条件变量
            pthread_mutex_destroy(&_mtx);
            pthread_cond_destroy(&_pro_cond);
            pthread_cond_destroy(&_con_cond);
        }
        // 生产数据(入队)
        void Push(const T& inData)
        {
            // 加锁(RAII风格)
            LockGuard lock(&_mtx);
            // 循环判断条件是否满足
            while(IsFull())
            {
                pthread_cond_wait(&_pro_cond, &_mtx);
            }
            _queue.push(inData);
            // 可以加策略唤醒,比如生产一半才唤醒消费者
            pthread_cond_signal(&_con_cond);
            // 自动解锁
        }
        // 消费数据(出队)
        void Pop(T* outData)
        {
            // 加锁(RAII 风格)
            LockGuard lock(&_mtx);
            // 循环判读条件是否满足
            while(IsEmpty())
            {
                pthread_cond_wait(&_con_cond, &_mtx);
            }
            *outData = _queue.front();
            _queue.pop();
            // 可以加策略唤醒,比如消费完后才唤醒生产者
            pthread_cond_signal(&_pro_cond);
            // 自动解锁
        }
    private:
        // 判断是否为满
        bool IsFull()
        {
            return _queue.size() == _cap;
        }
        // 判断是否为空
        bool IsEmpty()
        {
            return _queue.empty();
        }
    private:
        std::queue<T> _queue;
        size_t _cap; // 阻塞队列的容量
        pthread_mutex_t _mtx; // 互斥锁
        pthread_cond_t _pro_cond; // 生产者条件变量
        pthread_cond_t _con_cond; // 消费者条件变量
    };
}


最后引入 main.cc,并编译运行程序,查看结果是否正确

源文件 main.cc

#include "ThreadPool_V3.hpp"
#include <memory>
typedef Yohifo::Task<int> type;
// 回调函数
void callBack(type& task)
{
    // 获取计算结果后打印
    std::string ret = task.getResult();
    std::cout << "计算结果为: " << ret << std::endl;
}
int main()
{
    std::unique_ptr<Yohifo::ThreadPool<type>> ptr(new Yohifo::ThreadPool<type>(callBack));
    ptr->init();
    ptr->start();
    // 还有后续动作
    while(true)
    {
        // 输入 操作数 操作数 操作符
        int x = 0, y = 0;
        char op = '+';
        std::cout << "输入 x: ";
        std::cin >> x;
        std::cout << "输入 y: ";
        std::cin >> y;
        std::cout << "输入 op: ";
        std::cin >> op;
        // 构建任务对象
        type task(x, y, op);
        // 装载任务
        ptr->pushTask(task);
    }
    return 0;
}


运行结果如下


如何证明现在有一批线程在运行呢?

通过指令查看,当程序运行后,再新开一个终端,并输入以下命令

ps -aL | grep threadPool


注:threadPool 为当前程序编译后生成的可执行文件名

可以看到:除了主线程 5847 外,其他次线程都在等待任务就绪,从生产者消费者模型中获取任务并执行;当大量并发任务来临时,线程池是能大大提高效率的


3.单例模式

3.1.什么是单例模式

代码构建类,类实例化出对象,这个实例化出的对象也可以称为 实例,比如常见的 STL 容器,在使用时,都是先根据库中的类,形成一个 实例 以供使用;正常情况下,一个类可以实例化出很多很多个对象,但对于某些场景来说,是不适合创建出多个对象的

比如本文中提到的 线程池,当程序运行后,仅需一个 线程池对象 来进行高效任务计算,因为多个 线程池对象 无疑会大大增加调度成本,因此需要对 线程池类 进行特殊设计,使其只能创建一个 对象,换句话说就是不能让别人再创建对象

正如 一山不容二虎 一样,线程池 对象在一个程序中是不推荐出现多个的

在一个程序中只允许实例化出一个对象,可以通过 单例模式 来实现,单例模式 是非常 经典、常用、常考 的设计模式

什么是设计模式?

设计模式就是计算机大佬们在长时间项目实战中总结出来的解决方案,是帮助菜鸡编写高质量代码的利器,常见的设计模式有 单例模式、建造者模式、工厂模式、代理模式等


3.2.单例模式的特点

单例模式 最大的特点就是 只允许存在一个对象(实例),这就好比现在的 一夫一妻制 一样,要是在古代,单例模式 肯定不被推崇

在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百 GB) 到内存中,此时往往要用一个 单例 的类来管理这些数据;在我们今天的场景中,也需要一个 单例线程池 来协同生产者与消费者

3.3.单例模式的简单实现

单例模式 有两种实现方向:饿汉懒汉,它们避免类被再次创建出对象的手段是一样的:构造函数私有化、删除拷贝构造

只要外部无法访问 构造函数,那么也就无法构建对象了,比如下面这个类 Signal

单例类 Signal

#pragma once
#include <iostream>
namespace Yohifo
{
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}
        // 删除拷贝构造
        Signal(const Signal&) = delete;
    };
}


当外界试图创建对象时


当然这只实现了一半,还有另一半是 创建一个单例对象,既然外部受权限约束无法创建对象,那么类内是肯定可以创建对象的,只需要创建一个指向该类对象的 静态指针 或者一个 静态对象,再初始化就好了;因为外部无法访问该指针,所以还需要提供一个静态函数 getInstance() 以获取单例对象句柄,至于具体怎么实现,需要分不同方向(饿汉 or 懒汉)

#pragma once
#include <iostream>
namespace Yohifo
{
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}
        // 删除拷贝构造
        Signal(const Signal&) = delete;
     public:
      // 获取单例对象的句柄
        static Signal *getInstance()
        {
            return _sigptr;
        }
        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }
    private:
        // 指向单例对象的静态指针
        static Signal *_sigptr;
    };
}


注意:构造函数不能只声明,需要实现,即使什么都不写

为什么要删除拷贝构造?

如果不删除拷贝构造,那么外部可以借助拷贝构造函数,拷贝构造出一个与 单例对象 一致的 “对象”,此时就出现两个对象,这是不符合 单例模式 特点的

为什么要创建一个静态函数?

单例对象也需要被初始化,并且要能被外部使用

调用链逻辑:通过静态函数获取句柄(静态单例对象地址)-> 通过地址调用该对象的其他函数

3.3.1.饿汉模式

张三总是很饿,尽管饭菜还没准备好,他就已经早早的把碗洗好了,等到开饭时,直接开干

饿汉模式 也是如此,在程序加载到内存时,就已经早早的把 单例对象 创建好了(此时程序服务还没有完全启动),也就是在外部直接通过 new 实例化一个对象,具体实现如下

#pragma once
#include <iostream>
namespace Yohifo
{
    // 饿汉模式
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}
        // 删除拷贝构造
        Signal(const Signal&) = delete;
    public:
        static Signal *getInstance()
        {
            return _sigptr;
        }
        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }
    private:
        // 指向单例对象的静态指针
        static Signal *_sigptr;
    };
    Signal* Signal::_sigptr = new Signal();
}


注:在程序加载时,该对象会被创建

这里的 单例对象 本质就有点像 全局变量,在程序加载时就已经创建好了

外部可以直接通过 getInstance() 获取 单例对象 的操作句柄,来调用类中的其他函数

main.cc

#include <iostream>
#include "Signal.hpp"
int main()
{
    Yohifo::Signal::getInstance()->print();
    return 0;
}


运行结果为


这就实现了一个简单的 饿汉版单例类,除了创建 static Signal*静态单例对象指针 外,也可以直接定义一个 静态单例对象,生命周期随进程,不过要注意的是:getInstance() 需要返回的也是该静态单例对象的地址,不能返回值,因为拷贝构造被删除了;并且需要在类的外部初始化该静态单例对象

#pragma once
#include <iostream>
namespace Yohifo
{
    // 饿汉模式
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}
        // 删除拷贝构造
        Signal(const Signal&) = delete;
    public:
        static Signal *getInstance()
        {
            return &_sig;
        }
        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }
    private:
        // 静态单例对象
        static Signal _sig;
    };
    // 初始化
    Signal Signal::_sig;
}


饿汉模式 是一个相对简单的单例实现方向,只需要在类中声明,在类外初始化就行了,但它也会带来一定的弊端:延缓服务启动速度

完全启动服务是需要时间的,创建 单例对象 也是需要时间的,饿汉模式 在服务正式启动前会先创建对象,但凡这个单例类很大,服务启动时间势必会受到影响,大型项目启动,时间就是金钱

并且由于 饿汉模式 每次都会先创建 单例对象,再启动服务,如果后续使用 单例对象 还好说,但如果后续没有使用 单例对象,那么这个对象就是白创建了,在延缓服务启动的同时造成了一定的资源浪费

综上所述,饿汉模式 不是很推荐使用,除非图实现简单,并且服务规模较小;既然 饿汉模式 有缺点,就需要改进,于是就出现了 懒汉模式

3.3.2.懒汉模式

李四也是个很饿的人,他也有一个自己的碗,吃完饭后碗会脏,但他不像张三那样极端,李四比较懒,只有等他吃饭的时候,他才会去洗碗,李四这种做法让他感到无比轻松



懒汉模式 中,单例对象 并不会在程序加载时创建,而是在第一次调用时创建,第一次调用创建后,后续无需再创建,直接使用即可

#pragma once
#include <iostream>
namespace Yohifo
{
    // 懒汉模式
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}
        // 删除拷贝构造
        Signal(const Signal&) = delete;
    public:
        static Signal *getInstance()
        {
            // 第一次调用才创建
            if(_sigptr == nullptr)
            {
                _sigptr = new Signal();
            }
            return _sigptr;
        }
        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }
    private:
        // 静态指针
        static Signal *_sigptr;
    };
    // 初始化静态指针
    Signal* Signal::_sigptr = nullptr;
}


注意:此时的静态指针需要初始化为 nullptr,方便第一次判断

饿汉模式 中出现的问题这里全都避免了

  • 创建耗时 -> 只在第一次使用时创建
  • 占用资源 -> 如果不使用,就不会被创建

懒汉模式 的核心在于 延时加载,可以优化服务器的速度及资源占用

延时加载这种机制就有点像 「写时拷贝」,就du你不会使用,从而节省资源开销,类似的还有 动态库、进程地址空间 等

当然,懒汉模式 下也是可以正常使用 单例对象

这样看来,懒汉模式 确实优秀,实现起来也不麻烦,为什么会说 饿汉模式 更简单呢?

这是因为当前只是单线程场景,程序暂时没啥问题,如果当前是多线程场景,问题就大了,如果一批线程同时调用 getInstance(),同时认定 _sigptr 为空,就会创建多个 单例对象,这是不合理的

也就是说当前实现的 懒汉模式 存在严重的线程安全问题

如何证明?

简单改一下代码,每创建一个单例对象,就打印一条语句,将代码放入多线程环境中测试

获取单例对象句柄 getInstance() — 位于 Signal

static Signal *getInstance()
{
    // 第一次调用才创建
    if(_sigptr == nullptr)
    {
        std::cout << "创建了一个单例对象" << std::endl;
        _sigptr = new Signal();
    }
    return _sigptr;
}


源文件 main.cc

其中使用了 lambda 表达式来作为线程的回调函数,重点在于查看现象

#include <iostream>
#include <pthread.h>
#include "Signal.hpp"
int main()
{
    // 创建一批线程
    pthread_t arr[10];
    for(int i = 0; i < 10; i++)
    {
        pthread_create(arr + i, nullptr, [](void*)->void*
            {
                // 获取句柄
                auto ptr = Yohifo::Signal::getInstance();
                ptr->print();
                return nullptr;
            }, nullptr);
    }
    for(int i = 0; i < 10; i++)
        pthread_join(arr[i], nullptr);
    return 0;
}


运行结果如下:


当前代码在多线程环境中,同时创建了多个 单例对象,因此是存在线程安全问题的

饿汉模式没有线程安全问题吗?

没有,因为饿汉模式下,单例对象一开始就被创建了,即便是多线程场景中,也不会创建多个对象,它们也做不到


3.3.3.懒汉模式(线程安全版)

有问题就解决,解决多线程并发访问的利器是 互斥锁,那就创建 互斥锁 保护单例对象的创建

#pragma once
#include <iostream>
#include <mutex>
namespace Yohifo
{
    // 懒汉模式
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}
        // 删除拷贝构造
        Signal(const Signal&) = delete;
    public:
        static Signal *getInstance()
        {
            // 加锁保护
            pthread_mutex_lock(&_mtx);
            if(_sigptr == nullptr)
            {
                std::cout << "创建了一个单例对象" << std::endl;
                _sigptr = new Signal();
            }
            pthread_mutex_unlock(&_mtx);
            return _sigptr;
        }
        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }
    private:
        // 静态指针
        static Signal *_sigptr;
        static pthread_mutex_t _mtx;
    };
    // 初始化静态指针
    Signal* Signal::_sigptr = nullptr;
    // 初始化互斥锁
    pthread_mutex_t Signal::_mtx = PTHREAD_MUTEX_INITIALIZER;
}


注意:getInstance() 是静态函数,互斥锁也要定义为静态的,可以初始化为全局静态锁

依旧是借助之前的多线程场景,测试一下改进后的 懒汉模式 代码有没有问题


结果是没有问题,单例对象 也只会创建一个

现在还面临最后一个问题:效率问题

当前代码确实能保证只会创建一个 单例对象,但即使后续不会创建 单例对象,也需要进行 加锁、判断、解锁 这个流程,要知道 加锁 也是有资源消耗的,所以这种写法不妥

解决方案是:DoubleCheck 双检查加锁

加锁 前再增加一层判断,如此一来,N 个线程,顶多只会进行 N加锁与解锁,这是非常优雅的解决方案

获取静态对象句柄 getInstance() — 位于 Signal

static Signal *getInstance()
{
    // 双检查
    if(_sigptr == nullptr)
    {
        // 加锁保护
        pthread_mutex_lock(&_mtx);
        if(_sigptr == nullptr)
        {
            std::cout << "创建了一个单例对象" << std::endl;
            _sigptr = new Signal();
        }
        pthread_mutex_unlock(&_mtx);
    }
    return _sigptr;
}


单纯的 if 判断并不会消耗很多资源,但 加锁 行为会消耗资源,延缓程序运行速度,双检查加锁 可以有效避免这个问题

这是个精妙绝伦的代码设计,值得学习


所以 懒汉模式 麻烦吗?

相比于 饿汉模式,确实挺麻烦的,不仅要判断后创建 单例对象,还需要考虑线程安全问题

值得一提的是,懒汉模式 还有一种非常简单的写法:调用 getInstance() 时创建一个静态单例对象并返回,因为静态单例对象只会初始化一次,所以是可行的,并且在 C++11 之后,可以保证静态变量初始化时的线程安全问题,也就不需要 双检查加锁 了,实现起来非常简单

#pragma once
#include <iostream>
#include <mutex>
namespace Yohifo
{
    // 懒汉模式
    class Signal
    {
    private:
        // 构造函数私有化
        Signal()
        {}
        // 删除拷贝构造
        Signal(const Signal&) = delete;
    public:
        static Signal *getInstance()
        {
            // 静态单例对象,只会初始化一次,并且生命周期随进程
            static Signal _sig;
            return &_sig;
        }
        void print()
        {
            std::cout << "Hello Signal!" << std::endl;
        }
    };
}


结果也是正常的


所以如果当前的生产环境所支持的 C++ 版本为 C++11 及以后,在实现 懒汉模式 时可以选择这种简便的方式,是非常不错的;如果为了兼容性,也可以选择传统写法

注意:静态变量创建时的线程安全问题,在 C++11 之前是不被保障的

关于 单例模式 的其他问题

new 出来的单例对象不需要销毁吗?

这个单例对象生成周期随进程,进程结束了,资源也就都被销毁了,如果想手动销毁,可以设计一个垃圾回收内部类 GC,主动去销毁单例对象


3.4.线程池_V4(最终版)

有了 单例模式 的相关知识后,就可以开始编写最终版线程池了

「最终版」:将线程池改为 单例模式,只允许存在一个线程池对象

这里选择 懒汉模式,因为比较优秀,并且为了确保兼容性,选择 经典写法

首先是修改 ThreadPool 为单例模式

头文件 ThreadPool_V4.hpp

#pragma once
#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"
#include "BlockingQueue.hpp" // CP模型
namespace Yohifo
{
#define THREAD_NUM 10
    template<class T>
    class ThreadPool
    {
        using func_t = std::function<void(T&)>; // 包装器
  // 私有化
    private:
        ThreadPool(func_t func, int num = THREAD_NUM)
            :_num(num), _func(func)
        {
        }
        ~ThreadPool()
        {
            // 等待线程退出
            for(auto &t : _threads)
                t.join();
        }
        // 删除拷贝构造
        ThreadPool(const ThreadPool<T> &) = delete;
    // ...
    private:
      // ...
        // 创建静态单例对象指针及互斥锁
        static ThreadPool<T> *_inst;
        static pthread_mutex_t _mtx;
    };
    // 初始化指针
    template<class T>
    ThreadPool<T>* ThreadPool<T>::_inst = nullptr;
    // 初始化互斥锁
    template<class T>
    pthread_mutex_t ThreadPool<T>::_mtx = PTHREAD_MUTEX_INITIALIZER;
}


然后提供一个获取 单例对象 句柄的函数,如果是第一次创建 单例对象,就需要在创建完对象后,顺便进行 init()start()

获取句柄 getInstance() — 位于 ThreadPool

static ThreadPool<T>* getInstance()
{
    // 双检查
    if(_inst == nullptr)
    {
        // 加锁
        LockGuard lock(&_mtx);
        if(_inst == nullptr)
        {
            // 创建对象
            _inst = new ThreadPool<T>();
      // 初始化及启动服务
      _inst->init();
      _inst->start();
        }
    }
    return _inst;
}


单例模式 改完了,但现在面临一个尴尬的问题:main.cc 无法直接将回调函数 callBack() 进行传递,因为它根本无法创建对象

可以试试曲线救国:将函数对象传递给 getInstance() 函数,如果用户不传,那就使用缺省参数,也就是直接打印结果

总之,修修改改后的线程池长这样

头文件 ThreadPool_V4.hpp

#pragma once
#include <vector>
#include <string>
#include <memory>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "Thread.hpp"
#include "BlockingQueue.hpp" // CP模型
namespace Yohifo
{
#define THREAD_NUM 10
    template<class T>
    class ThreadPool
    {
        using func_t = std::function<void(T&)>; // 包装器
    private:
        ThreadPool(int num = THREAD_NUM)
            :_num(num)
        {
        }
        ~ThreadPool()
        {
            // 等待线程退出
            for(auto &t : _threads)
                t.join();
        }
        // 删除拷贝构造
        ThreadPool(const ThreadPool<T> &) = delete;
    public:
        static ThreadPool<T>* getInstance(const func_t &func = [](T& task){ std::cout << task.getResult() << std::endl; })
        {
            // 双检查
            if(_inst == nullptr)
            {
                // 加锁
                LockGuard lock(&_mtx);
                if(_inst == nullptr)
                {
                    // 创建对象
                    _inst = new ThreadPool<T>();
                    // 初始化及启动服务
                    _inst->init();
                    _inst->start();
                }
            }
            // 支持随时更改 main.cc 传过来的回调函数
            _inst->_func = func;
            return _inst;
        }
    public:
        void init()
        {
            // 创建一批线程
            for(int i = 0; i < _num; i++)
                _threads.push_back(Thread(i, threadRoutine, this));
        }
        void start()
        {
            // 启动线程
            for(auto &t : _threads)
                t.run();
        }
        // 提供给线程的回调函数(已修改返回类型为 void)
        static void threadRoutine(void *args)
        {
            // 避免等待线程,直接剥离
            pthread_detach(pthread_self());
            auto ptr = static_cast<ThreadPool<T>*>(args);
            while (true)
            {
                // 从CP模型中获取任务
                T task = ptr->popTask();
                task();
                ptr->callBack(task); // 回调函数
            }
        }
        // 装载任务
        void pushTask(const T& task)
        {
            _blockqueue.Push(task);
        }
    protected:
        func_t callBack(T &task)
        {
            _func(task);
        }
        T popTask()
        {
            T task;
            _blockqueue.Pop(&task);
            return task;
        }
    private:
        std::vector<Thread> _threads;
        int _num; // 线程数量
        BlockQueue<T> _blockqueue; // 阻塞队列
        func_t _func;
        // 创建静态单例对象指针及互斥锁
        static ThreadPool<T> *_inst;
        static pthread_mutex_t _mtx;
    };
    // 初始化指针
    template<class T>
    ThreadPool<T>* ThreadPool<T>::_inst = nullptr;
    // 初始化互斥锁
    template<class T>
    pthread_mutex_t ThreadPool<T>::_mtx = PTHREAD_MUTEX_INITIALIZER;
}


此时 main.cc 想要使用线程池对象时,就得通过 getInstance() 获取句柄,然后才能进行操作

源文件 main.cc

#include "ThreadPool_V4.hpp"
#include <memory>
typedef Yohifo::Task<int> type;
// 回调函数
void callBack(type& task)
{
    // 获取计算结果后打印
    std::string ret = task.getResult();
    std::cout << "计算结果为: " << ret << std::endl;
}
int main()
{
    // 还有后续动作
    while(true)
    {
        // 输入 操作数 操作数 操作符
        int x = 0, y = 0;
        char op = '+';
        std::cout << "输入 x: ";
        std::cin >> x;
        std::cout << "输入 y: ";
        std::cin >> y;
        std::cout << "输入 op: ";
        std::cin >> op;
        // 构建任务对象
        type task(x, y, op);
        // 装载任务
        Yohifo::ThreadPool<type>::getInstance(callBack)->pushTask(task);
    }
    return 0;
}


此时是可以获取结果的,也可以看到一批线程正在候等任务到达

如何证明当前的 单例模式 生效了?

在调用 getInstance() 之前查看正在运行中的线程数量,调用完后再次查看,如果线程数量从 1 个变成多个,就证明 单例模式 是生效的(延迟加载)


还可以通过其他方式证明,比如多行打印 单例对象句柄,查看地址是否为同一个,就可以知道 单例模式 是否生效了


至此我们的 线程池_V4 最终版 代码算是完善了,以下是一些注意事项及建议

  1. 注意加锁解锁的位置,尽可能提高效率
  2. 使用双检查加锁,避免不必要的竞争
  3. 可以使用 volatile 修饰静态单例对象指针,避免被编译器优化覆盖

4.周边问题补充

4.1.STL线程安全问题

STL 库中的容器是否是 线程安全 的?

答案是 不是

因为 STL 设计的初衷就是打造出极致性能容器,而加锁、解锁操作势必会影响效率,因此 STL 中的容器并未考虑线程安全,在之前编写的 生产者消费者模型线程池 中,使用了部分 STL 容器,如 vectorqueuestring 等,这些都是需要我们自己去加锁、解锁,以确保多线程并发访问时的线程安全问题

从另一方面来说,STL 容器种类繁多,容器间实现方式各不相同,无法以统一的方式进行加锁、解锁操作,比如哈希表中就有 锁表锁桶 两种方式

所以在多线程场景中使用 STL 库时,需要自己确保线程安全

4.2.智能指针线程安全问题

C++ 标准提供的智能指针有三种:unique_ptrshared_ptrweak_ptr

首先来说 unique_ptr,这是个功能单纯的智能指针,只具备基本的 RAII 风格,不支持拷贝,因此无法作为参数传递,也就不涉及线程安全问题

其次是 shared_ptr,得益于 引用计数,这个智能指针支持拷贝,可能被多线程并发访问,但标准库在设计时考虑到了这个问题,索性将 shared_ptr 对于引用计数的操作设计成了 原子操作 CAS,这就确保了它的 线程安全,至于 weak_ptr,这个就是 shared_ptr 的小弟,名为弱引用智能指针,具体实现与 shared_ptr 一脉相承,因此它也是线程安全的

4.3.其他常见锁概念

悲观锁:总是认为数据会被其他线程修改,于是在自己访问数据前,会先加锁,其他线程想访问时只能等待,之前使用的锁都属于悲观锁

乐观锁:并不认为其他线程会来修改数据,因此在访问数据前,并不会加锁,但是在更新数据前,会判断其他数据在更新前有没有被修改过,主要通过 版本号机制 和 CAS操作实现

CAS 操作:当需要更新数据时,会先判断内存中的值与之前获取的值是否相等,如果相等就用新值覆盖旧值,失败就不断重试

自旋锁:申请锁失败时,线程不会被挂起,而且不断尝试申请锁

自旋 本质上就是一个不断 轮询 的过程,即不断尝试申请锁,这种操作是十分消耗 CPU 时间的,因此推荐临界区中的操作时间较短时,使用 自旋锁 以提高效率;操作时间较长时,自旋锁 会严重占用 CPU 时间

自旋锁 的优点:可以减少线程切换的消耗

自旋锁相关接口

#include <pthread.h>
pthread_spinlock_t lock; // 自旋锁类型
int pthread_spin_init(pthread_spinlock_t *lock, int pshared); // 初始化自旋锁
int pthread_spin_destroy(pthread_spinlock_t *lock); // 销毁自旋锁
// 自旋锁加锁
int pthread_spin_lock(pthread_spinlock_t *lock); // 失败就不断重试(阻塞式)
int pthread_spin_trylock(pthread_spinlock_t *lock); // 失败就继续向后运行(非阻塞式)
// 自旋锁解锁
int pthread_spin_unlock(pthread_spinlock_t *lock);


就这接口风格,跟 mutex互斥锁 是一脉相承,可以轻易上手,将 线程池 中的 互斥锁 轻易改为 自旋锁

公平锁:一种用于同步多线程或多进程之间访问共享资源的机制,它通过使用互斥锁和相关的调度策略来确保资源的公平分配,以提高系统的性能和稳定性

非公平锁:通常使用信号量(Semaphore)或自旋锁(Spinlock)等机制。这些锁机制没有严格的按照请求的顺序来分配锁,而是以更高的性能为目标,允许一些线程或进程在较短时间内多次获取锁资源,从而减少了竞争开销

4.4.读者写者问题

除了 生产者消费者模型 外,还有一个 读者写者模型,用来解决 读者写者 问题,核心思想是 读者共享,写者互斥

这就好比博客发布了,允许很多人同时读,但如果作者想要进行修改,那么其他人自然也就无法查看了,这就是一个很典型的 读者写者 问题

读者写者模型 也遵循 321 原则

3 种关系:

  • 读者<->读者 无关系
  • 写者<->写者 互斥
  • 读者<->写者 互斥、同步

2 种角色:读者写者

1 个交易场所:阻塞队列或其他缓冲区

为什么读者与读者间甚至不存在互斥关系?

因为读者读取数据时,并不会对数据做出修改,因此不需要维持互斥关系

pthread 库中提供了 读写锁 相关接口

#include <pthread.h>
pthread_rwlock_t; // 读写锁类型
// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *__restrict__ __rwlock, const pthread_rwlockattr_t *__restrict__ __attr); 
// 销毁读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *__rwlock) 
 // 读者,加锁
int pthread_rwlock_rdlock(pthread_rwlock_t *__rwlock); // 阻塞式
int pthread_rwlock_tryrdlock(pthread_rwlock_t *__rwlock); // 非阻塞式
// 写者,加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *__rwlock); // 阻塞式 
int pthread_rwlock_trywrlock(pthread_rwlock_t *__rwlock); // 非阻塞式
// 解锁(读者锁、写者锁都可以解)
int pthread_rwlock_unlock(pthread_rwlock_t *__rwlock); 


注意:读者和写者使用的加锁接口并不是同一个。

关于 读者写者模型 的实现

读者读数据时,允许其他读者一起读取数据,但不允许写者修改数据

写者写数据时,不允许读者进入

读者读取完数据后,通知写者进行写入

写者写完数据后,通知读者进行读取

以下是伪代码

int reader_cnt = 0; // 统计读者的数量
pthread_mutex_t lock; // 互斥锁
sem_t w(1); // 二元信号量
读者
  // 加锁
  {
    pthread_mutex_lock(&lock);
    if(reader_cnt == 0)
      P(w); // 第一个读者进入,申请信号量
    reader_cnt++; // 进入了一个读者
    pthread_mutex_unlock(&lock);  
  }
  // 读取数据
  // 解锁
  {
    pthread_mutex_lock(&lock);
    reader_cnt--; // 走了一个读者
    if(reader_cnt == 0)
      V(w); // 最后一个读者走了,归还信号量
    pthread_mmutex_unlock();
  }
写者
  // 加锁
  {
    P(w); // 申请信号量
    if(reader_cnt > 0)
    {
      V(w); // 归还信号量
      // 挂起等待
    }
  }
  // 写入数据
  // 解锁
  {
    V(w); // 归还信号量
  }


因为现实中,读者数量大多数情况下都是多于写者的,所以势必会存在很多很多读者不断读取,导致写者根本申请不到信号量,写者陷入 死锁 状态

这是读者写者模型的特性,也是 读者优先 策略的体现,如果想要避免死锁,可以选择 写者优先 策略,优先让写者先写,读者先等一等


🌆总结

以上就是关于 Linux多线程【线程池】的全部内容了,作为多线程篇章的收官之作,首先学习了池化技术,了解了线程池的特性,然后又分别实现了四个版本的线程池,循序渐进,最终得到了单例版的线程池,得益于模板,此线程池可以轻松应用于其他场景中,最后还学习了多线程的一些周边知识,比如线程安全、锁概念、读者写者问题。总之多线程算是正式结束了,下一篇将会打开网络的大门


目录
相关文章
|
21天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
100 38
|
1月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
43 1
C++ 多线程之初识多线程
|
19天前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
44 2
|
21天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
58 4
|
21天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
89 2
|
23天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
17 3
|
23天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
16 2
|
23天前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
28 2
|
23天前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
28 1
|
23天前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
33 1