Linux创建一个线程池

简介: Linux创建一个线程池

线程池

线程池一种线程的使用模式。本质上创建一个新线程是需要成本的并且线程过多会带来调度开销,进而影响缓存局部性和整体性能,所以线程池的思想就是提前创建好一批线程,要用的时候直接就可以用,不需要等到需要的时候再创建。

线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。不仅能够保证内核的充分利用,还能防止过分调度。

可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量

应用场景

  1. 需要大量的线程来完成任务,且完成任务的时间比较短,例如WEB服务器完成网页请求这样的任务。
  2. 对性能要求苛刻的应用,例如要求服务器迅速响应客户请求
  3. 接受突发性的大量请求,但不至于是服务器因此产生大量线程的应用

基于线程池实现生产者消费者模型

要求

  1. 创建固定数量的线程池,循环从任务队列中获取任务对象。任务队列没有任务时所有线程阻塞等待
  2. 获取到任务对象后,执行任务对象中的任务接口
  3. 主线程负责往任务队列中插入任务,线程池中的线程等待获取
  4. 对线程库进行封装,使用类对象实现线程对应操作

Task.hpp (任务对象)

#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// 任务类
class CPTask
{
    // 调用的计算方法,根据传入的字符参数决定
    typedef std::function<int(int, int, char)> func_t;
public:
    CPTask()
    {
    }
    CPTask(int x, int y, char op, func_t func)
        : _x(x), _y(y), _op(op), _func(func)
    {
    }
    // 实现传入的函数调用
    std::string operator()()
    {
        int count = _func(_x, _y, _op);
        // 将结果以自定义的字符串形式返回
        char res[2048];
        snprintf(res, sizeof res, "%d %c %d = %d", _x, _op, _y, count);
        return res;
    }
    // 显示出当前传入的参数
    std::string tostring()
    {
        char res[1024];
        snprintf(res, sizeof res, "%d %c %d = ", _x, _op, _y);
        return res;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _func;
};
// 负责计算的任务函数
int Math(int x, int y, char c)
{
    int count;
    switch (c)
    {
    case '+':
        count = x + y;
        break;
    case '-':
        count = x - y;
        break;
    case '*':
        count = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cout << "div zero" << std::endl;
            count = -1;
        }
        else
            count = x / y;
        break;
    }
    default:
        break;
    }
    return count;
}
class SCTask
{
    // 获取保存数据的方法
    typedef std::function<void(std::string)> func_t;
public:
    SCTask()
    {
    }
    SCTask(const std::string &str, func_t func)
        : _str(str), _func(func)
    {
    }
    void operator()()
    {
        _func(_str);
    }
private:
    std::string _str;
    func_t _func;
};

MyThread.hpp (封装线程库)

#pragma once
#include <iostream>
#include <pthread.h>
#include <string>
#include <cassert>
#include <functional>
class Thread
{
private:
    // 线程调用方法
    // 静态类内成员函数不含this指针
    // 因此参数直接传入this指针
    static void *start_routine(void *args)
    {
        Thread *_this = (Thread *)args;
        return _this->run();
    }
public:
    typedef std::function<void *(void *)> func_;
    // 构造函数
    Thread()
    {
        // 保存线程名
        _name = "thread-No.";
        _name += std::to_string(_num++);
    }
    // 线程启动(创建线程并调用回调函数)
    void start(func_ func, void *args = nullptr)
    {
        _func = func;
        _args = args;
        // 创建线程成功后直接回调线程的函数
        int n = pthread_create(&_tid, nullptr, start_routine, this);
        assert(n == 0);
    }
    // 传入回调函数的函数参数
    void *run()
    {
        return _func(_args);
    }
    // 线程等待
    void join()
    {
        int n = pthread_join(_tid, nullptr);
        assert(n == 0);
    }
    // 返回线程名
    std::string ThreadName()
    {
        return _name;
    }
    ~Thread()
    {
    }
private:
    std::string _name; // 线程名
    pthread_t _tid;    // 线程id
    func_ _func;       // 线程调用方法
    void *_args;       // 线程调用方法参数
    static int _num;   // 线程计数,用于实现线程名
};
// 初始化线程数为1
int Thread::_num = 1;

ThreadPool.hpp (线程池)

#pragma once
#include "MyThread.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <vector>
#include <queue>
// 线程池类定义位于下面,因此属性类想要获取到
// 就必须在前面声明
template <class T>
class Pool;
// 线程的属性类
template <class T>
class ThreadData
{
public:
    Pool<T> *_pool;    // 线程所在的线程池,获取到线程的this指针
    std::string _name; // 线程的名字
    ThreadData(Pool<T> *pool, const std::string &name) : _pool(pool), _name(name)
    {
    }
};
// 线程池的类
template <class T>
class Pool
{
private:
    static void *TaskPlay(void *argc)
    {
        // 获取到当前线程的线程属性类的对象
        // 通过线程属性类里面存储的this指针可以访问当前的非静态成员变量
        ThreadData<T> *_this = (ThreadData<T> *)argc;
        // 定义任务对象
        T t;
        while (1)
        {
            // 加锁和条件变量判断
            pthread_mutex_lock(&_this->_pool->_mutex);
            while (_this->_pool->_Taskqueue.empty())
                pthread_cond_wait(&_this->_pool->_cond, &_this->_pool->_mutex);
            // 线程获取任务
            t = _this->_pool->_Taskqueue.front();
            _this->_pool->_Taskqueue.pop();
            // 解锁
            pthread_mutex_unlock(&_this->_pool->_mutex);
            // 线程将任务拿到后并不需要在临界资源里执行,出来后再执行为其他线程进入临界资源腾出时间
            std::cout << _this->_name << " 处理完成: " << t() << std::endl;
        }
        return nullptr;
    }
public:
    // 将指定数量的线程创建
    // 初始化锁和条件变量
    // 线程数量缺省值为5,默认线程数为5
    Pool(const int num = 5) : _num(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        for (int i = 0; i < _num; ++i)
        {
            _threads.push_back(new Thread());
        }
    }
    // 线程池启动
    void start()
    {
        // 每个线程都创建一个线程属性类对象
        // 将当前的this指针和创建线程后的线程名字传入
        // 调用线程的启动函数时将线程的调用函数和线程属性类对象传入
        // 根据线程属性类里的成员可以获取到线程的名字和在静态成员函数中获取到当前this指针
        for (auto &t : _threads)
        {
            ThreadData<T> *td = new ThreadData<T>(this, t->ThreadName());
            t->start(TaskPlay, (void *)td);
            std::cout << "start....." << std::endl;
        }
    }
    // 往线程池中插入任务到任务队列
    // 为了保护任务队列需要加锁操作
    // 往队列中插入任务完成后就可以唤醒线程执行任务了
    void push(const T &in)
    {
        pthread_mutex_lock(&_mutex);
        _Taskqueue.push(in);
        pthread_cond_signal(&_cond);
        pthread_mutex_unlock(&_mutex);
    }
    ~Pool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
        for (auto &e : _threads)
            delete e;
    }
private:
    int _num;                       // 线程池里的线程数
    std::vector<Thread *> _threads; // 存放所有线程的地址的数组
    std::queue<T> _Taskqueue;       // 存放线程的任务的队列,线程从此队列拿任务
    pthread_mutex_t _mutex;         // 定义一把锁,为了保护临界资源
    pthread_cond_t _cond;           // 定义条件变量
};

main.cc

#include "ThreadPool.hpp"
int main()
{
    srand(time(nullptr));
    std::string s("+-*/");
    // 创建线程池,传参为10,则创建10个线程
    Pool<CPTask> *p = new Pool<CPTask>(10);
    // 调用线程池启动方法
    p->start();
    while(1)
    {
        // 随机生成任务类的参数
        int x = rand() % 100;
        int y = rand() % 100;
        int sindex = rand() % s.size();
        // Math方法在任务类头文件中
        CPTask t(x, y, s[sindex], Math);
        std::cout << "录入任务:" << t.tostring() << std::endl;
        // 将任务类插入到线程池的任务队列中
        p->push(t);
        sleep(1);
    }
    return 0;
}

实现效果


17632d74fe634651800678f8ba1b3085.png

最终成功实现主线程录入任务,线程池中的线程获取任务后执行。

相关实践学习
CentOS 7迁移Anolis OS 7
龙蜥操作系统Anolis OS的体验。Anolis OS 7生态上和依赖管理上保持跟CentOS 7.x兼容,一键式迁移脚本centos2anolis.py。本文为您介绍如何通过AOMS迁移工具实现CentOS 7.x到Anolis OS 7的迁移。
目录
相关文章
|
3月前
|
消息中间件 存储 缓存
【嵌入式软件工程师面经】Linux系统编程(线程进程)
【嵌入式软件工程师面经】Linux系统编程(线程进程)
100 1
|
1月前
|
算法 Unix Linux
linux线程调度策略
linux线程调度策略
45 0
|
1月前
|
存储 设计模式 NoSQL
Linux线程详解
Linux线程详解
|
1月前
|
缓存 Linux C语言
Linux线程是如何创建的
【8月更文挑战第5天】线程不是一个完全由内核实现的机制,它是由内核态和用户态合作完成的。
|
1月前
|
负载均衡 Linux 调度
在Linux中,进程和线程有何作用?
在Linux中,进程和线程有何作用?
|
1月前
|
缓存 Linux C语言
Linux中线程是如何创建的
【8月更文挑战第15天】线程并非纯内核机制,由内核态与用户态共同实现。
|
3月前
|
API
linux---线程互斥锁总结及代码实现
linux---线程互斥锁总结及代码实现
|
3月前
|
Linux API
Linux线程总结---线程的创建、退出、取消、回收、分离属性
Linux线程总结---线程的创建、退出、取消、回收、分离属性
|
2月前
|
安全 算法 Linux
【Linux】线程安全——补充|互斥、锁|同步、条件变量(下)
【Linux】线程安全——补充|互斥、锁|同步、条件变量(下)
39 0
|
2月前
|
存储 安全 Linux
【Linux】线程安全——补充|互斥、锁|同步、条件变量(上)
【Linux】线程安全——补充|互斥、锁|同步、条件变量(上)
50 0