线程池的实现

简介: 线程池的实现

线程池是一种池式组件,通过创建和维护一定数量的线程,实现这些线程的重复使用避免了频繁创建和销毁线程的开销,从而提升了性能

线程池的作用:

1.复用线程资源;

2.减少线程创建和销毁的开销;

3.可异步处理生产者线程的任务;

4.减少了多个任务(不是一个任务)的执行时间;

代码实现

一、结构体定义

1.任务结构体:

typedef struct task_s { 
    void *next;  // 指向下一个task
    handler_pt func; // 该task执行的函数
    void *arg;  // 函数的参数
}task_t;

2.队列结构体:

typedef struct task_queue_s { // task队列
    void *head; // 一级指针,指向队列的第一个task结构体
    void **tail; // 二级指针,指向队列的最后一个task结构体的第一个成员void *next指针
    int block; // 阻塞标志
    spinlock_t lock; // 自旋锁变量
    pthread_mutex_t mutex; // 互斥锁变量
    pthread_cond_t cond; // 条件变量
} task_queue_t;

3.线程池结构体:

struct threadpool_s {
    task_queue_t *task_queue; // task队列指针
    atomic_int quit; // 原子变量
    int thread_count; // 线程池中的线程数量
    pthread_t *threads; // 线程句柄数组
};
二、资源创建和销毁

1.创建任务队列:

static task_queue_t *__taskqueue_create() { // 创建一个任务队列
    int ret;
    task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
    // 回滚式编程,创建资源
    if (queue) {
        ret = pthread_mutex_init(&queue->mutex, NULL);
        if (ret == 0) {
            ret = pthread_cond_init(&queue->cond, NULL);
            if (ret == 0) { // 全部资源创建成功
                spinlock_init(&queue->lock);
                queue->head = NULL; // 队列为空
                queue->tail = &queue->head; // tail是二级指针,大小是一个指针八字节,队列为空时指向head指针首地址
                queue->block = 1; // 设置为阻塞
                return queue;
            }
            pthread_mutex_destroy(&queue->mutex); // queue和init成功,但此{}内其它资源至少有一个创建失败
        }
        free(queue); // queue创建成功,但此{}内其它资源没有全部创建成功,至少有一个失败
    }
    return NULL; // queue创建失败
}

2.销毁任务队列:

static void __taskqueue_destroy(task_queue_t *queue) { // 销毁一个任务队列
    task_t *task;
    // 先将任务队列中的任务全部销毁
    while ((task = __pop_task(queue))) {
        free(task);
    }
    // 销毁任务队列的成员
    spinlock_destroy(&queue->lock);
    pthread_cond_destroy(&queue->cond);
    pthread_mutex_destroy(&queue->mutex);
    free(queue); // 释放队列结构体空间
}

3.创建线程池

static int __threads_create(threadpool_t *pool, size_t thread_count) { // 为线程池创建若干线程
    pthread_attr_t attr; // 用于设置线程属性的变量,作为pthread_create的第二个参数
    int ret;
    ret = pthread_attr_init(&attr); // 初始化线程属性变量
    // 资源创建-->回滚式编程
    if (ret == 0) {
        // 为线程句柄数组分配空间
        pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
        if (pool->threads) {
            // 创建若干线程开始执行__threadpool_worker函数
            int i = 0;
            for (; i < thread_count; i++) {
                if (pthread_create(&pool->threads[i], &attr, __threadpool_worker, pool) != 0) {
                    break; // 出现创建失败的情况
                }
            }
            pool->thread_count = i; // 更新线程数量,因为可能中途创建失败,数量小于预期值
            pthread_attr_destroy(&attr); // 销毁线程属性变量
            if (i == thread_count)
                return 0; // 资源全部创建成功
            __threads_terminate(pool);
            free(pool->threads);
        }
        ret = -1; // 为线程句柄数组分配空间失败
    }
    return ret; // 初始化线程属性变量失败
}
threadpool_t *threadpool_create(int thread_count) { // 创建一个线程池,参数是线程数量
    threadpool_t *pool;
    pool = (threadpool_t *)malloc(sizeof(*pool)); // 为线程池分配内存
    if (pool) {
        task_queue_t *queue = __taskqueue_create(); // 创建线程池的工作队列
        if (queue) {
            pool->task_queue = queue;
            atomic_init(&pool->quit, 0);
            if (__threads_create(pool, thread_count) == 0) // 为线程池创建若干线程
                return pool;
            __taskqueue_destroy(queue); // 线程创建失败则逐层释放已经创建的资源
        }
        free(pool);
    }
    return NULL;
}

4.回收线程池:

static void __threads_terminate(threadpool_t *pool) { // 回收线程池的所有线程
    atomic_store(&pool->quit, 1); // 设置quit状态,使得不再有新的线程开始工作
    __nonblock(pool->task_queue); // 设置队列为非阻塞,使得get_task的线程不会进入条件等待,直接退出
    int i;
    for (i = 0; i < pool->thread_count; i++) {
        pthread_join(pool->threads[i], NULL); // 等待所有线程结束
    }
}

三、任务的添加、删除和执行

1.向任务队列中添加一个任务:

static inline void __add_task(task_queue_t *queue, void *task) { // 向任务队列中添加一个任务 
    // void *,不限定任务类型,只要该任务的结构体起始内存是一个用于链接下一个节点的指针
    void **link = (void **)task; //转换task指向的类型从结构体变为指针:所指范围是8字节,也就是task_t结构体对象的前8个字节,此时task相当于指向其结构体的第一个成员next,与tail用法相同
    *link = NULL; // (*link等价于next指针)将该task指向的next指针成员指向NULL
    
    // 修改共享变量queue时加锁
    spinlock_lock(&queue->lock);
    *queue->tail = link; // 等价于 queue->tail->next = link, 因为二级指针tail实际上指向一个task_t结构体的第一个成员:next指针,link指向这个task的第一个成员next指针
    queue->tail = link; // 实际上第一行的next指针和第二行的tail指针都保存着link也就是task的首地址
    spinlock_unlock(&queue->lock);
    pthread_cond_signal(&queue->cond); // 添加一个任务后,广播唤醒处于等待状态的任务
}
int threadpool_post(threadpool_t *pool, handler_pt func, void *arg) { // 向线程池添加一个任务
    if (atomic_load(&pool->quit) == 1) // 如果线程池的状态是quit:已终止,则退出
        return -1;
    task_t *task = (task_t *)malloc(sizeof(task_t));
    if (!task) return -1;
    task->func = func;
    task->arg = arg;
    __add_task(pool->task_queue, task);
    return 0;
}

2.从任务队列中取出一个任务:

static inline void *__pop_task(task_queue_t *queue) { // 从队列中取出一个task
    // 上锁
    spinlock_lock(&queue->lock);
    if (queue->head == NULL) { // 队列为空
        spinlock_unlock(&queue->lock);
        return NULL;
    }
    task_t *task;
    task = queue->head; // 取出第一个
    void **link = (void **)task; // link指向task即head的下一个task
    queue->head = *link; // *link即link指向的task指针
    if (queue->head == NULL) { // 判断取出一个节点后队列是否为空,很必要!
        queue->tail = &queue->head;
    }
    spinlock_unlock(&queue->lock);
    return task;
}
static inline void *__get_task(task_queue_t *queue) { // 从队列中获取一个task,调用了pop_task
    task_t *task;
    // 使用while, 一定不能用if,如果被虚假唤醒,队列仍为空,返回的task == NULL
    while ((task = __pop_task(queue)) == NULL) { // 队列为空
    // 上锁 ,一定要在while内此处上锁,如果在while外,则其它没有抢到锁的线程会直接获得一个未初始化的野指针task
        pthread_mutex_lock(&queue->mutex);
        if (queue->block == 0) { // 队列为非阻塞状态
            pthread_mutex_unlock(&queue->mutex);
            return NULL; // 队列为空就直接返回
        }
        // 队列为阻塞状态
        pthread_cond_wait(&queue->cond, &queue->mutex);
        // pthread_cond_wait函数执行的内容:
        // 1.先进行unlock(&queue->mutex)
        // 2.再使当前线程休眠
        // --- 接收到broadcast广播时唤醒线程
        // 3.唤醒当前线程
        // 4.加上lock(&queue->mutex)
        // 5.函数返回
        // 6.pthread_cond_wait的执行并不是原子性的,所以需要使用锁
        pthread_mutex_unlock(&queue->mutex);
    }
    return task;
}

3.执行任务(每一个消费者线程所执行的函数)

static void *__threadpool_worker(void *arg) { // 线程池中的全部线程所执行的函数
    threadpool_t *pool = (threadpool_t *)arg; // 传入一个线程池的指针
    task_t *task;  
    void *ctx; 
    while (atomic_load(&pool->quit) == 0) { // 线程池没有停止运转(被销毁前停止、被终止前停止)
        task = (task_t *)__get_task(pool->task_queue); // 从线程池的任务队列中获取一个任务
        if (!task) break; // 线程池被标记终止"quit",get_task返回NULL
        handler_pt func = task->func; // 获取该任务所执行的函数
        ctx = task->arg; // 任务执行函数所需的参数
        free(task); // 任务被执行后销毁
        func(ctx); // 执行任务函数
    }
    return NULL;
}

推荐学习 https://xxetb.xetslk.com/s/p5Ibb

目录
相关文章
|
6月前
|
NoSQL Java 应用服务中间件
|
缓存 Java 应用服务中间件
线程池的10个坑你都遇到过吗
日常开发中,为了更好管理线程资源,减少创建线程和销毁线程的资源损耗,我们会使用线程池来执行一些异步任务。但是线程池使用不当,就可能会引发生产事故。大家看完肯定会有帮助的~
227 0
|
2月前
|
监控 Java API
如何快速地实现一个线程池
如何快速地实现一个线程池
23 0
|
3月前
|
Java 调度
基于C++11的线程池
基于C++11的线程池
|
5月前
|
存储 缓存 安全
线程池相关详解
线程池相关详解
|
6月前
|
缓存 Java API
厉害了,线程池就该这么玩
厉害了,线程池就该这么玩
58 0
|
存储 Java 测试技术
13.一文彻底了解线程池
大家好,我是王有志。线程池是Java面试中必问的八股文,涉及到非常多的问题,今天我们就通过一篇文章,来彻底搞懂Java面试中关于线程池的问题。
401 2
13.一文彻底了解线程池
|
前端开发 Java 调度
你了解线程池吗
你了解线程池吗
81 0
|
存储 缓存 Java
理解与实现线程池
理解与实现线程池
129 0
KeyAffinityExecutor 线程池
KeyAffinityExecutor 线程池