本文以营业厅为例子,实现简单的线程池
一、线程池介绍
现在的企业客户端数以百万,如果某一时刻同时向服务器发消息,那么服务器要处理这些消息是同时开百万个线程吗??当然不行!!
根据posix标准,一个线程大概占8M空间,那么对于16G的内存,最多也就是开2048个线程。因此此时需要线程池。
线程池是一种常见的并发编程模型,它可以有效地管理和复用线程资源,提高程序运行效率和响应速度。通常情况下,线程池由一个任务队列和若干个工作线程组成。
线程池的优点包括:
- 提高性能:线程池可以重复使用现有的线程,避免了频繁创建和销毁线程的开销,从而提高程序的性能。
- 提高响应速度:线程池中已经创建好的空闲线程可以立即响应任务请求,减少了任务等待时间,提高了程序的响应速度。
- 控制并发数量:线程池可以限制同时执行的任务数量,有效地控制了系统资源的消耗。
- 管理线程:线程池可以统一管理所有线程,并且可以设置线程池参数来满足业务需求。
- 提供更好的可扩展性:通过使用线程池,我们可以灵活地调整系统中任务队列长度和工作线程数目,从而实现更好的可扩展性。
二、实现一个简单的线程池
以银行营业厅进行类比:
#include<stdio.h> #include <string.h> #include <stdlib.h> #include<pthread.h> #include<errno.h> //'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' //''''''''''''''''''''属性'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' #define LIST_INSERT(item ,list) do{\ item->prev=NULL; \ item->next=list; \ if ((list) != NULL) list->prev=item; \ (list)=item; \ }while(0) #define LIST_REMOVE(item,list) do{\ if(item->prev != NULL) item->prev->next=item->next; \ if(item->next != NULL) item->next->prev=item->prev; \ if(list==item) list=item->next; \ item->prev=item->next=NULL; \ }while(0) //顾客 struct nTask { void (*task_fun)(struct nTask*); void *user_data; struct nTask *prev; struct nTask *next; }; //业务员 struct nWorker{ pthread_t threadid; int terminate; //判断线程是否终止 struct nManager *manager; //业务员中应当有管理权限,如查看任务队列是否为空 struct nWorker *prev; struct nWorker *next; }; //线程池 typedef struct nManager{ struct nTask *tasks; //任务队列 struct nWorker *workers; //执行队列 pthread_mutex_t mutex; pthread_cond_t cond; }ThreadPool; //'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' //''''''''''''''''''''接口层'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' //线程的回调 //判断任务队列是否内是否有任务,若有取出执行,若无等待任务 static void *nThreadPoolCallback(void *arg){ struct nWorker *worker=(struct nWorker *)arg; printf("nThreadPoolCallback\n"); while (1){ //加锁 pthread_mutex_lock(&worker->manager->mutex); //若任务队列为空,等待 while(worker->manager->tasks == NULL){ if (worker->terminate) break; //此时该线程上的任务队列为空,因此不解锁退出也不会影响其他线程,即不会造成死锁 pthread_cond_wait(&worker->manager->cond,&worker->manager->mutex); } if(worker->terminate){ //需要先解锁后退出,否则会死锁 pthread_mutex_unlock(&worker->manager->mutex); break; } //取出第一个任务,而后从任务队列删除该任务 struct nTask *task=worker->manager->tasks; //指向任务队列(链表)的第一个任务 LIST_REMOVE(task,worker->manager->tasks); //解锁 pthread_mutex_unlock(&worker->manager->mutex); //线程执行该任务 task->task_fun(task); } free(worker); } //创建线程池 int nThreadPoolCreat(ThreadPool *pool,int numWorkers){ if (pool == NULL) return -1; if (numWorkers < 1) numWorkers=1; memset(pool,0,sizeof(ThreadPool)); //初始化 pthread_mutex_init(&pool->mutex,NULL); pthread_cond_init(&pool->cond,NULL); //创建线程(业务员),加入到执行队列中 int i=0; for(i=0;i<numWorkers;i++){ //分配内存,并初始化(堆区创建的数据一定要初始化) struct nWorker *worker=(struct nWorker *)malloc(sizeof(struct nWorker )); if (worker == NULL){ perror("malloc"); return -2; } memset(worker,0,sizeof(struct nWorker)); worker->manager=pool; //创建线程(业务员) int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker); if (ret){ perror("pthread_creat"); free(worker); return -3; } //将线程(业务员)加入到任务队列中 printf("LIST_INSERT\n"); LIST_INSERT(worker,pool->workers); } //sucess return 0; } //销毁线程池 int nThreadPoolDestory(ThreadPool *pool){ //将线程池内的所有线程状态terminate设为真,表示退出 struct nWorker *worker=NULL; for (worker=pool->workers;worker!=NULL;worker=worker->next){ worker->terminate; } //获取与条件变量相关联的互斥锁-->向所有等待在该条件变量上的线程发送信号以唤醒它们-->释放线程池的互斥锁 //pthread_cond_broadcast配合pthread_cond_wait使用,获取的是同一把锁 pthread_mutex_lock(&pool->mutex); pthread_cond_broadcast(&pool->cond); pthread_mutex_unlock(&pool->mutex); //将线程池中的workers和tasks指针分别设置为NULL,释放内存空间 pool->tasks=NULL; pool->workers=NULL; return 0; } //往线程池加入任务 int nThreadPoolPushTask(ThreadPool *pool,struct nTask *task){ /*pthread_cond_signal通知等待在条件变量上的某个线程去获取锁并继续执行。如果此时没有加锁, 那么可能会出现竞态条件(Race Condition),也就是多个线程同时进入临界区域并修改共享数据,从而导致程序逻辑错误或者崩溃。 */ pthread_mutex_lock(&pool->mutex); LIST_INSERT(task,pool->tasks); pthread_cond_signal(&pool->cond); pthread_mutex_unlock(&pool->mutex); } //'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' //''''''''''''''''''''调试接口'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' #if 1 #define THREADPOOL_INIT_COUNT 2 #define TASK_INIT_COUNT 20 void task_entry(struct nTask *task){ int idx=*(int *)task->user_data; printf("idx: %d\n",idx); free(task->user_data); free(task); } int main(){ ThreadPool pool={0}; //创建线程 nThreadPoolCreat(&pool,THREADPOOL_INIT_COUNT); printf("nThreadPoolCreat -- finish\n"); //创建任务 int i=0; for (i=0;i<TASK_INIT_COUNT;i++){ struct nTask *task=(struct nTask *)malloc(sizeof(struct nTask)); if (task==NULL){ perror("malloc"); exit(1); } memset(task,0,sizeof(struct nTask)); task->task_fun=task_entry; task->user_data=malloc(sizeof(int)); *(int *)task->user_data=i; //加入任务队列 nThreadPoolPushTask(&pool,task); } getchar(); } #endif