最近自己,很烦所以超级久没学习了,今天趁着抗战七十周年放三天假,赶紧看下书。
废话不多说。
今天,介绍一个简单的线程池。
首先说明什么是线程池,线程池:是包含若干个线程,来处理多个任务的线程集合。
它的目的是用来处理,大量的相对短暂的任务。
这里我们先来解释下两个概念,什么叫大量呢?对于线程来说,需要线程数小于任务数,第二,短暂的任务是指,任务需要相对短暂,如果线程和主进程同周期,则不适合用线程池。
然后来说下CPU数和线程数的关系,如果你的任务主要是计算密集型任务
则:线程个数 = CPU个数,比较好
如果你的任务是I/O密集型任务
则:线程个数>CPU个数,比较好
为什么计算密集型任务需要线程个数 = CPU个数呢?
计算密集型,顾名思义就是应用需要非常多的CPU计算资源,在多核CPU时代,我们要让每一个CPU核心都参与计算,将CPU的性能充分利用起来,这样才算是没有浪费服务器配置,如果在非常好的服务器配置上还运行着单线程程序那将是多么重大的浪费。对于计算密集型的应用,完全是靠CPU的核数来工作,所以为了让它的优势完全发挥出来,避免过多的线程上下文切换,所以比较理想的安排是线程个数 = CPU个数。
那么I/O密集型任务同理。
对于IO密集型的应用,就很好理解了,我们现在做的开发大部分都是WEB应用,涉及到大量的网络传输,不仅如此,与数据库,与缓存间的交互也涉及到IO,一旦发生IO,线程就会处于等待状态,当IO结束,数据准备好后,线程才会继续执行。因此从这里可以发现,对于IO密集型的应用,我们可以多设置一些线程池中线程的数量(超过CPU数量),这样就能让在等待IO的这段时间内,线程可以去做其它事,提高并发处理效率。当然也不是越多越好。
好了说了这么多,我来贴代码了。
condiition.h
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include <pthread.h>
typedef struct condition
{
pthread_mutex_t pmutex;
pthread_cond_t pcond;
} condition_t;
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
int condition_signal(condition_t *cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);
#endif /* _CONDITION_H_ */
condition.c
#include "condition.h"
int condition_init(condition_t *cond)
{
int status;
if ((status = pthread_mutex_init(&cond->pmutex, NULL)))
return status;
if ((status = pthread_cond_init(&cond->pcond, NULL)))
return status;
return 0;
}
int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
int condition_wait(condition_t *cond)
{
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}
int condition_signal(condition_t *cond)
{
return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t* cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
int condition_destroy(condition_t* cond)
{
int status;
if ((status = pthread_mutex_destroy(&cond->pmutex)))
return status;
if ((status = pthread_cond_destroy(&cond->pcond)))
return status;
return 0;
}
threadpool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include "condition.h"
// 任务结构体,将任务放入队列由线程池中的线程来执行
typedef struct task
{
void *(*run)(void *arg); // 任务回调函数
void *arg; // 回调函数参数
struct task *next;
} task_t;
// 线程池结构体
typedef struct threadpool
{
condition_t ready; //任务准备就绪或者线程池销毁通知
task_t *first; //任务队列头指针
task_t *last; //任务队列尾指针
int counter; //线程池中当前线程数
int idle; //线程池中当前正在等待任务的线程数
int max_threads; //线程池中最大允许的线程数
int quit; //销毁线程池的时候置1
} threadpool_t;
// 初始化线程池
void threadpool_init(threadpool_t *pool, int threads);
// 往线程池中添加任务
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
// 销毁线程池
void threadpool_destroy(threadpool_t *pool);
#endif /* _THREAD_POOL_H_ */
threadpool.c
#include "threadpool.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <time.h>
void *thread_routine(void *arg)
{
struct timespec abstime;
int timeout;
printf("thread 0x%x is starting\n", (int)pthread_self());
threadpool_t *pool = (threadpool_t *)arg;
while (1)
{
timeout = 0;
condition_lock(&pool->ready);
pool->idle++;
// 等待队列有任务到来或者线程池销毁通知
//当任务队列中没有任务,并且没有收到销毁通知时,线程陷入等待。
while (pool->first == NULL && !pool->quit)
{
printf("thread 0x%x is waiting\n", (int)pthread_self());
//condition_wait(&pool->ready);
clock_gettime(CLOCK_REALTIME, &abstime);
abstime.tv_sec += 2;
int status = condition_timedwait(&pool->ready, &abstime);
if (status == ETIMEDOUT)
{
printf("thread 0x%x is wait timed out\n", (int)pthread_self());
timeout = 1;
break;
}
}
// 等待到条件,处于工作状态
pool->idle--;
// 等待到任务
//如果队列首部不为空,则队列中有任务
if (pool->first != NULL)
{
// 从队头取出任务
task_t *t = pool->first;
pool->first = t->next;
// 执行任务需要一定的时间,所以要先解锁,以便生产者进程
// 能够往队列中添加任务,其它消费者线程能够进入等待任务
condition_unlock(&pool->ready);
t->run(t->arg);
free(t);
condition_lock(&pool->ready);
}
// 如果等待到线程池销毁通知, 且任务都执行完毕
//当队列为空,并且线程的到销毁通知
if (pool->quit && pool->first == NULL)
{
pool->counter--;
if (pool->counter == 0)
condition_signal(&pool->ready);
condition_unlock(&pool->ready);
// 跳出循环之前要记得解锁
break;
}
if (timeout && pool->first == NULL)
{
pool->counter--;
condition_unlock(&pool->ready);
// 跳出循环之前要记得解锁
break;
}
condition_unlock(&pool->ready);
}
printf("thread 0x%x is exting\n", (int)pthread_self());
return NULL;
}
// 初始化线程池
void threadpool_init(threadpool_t *pool, int threads)
{
// 对线程池中的各个字段初始化
condition_init(&pool->ready);
pool->first = NULL;
pool->last = NULL;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads;
pool->quit = 0;
}
// 往线程池中添加任务
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
{
// 生成新任务
task_t *newtask = (task_t *)malloc(sizeof(task_t));
newtask->run = run;
newtask->arg = arg;
newtask->next = NULL;
//先将线程锁住
condition_lock(&pool->ready);
// 将任务添加到队列
//如果队列为空,将它添加到队头,否则添加到队尾
if (pool->first == NULL)
pool->first = newtask;
else
pool->last->next = newtask;
pool->last = newtask;
// 如果有等待线程,则唤醒其中一个
if (pool->idle > 0)
condition_signal(&pool->ready);
else if (pool->counter < pool->max_threads)
{
// 没有等待线程,并且当前线程数不超过最大线程数,则创建一个新线程
pthread_t tid;
pthread_create(&tid, NULL, thread_routine, pool);
pool->counter++;
}
condition_unlock(&pool->ready);
}
// 销毁线程池
void threadpool_destroy(threadpool_t *pool)
{
if (pool->quit)
{
return;
}
condition_lock(&pool->ready);
pool->quit = 1;
if (pool->counter > 0)
{
if (pool->idle > 0)
condition_broadcast(&pool->ready);
// 处于执行任务状态中的线程,不会收到广播
// 线程池需要等待执行任务状态中的线程全部退出
while (pool->counter > 0)
condition_wait(&pool->ready);
}
condition_unlock(&pool->ready);
condition_destroy(&pool->ready);
}
main.c
#include "threadpool.h"
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
void* mytask(void *arg)
{
printf("thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg);
sleep(1);
free(arg);
return NULL;
}
int main(void)
{
threadpool_t pool;
threadpool_init(&pool, 3);
int i;
for (i=0; i<10; i++)
{
int *arg = (int *)malloc(sizeof(int));
*arg = i;
threadpool_add_task(&pool, mytask, arg);
}
//sleep(15);
threadpool_destroy(&pool);
return 0;
}
看完代码我们继续看下,线程池和我们创建的线程有什么好处呢?
总的来说有两点
1.当任务增加的时候能够动态的增加线程池中线程的数量直到达到一个阈值。
2.当任务执行完毕的时候,能够动态的销毁线程池中的线程
然后我们再来分析,线程池是如何达到这些条件的。
首先,我们看在main函数中我们初始化线程池,规定了线程池中有3个线程,然后我们创建了10个任务。
然后在再添加任务的过程中,我们把看是否有等待的进程,如果有就将他唤醒执行任务
如果没有,那么创建线程。在创建的线程中,进行判断。
如果任务队列不为空,那么执行任务。
如果,任务队列中没有任务,并且,没有得到销毁通知,那么线程陷入等待,如果等待到任务,那么执行任务,如果在一定时间没等到,那么设置超时标志。
如果,是超时了,并且得到了销毁通知,那么当前线程数减一,并且将当前线程退出。
如果,得到了销毁通知,并且当前任务队列中没有任务,则将当前线程数减一,如果当前线程数为零,则通知销毁线程函数,并且退出。
而销毁线程函数,执行的是如果销毁标志不为零,则返回。如果为零,那么首先广播所有的等待的(空闲)线程,需要他们退出。如果是正在执行任务的线程那么将收不到他的信息,将等待线程的通知信号。然后销毁所有的信号量。
之后,综上,线程池能实现动态增加线程,和动态销毁线程。
恩,这个线程池,还是不那么好理解,我的叙述也不是很到位,各位如果有什么为题,可以多到评论区评论,我会定期解答的。