线程池底层实现学习

简介: 线程池底层实现学习

什么是线程池❔

线程池是一个维持固定数量线程的池式结构

  • 问:为什么要是固定数量
    **答:**线程是一个紧缺的系统资源,随着线程不断的创建,达到一定数量后,操作系统的压力会越来越大
  • 问: 如何来决定线程池中线程的数量
    **答:**区分当前场景下是cpu密集型还是io密集型
    1. cpu密集型(需要利用cpu进行复杂耗时的计算):proc
    2. io密集型(大量的网络io):2*proc
  • 问:为什么需要使用线程池
    **答:**某类任务特别耗时,会影响当前线程处理其他任务,我们就会将它抛到其他线程中去行,这些任务会被异步的执行,但是线程的创建与销毁的开销很大,于是就用一个池式结构来管理固定数量的线程,我们叫做线程池,主要作用为:复用线程资源充分利用线程资源
  • 问:线程池属于什么类型模型
    答:属于生产者消费者模型

线程池基本大概运行框架🖼

首先线程池中维护一堆运行的线程(是通过pthread_create线程,并运行线程的专属入口函数为work,它会线程中一直运行),会一直获取任务队列中的任务节点(当任务队列中没有节点则线程会休眠),而线程池暴露了一个post接口给用户,用户可以将要执行的函数以及其上下文(参数),以任务节点的形式放进任务队列,此时会唤醒线程池中的线程来执行该任务,当任务执行完毕后线程会继续运行work函数获取任务队列的节点

线程池的实现🍻

线程池中的基本组成结构🍺

任务节点链表 任务队列 线程池
任务实体节点用链表组织 用阻塞队列的方式管理任务实体 包含任务队列,包含线程id数组
任务节点链表📑
typedef struct task_s {
    void *next;
    handler_pt func;
    void *arg;
} task_t;
  • 第一个字段必须为next指针,方便后续任务队列二级指针的操作
  • 后面的字段分别是函数指针和参数列表的参数
任务队列📑

这个队列是在多线程环境下,用于管理大量任务

typedef struct task_queue_s {
    void *head;
    void **tail; 
    int block;
    spinlock_t lock;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
} task_queue_t;
  • 指向队列头的指针
  • 指向队列尾部指针的指针(可以很方便的对队列进行增加删除操作)
  • 阻塞信息,当为1时表示使用阻塞队列,当任务队列为空时会阻塞休眠等待任务队列中有任务节点
  • 用于任务队列增删的时候的锁,使用自旋锁是因为队列增删节点的时间很快
  • mutexcond用于当前线程访问任务队列,任务队列为空时可以进行休眠,等待队列有节点时的信号
线程池结构📑
struct thrdpool_s {
    task_queue_t *task_queue;
    atomic_int quit;
    int thrd_count;
    pthread_t *threads;
};
  • 任务队列
  • 线程池中线程的状态,1代表不会启动,0代表运行中
  • 线程池中线程的数量
  • 线程id数组

线程池接口设计⁉️

c中,可以在.c文件中对函数使用static来向其他文件屏蔽该函数,使该函数只能在本文件中访问,比如:

  • 有一个a.c文件里面有两个函数,一个是static的一个不是static,在另一个b.c文件中调用a.c,文件中的函数则只能调用非static函数,如果想要调用static,可以使用非static函数内部间接的调用该函数
暴露给用户的
  • 线程池的创建
  • 往任务队列中增加节点的函数
  • 线程池状态的控制,可以控制线程池中的线程暂停
  • 回收线程池中的相关资源的api
线程池内部的基本必要函数(使用static屏蔽)

**小注:**为什么pthread_create调用成员函数时,该成员函数必须声明为静态函数,不然就会与cpthread_create函数的函数不匹配

  • pthread_create第三个参数是一个参数列表为void*,返回类型为void*的函数指针,非static的成员函数的参数列表会隐含的穿一个this指针,导致其参数列表,隐式的会有两个参数从而与pthread_create的参数列表不匹配
  • work工作函数
  • 对任务队列的增删函数
  • 实际线程的创建函数
  • 线程的创建函数(会被用户的线程池创建函数调用)
实际coding部分

回滚式写法:一般是涉及到资源的创建时会使用这种写法,没有创建成功则立即回滚处理,条件判断,只在内层条件判断成功后返回正确内容,其他条件分支统统是错误回滚处理

创建任务队列函数
static task_queue_t *
__taskqueue_create() {
    task_queue_t *queue = (task_queue_t *)malloc(sizeof(*queue));
    if (!queue) return NULL;
    int ret;
    ret = pthread_mutex_init(&queue->mutex, NULL);
    if (ret == 0) {
        ret = pthread_cond_init(&queue->cond, NULL);
        if (ret == 0) {
            spinlock_init(&queue->lock);  // 自己写的用 只是一个变量不需要释放
            queue->head = NULL;
            queue->tail = &queue->head;
            queue->block = 1;
            return queue;
        }
        pthread_cond_destroy(&queue->cond);
    }
    pthread_mutex_destroy(&queue->mutex);
    return NULL;
}
  • 疑惑操作:
queue->head = NULL;
queue->tail = &queue->head;
取消阻塞函数
static void
__nonblock(task_queue_t *queue) {
    pthread_mutex_lock(&queue->mutex);
    queue->block = 0;  // 可以使用原子操作来避免锁
    pthread_mutex_unlock(&queue->mutex);
    pthread_cond_broadcast(&queue->cond);
}
  • 采用条件变量广播唤醒所有睡眠等待的线程,让他们退出
获取任务队列节点函数
static inline void * 
__get_task(task_queue_t *queue) {
    task_t *task;
    // 虚假唤醒
    while ((task = __pop_task(queue)) == NULL) {
        pthread_mutex_lock(&queue->mutex);
        if (queue->block == 0) {
            // break;
            return NULL;
        }
        pthread_cond_wait(&queue->cond, &queue->mutex);
        pthread_mutex_unlock(&queue->mutex);
    }
    return task;
}
  • 这个函数被work函数调用之后会如果队列为空则会阻塞线程
向任务队列中添加节点
static inline void 
__add_task(task_queue_t *queue, void *task) {
    void **link = (void **)task; // malloc 
    *link = NULL; // task->next = NULL;
    spinlock_lock(&queue->lock);
    *queue->tail = link;  // 将尾部指向link节点
    queue->tail = link;
    spinlock_unlock(&queue->lock);
    pthread_cond_signal(&queue->cond);
}
  • linkvoid**类型的,*link则是一个void*类型的,具体指task->next的结构体,因此可以使用(task_t*)(*link)这样就可以(task_t*)(*link)->func来访问其成员,但是队列中用二维指针管理的基本都是尾元素,所以(*link)一般都指向NULL,故使用(task_t*)(*link)->func来访问成员都是未定义的
取任务队列的头结点
static inline void * 
__pop_task(task_queue_t *queue) {
    spinlock_lock(&queue->lock);
    if (queue->head == NULL) {  // 任务队列为空
        spinlock_unlock(&queue->lock);
        return NULL;
    }
    task_t *task;
    task = queue->head;
    queue->head = task->next;
    if (queue->head == NULL) {
        queue->tail = &queue->head;
    }
    spinlock_unlock(&queue->lock);
    return task;
}
线程工作函数
static void *
__thrdpool_worker(void *arg) {
    thrdpool_t *pool = (thrdpool_t*) arg;
    task_t *task;
    void *ctx;
    while (atomic_load(&pool->quit) == 0) {
        task = (task_t*)__get_task(pool->task_queue);  // 内含休眠
        if (!task) break;
        handler_pt func = task->func;
        ctx = task->arg;
        free(task);
        func(ctx);
    }
    return NULL;
}
  • free函数是释放task指向的内存,但是free 函数并不会强制操作系统立即回收该内存空间,而是将其加入内存池中,以备下一次分配使用。所以我们在free之前保存其堆中指向的内存,还可以使用。
线程池创建函数
thrdpool_t *
thrdpool_create(int thrd_count) {
    thrdpool_t *pool;
    pool = (thrdpool_t*) malloc(sizeof(*pool));
    if (!pool) return NULL;
    task_queue_t *queue = __taskqueue_create();
    if (queue) {
        pool->task_queue = queue;
        atomic_init(&pool->quit, 0);
        if (__threads_create(pool, thrd_count) == 0) {
            return pool;
        }
        __taskqueue_destroy(pool->task_queue);
    }
    free(pool);
    return NULL;
}
  • 包括了队列的创建与线程们的创建

线程池的使用

**用来干什么:**用来增加一个全局变量的值,增加到1000后使线程池terminate

**怎么用:**初始化线程池,不停的往任务队列中抛任务节点进去,每个任务仅对临界资源+1一次,用户不断的push任务,线程池中的线程不断地去竞争队列中的任务节点然后执行后返回然后继续竞争

任务函数

int done = 0;
void do_task(void *arg) {
    thrdpool_t *pool = (thrdpool_t*)arg;
    pthread_mutex_lock(&lock);
    done++;
    printf("doing %d task\n", done);
    pthread_mutex_unlock(&lock);
    if (done >= 1000) {
        thrdpool_terminate(pool);
    }
}
  • 对全局变量done进行+1操作

使用线程池

int threads = 8;
pthread_mutex_init(&lock, NULL);
thrdpool_t *pool = thrdpool_create(threads);
if (pool == NULL) {
    perror("thread pool create error!\n");
    exit(-1);
}
while (thrdpool_post(pool, &do_task, pool) == 0) {
}
thrdpool_waitdone(pool);
pthread_mutex_destroy(&lock);
  • 创建线程池thrdpool_create
  • 向线程池中发送数据

总结:

如果没有使用线程池,我们就是自己用户层的开启线程,来一个开一个,完全没有管理的概念,加入了线程池后,由线程池来管理线程们。

还有使用二级指针维护队列尾,使得队列的删除变得方便

相关文章
|
7月前
|
安全 编译器 C#
C#学习相关系列之多线程---lock线程锁的用法
C#学习相关系列之多线程---lock线程锁的用法
|
7月前
|
Java 调度 C#
C#学习系列相关之多线程(一)----常用多线程方法总结
C#学习系列相关之多线程(一)----常用多线程方法总结
|
7月前
|
C#
C#学习相关系列之多线程---ConfigureAwait的用法
C#学习相关系列之多线程---ConfigureAwait的用法
129 0
|
7月前
|
C#
C#学习相关系列之多线程---TaskCompletionSource用法(八)
C#学习相关系列之多线程---TaskCompletionSource用法(八)
202 0
|
3月前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
192 6
【Java学习】多线程&JUC万字超详解
|
6月前
|
NoSQL Redis
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
137 0
|
6月前
|
调度 Python
Python多线程学习优质方法分享
Python多线程学习优质方法分享
27 0
|
6月前
|
安全 API C++
逆向学习Windows篇:C++中多线程的使用和回调函数的实现
逆向学习Windows篇:C++中多线程的使用和回调函数的实现
212 0
|
7月前
|
Java 调度
【JAVA学习之路 | 提高篇】进程与线程(Thread)
【JAVA学习之路 | 提高篇】进程与线程(Thread)
|
7月前
|
安全 Java
java-多线程学习记录
java-多线程学习记录