理解与实现线程池

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 理解与实现线程池

一、线程池的概念

线程池是一种线程使用模式


线程过多会带来调度开销,进而影响缓存局部性和整体性能


而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务


二、线程池的优点

线程池避免了在处理短时间任务时创建与销毁线程的代价

线程池不仅能够保证内核充分利用,还能防止过分调度

注意: 线程池中可用线程的数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量


三、线程池的应用场景

需要大量的线程来完成任务,且完成任务的时间比较短。 如WEB服务器完成网页请求,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大。 但对于长时间的任务,如一个Telnet连接请求,线程池的优点就不明显了,因为Telnet会话时间比线程的创建时间大多了

对性能要求苛刻的应用,如要求服务器迅速响应客户请求

接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误

四、实现线程池

线程池本质上就是一个生产者消费者模型,其中包括了一个任务队列与若干线程

1644ecb85dc0443ab4efa90ae3f7b527.png



线程池中的多个线程负责从任务队列中取任务,并将取到的任务进行处理

线程池提供一个PushTask()接口,用于让外部线程(主线程)将任务Push到任务队列中

日志模块


完整的日志功能至少有日志等级、时间。最好是支持用户自定义(日志内容, 文件行,文件名等)

#pragma once
#include <iostream>
#include <string>
#include <cstdio>
#include <cstdarg>
#include <ctime>
//日志级别
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4
const char *gLevelMap[] = {
    "DEBUG",
    "NORMAL",
    "WARNING",
    "ERROR",
    "FATAL"
};
void LogMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
    if(level== DEBUG) return;
#endif
    //标准部分
    char stdBuffer[1024];
    const time_t timestamp = time(nullptr);
    struct tm* local_time = localtime(&timestamp);
    snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%d-%d-%d-%d-%d-%d] ", gLevelMap[level], 
        local_time->tm_year + 1900, local_time->tm_mon + 1, local_time->tm_mday, local_time->tm_hour, local_time->tm_min, local_time->tm_sec);
    //自定义部分
    char logBuffer[1024]; 
    va_list args;
    va_start(args, format);
    vsnprintf(logBuffer, sizeof logBuffer, format, args);
    va_end(args);
    printf("%s%s\n", stdBuffer, logBuffer);
}


任务模块


线程池中存储的是一个个任务,下面将任务进行封装


无论该任务是什么类型的,在该任务类中都必须包含仿函数,当处理该类型的任务时只需调用operator()即可

#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "Log.hpp"
typedef std::function<int(int, int)> fun_t;
class Task
{
public:
    Task(){}
    Task(int x, int y, fun_t func):_x(x), _y(y), _func(func) {}
    void operator ()(const std::string &name) {
        LogMessage(NORMAL, "%s处理完成: %d+%d=%d", name.c_str(), _x, _y, _func(_x, _y));
    }
public:
    int _x;
    int _y;
    fun_t _func;
};


线程模块


由于系统调用接口过于复杂,线程模块完成的便是线程的封装,降低接口的调用复杂度,提高代码的阅读性并提高代码的复用性

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <cstdio>
class ThreadDate
{
public:
    void* _args;
    std::string _name;
};
typedef void*(*func_t)(void*);
class Thread
{
public:
    Thread(size_t num,func_t callback,void* args): _function(callback)
    {
        char nameBuffer[64];
        snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);
        _threadDate._name = nameBuffer;
        _threadDate._args = args;
    }
    ~Thread() {}
    void Start() { pthread_create(&_tid, nullptr, _function, (void*)&_threadDate); }
    void Join() { pthread_join(_tid, nullptr); }
    std::string Name() { return _name; }
private:
    std::string _name;
    func_t _function;
    ThreadDate _threadDate;
    pthread_t _tid;
};


为什么线程池中需要有互斥锁和条件变量?


线程池中的任务队列是会被多个执行流同时访问的临界资源,因此需要引入互斥锁对任务队列进行保护

线程池当中的线程要从任务队列中取任务,前提条件是任务队列中有任务,因此线程池中的线程在拿任务之前,需先判断任务队列中是否有任务。若此时任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,因此需要引入条件变量

当外部线程向任务队列中Push一个任务后,此时可能有线程正处于等待状态,因此在新增任务后需要唤醒在条件变量下等待的线程

注意:


当某线程被唤醒时,其可能是异常或是伪唤醒,或者是一些广播类的唤醒线程操作而导致所有线程被唤醒,使得在被唤醒的若干线程中,只有个别线程能拿到任务。此时应该让被唤醒的线程再次判断是否满足被唤醒条件,所以在判断任务队列是否为空时,应该使用while进行判断,而不是if

pthread_cond_broadcast()函数的作用是唤醒条件变量下的所有线程,而外部可能只Push了一个任务,却把全部在等待的线程都唤醒了,此时这些线程就都会去任务队列获取任务,但最终只有一个线程能得到任务。一瞬间唤醒大量的线程可能会导致系统震荡,被称为惊群效应。因此在唤醒线程时最好使用pthread_cond_signal()函数唤醒正在等待的一个线程即可

当线程从任务队列中拿到任务后,该任务就属于当前线程了,与其他线程已经没有关系了,因此应该在解锁之后再进行处理任务,而不是在解锁之前进行。因为处理任务的过程可能会耗费一定的时间,所以不要将该行为其放到临界区当中

为什么线程池中的线程执行例程需要设置为静态方法?


使用pthread_create()函数创建线程时,需要为创建的线程传入一个Routine(执行例程),该Routine只有一个参数类型为void*的参数,以及返回类型为void*的返回值


此时Routine作为类的成员函数,该函数的第一个参数是隐藏的this指针,因此这里的Routine函数,貌似只有一个参数,而实际上有两个参数。此时直接将该Routine函数作为创建线程时的执行例程是不行的,无法通过编译


静态成员函数属于类,而不属于某个对象,也就是说静态成员函数是没有隐藏的this指针的,因此需要将Routine设置为静态方法,此时Routine函数才真正只有一个参数类型为void*的参数


但在静态成员函数内部无法使用非静态成员变量,因此需要在创建线程时,向Routine函数传入的当前对象的this指针,此时就能够通过该this指针在Routine函数内部调用非静态成员变量了


懒汉单例模式


整个工程中线程池应该只存在一个实例,可以使用单例模式进行实现。这里采用懒汉单例模式,其最核心的思想是"延时加载",从而能够优化服务器的启动速度

//示意代码
template <typename T>
class Singleton 
{
    static T* inst;
public:
    static T* GetInstance() {
        if (inst == NULL) {
            inst = new T();
        }     
        return inst;
    }
};

只有调用GetInstance()后,才会实例化出唯一的线程池实例。但存在一个严重的问题, 线程不安全。第一次调用GetInstance()时, 若多个线程同时调用, 可能会创建出多份 T 对象的实例

template <class T>
class Singleton 
{
    static T* inst;
    static std::mutex lock;
public:
    static T* GetInstance() {
        if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能.
            lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
            if (inst == NULL) {
                inst = new T();
            } 
            lock.unlock();
        } 
        return inst;
    }
};


可能会有些疑惑,为什么需要判定两次是否为空指针?为什么不写成下面这样呢?

template <class T>
class Singleton 
{
    static T* inst;
    static std::mutex lock;
public:
    static T* GetInstance() {
        lock.lock();
        if (inst == NULL) {
            inst = new T();
        } 
        lock.unlock();
        return inst;
    }
};

因为第一个线程创建出唯一线程池实例后,后续可能依然会有该函数的调用(不是为了创建出唯一实例,而是为了获得唯一实例的地址),此时多线程就会涉及到竞争锁资源以及不断的加锁与解锁,造成时间与资源的浪费。使用双重if判断则可以避免某些情况下的锁竞争(已经存在唯一实例),从而提高性能


锁防护装置模块


RAII风格,可避免在加锁区域抛异常而导致未解锁,出作用域自动解锁

#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
    Mutex(pthread_mutex_t *mtx):_pmtx(mtx) {}
    void Lock() { pthread_mutex_lock(_pmtx); }
    void UnLock() { pthread_mutex_unlock(_pmtx); }
    ~Mutex() {}
private:
    pthread_mutex_t *_pmtx;
};
// RAII风格的加锁方式
class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mtx):_mutex(mtx) { _mutex.Lock(); }
    ~LockGuard() { _mutex.UnLock(); }
private:
    Mutex _mutex;
};


线程池实现

#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "Thread.hpp"
#include "LockGuard.hpp"
#include "Log.hpp"
//懒汉模式
const int g_threadNum = 3;
template<class T>
class ThreadPool
{
public://为routine()静态函数提供
    pthread_mutex_t *GetMutex() { return &_mutex; }
    bool isEmpty() { return _taskQueue.empty(); }
    void WaitCond() { pthread_cond_wait(&_cond, &_mutex); }
    T GetTask() {
        T task = _taskQueue.front();
        _taskQueue.pop();
        return task;
    }
public:
    //需考虑多线程申请单例的情况
    static ThreadPool<T>* GetThreadPool(int num = g_threadNum)
    {
        if(nullptr == pool_ptr) {
            {
                LockGuard lockguard(&_init_mutex);
                if(nullptr == pool_ptr) {
                    pool_ptr = new ThreadPool<T>(num);
                }
            }
        }
        return pool_ptr;
    }
    static void* Routine(void* args) {
        ThreadDate* thread_date = (ThreadDate*)args;
        ThreadPool<T>* thread_pool = (ThreadPool<T>*)thread_date->_args;
        while(true) {
            T task;
            {
                LockGuard lockguard(thread_pool->GetMutex());
                while(thread_pool->isEmpty()) thread_pool->WaitCond();
                task = thread_pool->GetTask();
            }
            task(thread_date->_name);//仿函数
        }
    }
    void PushTask(const T& task)
    {
        LockGuard lockguard(&_mutex);
        _taskQueue.push(task);
        pthread_cond_signal(&_cond);
    }
    void Run()
    {
        for(auto& iter : _threads) {
            iter->Start(); 
            LogMessage(DEBUG, "%s %s", iter->Name().c_str(), "启动成功");
        }
    }
    ~ThreadPool()
    {
        for (auto &iter : _threads) {
            iter->Join();
            delete iter;
        }
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    ThreadPool(int threadNum):_num(threadNum) {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        for (int i = 1; i <= _num; i++) {
            _threads.push_back(new Thread(i, Routine, this));
        }
    }
    ThreadPool(const ThreadPool<T>& others) = delete;
    ThreadPool<T>& operator= (const ThreadPool<T>& others) = delete;
private:
    std::vector<Thread*> _threads;
    size_t _num;
    std::queue<T> _taskQueue;
private:
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
private:
    static ThreadPool<T>* pool_ptr;//避免编译器自动优化
    static pthread_mutex_t _init_mutex;
};
template<typename T>
ThreadPool<T>* ThreadPool<T>::pool_ptr = nullptr;
template<typename T>
pthread_mutex_t ThreadPool<T>::_init_mutex = PTHREAD_MUTEX_INITIALIZER;


五、线程池演示

主线程逻辑


主线程就负责不断向任务队列当中Push任务即可,此后线程池中的线程会从任务队列中获取任务并进行处理

#include "ThreadPool.hpp"
#include "Task.hpp"
int main()
{
    ThreadPool<Task>* threadPool = ThreadPool<Task>::GetThreadPool(5);
    threadPool->Run();
    while(true)
    {
        //生产的过程,制作任务的时候,要花时间
        int x = rand()%100 + 1;
        usleep(7721);
        int y = rand()%30 + 1;
        Task task(x, y, [](int x, int y)->int{
            return x + y;
        });
        LogMessage(NORMAL, "制作任务完成: %d+%d=?", x, y);
        //推送任务到线程池中
        threadPool->PushTask(task);
        sleep(1);
    }
    return 0;
}


运行代码后一瞬间就有六个线程,其中一个为主线程,另外五个是线程池内处理任务的线程


c84ec66555a94465a42261ea41f97528.png


五个线程在处理时会呈现出一定的顺序性,因为主线程是每秒Push一个任务,五个线程中只会有一个线程获取到该任务,其他线程都会在等待队列中进行等待,当该线程处理完任务后就会因为任务队列为空而排到等待队列的最后,当主线程再次Push一个任务后会唤醒等待队列首部的一个线程,这个线程处理完任务后又会排到等待队列的最后,因此这五个线程在处理任务时会呈现出一定的顺序性(4-1-3-5-2)


注意:此后若想让线程池处理其他不同的任务请求时,只需要提供一个任务类,在该任务类当中提供对应的operator()方法即可


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
缓存 Java 应用服务中间件
线程池的10个坑你都遇到过吗
日常开发中,为了更好管理线程资源,减少创建线程和销毁线程的资源损耗,我们会使用线程池来执行一些异步任务。但是线程池使用不当,就可能会引发生产事故。大家看完肯定会有帮助的~
227 0
|
6月前
|
缓存 Java
|
4月前
|
缓存 Java
线程池使用小结
线程池使用小结
26 0
|
6月前
|
存储 Java 调度
浅谈线程池
浅谈线程池
38 1
|
Java
线程池总结
线程池总结
62 0
|
存储 Java 测试技术
13.一文彻底了解线程池
大家好,我是王有志。线程池是Java面试中必问的八股文,涉及到非常多的问题,今天我们就通过一篇文章,来彻底搞懂Java面试中关于线程池的问题。
401 2
13.一文彻底了解线程池
|
前端开发 Java 调度
你了解线程池吗
你了解线程池吗
81 0
|
存储 Java 调度
线程池使用
线程池使用
|
缓存 NoSQL Java
【线程池】
【线程池】
148 0