C语言线程解池解读和实现01
在说到并发,池式组件的时候,最先想到的肯定是线程池。那线程池的原理是什么呢,又是如何工作的呢?这篇文章告诉你答案。
知识梳理
- 什么是线程池
线程池就是维护和管理一定数量线程的池式组件。有提高CPU工 作效率的作用 - 为什么需要线程池
通俗来说,如果我们有一个IO十分耗时但是我们又是单线程的那么我们的线程将会阻塞,等待这个IO执行完之后才会继续执行。这样就会十分耗时。 - 线程池有哪些组件
- 任务队列
- 一定数量的线程
- 锁(保证线程安全)
- 线程池是如歌管理线程的
- 有任务:执行任务
- 没有任务:CPU休眠
头文件解读
我们先来看一下头文件吧:
#ifndef THRDPOOL_H_ #define THRDPOOL_H_ typedef struct thrdpool_s thrdpool_t; typedef void (*handler_pt)(void*); #ifdef __cplusplus extern "C" { #endif thrdpool_t *thrdpool_create(int thrd_count); void thrdpool_treminater(thrdpool_t *pool); int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg); void thrdpool_waitdone(thrdpool_t *pool); #ifdef __cplusplus } #endif #endif
代码解读:
作为一个组件,或者说一个库,我们并不希望别人可以看到我们内部是如何实现的,所以我们告诉用户如何使用即可。thrdpool_create用来初始化一个线程池,参数是线程数量。thrdpool_terminater用来停止线程池。thrdpool_post用来抛出任务,即在哪个线程池,执行哪个函数,参数是什么。thrdpool_waitdone检测线程是不是都执行完。
注意,虽然我们不希望用户看到我们的实现,但是要告诉用户我们库的使用规范,就是最前面的两个typedef。他告诉用户我们线程池的对象类型和传入任务的规范。
由于我们支持C++使用我们库,所以我们加上#ifdef __cplusplus extern “C”。就是说如果是C++,那我们就采用C规则编译这个文件。
数据结构解读
队列
typedef struct task_s { void *next; handler_pt func;//对应函数 void *arg;//参数 } task_t; typedef struct task_queue_s { void *head;//头指针 void **tail;//尾指针 int block;//标志 spinlock_t lock;//回旋锁 pthread_mutex_t mutex;//互斥锁 pthread_cond_t cond;//条件 } task_queue_t;
代码解读:
这里我们先看一下我们队列结构的示意图:
我们使用一个链式结构。将所有task连接在一起,然后有一个manager来管理这些task。看了这张图结合上面的代码就可以很清楚的了解到我们队列的结构。
池
typedef struct thrdpool_s { task_queue_t *task_queue;//任务队列 atomic_int quit;//标志 uint32_t thrd_count;//池内线程数量 pthread_t *threads;//线程数组 } thrdpool_t;
代码解读:
这里没有什么好说的。其中标志的意思就是:如果为0,正常运行,如果为1阻塞。它是一个原子变量,所以具有线程安全。由于就这一个变量,没有设计复杂的操作,所以没有使用锁的必要,我们用原子变量即可解决。
代码实现
初始化队列
static task_queue_t* __taskqueue_create() { int ret; task_queue_t *queue = (task_queue_t*)malloc(sizeof(*queue)); if(queue) { ret = pthread_mutex_init(&queue->mutex); if(ret == 0) { ret = pthread_cond_init(&queue->cond); if(ret == 0) { spinlock_init(&queue->lock); queue->head = NULL; queue->tail = &queue->head; queue->block = 1; return queue; } pthread_mutex_destroy(&queue->mutex); } free(queue); } return NULL; }
代码解读:
没有什么特别好说的,这里就是申请一块内存,然后对其中的结构体成员进行初始化。如果全部初始化成功就返回指针。主要注意的是,我们的block要初始化为0,就是默认在阻塞状态,因为刚刚初始化,队列里面没有任何任务。
移除阻塞
static int __nonblock(task_queue_t *queue) { pthread_mutex_lock(&queue->mutex); queue->block = 0; pthread_mutex_unlock(&queue->mutex); pthread_cond_broadcast(&queue->cond); }
代码解读:
在对task_queue内部变量操作的时候,必须使用锁来保证线程安全。将标志为阻塞的block置0,解除阻塞。然后广播,唤醒其他线程。
插入任务
static inline __add_task(task_queue_t *queue, void *task) { void **link = (void**)task; *link = NULL; spinlock_lock(&queue->lock); *queue->tail = link; queue->tail = link; spinlock_unlock(&queue->lock); pthread_cond_signal(&queue->cond); }
代码解读:
首先,我们让task->next指向NULL。然后,在加锁的情况下,对队列进行尾插。最后唤醒一个线程获取任务。这里需要解释一下我们的写法,为什么可以这样写。事实上,在内核中,队列都是这样的一级指针头+二级指针尾的写法,原理请看图:
从指正管理长度的角度来看void*就是管理了整个node,但是**void就是管理指向内存的后面8个字节(64位操作系统)即*next;
所以*queue->tail 就是 queue->tail->next;同理*link 就是 *next。
删除任务
static task_t* __pop_task(task_queue_t *queue) { spinlock_lock(&queue->lock); if(queue->head == NULL) return NULL; task_t *task; void **link = (void**)queue->head; queue->head = *link; if(queue->head == NULL) queue->tail = &queue->head; spinlock_unlock(&queue->lock); }
代码讲解:
我们将最前面的task取出,然后head指针指向后面一个。
获取任务
static void* __get_task(task_queue_t *queue) { task_t *task; pthread_mutex_lock(&queue->mutex); while(task = __pop_task(queue) == NULL) { if(queue->block == 0) { pthread_mutex_unlock(&queue->mutex); return NULL; } pthread_cond_wait(&queue->cond, &queue->mutex); pthread_mutex_unlock(&queue->mutex); } return task; }
代码解读:
我们调用上面写的__pop_task来获取队列最前面的task。如果队列是非阻塞的,那就直接返回。如果队列是阻塞的(刚刚初始化完),那我们就要进入休眠,等待__add_task里面的唤醒。然后返回获取到的task。
队列的销毁
static void __destroy_task_queue(task_queue_t *queue) { task_t *task; while(task = __pop_task(queue)) { free(task); } pthread_mutex_destroy(&queue->mutex); pthread_cond_destroy(&queue->cond); spinlock_destroy(&queue->lock); free(queue); }
代码解读:
释放全部资源销毁即可。
线程池工作
static void* __thrdpool_work(void *arg) { thrdpool_t *thrdpool = (thrdpool_t*)arg; task_t *task; void *cxt; if(atomic_load(&thrdpool->quit) == 0) { task = (task_t*)__get_task(thrdpool->task_queue); handler_pt func = task->func; cxt = task->arg; free(task); func(cxt); } return NULL; }
代码解读:
线程停止
static void __thrdpool_terminater(thrdpool_t *pool) { atomic_store(&pool->quit, 1); __nonblock(pool->task_queue); int i = 0; for(i; i < pool->thrd_count; i++) { pthread_join(pool->threads[i], NULL); } }
代码解读:
首先让线程池阻塞,然后执行完当前所有线程的任务。
创建线程池
static int __thrdpool_create(thrdpool_t *pool, int thrd_num) { int ret; pthread_attr_t attr; ret = pthread_attr_init(&attr); if(ret == 0) { pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thrd_num); if(pool->threads) { int i = 0; for(i; i < thrd_num; i++) { if(pthread_create(&pool->threads[i], &attr, __thrdpool_work, pool) != 0) { break; } } pool->thrd_count = i; if(thrd_num == i) return 0; __thrdpool_terminater(pool); free(pool->threads); } ret = -1; } return ret; }
代码解读:
主要就是在堆上开辟空间,然后使用循环批量创建线程。就是要注意一点一点来,如果资源创建失败需要及时销毁资源并且返回。
接口
void thrdpool_treminater(thrdpool_t *pool) { atomic_store(&pool->quit, 1); __nonblock(pool->task_queue); } thrdpool_t *thrdpool_create(int thrd_count) { thrdpool_t *pool = (thrdpool_t*)malloc(sizeof(thrdpool_t)); if(pool) { task_queue_t *task = __taskqueue_create(); if(task) { pool->task_queue = task; int ret = __thrdpool_create(pool, thrd_count); if(ret == 0) { return pool; } __destroy_task_queue(pool->task_queue); } free(pool); } return NULL; } int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg) { task_t *task = (task_t*)malloc(sizeof(task_t)); if(atomic_load(&pool->quit) == 1) { return -1; } task->arg = arg; task->func = func; __add_task(pool->task_queue, task); return 0; } void thrdpool_waitdone(thrdpool_t *pool) { int i = 0; for(i; i < pool->thrd_count; i++) { pthread_join(pool->threads[i], NULL); } __destroy_task_queue(pool->task_queue); free(pool->threads); free(pool); }
代码解读:
这里的接口是提供给客户使用的。这里需要注意waitdone和terminater的区别。一个是完全停止线程池(带有销毁功能),一个是不销毁,只是暂停。