线程池的实现

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 线程池的实现

线程池的重点

  • 为什么使用线程池
  • 线程池的构成,其中最重要的是队列的设计(结合线程池网络编程的实例)。

1、线程池的概念

1.1、为什么要使用线程池

对于某些特别耗时的任务,严重影响了该线程处理其他任务,需要将这些任务交给其他线程异步处理,但是频繁创建和销毁线程也会给系统带来大量开销。线程池的出现可以很好解决上述问题,线程池中的线程数量需要我们在线程资源的开销和 cpu 核心间进行平衡选择,要做到既能减少线程创建和销毁的开销,又能最大限度的提高 cpu 利用率。

1.2、线程池的作用

  • 复用线程资源,减少线程创建和销毁的开销
  • 可异步处理生产者线程的任务。

1.3、线程池的构成

线程池其实是一个生产者和消费者模型,生产者线程发布任务,将任务放入队列中。线程池中的空闲消费者线程从队列中取出任务,并执行任务。

1704877592665.jpg

线程池的工作原理


如图,线程池有三个核心组件

  • 生产者线程:发布任务
  • 队列:存储任务结点,其中包括任务的上下文、任务的执行函数等
  • 线程池(消费者):取出任务,执行任务。此过程涉及到线程调度的问题,即从无到有唤醒,从有到无休眠。

1.4、线程池线程数量的平衡选择

线程池线程数量的经验公式:(io等待时间 + cpu运算时间)* 核心数 / cpu运算时间

  • io 密集型:2 * cpu 核心数 + 有效磁盘数(2)
  • cpu 密集型:与 cpu 核心数一致

2、线程池的实现

2.1、线程池的组件

  • 生产者线程:主线程
  • 任务队列
// 任务结点
 typedef struct task_s {
  handler_pt func; // 任务的执行函数
  void * arg;      // 任务的上下文
 } task_t;
 // 任务队列
 typedef struct task_queue_s {
  uint32_t head;  // 队列的头指针
  uint32_t tail;  // 队列的尾指针
  uint32_t count; // 队列中的任务结点数量
  task_t *queue;  // 队列数组
 } task_queue_t;
  • 线程池
struct thread_pool_t {
  pthread_mutex_t mutex;
  pthread_cond_t condition;
  pthread_t *threads;
  task_queue_t task_queue;
  int closed;     // 退出标记
  int started;    // 当前运行的线程数
  int thrd_count; // 线程的数量
  int queue_size; // 队列的长度,设置数组,一次性分配内存,不使用内存池
 };

2.2、线程池的接口

2.2.1、创建线程池

  • 线程池的线程数量:根据经验公式计算得出
  • 队列的长度:线程的栈空间固定大小(默认8M),由线程数量和线程栈空间大小共同决定
thread_pool_t* thread_pool_create(int thrd_count, int queue_size) {
     thread_pool_t *pool;
     if (thrd_count <= 0 || queue_size <= 0) {
         return NULL;
     }
     // 线程池分配内存
     pool = (thread_pool_t*) malloc(sizeof(*pool));
     if (pool == NULL) {
         return NULL;
     }
     // 初始化线程池
     // 为什么不直接用thrd_count赋值?而是选择从0开始计数
     // 每当成功创建1个线程,计数+1,避免线程创建失败造成计数混乱
     pool->thrd_count = 0;  
     pool->queue_size = queue_size;
     pool->task_queue.head = 0;
     pool->task_queue.tail = 0;
     pool->task_queue.count = 0;
     pool->started = pool->closed = 0;
     // 创建任务队列
     pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
     if (pool->task_queue.queue == NULL) {
         // TODO: free pool
         return NULL;
     }
     // 创建线程
     pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);
     if (pool->threads == NULL) {
         // TODO: free pool
         return NULL;
     }
     // 依次创建好线程
     for (int i = 0; i < thrd_count; ++i) {
         if (pthread_create(&(pool->threads[i]), NULL, thread_worker, (void*)pool) != 0) {
             // TODO: free pool
             return NULL;
         }
         pool->thrd_count++;
         pool->started++;
     }
     return pool;
 }
 // 线程池中的线程(消费者)该干的事儿
 static void* thread_worker(void *thrd_pool) {
     thread_pool_t *pool = (thread_pool_t*)thrd_pool;
     task_queue_t *que;
     task_t task;
     for (;;) {
         pthread_mutex_lock(&(pool->mutex));
         que = &pool->task_queue;
         // 虚假唤醒问题
         // while 判断:没有任务而且线程池没有关闭
         while (que->count == 0 && pool->closed == 0) {
             // pthread_mutex_unlock(&(pool->mutex))
             // 阻塞在 condition
             // ===================================
             // 解除阻塞
             // pthread_mutex_lock(&(pool->mutex));
             pthread_cond_wait(&(pool->condition), &(pool->mutex));
         }
         // 线程池关闭
         if (pool->closed == 1) break;
         // 获取任务
         task = que->queue[que->head];
         que->head = (que->head + 1) % pool->queue_size;
         que->count--;
         pthread_mutex_unlock(&(pool->mutex));
         // 执行任务
         (*(task.func))(task.arg);
     }
     // 销毁该线程
     pool->started--;
     pthread_mutex_unlock(&(pool->mutex));
     pthread_exit(NULL);
     return NULL;
 }

虚假唤醒

上述代码注释中提到了虚假唤醒,那么,什么是虚假唤醒?

虚假唤醒指的是在多线程环境下,多个线程等待在同一个条件上。当条件满足时,可能唤醒多个线程。如果这个资源只能由一个线程获得,剩余线程无法获得该资源。对于无法获得资源的线程来说,这种唤醒是无意义的,这种现象称为虚假唤醒。

虚假唤醒产生的原因

  • 信号中断导致的问题(Linux2.6以后已解决)
  • pthread_cond_signal,至少唤醒1个线程,即可能同时唤醒多个线程。

为避免虚假唤醒的发生,每个被唤醒的线程都需要再检查一次条件是否满足。如果不满足,应该继续睡眠;只有满足了才能往下执行。因此,需要把条件变量的判断从 if 判断换成 while 判断,此时,一次只能唤醒一个线程。


2.2.2、销毁线程池

  • 标记线程池退出
  • 通知所有线程
int thread_pool_destroy(thread_pool_t *pool) {
     if (pool == NULL) {
         return -1;
     }
     // 阻止产生新的任务
     if (pthread_mutex_lock(&(pool->mutex)) != 0) {
         return -2;
     }
     // 判断是否已经退出,防止重复释放空间
     if (pool->closed) {
         thread_pool_free(pool);
         return -3;
     }
     // 标记线程池退出
     pool->closed = 1;
     // 唤醒所有阻塞在cond上的线程,并释放互斥锁
     if (pthread_cond_broadcast(&(pool->condition)) != 0 || 
             pthread_mutex_unlock(&(pool->mutex)) != 0) {
         thread_pool_free(pool);
         return -4;
     }
     // 等待所有线程退出
     wait_all_done(pool);
     // 释放线程池空间
     thread_pool_free(pool);
     return 0;
 }

2.2.3、生产者抛出任务

  • 构造任务
  • 放入队列
  • 唤醒线程
int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {
     if (pool == NULL || func == NULL) {
         return -1;
     }
     task_queue_t *task_queue = &(pool->task_queue);
     if (pthread_mutex_lock(&(pool->mutex)) != 0) {
         return -2;
     }
     // 判断线程池是否关闭
     if (pool->closed) {
         pthread_mutex_unlock(&(pool->mutex));
         return -3;
     }
     // 判断任务队列是否已满
     if (task_queue->count == pool->queue_size) {
         pthread_mutex_unlock(&(pool->mutex));
         return -4;
     }
     // 1、主线程(生产者线程)构造任务,放入任务队列
     // 队列的操作,使用自旋锁
     task_queue->queue[task_queue->tail].func = func;
     task_queue->queue[task_queue->tail].arg = arg;
     task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
     task_queue->count++;
     // 2、唤醒线程池中的线程
     if (pthread_cond_signal(&(pool->condition)) != 0) {
         pthread_mutex_unlock(&(pool->mutex));
         return -5;
     }
     pthread_mutex_unlock(&(pool->mutex));
     return 0;
 }

2.3、实例

// threadpool.h
 #ifndef _THREAD_POOL_H 
 #define _THREAD_POOL_H
 typedef struct thread_pool_t thread_pool_t;
 typedef void (*handler_pt) (void *);
 // 创建线程池
 thread_pool_t *thread_pool_create(int thrd_count, int queue_size);
 // 抛出任务
 int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg);
 // 销毁线程池
 int thread_pool_destroy(thread_pool_t *pool);
 // 等待所有线程的退出
 int wait_all_done(thread_pool_t *pool);
 #endif
 // threadpool.c
 #include <pthread.h>
 #include <stdint.h>
 #include <stddef.h>
 #include <stdlib.h>
 #include "thrd_pool.h"
 // 任务结点
 typedef struct task_s {
     handler_pt func;    // 任务的执行函数
     void * arg;         // 任务的上下文
 } task_t;
 // 任务队列
 typedef struct task_queue_s {
     uint32_t head;  // 队列的头指针
     uint32_t tail;  // 队列的尾指针
     uint32_t count; // 队列中的任务结点数量
     task_t *queue;  // 队列数组
 } task_queue_t;
 // 线程池
 struct thread_pool_t {
     pthread_mutex_t mutex;
     pthread_cond_t condition;
     pthread_t *threads;
     task_queue_t task_queue;
     int closed;     // 退出标记
     int started;    // 当前运行的线程数
     int thrd_count; // 线程的数量
     int queue_size; // 队列的长度,设置数组,一次性分配内存,不使用内存池
 };
 static void * thread_worker(void *thrd_pool);
 static void thread_pool_free(thread_pool_t *pool);
 // 创建线程池
 thread_pool_t* thread_pool_create(int thrd_count, int queue_size) {
     thread_pool_t *pool;
     if (thrd_count <= 0 || queue_size <= 0) {
         return NULL;
     }
     // 线程池分配内存
     pool = (thread_pool_t*) malloc(sizeof(*pool));
     if (pool == NULL) {
         return NULL;
     }
     // 初始化线程池
     // 为什么不直接用thrd_count赋值?而是选择从0开始计数
     // 每当成功创建1个线程,计数+1,避免线程创建失败造成计数混乱
     pool->thrd_count = 0;  
     pool->queue_size = queue_size;
     pool->task_queue.head = 0;
     pool->task_queue.tail = 0;
     pool->task_queue.count = 0;
     pool->started = pool->closed = 0;
     // 创建任务队列
     pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
     if (pool->task_queue.queue == NULL) {
         // TODO: free pool
         return NULL;
     }
     // 创建线程
     pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);
     if (pool->threads == NULL) {
         // TODO: free pool
         return NULL;
     }
     // 依次创建好线程
     int i = 0;
     for (; i < thrd_count; ++i) {
         if (pthread_create(&(pool->threads[i]), NULL, thread_worker, (void*)pool) != 0) {
             // TODO: free pool
             return NULL;
         }
         pool->thrd_count++;
         pool->started++;
     }
     return pool;
 }
 // 生产者抛出任务
 int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {
     if (pool == NULL || func == NULL) {
         return -1;
     }
     task_queue_t *task_queue = &(pool->task_queue);
     if (pthread_mutex_lock(&(pool->mutex)) != 0) {
         return -2;
     }
     // 判断线程池是否关闭
     if (pool->closed) {
         pthread_mutex_unlock(&(pool->mutex));
         return -3;
     }
     // 判断任务队列是否已满
     if (task_queue->count == pool->queue_size) {
         pthread_mutex_unlock(&(pool->mutex));
         return -4;
     }
     // 1、主线程(生产者线程)构造任务,放入任务队列
     // 队列的操作,使用自旋锁
     task_queue->queue[task_queue->tail].func = func;
     task_queue->queue[task_queue->tail].arg = arg;
     task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
     task_queue->count++;
     // 2、唤醒线程池中的线程
     if (pthread_cond_signal(&(pool->condition)) != 0) {
         pthread_mutex_unlock(&(pool->mutex));
         return -5;
     }
     pthread_mutex_unlock(&(pool->mutex));
     return 0;
 }
 // 释放线程池空间
 static void thread_pool_free(thread_pool_t *pool) {
     if (pool == NULL || pool->started > 0) {
         return;
     }
     if (pool->threads) {
         free(pool->threads);
         pool->threads = NULL;
         pthread_mutex_lock(&(pool->mutex));
         pthread_mutex_destroy(&pool->mutex);
         pthread_cond_destroy(&pool->condition);
     }
     if (pool->task_queue.queue) {
         free(pool->task_queue.queue);
         pool->task_queue.queue = NULL;
     }
     free(pool);
 }
 // 等待所有线程的退出
 int wait_all_done(thread_pool_t *pool) {
     int i, ret = 0;
     for (i = 0; i < pool->thrd_count; i++) {
         if (pthread_join(pool->threads[i], NULL) != 0) {
             ret = 1;
         }
     }
     return ret;
 }
 // 3、销毁线程池
 int thread_pool_destroy(thread_pool_t *pool) {
     if (pool == NULL) {
         return -1;
     }
     // 阻止产生新的任务
     if (pthread_mutex_lock(&(pool->mutex)) != 0) {
         return -2;
     }
     // 判断是否已经退出,防止重复释放空间
     if (pool->closed) {
         thread_pool_free(pool);
         return -3;
     }
     // 标记线程池退出
     pool->closed = 1;
     // 让所有阻塞在cond上的线程唤醒,并释放互斥锁
     if (pthread_cond_broadcast(&(pool->condition)) != 0 || 
             pthread_mutex_unlock(&(pool->mutex)) != 0) {
         thread_pool_free(pool);
         return -4;
     }
     // 等待所有线程退出
     wait_all_done(pool);
     thread_pool_free(pool);
     return 0;
 }
 // 线程池中的线程(消费者)该干的事儿
 static void* thread_worker(void *thrd_pool) {
     thread_pool_t *pool = (thread_pool_t*)thrd_pool;
     task_queue_t *que;
     task_t task;
     for (;;) {
         pthread_mutex_lock(&(pool->mutex));
         que = &pool->task_queue;
         // 虚假唤醒问题
         // while 判断:没有任务而且线程池没有关闭
         while (que->count == 0 && pool->closed == 0) {
             // pthread_mutex_unlock(&(pool->mutex))
             // 阻塞在 condition
             // ===================================
             // 解除阻塞
             // pthread_mutex_lock(&(pool->mutex));
             pthread_cond_wait(&(pool->condition), &(pool->mutex));
         }
         // 线程池关闭
         if (pool->closed == 1) break;
         // 获取任务
         task = que->queue[que->head];
         que->head = (que->head + 1) % pool->queue_size;
         que->count--;
         pthread_mutex_unlock(&(pool->mutex));
         // 执行任务
         (*(task.func))(task.arg);
     }
     // 销毁该线程
     pool->started--;
     pthread_mutex_unlock(&(pool->mutex));
     pthread_exit(NULL);
     return NULL;
 }
 // main.c
 #include <stdio.h>
 #include <stdlib.h>
 #include <pthread.h>
 #include <unistd.h>
 #include "thrd_pool.h"
 int nums = 0;
 int done = 0;
 pthread_mutex_t lock;
 void do_task(void *arg) {
     usleep(10000);
     pthread_mutex_lock(&lock);
     done++;
     printf("doing %d task\n", done);
     pthread_mutex_unlock(&lock);
 }
 int main(int argc, char **argv) {
     int threads = 8;
     int queue_size = 256;
     if (argc == 2) {
         threads = atoi(argv[1]);
         if (threads <= 0) {
             printf("threads number error: %d\n", threads);
             return 1;
         }
     } else if (argc > 2) {
         threads = atoi(argv[1]);
         queue_size = atoi(argv[1]);
         if (threads <= 0 || queue_size <= 0) {
             printf("threads number or queue size error: %d,%d\n", threads, queue_size);
             return 1;
         }
     }
     thread_pool_t *pool = thread_pool_create(threads, queue_size);
     if (pool == NULL) {
         printf("thread pool create error!\n");
         return 1;
     }
     while (thread_pool_post(pool, &do_task, NULL) == 0) {
         pthread_mutex_lock(&lock);
         nums++;
         pthread_mutex_unlock(&lock);
     }
     printf("add %d tasks\n", nums);
     wait_all_done(pool);
     printf("did %d tasks\n", done);
     thread_pool_destroy(pool);
     return 0;
 }


3、线程池网络编程

3.1、reactor 的问题


1704877617551.jpg

reactor

在一个事件循环中,可以处理多个就绪事件。这些就绪事件在 reactor 模型中是串行执行的,一个事件处理若耗时较长,会延迟其他同时触发的事件的处理。

void eventloop_once(reactor_t * r, int timeout) {
     int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout);
     for (int i = 0; i < n; ++i) {
         struct epoll_event *e = &r->fire[i];
         int mask = e->events;
         // 用 io 函数捕获具体的错误信息
         if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT;
         // 用 io 函数捕获断开的具体信息
         if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT;
         event_t *et = (event_t*) e->data.ptr;
         // 处理读事件
         if (mask & EPOLLIN) {
             if (et->read_fn) {
                 et->read_fn(et->fd, EPOLLIN, et);
             }    
         }
         // 处理写事件
         if (mask & EPOLLOUT) {
             if (et->write_fn) {
                 et->write_fn(et->fd, EPOLLOUT, et);
             }       
             else {
                 uint8_t * buf = buffer_write_atmost(&et->out);
                 event_buffer_write(et, buf, buffer_len(&et->out));
             }
         }
     }
 }

接下来,介绍 nginx, redis, skynet 线程池实现形式,都是对 reactor 模型的优化。

3.2、nginx 线程池

为什么使用线程池?

nginx 是磁盘 io 密集型。业务逻辑是处理文件缓冲,需要磁盘 io 操作,耗费大量的时间。

什么情况下使用线程池?

nginx 线程池作用 compute 阶段,将业务逻辑交给线程池处理。


怎样使用线程池?

nginx 的线程池有两个消息队列,当线程池处理完任务后,将处理的结果放入完成消息队列,以事件的方式通知主线程,主线程从完成消息队列中取出结果发送给客户端。


nginx线程池设计原理

值得注意的是,nginx 不推荐线程池的方式,可以采用替代的操作 sendfile, directio解决磁盘 io 的问题。

3.3、redis 线程池

为什么使用线程池?

redis 是网络 io 密集型,需要同时处理多条并发请求,存在读写 io 的问题(请求大量数据,写日志业务等)。

什么情况下使用线程池?

redis 线程池作用读写 io 阶段,即 read, decode 和 encode, send 阶段。主线程处理业务逻辑,之所以用单线程处理业务,是因为 redis 采用高效的数据结构,其业务逻辑处理较快。

redis 线程池作用阶段

怎样使用线程池?

主线程拥有两个全局队列clients_pending_readclients_pending_write,每个 io 线程(主线程同时也是 io 线程)拥有一个专属队列 io_threads_list[id]。主线程既作为生产者,产生任务;又作为消费者,获取任务执行。

首先,主线程将一次循环的所有就绪的读事件收集到自己的全局任务队列clients_pending_read中,再把每个事件负载均衡地分配到每个 io 线程的专属任务队列中。一次事件循环中不会出现同名 fd,不同的 fd 分配到每个 io 线程各自的队列中,避免了多个 io 线程同时从全局队列中取数据,因此,不需要加锁操作。

接下来,io 线程从自己的专属队列中取出任务,(除主线程外)并发执行 read 和 decode 操作。主线程将解析后的任务做 compute 操作。最后,io 线程(包括主线程)并发执行 encode 和 send 操作。

1704877659260.jpg

redis 线程池设计原理

源码实现细节,见我的另一篇文章,redis 源码解析:IO 线程池

3.4、skynet 线程池

为什么使用线程池?

skynet 是 cpu 密集型。处理读写 io 、数据包解压缩、业务逻辑处理等。特别地,当同一个 io 在多个线程处理时,将写 io 转由网络线程处理。

什么情况下使用线程池?

skynet 线程池作用于除 send 外的所有阶段。


1704877659260.jpg

怎样使用线程池?

skynet 的网络线程收集事件,将事件插入到对应的 actor 的专属消息队列中。全局消息队列存储有消息的 actor,即描述活跃的 actor,线程池负责检查该全局队列。

1704877765157.jpg

skynet线程池设计原理

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
监控 Java API
如何快速地实现一个线程池
如何快速地实现一个线程池
27 0
|
4月前
|
算法 Java
线程池
【8月更文挑战第22天】
44 4
|
4月前
|
Java 调度
基于C++11的线程池
基于C++11的线程池
|
6月前
|
Java
线程池的实现
线程池的实现
38 0
|
7月前
|
存储 Java 调度
浅谈线程池
浅谈线程池
43 1
|
缓存 Java
线程池简单总结
线程池简单总结
|
Java
6. 实现简单的线程池
6. 实现简单的线程池
58 0
|
存储 Java 测试技术
13.一文彻底了解线程池
大家好,我是王有志。线程池是Java面试中必问的八股文,涉及到非常多的问题,今天我们就通过一篇文章,来彻底搞懂Java面试中关于线程池的问题。
408 2
13.一文彻底了解线程池
|
监控 Java
线程池的讲解和实现
线程池的讲解和实现
|
消息中间件 监控 搜索推荐
线程池:我是谁?我在哪儿?
大家好,这篇文章跟大家探讨下日常使用线程池的各种姿势,重点介绍怎么在 Spring 环境中正确使用线程池。
314 1
线程池:我是谁?我在哪儿?
下一篇
DataWorks