线程池是一种池式组件,通过创建和维护一定数量的线程,实现这些线程的重复使用,避免了频繁创建和销毁线程的开销,从而提升了性能
线程池的作用:
1.复用线程资源;
2.减少线程创建和销毁的开销;
3.可异步处理生产者线程的任务;
4.减少了多个任务(不是一个任务)的执行时间;
代码实现
一、结构体定义
1.任务结构体:
typedef struct task_s { void *next; // 指向下一个task handler_pt func; // 该task执行的函数 void *arg; // 函数的参数 }task_t;
2.队列结构体:
typedef struct task_queue_s { // task队列 void *head; // 一级指针,指向队列的第一个task结构体 void **tail; // 二级指针,指向队列的最后一个task结构体的第一个成员void *next指针 int block; // 阻塞标志 spinlock_t lock; // 自旋锁变量 pthread_mutex_t mutex; // 互斥锁变量 pthread_cond_t cond; // 条件变量 } task_queue_t;
3.线程池结构体:
struct threadpool_s { task_queue_t *task_queue; // task队列指针 atomic_int quit; // 原子变量 int thread_count; // 线程池中的线程数量 pthread_t *threads; // 线程句柄数组 };
二、资源创建和销毁
1.创建任务队列:
static task_queue_t *__taskqueue_create() { // 创建一个任务队列 int ret; task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t)); // 回滚式编程,创建资源 if (queue) { ret = pthread_mutex_init(&queue->mutex, NULL); if (ret == 0) { ret = pthread_cond_init(&queue->cond, NULL); if (ret == 0) { // 全部资源创建成功 spinlock_init(&queue->lock); queue->head = NULL; // 队列为空 queue->tail = &queue->head; // tail是二级指针,大小是一个指针八字节,队列为空时指向head指针首地址 queue->block = 1; // 设置为阻塞 return queue; } pthread_mutex_destroy(&queue->mutex); // queue和init成功,但此{}内其它资源至少有一个创建失败 } free(queue); // queue创建成功,但此{}内其它资源没有全部创建成功,至少有一个失败 } return NULL; // queue创建失败 }
2.销毁任务队列:
static void __taskqueue_destroy(task_queue_t *queue) { // 销毁一个任务队列 task_t *task; // 先将任务队列中的任务全部销毁 while ((task = __pop_task(queue))) { free(task); } // 销毁任务队列的成员 spinlock_destroy(&queue->lock); pthread_cond_destroy(&queue->cond); pthread_mutex_destroy(&queue->mutex); free(queue); // 释放队列结构体空间 }
3.创建线程池:
static int __threads_create(threadpool_t *pool, size_t thread_count) { // 为线程池创建若干线程 pthread_attr_t attr; // 用于设置线程属性的变量,作为pthread_create的第二个参数 int ret; ret = pthread_attr_init(&attr); // 初始化线程属性变量 // 资源创建-->回滚式编程 if (ret == 0) { // 为线程句柄数组分配空间 pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count); if (pool->threads) { // 创建若干线程开始执行__threadpool_worker函数 int i = 0; for (; i < thread_count; i++) { if (pthread_create(&pool->threads[i], &attr, __threadpool_worker, pool) != 0) { break; // 出现创建失败的情况 } } pool->thread_count = i; // 更新线程数量,因为可能中途创建失败,数量小于预期值 pthread_attr_destroy(&attr); // 销毁线程属性变量 if (i == thread_count) return 0; // 资源全部创建成功 __threads_terminate(pool); free(pool->threads); } ret = -1; // 为线程句柄数组分配空间失败 } return ret; // 初始化线程属性变量失败 } threadpool_t *threadpool_create(int thread_count) { // 创建一个线程池,参数是线程数量 threadpool_t *pool; pool = (threadpool_t *)malloc(sizeof(*pool)); // 为线程池分配内存 if (pool) { task_queue_t *queue = __taskqueue_create(); // 创建线程池的工作队列 if (queue) { pool->task_queue = queue; atomic_init(&pool->quit, 0); if (__threads_create(pool, thread_count) == 0) // 为线程池创建若干线程 return pool; __taskqueue_destroy(queue); // 线程创建失败则逐层释放已经创建的资源 } free(pool); } return NULL; }
4.回收线程池:
static void __threads_terminate(threadpool_t *pool) { // 回收线程池的所有线程 atomic_store(&pool->quit, 1); // 设置quit状态,使得不再有新的线程开始工作 __nonblock(pool->task_queue); // 设置队列为非阻塞,使得get_task的线程不会进入条件等待,直接退出 int i; for (i = 0; i < pool->thread_count; i++) { pthread_join(pool->threads[i], NULL); // 等待所有线程结束 } }
三、任务的添加、删除和执行
1.向任务队列中添加一个任务:
static inline void __add_task(task_queue_t *queue, void *task) { // 向任务队列中添加一个任务 // void *,不限定任务类型,只要该任务的结构体起始内存是一个用于链接下一个节点的指针 void **link = (void **)task; //转换task指向的类型从结构体变为指针:所指范围是8字节,也就是task_t结构体对象的前8个字节,此时task相当于指向其结构体的第一个成员next,与tail用法相同 *link = NULL; // (*link等价于next指针)将该task指向的next指针成员指向NULL // 修改共享变量queue时加锁 spinlock_lock(&queue->lock); *queue->tail = link; // 等价于 queue->tail->next = link, 因为二级指针tail实际上指向一个task_t结构体的第一个成员:next指针,link指向这个task的第一个成员next指针 queue->tail = link; // 实际上第一行的next指针和第二行的tail指针都保存着link也就是task的首地址 spinlock_unlock(&queue->lock); pthread_cond_signal(&queue->cond); // 添加一个任务后,广播唤醒处于等待状态的任务 } int threadpool_post(threadpool_t *pool, handler_pt func, void *arg) { // 向线程池添加一个任务 if (atomic_load(&pool->quit) == 1) // 如果线程池的状态是quit:已终止,则退出 return -1; task_t *task = (task_t *)malloc(sizeof(task_t)); if (!task) return -1; task->func = func; task->arg = arg; __add_task(pool->task_queue, task); return 0; }
2.从任务队列中取出一个任务:
static inline void *__pop_task(task_queue_t *queue) { // 从队列中取出一个task // 上锁 spinlock_lock(&queue->lock); if (queue->head == NULL) { // 队列为空 spinlock_unlock(&queue->lock); return NULL; } task_t *task; task = queue->head; // 取出第一个 void **link = (void **)task; // link指向task即head的下一个task queue->head = *link; // *link即link指向的task指针 if (queue->head == NULL) { // 判断取出一个节点后队列是否为空,很必要! queue->tail = &queue->head; } spinlock_unlock(&queue->lock); return task; } static inline void *__get_task(task_queue_t *queue) { // 从队列中获取一个task,调用了pop_task task_t *task; // 使用while, 一定不能用if,如果被虚假唤醒,队列仍为空,返回的task == NULL while ((task = __pop_task(queue)) == NULL) { // 队列为空 // 上锁 ,一定要在while内此处上锁,如果在while外,则其它没有抢到锁的线程会直接获得一个未初始化的野指针task pthread_mutex_lock(&queue->mutex); if (queue->block == 0) { // 队列为非阻塞状态 pthread_mutex_unlock(&queue->mutex); return NULL; // 队列为空就直接返回 } // 队列为阻塞状态 pthread_cond_wait(&queue->cond, &queue->mutex); // pthread_cond_wait函数执行的内容: // 1.先进行unlock(&queue->mutex) // 2.再使当前线程休眠 // --- 接收到broadcast广播时唤醒线程 // 3.唤醒当前线程 // 4.加上lock(&queue->mutex) // 5.函数返回 // 6.pthread_cond_wait的执行并不是原子性的,所以需要使用锁 pthread_mutex_unlock(&queue->mutex); } return task; }
3.执行任务(每一个消费者线程所执行的函数)
static void *__threadpool_worker(void *arg) { // 线程池中的全部线程所执行的函数 threadpool_t *pool = (threadpool_t *)arg; // 传入一个线程池的指针 task_t *task; void *ctx; while (atomic_load(&pool->quit) == 0) { // 线程池没有停止运转(被销毁前停止、被终止前停止) task = (task_t *)__get_task(pool->task_queue); // 从线程池的任务队列中获取一个任务 if (!task) break; // 线程池被标记终止"quit",get_task返回NULL handler_pt func = task->func; // 获取该任务所执行的函数 ctx = task->arg; // 任务执行函数所需的参数 free(task); // 任务被执行后销毁 func(ctx); // 执行任务函数 } return NULL; }