前言
线程池 -- 纯C版
使用步骤
//------------------------------------------// // 作者: 干饭小白 // 时间: 2023-09-04 //------------------------------------------// #pragma once #ifdef __cplusplus extern "C" { #endif __cplusplus #define DEFAULT_TIME 5 #define MIN_WAIT_TASK_NUM 5 #define DEFAULT_THREAD_NUM 2 #include <pthread.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <signal.h> #include <errno.h> // 任务函数结构 typedef struct { void *(*function)(void*); void *arg; }Task; // 线程池管理者 struct Threadpool { /* 开关 */ bool shutdown; /* 任务队列 */ int queue_front; // 队头 int queue_rear; // 队尾 int queue_size; // 队列大小 int queue_max_size; // 最大的任务数 Task *task_queue; // 任务队列 /* 工作线程状态 */ int min_thr_num; // 最小的线程数 int max_thr_num; // 最大线线程数 int live_thr_num; // 当前的活跃线程数 int busy_thr_num; // 当前忙碌的线程数 int wait_exit_thr_num; // 当前正在等待退出的线程数 pthread_t *threads; // 线程集合 /* 线程集合的管理 */ pthread_t admin_tid; // 线程管理者 -- 保持线程池中线程的相对平衡 pthread_mutex_t lock; // 互斥锁 pthread_mutex_t thread_counter; // 保证线程间竞争关系互斥 -- 正在忙碌的线程变量控制 pthread_cond_t queue_not_full; // 唤醒任务可入队线程 -- 队列不为满 pthread_cond_t queue_not_empty; // 唤醒工作线程取任务 -- 队列不为空 }; /*----------------------------------------------- 函数名: threadpool_creat 函数说明: 创建并初始化一个线程池 输入参数: min_thr_num(int) 最小的线程数量 max_thr_num(int) 最大的线程数量 queue_max_size(int) 任务队列大小 输出参数: 无 返回值: 返回线程池句柄,失败返回 NULL -----------------------------------------------*/ Threadpool * threadpool_creat(int min_thr_num, int max_thr_num, int queue_max_size); /*----------------------------------------------- 函数名: threadpool_thread 函数说明: 任务处理线程函数 输入参数: threadpool(Threadpool) 线程池句柄 输出参数: 无 返回值: 无 -----------------------------------------------*/ void * threadpool_thread(void *threadpool); /*----------------------------------------------- 函数名: threadpool_thread 函数说明: 任务处理线程函数 输入参数: threadpool(Threadpool*) 线程池句柄 输出参数: 无 返回值: 无 -----------------------------------------------*/ void * admin_thread(void *threadpool); /*----------------------------------------------- 函数名: threadpool_free 函数说明: 释放资源 输入参数: pool(Threadpool*) 线程池句柄 输出参数: 无 返回值: 0 正常 -1 pool == NULL -----------------------------------------------*/ int threadpool_free(Threadpool * pool); /*----------------------------------------------- 函数名: threadpool_destory 函数说明: 销毁线程池 输入参数: pool(Threadpool*) 线程池句柄 输出参数: 无 返回值: 0 正常 -1 pool == NULL -----------------------------------------------*/ int threadpool_destory(Threadpool * pool); /*----------------------------------------------- 函数名: threadpool_add_task 函数说明: 向任务队列中添加事件 输入参数: pool(Threadpool*) 线程池句柄 void *(*function)(void *arg) 任务函数 arg(void *) 任务函数携带的参数 输出参数: 无 返回值: 无 -----------------------------------------------*/ int threadpool_add_task(Threadpool * pool, void *(*function)(void *arg), void *arg); /*----------------------------------------------- 函数名: is_thread_alive 函数说明: 判断一个线程是否存活 输入参数: tid(pthread_t) 线程pid 输出参数: 无 返回值: 存活 true 消亡 false -----------------------------------------------*/ bool is_thread_alive(pthread_t tid); #ifdef __cplusplus } #endif
#include "ThreadPool.h" Threadpool * threadpool_creat(int min_thr_num, int max_thr_num, int queue_max_size) { Threadpool *pool = NULL; /* 使用do{}while(0) --> 实现goto机制,错误时即使跳出,并统一处理 */ do { /** 开启线程池空间 **/ pool = (Threadpool *)malloc(sizeof(Threadpool)); if(NULL == pool) { break; } /** 初始化信息 **/ pool->min_thr_num = min_thr_num; pool->max_thr_num = max_thr_num; pool->live_thr_num = min_thr_num; pool->busy_thr_num = 0; pool->wait_exit_thr_num = 0; pool->queue_front = 0; pool->queue_rear = 0; pool->queue_size = 0; pool->queue_max_size = queue_max_size; pool->shutdown = false; /** 分配工作线程空间 **/ pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * max_thr_num); if(NULL == pool->threads) { break; } memset(pool->threads, 0, sizeof(pthread_t) * max_thr_num); /** 队列空间 **/ pool->task_queue = (Task *)malloc(sizeof(Task) * queue_max_size); if(NULL == pool->task_queue) { break; } /** 初始化互斥锁和条件变量 **/ if(pthread_mutex_init(&(pool->lock), NULL) != 0 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0 || pthread_cond_init(&(pool->queue_not_empty), NULL) || pthread_cond_init(&(pool->queue_not_full), NULL) != 0) { break; } /** 启动 min_thr_num 个工作线程 **/ for(int i = 0; i < min_thr_num; ++i) { pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); } /** 启动管理者线程 **/ pthread_create(&(pool->admin_tid), NULL, admin_thread, (void*)pool); return pool; }while(0); /* 释放pool的空间 */ threadpool_free(pool); return NULL; } int threadpool_destory(Threadpool * pool) { /* 容错性判断 */ if(NULL == pool) { return -1; } /* 线程标志位:true */ pool->shutdown = true; /* 销毁管理线程 */ pthread_join(pool->admin_tid, NULL); /* 通知存活的线程结束自己 */ for(int i = 0; i < pool->live_thr_num; ++i) { /** 唤醒所有被阻塞的线程 **/ pthread_cond_broadcast(&(pool->queue_not_empty)); } /* 等待线程结束,进行回收 */ for(int i = 0; i < pool->live_thr_num; ++i) { pthread_join(pool->threads[i], NULL); } /* 释放资源 */ threadpool_free(pool); return 0; } int threadpool_free(Threadpool * pool) { /* 容错性判断 */ if(NULL == pool) { return -1; } /* 销毁任务队列 */ if(pool->task_queue) { free(pool->task_queue); pool->task_queue = NULL; } /* 销毁工作线程集合 */ if(pool->threads) { free(pool->threads); pool->threads = NULL; } /* 销毁池管理空间 */ free(pool); pool = NULL; return 0; } void * admin_thread(void *threadpool) { /* 容错处理 */ if(NULL == threadpool) { return NULL; } /* 维护线程池的平衡 */ Threadpool *pool = (Threadpool *)threadpool; while(!pool->shutdown) { /** 每隔DEFAULT_TIME进行一次 **/ sleep(DEFAULT_TIME); /** 获取当前状态下活跃数和任务数 -- 多线程访问同一个变量(互斥锁) **/ pthread_mutex_lock(&(pool->lock)); int queue_size = pool->queue_size; int live_thr_num = pool->live_thr_num; pthread_mutex_unlock(&(pool->lock)); /** 获取忙碌数 **/ pthread_mutex_lock(&(pool->thread_counter)); int busy_thr_num = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter)); /** 创建新线程:正在等待的任务 >= 最小允许等待的任务数 && 存活线程数 < 最大线程数 **/ if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) { pthread_mutex_lock(&(pool->lock)); int add = 0; for(int i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_NUM && pool->live_thr_num < pool->max_thr_num; ++i) { if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) { pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool); add++; pool->live_thr_num++; } } pthread_mutex_unlock(&(pool->lock)); } /** 销毁空闲的线程: 忙碌数*2 < 存活数(一半以上的空闲) && 存活数 > 最小线程数 **/ if((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) { pthread_mutex_lock(&(pool->lock)); pool->wait_exit_thr_num = DEFAULT_THREAD_NUM; pthread_mutex_unlock(&(pool->lock)); } for(int i = 0; i < DEFAULT_THREAD_NUM; ++i) { /*** 通知处于空闲的线程自杀 ***/ pthread_cond_signal(&(pool->queue_not_empty)); } } return NULL; } bool is_thread_alive(pthread_t tid) { int kill_rc = pthread_kill(tid, 0); if(ESRCH == kill_rc) { return false; } return true; } void * threadpool_thread(void *threadpool) { /* 容错处理 */ if(NULL == threadpool) { return NULL; } Threadpool *pool = (Threadpool *)threadpool; Task task; while(true) { pthread_mutex_lock(&(pool->lock)); /* 无任务:阻塞等待 */ while((pool->queue_size == 0 && !pool->shutdown)) { /** 阻塞等待 **/ pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); /** 是否是自杀任务 **/ if(pool->wait_exit_thr_num > 0) { pool->wait_exit_thr_num--; /** 判断线程池中的线程数量是否大于最小线程数:是则结束当前线程 **/ if(pool->live_thr_num > pool->min_thr_num) { pool->live_thr_num--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); } } } /* 是否销毁池 */ if(pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); } /* 有任务:执行任务 */ task.function = pool->task_queue[pool->queue_front].function; task.arg = pool->task_queue[pool->queue_front].arg; /* 维护循环队列平衡 */ pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; pool->queue_size--; /* 通知队列不未满 */ pthread_cond_broadcast(&(pool->queue_not_full)); pthread_mutex_unlock(&(pool->lock)); /* 执行开始:忙碌线程+1 */ pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thr_num++; pthread_mutex_unlock(&(pool->thread_counter)); /* 执行任务 */ (*(task.function))(task.arg); /* 执行结束:忙碌线程-1 */ pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thr_num--; pthread_mutex_unlock(&(pool->thread_counter)); } pthread_exit(NULL); } int threadpool_add_task(Threadpool * pool, void *(*function)(void *arg), void *arg) { if(NULL == pool) { return -1; } pthread_mutex_lock(&(pool->lock)); /* 任务队列满时需要等到 */ while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) { pthread_cond_wait(&(pool->queue_not_full), &(pool->lock)); } /* 线程池处于关闭状态 */ if(pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); } /* 清空残留 */ if(pool->task_queue[pool->queue_rear].arg != NULL) { free(pool->task_queue[pool->queue_rear].arg); pool->task_queue[pool->queue_rear].arg = NULL; } /* 添加任务 */ pthread_cond_signal(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0; }