线程池设计, 从简单的我们平常设计线程池图解,到生活中的类似线程池的处理现实场景, 到简单的C++模拟nginx写的单链表组织工作队列的简单线程池实现 + nginx 部分源码刨析

简介: 线程池设计, 从简单的我们平常设计线程池图解,到生活中的类似线程池的处理现实场景, 到简单的C++模拟nginx写的单链表组织工作队列的简单线程池实现 + nginx 部分源码刨析

活实例 整体 抽象 线程池, 其实线程池, 给我的感觉 核心 其实是 在于任务队列的设计上, 任务队列 + 互斥锁 + 条件变量 保证 任务队列的 中任务的有条不紊的 生产任务和 处理任务.........


这个池子: 其实 就是提前开启了 多个 死循环的处理任务的工作线程: 这些多个线程好比是现实生活中的办事窗口,任务队列就好比是我们去办事的人.....


看一张图: 人们排着队去 银行办事, 或者 医院挂号 办事:


其实这些  窗口就是我们的工作线程: 他们其实是一开始就知道自己的办事流程的,,,,   一直死循环的等待工作,,,,,     是提前 开启的,,,   是 时刻准备着的,  所以 是任务到来之前, 一创建线程池, 初始化线程池的时候, 就必须马上的启动这些工作线程......       (InitTreadPool就启动消费者线程)  

工作线程: 一般是 一直的 死循环的进行 等待 着 任务的到来 随时准备着 处理任务的........


任务队列:  生产者, 不停的 生产存储着需要解决的任务......    (存在 异步解耦合性质?????).....


下面 先理解好啥叫作异步解耦?????    


队列。。。  其实永远都是一个作用, 缓冲, 解除耦合性,,,,    然后一提到 解耦合 永远都有一个叫做   异步解耦的说法, 但是  这个概念总是非常的模糊的过去了。。。。。。。


异步解耦合: 就是两个事件之间是相互解除等待关系的,,,,,  (这个是我的简单的理解,如果有不对,欢迎各位给出改正, 我会非常表示感谢的,,,,,)


有了任务队列:  其实 在所有线程都在繁忙的时候, 我们提供任务的线程需要不需要等待 线程空闲下来,我们把任务给到线程手上,我们才返回,,,,去收集其他任务提交信息呢????


答案当然是不需要,  我们只是需要将任务给到任务队列中我们就不需要管了, 就可以直接返回了......  这样我们就不受工作线程繁忙的影响了, 提供任务的线程将任务放到任务队列中,然后就可以去继续获取其他任务信息放入到任务队列中.......  


还可以用生活中的 快递驿站的例子理解


比如说常见的菜鸟驿站, 快递员们将 快递包裹(任务) 放入到菜鸟驿站就OK了......   然后快递员就可以离开去干其他事情了......


同样我们人去拿取快递, 也不需要和快递员直接交互, 我们也不是说快递员将快递放入到了菜鸟驿站,我们就需要立马去拿, 不需要, 我们可以先将手头上正在处理的事情处理好,然后 抽空去拿就OK了.....


如上就是我对于异步解耦的理解, 就是 生产者来说 我 不需要等着你可以消费了,我才生产, 我生产好了可以屯在队列中, 消费者 不是说你生产了我立马必须消费, 可以等到我自己可以消费了再消费,,, 异步性我感觉有一定的自由性....


线程池说了这么多理论, 相信大家都快烦了, 如果大家是学生的话就像快点来撸代码了,,,, 我学这里的时候完全是一个想法, 哈哈哈, 奉上大致的两份代码如下:    C++ 版本


首先写代码的时候  需要了解一个点:    线程处理函数的第一个参数需要是  void* 类型


void *(*start_routine) (void *)   所以  这个  Routine 线程执行的函数我们必须要 在类中进行一个static  不然 第一个参数默认传入的是 this 指针, 这样会报错, 所以我们处理成static 函数,然后手动传入this指针实现


如下 首先是一个  thread_pool 的 代码:[tangyujie@VM-4-9-centos PthreadPool]$ cat pthread_pool.hpp

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
using std::endl;
using std::cin;
using std::cout;
using std::queue;
template<class T>
class PthreadPool {
public:
    PthreadPool(int num = 3) :_num(num) {
        pthread_mutex_init(&_lock, NULL);
        pthread_cond_init(&_cond, NULL);
    }
    ~PthreadPool() {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    }
    void Lock() {
        pthread_mutex_lock(&_lock);             //上锁
    }
    void Unlock() {
        pthread_mutex_unlock(&_lock);           //解锁
    }
    void WakeUp() {                             //唤醒进程, 通知消费者线程有任务了
        pthread_cond_signal(&_cond);            //唤醒单个进程
    }
  void Wait() {                                 //没有任务, 消费者线程先阻塞起来等待任务 
        pthread_cond_wait(&_cond, &_lock);        
    }
    bool IsEmptyQueue() {
        return _taskqueue.empty();
    }
    static void* Routine(void* args) {          
        PthreadPool* self = (PthreadPool*)args;   
        while (1) {                      
            self->Lock();                           
            while (self->IsEmptyQueue()) {       //while  防止伪唤醒,
                //wait
                self->Wait();
            }
            //说明存在 task了/
            T t;
            self->PopTask(t);
            self->Unlock();                         
            t.Run();                            //解锁后处理任务.... 做啥???
        }
    } 
    void PushTask(const T& in) {                //传入参数, push task
        Lock();
        _taskqueue.push(in);                    //操作临界资源加锁
        Unlock();
        WakeUp();                                       
    }
    void PopTask(T& out) {                       //传出参数 拿取任务
        out = _taskqueue.front();                //拿取pop任务
        _taskqueue.pop();                        //任务队列pop 任务
    }
    void InitPthreadPool() {                       
        pthread_t tid;        
        for (int i = 0; i < _num; ++i) {
            pthread_create(&tid, NULL, Routine, this);
            pthread_detach(tid);                 //回收线程资源
        }
    }
private:
    int _num;                                   //工作线程的数目
    queue<T> _taskqueue;                        //任务队列
    pthread_mutex_t _lock;                      //锁保证临界资源的互斥访问
    pthread_cond_t _cond;                       //条件变量控制资源到来时候的唤醒工作
};

然后是  task.hpp的代码 生产任务:#[tangyujie@VM-4-9-centos PthreadPool]$ cat task.hpp

#pragma once
#include <iostream>
#include <pthread.h>
using std::endl;
using std::cout;
using std::cerr;
//typedef int (*handler_t ) (int, int, char);
class Task {
public:
    Task(int x = 1, int y = 1, char op = '+')                //默认构造 
        : _x(x)
        , _y(y)
        , _op(op)  
    {}
    ~Task(){}
    void Run() {
        int z = 0;
        switch(_op) {
            case '+':
                z = _x + _y;
                break;
            case '-':
                z = _x - _y;
                break;
            case '*':
                z = _x * _y;
                break;
            case '/':
                if (_y == 0) {cerr << "div zero!" << endl; break;}
                z = _x / _y;
                break;
            case '%':
                if (_y == 0) {cerr << "mod zero!" << endl; break;}
                z = _x % _y;
                break;
            default:
                cerr << "operator error!" << endl;
                break;
        } 
        cout << "thread: [" << pthread_self() << "]: "<< _x << _op << _y << "=" << z << endl; 
    }
private:
    int _x;
    int _y;
    char _op;
};

最后一个是 测试函数  main.cc的代码:[tangyujie@VM-4-9-centos PthreadPool]$ cat main.cc

#include "task.hpp"
#include "pthread_pool.hpp"
#include <cstdlib>
#include <ctime>
int main() {
    PthreadPool<Task> * tp = new PthreadPool<Task>();
    tp->InitPthreadPool();
    srand((unsigned long)time(nullptr));
    const char* ops = "+-*/%";
    while (1) {
        int x = rand() % 123 + 1;
        int y = rand() % 123 + 1;
        Task t(x, y, ops[rand() % 5]);
        tp->PushTask(t);
    sleep(1);
    }
    return 0;
}

至此: 基本对于线程池基本上是知道理论了:


然后如下 介绍一下  线程池  为啥好??????池化技术有啥好处????


首先:  线程池, 池子,  对于池子, 我的印象就是蓄水池了, 比如说在缺水的底带, 家里要大水如果需要到很远的地方去打水的化, 还有需要水的时候才接水.... 可以吗???  当然可以, 但是这样需要的时候立马打水  或者到远处去运输水  对于这个时间的消耗是不是很大, 要是 我们提前造一个池子, 需要的话直接去池子里面去拿取, 是不是效率会高很多.............


内存池, 线程池  其实都是常见的池化技术,   线程池, 避免了临时创建大量线程, 充分的实现了线程的复用, 我们可以反复利用我们最初线程池初始创建的时候就创建好的多线程     (工作线程)

自此:  算是暂时小结了, 写了太多了,,,, 其中都是鄙人的拙见, 如果觉得有帮助希望点个赞,欢迎评论, 最后 好了啦, 不卷了, 大家新年快乐, 学完收工.... 至于  nginx 部分源码刨析留着下次写吧

相关文章
|
10月前
|
安全 Java 调度
Netty源码—3.Reactor线程模型二
本文主要介绍了NioEventLoop的执行总体框架、Reactor线程执行一次事件轮询、Reactor线程处理产生IO事件的Channel、Reactor线程处理任务队列之添加任务、Reactor线程处理任务队列之执行任务、NioEventLoop总结。
|
10月前
|
安全 Java
Netty源码—2.Reactor线程模型一
本文主要介绍了关于NioEventLoop的问题整理、理解Reactor线程模型主要分三部分、NioEventLoop的创建和NioEventLoop的启动。
|
11月前
|
Java 中间件 调度
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
本文涉及InheritableThreadLocal和TTL,从源码的角度,分别分析它们是怎么实现父子线程传递的。建议先了解ThreadLocal。
430 4
【源码】【Java并发】从InheritableThreadLocal和TTL源码的角度来看父子线程传递
|
Java 调度
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
当我们创建一个`ThreadPoolExecutor`的时候,你是否会好奇🤔,它到底发生了什么?比如:我传的拒绝策略、线程工厂是啥时候被使用的? 核心线程数是个啥?最大线程数和它又有什么关系?线程池,它是怎么调度,我们传入的线程?...不要着急,小手手点上关注、点赞、收藏。主播马上从源码的角度带你们探索神秘线程池的世界...
555 0
【源码】【Java并发】【线程池】邀请您从0-1阅读ThreadPoolExecutor源码
|
存储 监控 Java
JAVA线程池有哪些队列? 以及它们的适用场景案例
不同的线程池队列有着各自的特点和适用场景,在实际使用线程池时,需要根据具体的业务需求、系统资源状况以及对任务执行顺序、响应时间等方面的要求,合理选择相应的队列来构建线程池,以实现高效的任务处理。
832 12
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
2063 32
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
305 12
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
1080 7
|
消息中间件 存储 安全
|
安全 Java 容器
【JaveEE】——多线程中使用顺序表,队列,哈希表
多线程环境下使用ArrayList(同步机制,写时拷贝),使用队列,哈希表(高频)ConcurrentHashMap(缩小锁粒度,CAS,扩容优化)