线程池的重点
- 为什么使用线程池
- 线程池的构成,其中最重要的是队列的设计(结合线程池网络编程的实例)。
1、线程池的概念
1.1、为什么要使用线程池
对于某些特别耗时的任务,严重影响了该线程处理其他任务,需要将这些任务交给其他线程异步处理,但是频繁创建和销毁线程也会给系统带来大量开销。线程池的出现可以很好解决上述问题,线程池中的线程数量需要我们在线程资源的开销和 cpu 核心间进行平衡选择,要做到既能减少线程创建和销毁的开销,又能最大限度的提高 cpu 利用率。
1.2、线程池的作用
- 复用线程资源,减少线程创建和销毁的开销
- 可异步处理生产者线程的任务。
1.3、线程池的构成
线程池其实是一个生产者和消费者模型,生产者线程发布任务,将任务放入队列中。线程池中的空闲消费者线程从队列中取出任务,并执行任务。
线程池的工作原理
如图,线程池有三个核心组件
- 生产者线程:发布任务
- 队列:存储任务结点,其中包括任务的上下文、任务的执行函数等
- 线程池(消费者):取出任务,执行任务。此过程涉及到线程调度的问题,即从无到有唤醒,从有到无休眠。
1.4、线程池线程数量的平衡选择
线程池线程数量的经验公式:(io等待时间 + cpu运算时间)* 核心数 / cpu运算时间
- io 密集型:2 * cpu 核心数 + 有效磁盘数(2)
- cpu 密集型:与 cpu 核心数一致
2、线程池的实现
2.1、线程池的组件
- 生产者线程:主线程
- 任务队列
// 任务结点 typedef struct task_s { handler_pt func; // 任务的执行函数 void * arg; // 任务的上下文 } task_t; // 任务队列 typedef struct task_queue_s { uint32_t head; // 队列的头指针 uint32_t tail; // 队列的尾指针 uint32_t count; // 队列中的任务结点数量 task_t *queue; // 队列数组 } task_queue_t;
- 线程池
struct thread_pool_t { pthread_mutex_t mutex; pthread_cond_t condition; pthread_t *threads; task_queue_t task_queue; int closed; // 退出标记 int started; // 当前运行的线程数 int thrd_count; // 线程的数量 int queue_size; // 队列的长度,设置数组,一次性分配内存,不使用内存池 };
2.2、线程池的接口
2.2.1、创建线程池
- 线程池的线程数量:根据经验公式计算得出
- 队列的长度:线程的栈空间固定大小(默认8M),由线程数量和线程栈空间大小共同决定
thread_pool_t* thread_pool_create(int thrd_count, int queue_size) { thread_pool_t *pool; if (thrd_count <= 0 || queue_size <= 0) { return NULL; } // 线程池分配内存 pool = (thread_pool_t*) malloc(sizeof(*pool)); if (pool == NULL) { return NULL; } // 初始化线程池 // 为什么不直接用thrd_count赋值?而是选择从0开始计数 // 每当成功创建1个线程,计数+1,避免线程创建失败造成计数混乱 pool->thrd_count = 0; pool->queue_size = queue_size; pool->task_queue.head = 0; pool->task_queue.tail = 0; pool->task_queue.count = 0; pool->started = pool->closed = 0; // 创建任务队列 pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size); if (pool->task_queue.queue == NULL) { // TODO: free pool return NULL; } // 创建线程 pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count); if (pool->threads == NULL) { // TODO: free pool return NULL; } // 依次创建好线程 for (int i = 0; i < thrd_count; ++i) { if (pthread_create(&(pool->threads[i]), NULL, thread_worker, (void*)pool) != 0) { // TODO: free pool return NULL; } pool->thrd_count++; pool->started++; } return pool; } // 线程池中的线程(消费者)该干的事儿 static void* thread_worker(void *thrd_pool) { thread_pool_t *pool = (thread_pool_t*)thrd_pool; task_queue_t *que; task_t task; for (;;) { pthread_mutex_lock(&(pool->mutex)); que = &pool->task_queue; // 虚假唤醒问题 // while 判断:没有任务而且线程池没有关闭 while (que->count == 0 && pool->closed == 0) { // pthread_mutex_unlock(&(pool->mutex)) // 阻塞在 condition // =================================== // 解除阻塞 // pthread_mutex_lock(&(pool->mutex)); pthread_cond_wait(&(pool->condition), &(pool->mutex)); } // 线程池关闭 if (pool->closed == 1) break; // 获取任务 task = que->queue[que->head]; que->head = (que->head + 1) % pool->queue_size; que->count--; pthread_mutex_unlock(&(pool->mutex)); // 执行任务 (*(task.func))(task.arg); } // 销毁该线程 pool->started--; pthread_mutex_unlock(&(pool->mutex)); pthread_exit(NULL); return NULL; }
虚假唤醒
上述代码注释中提到了虚假唤醒,那么,什么是虚假唤醒?
虚假唤醒指的是在多线程环境下,多个线程等待在同一个条件上。当条件满足时,可能唤醒多个线程。如果这个资源只能由一个线程获得,剩余线程无法获得该资源。对于无法获得资源的线程来说,这种唤醒是无意义的,这种现象称为虚假唤醒。
虚假唤醒产生的原因
- 信号中断导致的问题(Linux2.6以后已解决)
pthread_cond_signal
,至少唤醒1个线程,即可能同时唤醒多个线程。
为避免虚假唤醒的发生,每个被唤醒的线程都需要再检查一次条件是否满足。如果不满足,应该继续睡眠;只有满足了才能往下执行。因此,需要把条件变量的判断从 if 判断换成 while 判断,此时,一次只能唤醒一个线程。
2.2.2、销毁线程池
- 标记线程池退出
- 通知所有线程
int thread_pool_destroy(thread_pool_t *pool) { if (pool == NULL) { return -1; } // 阻止产生新的任务 if (pthread_mutex_lock(&(pool->mutex)) != 0) { return -2; } // 判断是否已经退出,防止重复释放空间 if (pool->closed) { thread_pool_free(pool); return -3; } // 标记线程池退出 pool->closed = 1; // 唤醒所有阻塞在cond上的线程,并释放互斥锁 if (pthread_cond_broadcast(&(pool->condition)) != 0 || pthread_mutex_unlock(&(pool->mutex)) != 0) { thread_pool_free(pool); return -4; } // 等待所有线程退出 wait_all_done(pool); // 释放线程池空间 thread_pool_free(pool); return 0; }
2.2.3、生产者抛出任务
- 构造任务
- 放入队列
- 唤醒线程
int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) { if (pool == NULL || func == NULL) { return -1; } task_queue_t *task_queue = &(pool->task_queue); if (pthread_mutex_lock(&(pool->mutex)) != 0) { return -2; } // 判断线程池是否关闭 if (pool->closed) { pthread_mutex_unlock(&(pool->mutex)); return -3; } // 判断任务队列是否已满 if (task_queue->count == pool->queue_size) { pthread_mutex_unlock(&(pool->mutex)); return -4; } // 1、主线程(生产者线程)构造任务,放入任务队列 // 队列的操作,使用自旋锁 task_queue->queue[task_queue->tail].func = func; task_queue->queue[task_queue->tail].arg = arg; task_queue->tail = (task_queue->tail + 1) % pool->queue_size; task_queue->count++; // 2、唤醒线程池中的线程 if (pthread_cond_signal(&(pool->condition)) != 0) { pthread_mutex_unlock(&(pool->mutex)); return -5; } pthread_mutex_unlock(&(pool->mutex)); return 0; }
2.3、实例
// threadpool.h #ifndef _THREAD_POOL_H #define _THREAD_POOL_H typedef struct thread_pool_t thread_pool_t; typedef void (*handler_pt) (void *); // 创建线程池 thread_pool_t *thread_pool_create(int thrd_count, int queue_size); // 抛出任务 int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg); // 销毁线程池 int thread_pool_destroy(thread_pool_t *pool); // 等待所有线程的退出 int wait_all_done(thread_pool_t *pool); #endif // threadpool.c #include <pthread.h> #include <stdint.h> #include <stddef.h> #include <stdlib.h> #include "thrd_pool.h" // 任务结点 typedef struct task_s { handler_pt func; // 任务的执行函数 void * arg; // 任务的上下文 } task_t; // 任务队列 typedef struct task_queue_s { uint32_t head; // 队列的头指针 uint32_t tail; // 队列的尾指针 uint32_t count; // 队列中的任务结点数量 task_t *queue; // 队列数组 } task_queue_t; // 线程池 struct thread_pool_t { pthread_mutex_t mutex; pthread_cond_t condition; pthread_t *threads; task_queue_t task_queue; int closed; // 退出标记 int started; // 当前运行的线程数 int thrd_count; // 线程的数量 int queue_size; // 队列的长度,设置数组,一次性分配内存,不使用内存池 }; static void * thread_worker(void *thrd_pool); static void thread_pool_free(thread_pool_t *pool); // 创建线程池 thread_pool_t* thread_pool_create(int thrd_count, int queue_size) { thread_pool_t *pool; if (thrd_count <= 0 || queue_size <= 0) { return NULL; } // 线程池分配内存 pool = (thread_pool_t*) malloc(sizeof(*pool)); if (pool == NULL) { return NULL; } // 初始化线程池 // 为什么不直接用thrd_count赋值?而是选择从0开始计数 // 每当成功创建1个线程,计数+1,避免线程创建失败造成计数混乱 pool->thrd_count = 0; pool->queue_size = queue_size; pool->task_queue.head = 0; pool->task_queue.tail = 0; pool->task_queue.count = 0; pool->started = pool->closed = 0; // 创建任务队列 pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size); if (pool->task_queue.queue == NULL) { // TODO: free pool return NULL; } // 创建线程 pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count); if (pool->threads == NULL) { // TODO: free pool return NULL; } // 依次创建好线程 int i = 0; for (; i < thrd_count; ++i) { if (pthread_create(&(pool->threads[i]), NULL, thread_worker, (void*)pool) != 0) { // TODO: free pool return NULL; } pool->thrd_count++; pool->started++; } return pool; } // 生产者抛出任务 int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) { if (pool == NULL || func == NULL) { return -1; } task_queue_t *task_queue = &(pool->task_queue); if (pthread_mutex_lock(&(pool->mutex)) != 0) { return -2; } // 判断线程池是否关闭 if (pool->closed) { pthread_mutex_unlock(&(pool->mutex)); return -3; } // 判断任务队列是否已满 if (task_queue->count == pool->queue_size) { pthread_mutex_unlock(&(pool->mutex)); return -4; } // 1、主线程(生产者线程)构造任务,放入任务队列 // 队列的操作,使用自旋锁 task_queue->queue[task_queue->tail].func = func; task_queue->queue[task_queue->tail].arg = arg; task_queue->tail = (task_queue->tail + 1) % pool->queue_size; task_queue->count++; // 2、唤醒线程池中的线程 if (pthread_cond_signal(&(pool->condition)) != 0) { pthread_mutex_unlock(&(pool->mutex)); return -5; } pthread_mutex_unlock(&(pool->mutex)); return 0; } // 释放线程池空间 static void thread_pool_free(thread_pool_t *pool) { if (pool == NULL || pool->started > 0) { return; } if (pool->threads) { free(pool->threads); pool->threads = NULL; pthread_mutex_lock(&(pool->mutex)); pthread_mutex_destroy(&pool->mutex); pthread_cond_destroy(&pool->condition); } if (pool->task_queue.queue) { free(pool->task_queue.queue); pool->task_queue.queue = NULL; } free(pool); } // 等待所有线程的退出 int wait_all_done(thread_pool_t *pool) { int i, ret = 0; for (i = 0; i < pool->thrd_count; i++) { if (pthread_join(pool->threads[i], NULL) != 0) { ret = 1; } } return ret; } // 3、销毁线程池 int thread_pool_destroy(thread_pool_t *pool) { if (pool == NULL) { return -1; } // 阻止产生新的任务 if (pthread_mutex_lock(&(pool->mutex)) != 0) { return -2; } // 判断是否已经退出,防止重复释放空间 if (pool->closed) { thread_pool_free(pool); return -3; } // 标记线程池退出 pool->closed = 1; // 让所有阻塞在cond上的线程唤醒,并释放互斥锁 if (pthread_cond_broadcast(&(pool->condition)) != 0 || pthread_mutex_unlock(&(pool->mutex)) != 0) { thread_pool_free(pool); return -4; } // 等待所有线程退出 wait_all_done(pool); thread_pool_free(pool); return 0; } // 线程池中的线程(消费者)该干的事儿 static void* thread_worker(void *thrd_pool) { thread_pool_t *pool = (thread_pool_t*)thrd_pool; task_queue_t *que; task_t task; for (;;) { pthread_mutex_lock(&(pool->mutex)); que = &pool->task_queue; // 虚假唤醒问题 // while 判断:没有任务而且线程池没有关闭 while (que->count == 0 && pool->closed == 0) { // pthread_mutex_unlock(&(pool->mutex)) // 阻塞在 condition // =================================== // 解除阻塞 // pthread_mutex_lock(&(pool->mutex)); pthread_cond_wait(&(pool->condition), &(pool->mutex)); } // 线程池关闭 if (pool->closed == 1) break; // 获取任务 task = que->queue[que->head]; que->head = (que->head + 1) % pool->queue_size; que->count--; pthread_mutex_unlock(&(pool->mutex)); // 执行任务 (*(task.func))(task.arg); } // 销毁该线程 pool->started--; pthread_mutex_unlock(&(pool->mutex)); pthread_exit(NULL); return NULL; } // main.c #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include "thrd_pool.h" int nums = 0; int done = 0; pthread_mutex_t lock; void do_task(void *arg) { usleep(10000); pthread_mutex_lock(&lock); done++; printf("doing %d task\n", done); pthread_mutex_unlock(&lock); } int main(int argc, char **argv) { int threads = 8; int queue_size = 256; if (argc == 2) { threads = atoi(argv[1]); if (threads <= 0) { printf("threads number error: %d\n", threads); return 1; } } else if (argc > 2) { threads = atoi(argv[1]); queue_size = atoi(argv[1]); if (threads <= 0 || queue_size <= 0) { printf("threads number or queue size error: %d,%d\n", threads, queue_size); return 1; } } thread_pool_t *pool = thread_pool_create(threads, queue_size); if (pool == NULL) { printf("thread pool create error!\n"); return 1; } while (thread_pool_post(pool, &do_task, NULL) == 0) { pthread_mutex_lock(&lock); nums++; pthread_mutex_unlock(&lock); } printf("add %d tasks\n", nums); wait_all_done(pool); printf("did %d tasks\n", done); thread_pool_destroy(pool); return 0; }
3、线程池网络编程
3.1、reactor 的问题
reactor
在一个事件循环中,可以处理多个就绪事件。这些就绪事件在 reactor 模型中是串行执行的,一个事件处理若耗时较长,会延迟其他同时触发的事件的处理。
void eventloop_once(reactor_t * r, int timeout) { int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout); for (int i = 0; i < n; ++i) { struct epoll_event *e = &r->fire[i]; int mask = e->events; // 用 io 函数捕获具体的错误信息 if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT; // 用 io 函数捕获断开的具体信息 if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT; event_t *et = (event_t*) e->data.ptr; // 处理读事件 if (mask & EPOLLIN) { if (et->read_fn) { et->read_fn(et->fd, EPOLLIN, et); } } // 处理写事件 if (mask & EPOLLOUT) { if (et->write_fn) { et->write_fn(et->fd, EPOLLOUT, et); } else { uint8_t * buf = buffer_write_atmost(&et->out); event_buffer_write(et, buf, buffer_len(&et->out)); } } } }
接下来,介绍 nginx, redis, skynet 线程池实现形式,都是对 reactor 模型的优化。
3.2、nginx 线程池
为什么使用线程池?
nginx 是磁盘 io 密集型。业务逻辑是处理文件缓冲,需要磁盘 io 操作,耗费大量的时间。
什么情况下使用线程池?
nginx 线程池作用 compute 阶段,将业务逻辑交给线程池处理。
怎样使用线程池?
nginx 的线程池有两个消息队列,当线程池处理完任务后,将处理的结果放入完成消息队列,以事件的方式通知主线程,主线程从完成消息队列中取出结果发送给客户端。
nginx线程池设计原理
值得注意的是,nginx 不推荐线程池的方式,可以采用替代的操作 sendfile
, directio
解决磁盘 io 的问题。
3.3、redis 线程池
为什么使用线程池?
redis 是网络 io 密集型,需要同时处理多条并发请求,存在读写 io 的问题(请求大量数据,写日志业务等)。
什么情况下使用线程池?
redis 线程池作用读写 io 阶段,即 read, decode 和 encode, send 阶段。主线程处理业务逻辑,之所以用单线程处理业务,是因为 redis 采用高效的数据结构,其业务逻辑处理较快。
redis 线程池作用阶段
怎样使用线程池?
主线程拥有两个全局队列clients_pending_read
和clients_pending_write
,每个 io 线程(主线程同时也是 io 线程)拥有一个专属队列 io_threads_list[id]
。主线程既作为生产者,产生任务;又作为消费者,获取任务执行。
首先,主线程将一次循环的所有就绪的读事件收集到自己的全局任务队列clients_pending_read
中,再把每个事件负载均衡地分配到每个 io 线程的专属任务队列中。一次事件循环中不会出现同名 fd,不同的 fd 分配到每个 io 线程各自的队列中,避免了多个 io 线程同时从全局队列中取数据,因此,不需要加锁操作。
接下来,io 线程从自己的专属队列中取出任务,(除主线程外)并发执行 read 和 decode 操作。主线程将解析后的任务做 compute 操作。最后,io 线程(包括主线程)并发执行 encode 和 send 操作。
redis 线程池设计原理
源码实现细节,见我的另一篇文章,redis 源码解析:IO 线程池。
3.4、skynet 线程池
为什么使用线程池?
skynet 是 cpu 密集型。处理读写 io 、数据包解压缩、业务逻辑处理等。特别地,当同一个 io 在多个线程处理时,将写 io 转由网络线程处理。
什么情况下使用线程池?
skynet 线程池作用于除 send 外的所有阶段。
怎样使用线程池?
skynet 的网络线程收集事件,将事件插入到对应的 actor 的专属消息队列中。全局消息队列存储有消息的 actor,即描述活跃的 actor,线程池负责检查该全局队列。
skynet线程池设计原理