【linux线程(四)】初识线程池&手撕线程池

简介: 【linux线程(四)】初识线程池&手撕线程池

1. 前言

线程池在校招面试阶段经常被要求手撕,可见它的重要性如何.

本章重点:

本篇文章会先介绍什么是池化技术,然后详细讲解什么是线程池,以及如何手撕线程池,并且会给大家拓展如何将线程池设计为单例模式,以及读写锁的使用方法,最后会讲解如何在校招中遇见手撕线程池时,快速的写出代码


2. 什么是池化技术?

大家可能听说过线程池,进程池,对象池,甚至是内存池等概念,那么到底什么是池?它们有什么共同特质?

池化技术:

池化技术指的是提前准备一些资源,在需要时可以重复使用这些预先准备的资源 .

说白了,就是线程池就是在程序启动时就创建多个线程来备用,同理对象池和内存池也就是创建多个对象/空间备用

池化技术的优点:

  • 提高性能。通过重用资源,减少了创建和销毁资源的时间,从而提高了资源的使用效率
  • 降低系统开销, 避免了频繁地向操作系统申请和释放资源的开销
  • 简化代码。通过封装资源管理逻辑,使得应用程序代码更简洁易懂

3. 线程池详解

什么是线程池:

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度

线程池的运用场景:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没
    有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.

线程池的使用方法:

  1. 创建固定数量的线程,循环的去任务队列中拿任务
  2. 获取到任务后,不同线程执行不同任务的代码
  3. 任务结束,此线程继续循环的去任务队列拿任务

其次,由于线程池是会在多线程下跑的

所以将它设计为单例模型最好

除此之外,根据上一节学习到的内容,可以窥探到,线程池的本质其实就是一个生产者消费者模型,所以也会涉及到加解锁的问题,所以在类中我们需要两把锁,一把是用于单例模式的互斥锁,还有一把是用于生产者消费者之间的互斥锁


4. 手撕线程池

在写线程池的代码之前,需要先写一个关于单个线程的类,并且在线程池中,用数组存储所有的线程类

thread.hpp文件:

typedef void *(*fun_t)(void *);//线程要执行的函数是参数和返回值都为void*
class ThreadData
{
public:
    void *args_;//线程拥有的数据
    std::string name_;//线程的名字
};
class Thread
{
public:
    Thread(int num, fun_t callback, void *args) : func_(callback)
    {
        char nameBuffer[64];
        snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);
        name_ = nameBuffer;
        tdata_.args_ = args;
        tdata_.name_ = name_;
    }
    void start()
    {
        pthread_create(&tid_, nullptr, func_, (void*)&tdata_);//将线程名和参数都传给线程函数
    }
    void join()
    {
        pthread_join(tid_, nullptr);
    }
    std::string name()
    {
        return name_;
    }
    ~Thread()
    {}
private:
    std::string name_;//线程名字
    fun_t func_;//线程要执行的函数
    ThreadData tdata_;//线程的名字和数据
    pthread_t tid_;//线程ID
};

除此之外,还需要写一个锁相关的类

利用对象生命周期管理资源:

lockguard.hpp文件:

class Mutex
{
public:
    Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
    {}
    void lock() 
    {
        // std::cout << "要进行加锁" << std::endl;
        pthread_mutex_lock(pmtx_);
    }
    void unlock()
    {
        // std::cout << "要进行解锁" << std::endl;
        pthread_mutex_unlock(pmtx_);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
    lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
    {
        mtx_.lock();
    }
    ~lockGuard()
    {
        mtx_.unlock();
    }
private:
    Mutex mtx_;
};

最后再看看看主要的函数:

在ThreadPool.hpp文件中:

const int g_thread_num = 3;//创建的线程数量
// 本质是: 生产消费模型
template <class T>
class ThreadPool
{
public:
    pthread_mutex_t *getMutex()
    {
        return &lock;
    }
    bool isEmpty()//判断队列是否为空
    {
        return task_queue_.empty();
    }
    void waitCond()
    {
        pthread_cond_wait(&cond, &lock);
    }
    T getTask()//拿到队列中的任务
    {
        T t = task_queue_.front();
        task_queue_.pop();
        return t;
    }
private:
    ThreadPool(int thread_num = g_thread_num) : num_(thread_num)
    {
        pthread_mutex_init(&lock, nullptr);
        pthread_cond_init(&cond, nullptr);
        for (int i = 1; i <= num_; i++)
        {
            threads_.push_back(new Thread(i, routine, this));//将创建出来的线程用数组管理
        }
    }
    ThreadPool(const ThreadPool<T> &other) = delete; //禁用拷贝构造函数
    const ThreadPool<T> &operator=(const ThreadPool<T> &other) = delete; //禁用运算符重载=
public:
    // 考虑一下多线程使用单例的过程
    static ThreadPool<T> *getThreadPool(int num = g_thread_num)
    {
        // 可以有效减少未来必定要进行加锁检测的问题
        // 拦截大量的在已经创建好单例的时候,剩余线程请求单例的而直接访问锁的行为
        if (nullptr == thread_ptr) 
        {
            lockGuard lockguard(&mutex);
            // 但是,未来任何一个线程想获取单例,都必须调用getThreadPool接口
            // 但是,一定会存在大量的申请和释放锁的行为,这个是无用且浪费资源的
            if (nullptr == thread_ptr)
                thread_ptr = new ThreadPool<T>(num);
        }
        return thread_ptr;
    }
    // 1. run()
    void run()//让线程池跑起来,也就是创建出多个线程
    {
        for (auto &iter : threads_)
        {
            iter->start();
      std::cout << iter->name() << " 启动成功" << std::endl;
        }
    }
    // 线程池本质也是一个生产消费模型
    // void *routine(void *args)
    // 消费过程
    static void *routine(void *args)//线程拿到任务后要执行的函数
    {
        ThreadData *td = (ThreadData *)args;
        ThreadPool<T> *tp = (ThreadPool<T> *)td->args_;
        while (true)
        {
            T task;
            {
                lockGuard lockguard(tp->getMutex());//去队列中拿任务前需要先加锁
                while (tp->isEmpty())//若队列为空,则使用条件变量进行等待
                    tp->waitCond();
                // 读取任务
                task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间
            }
            //上来加上一对花括号的原因是让锁对象出了作用域自动销毁
            //此处来处理任务
        }
    }
    void pushTask(const T &task)
    {
        lockGuard lockguard(&lock);//插入任务时,也要加锁,队列是临界资源
        task_queue_.push(task);
        pthread_cond_signal(&cond);//插入成功后,直接使用条件变量唤醒线程来拿任务
    }
    ~ThreadPool()
    {
        for (auto &iter : threads_)
        {
            iter->join();
            delete iter;
        }
        pthread_mutex_destroy(&lock);
        pthread_cond_destroy(&cond);
    }
private:
    std::vector<Thread *> threads_;
    int num_;
    std::queue<T> task_queue_;
    static ThreadPool<T> *thread_ptr;
    static pthread_mutex_t mutex;
    pthread_mutex_t lock;
    pthread_cond_t cond;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::thread_ptr = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;

5. 初识读写锁

其实出了互斥锁外,还有其他种类的锁:

而将要介绍的锁是读写锁

在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。

说白了就是一把锁,可以让只读的线程无限制的进入,而只对要对数据做修改的线程加锁

读写锁的操作方法:

  1. 初始化锁

  2. 销毁锁

  3. 加解锁


6. 如何快速实现简易的线程池?

可以发现,上面的代码量是巨大的,所以在实际面试中遇见了,多半是记不住这么多代码的,所以这里我给大家打个样,写一个简易版的线程池,用来应对校招中需要手撕的场景:

首先需要简化的是,直接用一个整数代表要创建线程的数量,而不使用数组存储.并且简化线程要执行的函数,情况如下:

#include<iostream>
#include<pthread.h>
#include<mutex>
#include<queue>
#include<functional>
using namespace std;
pthread_mutex_t MTX = PTHREAD_MUTEX_INITIALIZER;//用于初始化单例模式中要使用的锁
typedef function<void(int)> func_t;
void myprintf(int x)//模拟线程要执行的函数
{
    cout<<x<<" "<<endl;
}
class Task   //模拟线程要执行的任务
{
public:
    Task(func_t func = myprintf):_func(func)
    {}
public:
    func_t _func;
};
class ThreadPool
{
public:
    static ThreadPool* GetInstance()
    {
        if(_singleton == nullptr)
        {
            pthread_mutex_lock(&MTX);
            if(_singleton == nullptr)
            {
                _singleton = new ThreadPool();
                _singleton->InitThreadPool();
            }
            pthread_mutex_unlock(&MTX);
        }
        return _singleton;
    }
    void InitThreadPool()//启动线程池
    {
        for(int i=0;i<_num;i++)
        {
            pthread_t tid;
            if(pthread_create(&tid,nullptr,Routine,this)!=0)
                cout<<"线程启动失败"<<endl;
        }
        cout<<"线程池启动成功"<<endl;
    }
    static void* Routine(void* args)
    {
        ThreadPool* td = (ThreadPool*)args;
        while(1)
        {
            //拿到任务区执行
            Task t;
            pthread_mutex_lock(&td->_mtx);//加锁
            while(td->TaskQueueIsEmpty())//若资源不就绪就等待
                pthread_cond_wait(&td->_cond,&td->_mtx);
            cout<<"开始执行任务"<<endl;
            td->Pop(t);//任务队列的任务减一
            pthread_mutex_unlock(&td->_mtx);
            t._func(10);
            cout<<"一次任务执行完毕"<<endl;
        }
    }
    void Push(Task t)
    {
        pthread_mutex_lock(&_mtx);
        _q.push(t);
        pthread_mutex_unlock(&_mtx);
        pthread_cond_signal(&_cond);
        cout<<"任务push成功"<<endl;
    }
    void Pop(Task& t)
    {
        t = _q.front();
        _q.pop();
    }
    bool TaskQueueIsEmpty()
    {
        return _q.size()==0?true:false;
    }
private:
    ThreadPool(int num = 10):_num(num)
    {
        pthread_mutex_init(&_mtx,nullptr);
        pthread_cond_init(&_cond,nullptr);
    }
    ThreadPool(const ThreadPool& td) = delete;
public:
    static ThreadPool* _singleton;
    queue<Task> _q;
    int _num;//创建线程的数量
    pthread_mutex_t _mtx;
    pthread_cond_t _cond;
};
ThreadPool* ThreadPool::_singleton = nullptr;

上面是线程池的简易版,本人也是硬背的这段代码来应对手撕


7. 总结以及拓展

大家可能现在理解了线程的重要性,像12305铁路系统这种软件,它一秒钟可能会有百万个人同时上线,如果没有像线程池或者其他技术支持,那么一旦这么多人登陆12305软件,服务器肯定会直接崩溃,当然这里只是举一个线程池实际运用的例子,实际生活中的例子肯定不会像直接使用一个线程池这么简单,所以,respect!


🔎 下期预告:Linux网络基础 🔍


相关文章
|
3月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
115 1
|
23天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
101 38
|
21天前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
46 2
|
23天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
60 4
|
23天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
94 2
|
2月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
139 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
1月前
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
29 0
Linux C/C++之线程基础
|
1月前
|
Dubbo Java 应用服务中间件
剖析Tomcat线程池与JDK线程池的区别和联系!
剖析Tomcat线程池与JDK线程池的区别和联系!
107 0
剖析Tomcat线程池与JDK线程池的区别和联系!
|
2月前
|
Java
直接拿来用:进程&进程池&线程&线程池
直接拿来用:进程&进程池&线程&线程池
|
1月前
|
设计模式 Java 物联网
【多线程-从零开始-玖】内核态,用户态,线程池的参数、使用方法详解
【多线程-从零开始-玖】内核态,用户态,线程池的参数、使用方法详解
58 0