什么是线程池❔
线程池是一个维持固定数量线程的池式结构
- 问:为什么要是固定数量
**答:**线程是一个紧缺的系统资源,随着线程不断的创建,达到一定数量后,操作系统的压力会越来越大 - 问: 如何来决定线程池中线程的数量
**答:**区分当前场景下是cpu
密集型还是io
密集型1
.cpu
密集型(需要利用cpu
进行复杂耗时的计算):proc
个2
.io
密集型(大量的网络io
):2*proc
个 - 问:为什么需要使用线程池
**答:**某类任务特别耗时,会影响当前线程处理其他任务,我们就会将它抛到其他线程中去行,这些任务会被异步的执行,但是线程的创建与销毁的开销很大,于是就用一个池式结构来管理固定数量的线程,我们叫做线程池,主要作用为:复用线程资源、充分利用线程资源 - 问:线程池属于什么类型模型
答:属于生产者消费者模型
线程池基本大概运行框架🖼
首先线程池中维护一堆运行的线程(是通过pthread_create
线程,并运行线程的专属入口函数为work
,它会线程中一直运行),会一直获取任务队列中的任务节点(当任务队列中没有节点则线程会休眠),而线程池暴露了一个post
接口给用户,用户可以将要执行的函数以及其上下文(参数),以任务节点的形式放进任务队列,此时会唤醒线程池中的线程来执行该任务,当任务执行完毕后线程会继续运行work
函数获取任务队列的节点
线程池的实现🍻
线程池中的基本组成结构🍺
任务节点链表 | 任务队列 | 线程池 |
任务实体节点用链表组织 | 用阻塞队列的方式管理任务实体 | 包含任务队列,包含线程id 数组 |
任务节点链表📑
typedef struct task_s { void *next; handler_pt func; void *arg; } task_t;
- 第一个字段必须为
next
指针,方便后续任务队列二级指针的操作 - 后面的字段分别是函数指针和参数列表的参数
任务队列📑
这个队列是在多线程环境下,用于管理大量任务
typedef struct task_queue_s { void *head; void **tail; int block; spinlock_t lock; pthread_mutex_t mutex; pthread_cond_t cond; } task_queue_t;
- 指向队列头的指针
- 指向队列尾部指针的指针(可以很方便的对队列进行增加删除操作)
- 阻塞信息,当为
1
时表示使用阻塞队列,当任务队列为空时会阻塞休眠等待任务队列中有任务节点 - 用于任务队列增删的时候的锁,使用自旋锁是因为队列增删节点的时间很快
mutex
与cond
用于当前线程访问任务队列,任务队列为空时可以进行休眠,等待队列有节点时的信号
线程池结构📑
struct thrdpool_s { task_queue_t *task_queue; atomic_int quit; int thrd_count; pthread_t *threads; };
- 任务队列
- 线程池中线程的状态,
1
代表不会启动,0
代表运行中 - 线程池中线程的数量
- 线程
id
数组
线程池接口设计⁉️
在c
中,可以在.c
文件中对函数使用static
来向其他文件屏蔽该函数,使该函数只能在本文件中访问,比如:
- 有一个
a.c
文件里面有两个函数,一个是static
的一个不是static
,在另一个b.c
文件中调用a.c
,文件中的函数则只能调用非static
函数,如果想要调用static
,可以使用非static
函数内部间接的调用该函数
暴露给用户的
- 线程池的创建
- 往任务队列中增加节点的函数
- 线程池状态的控制,可以控制线程池中的线程暂停
- 回收线程池中的相关资源的
api
线程池内部的基本必要函数(使用static
屏蔽)
**小注:**为什么
pthread_create
调用成员函数时,该成员函数必须声明为静态函数,不然就会与c
的pthread_create
函数的函数不匹配
pthread_create
第三个参数是一个参数列表为void*
,返回类型为void*
的函数指针,非static
的成员函数的参数列表会隐含的穿一个this指针,导致其参数列表,隐式的会有两个参数从而与pthread_create
的参数列表不匹配
work
工作函数- 对任务队列的增删函数
- 实际线程的创建函数
- 线程的创建函数(会被用户的线程池创建函数调用)
实际coding
部分
回滚式写法:一般是涉及到资源的创建时会使用这种写法,没有创建成功则立即回滚处理,条件判断,只在内层条件判断成功后返回正确内容,其他条件分支统统是错误回滚处理
创建任务队列函数
static task_queue_t * __taskqueue_create() { task_queue_t *queue = (task_queue_t *)malloc(sizeof(*queue)); if (!queue) return NULL; int ret; ret = pthread_mutex_init(&queue->mutex, NULL); if (ret == 0) { ret = pthread_cond_init(&queue->cond, NULL); if (ret == 0) { spinlock_init(&queue->lock); // 自己写的用 只是一个变量不需要释放 queue->head = NULL; queue->tail = &queue->head; queue->block = 1; return queue; } pthread_cond_destroy(&queue->cond); } pthread_mutex_destroy(&queue->mutex); return NULL; }
- 疑惑操作:
queue->head = NULL; queue->tail = &queue->head;
取消阻塞函数
static void __nonblock(task_queue_t *queue) { pthread_mutex_lock(&queue->mutex); queue->block = 0; // 可以使用原子操作来避免锁 pthread_mutex_unlock(&queue->mutex); pthread_cond_broadcast(&queue->cond); }
- 采用条件变量广播唤醒所有睡眠等待的线程,让他们退出
获取任务队列节点函数
static inline void * __get_task(task_queue_t *queue) { task_t *task; // 虚假唤醒 while ((task = __pop_task(queue)) == NULL) { pthread_mutex_lock(&queue->mutex); if (queue->block == 0) { // break; return NULL; } pthread_cond_wait(&queue->cond, &queue->mutex); pthread_mutex_unlock(&queue->mutex); } return task; }
- 这个函数被
work
函数调用之后会如果队列为空则会阻塞线程
向任务队列中添加节点
static inline void __add_task(task_queue_t *queue, void *task) { void **link = (void **)task; // malloc *link = NULL; // task->next = NULL; spinlock_lock(&queue->lock); *queue->tail = link; // 将尾部指向link节点 queue->tail = link; spinlock_unlock(&queue->lock); pthread_cond_signal(&queue->cond); }
link
是void**
类型的,*link
则是一个void*
类型的,具体指task->next
的结构体,因此可以使用(task_t*)(*link)
这样就可以(task_t*)(*link)->func
来访问其成员,但是队列中用二维指针管理的基本都是尾元素,所以(*link)
一般都指向NULL
,故使用(task_t*)(*link)->func
来访问成员都是未定义的
取任务队列的头结点
static inline void * __pop_task(task_queue_t *queue) { spinlock_lock(&queue->lock); if (queue->head == NULL) { // 任务队列为空 spinlock_unlock(&queue->lock); return NULL; } task_t *task; task = queue->head; queue->head = task->next; if (queue->head == NULL) { queue->tail = &queue->head; } spinlock_unlock(&queue->lock); return task; }
线程工作函数
static void * __thrdpool_worker(void *arg) { thrdpool_t *pool = (thrdpool_t*) arg; task_t *task; void *ctx; while (atomic_load(&pool->quit) == 0) { task = (task_t*)__get_task(pool->task_queue); // 内含休眠 if (!task) break; handler_pt func = task->func; ctx = task->arg; free(task); func(ctx); } return NULL; }
free
函数是释放task
指向的内存,但是free
函数并不会强制操作系统立即回收该内存空间,而是将其加入内存池中,以备下一次分配使用。所以我们在free
之前保存其堆中指向的内存,还可以使用。
线程池创建函数
thrdpool_t * thrdpool_create(int thrd_count) { thrdpool_t *pool; pool = (thrdpool_t*) malloc(sizeof(*pool)); if (!pool) return NULL; task_queue_t *queue = __taskqueue_create(); if (queue) { pool->task_queue = queue; atomic_init(&pool->quit, 0); if (__threads_create(pool, thrd_count) == 0) { return pool; } __taskqueue_destroy(pool->task_queue); } free(pool); return NULL; }
- 包括了队列的创建与线程们的创建
线程池的使用
**用来干什么:**用来增加一个全局变量的值,增加到1000
后使线程池terminate
**怎么用:**初始化线程池,不停的往任务队列中抛任务节点进去,每个任务仅对临界资源+1
一次,用户不断的push
任务,线程池中的线程不断地去竞争队列中的任务节点然后执行后返回然后继续竞争
任务函数
int done = 0; void do_task(void *arg) { thrdpool_t *pool = (thrdpool_t*)arg; pthread_mutex_lock(&lock); done++; printf("doing %d task\n", done); pthread_mutex_unlock(&lock); if (done >= 1000) { thrdpool_terminate(pool); } }
- 对全局变量
done
进行+1
操作
使用线程池
int threads = 8; pthread_mutex_init(&lock, NULL); thrdpool_t *pool = thrdpool_create(threads); if (pool == NULL) { perror("thread pool create error!\n"); exit(-1); } while (thrdpool_post(pool, &do_task, pool) == 0) { } thrdpool_waitdone(pool); pthread_mutex_destroy(&lock);
- 创建线程池
thrdpool_create
- 向线程池中发送数据
总结:
如果没有使用线程池,我们就是自己用户层的开启线程,来一个开一个,完全没有管理的概念,加入了线程池后,由线程池来管理线程们。
还有使用二级指针维护队列尾,使得队列的删除变得方便