掌握并行处理:理解并构建自己的线程池

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 本文深入探讨了线程池的原理和实现,并提供了一个详细的、分步指南,帮助读者掌握并行处理的核心概念。文章开始介绍了并行处理的重要性,以及为什么线程池是一种有效管理并发任务的技术。接着,文章详细解释了线程池的基本原理,包括线程池的组成结构、线程的生命周期和任务队列的管理方法。随后,文章展示了如何使用C++编程语言实现一个简单的线程池,并介绍了线程安全性和任务调度的关键考虑因素。

一、线程池存在原因

(1)线程使用场景:某类任务特别耗时,会严重影响该线程处理其他任务,因此需要在其他线程异步执行该任务。

(2)线程开销:随着这类任务越来越多,需要异步执行任务而开启的线程也越来越多,但是每个CPU的核心数和线程数是固定,过多的线程并不能提高效率。因此,线程资源的开销与CPU核心之间要平衡选择。

(3)原因:

  • 线程资源的开销与CPU核心之间要平衡选择,自然就产生线程池,即需要固定线程的数量。
  • 从前面的执行流程可以看出,每次耗时任务到来都需要不断的创建线程,执行完成后销毁线程。不断的创建和销毁线程会浪费系统资源。
  • 因此需要有线程池,提前分配固定线程数量,不会将它们关闭,当任务到来时复用这些线程。

(3)线程池的作用:

  1. 复用线程资源。
  2. 充分利用系统资源。
  3. 异步执行耗时任务。
  4. 减少了多个任务(不是一个任务)的执行时间。

二、线程池原理

线程池是一个生产–消费模型。发布任务的线程是生产者,其他线程是消费者。

线程池的构成:

(1)生产者线程:发布任务。

(2)队列:亦称任务队列,存放具体的任务。因为任务是异步执行的,任务的内容就包括了任务的上下文以及任务的执行函数。

(3)线程池:即消费者,是固定数量的线程集合;主要完成取出任务、执行任务、任务调度。

2.1、线程调度

由于任务的密疏程度是未知的,即任务是间歇性的,有时候任务很多,有时候任务很少。当任务很少时,需要将不执行任务的线程休眠,不能让其浪费系统资源。这就需要线程调度。

线程的调度主要通过mutex和condition实现。 即互斥锁和条件变量。

线程有两种状态:从无任务到有任务(从无到有)以及从有任务到无任务(从有到无)。

利用条件变量,从无到有时,唤醒线程;从有到无时,休眠线程。那么如何确定条件呢?就是依据任务队列的状态,如果任务队列中有任务,将线程唤醒;如果任务队列为空,将线程休眠。

仔细点说,就是:

  • 当生产者线程发布任务时,任务队列就有任务,进入 从无到有状态;通知某一个线程唤醒,取出任务、执行任务。
  • 线程判断任务队列是否有任务,如果任务队列为空,则进入 从有到无 状态,condition通知线程休眠。

2.2、平衡选择

线程资源的开销与CPU核心之间做平衡选择;平衡选择依据耗时任务而定。耗时任务分为IO密集型和CPU密集型。

  • IO密集型:IO的操作是同步的,系统调用会阻塞的将内核资源拷贝到用户态或者用户态资源阻塞的将资源拷贝到内核中;线程会阻塞等待系统调用完成。
  • CPU密集型:长时间占用CPU,使线程无法处理其他任务。

根据这两个类型,可以确定线程池的线程数量。一般,CPU密集型的线程池数量等于CPU核心数;IO密集型的线程池线程数量等于2倍核心数+2。

有这样一个公式: (IO等待时间+CPU运算时间)核心数/cpu运算时间。根据公式对线程池数量做优化调整,使其符合特定业务逻辑。

三、实现一个线程池

3.1、接口设计

(1)创建线程池的接口。确定线程池的线程数量以及任务队列的长度。

(2)销毁线程池的接口。线程判断线程池销毁标志,如果标记了线程池销毁,线程退出;并且通知所有线程。

(3)生产者线程抛出任务的接口。目的是构造一个任务,并把任务放到任务队列中,通知线程唤醒。

3.2、代码示例

thread_pool.h

#ifndef _THREAD_POOL_H#define _THREAD_POOL_Htypedefstructthread_pool_tthread_pool_t;
typedefvoid(*handler_pt)(void*);
enumTHREAD_POOL_ERROR_CODE{
THREAD_POOL_NULL=-1,
THREAD_MUTEX_FAIL=-2,
THREAD_POOL_CLOSED=-3,
THREAD_COND_FAIL=-4,
THREAD_POOL_TASK_QUE_FULL=-5,
THREAD_COND_SIGNAL_FAIL=-6,
THREAD_POOL_SUCCESS=0};
thread_pool_t*thread_pool_create(intthread_count,intqueue_size);
intthread_pool_destory(thread_pool_t*pool);
intthread_pool_post(thread_pool_t*pool,handler_ptfunc,void*arg);
intwait_pool_done(thread_pool_t*pool);
#endif

thread_pool.c

#include <pthread.h>#include <stdint.h>#include <stddef.h>#include <stdlib.h>#include "thread_pool.h"typedefstructtask_t{
handler_ptfunc;
void*arg;
} task_t;
typedefstructtask_queue_t{
uint32_thead;// 队列头索引uint32_ttail;// 队列尾索引uint32_tcount;//任务数量task_t*queue;//队列数组} task_queue_t;
structthread_pool_t{
pthread_mutex_tmutex;
pthread_cond_tcondition;
pthread_t*threads;
task_queue_ttask_queue;
intclosed;//销毁线程池标记intstarted;//当前运行的线程数intthread_count;
intqueue_size;
};
staticvoidthread_pool_free(thread_pool_t*pool)
{
if (pool==NULL||pool->started>0)
return;
if (pool->threads)
    {
free(pool->threads);
pool->threads=NULL;
pthread_mutex_lock(&(pool->mutex));
pthread_mutex_destroy(&(pool->mutex));
pthread_cond_destroy(&(pool->condition));
    }
if (pool->task_queue.queue)
    {
free(pool->task_queue.queue);
pool->task_queue.queue=NULL;
    }
free(pool);
}
staticvoid*thread_worker(void*thread_pool)
{
thread_pool_t*pool=thread_pool;
task_queue_t*que;
task_ttask;
for (;;)
    {
pthread_mutex_lock(&(pool->mutex));//加锁que=&(pool->task_queue);
// 判断虚假唤醒while (que->count==0&&pool->closed==0)
        {
// pthread_mutex_unlock(&(pool->mutex))// 阻塞在 condition// 唤醒信号===================================// 解除阻塞// pthread_mutex_lock(&(pool->mutex));pthread_cond_wait(&(pool->condition),&(pool->mutex));
        }
if (pool->closed==1)
break;
task=que->queue[que->head];//取出任务que->head= (que->head+1) %pool->queue_size;
que->count--;
pthread_mutex_unlock(&(pool->mutex));
        (*(task.func))(task.arg);
    }
pool->started--;
pthread_mutex_unlock(&(pool->mutex));
pthread_exit(NULL);
returnNULL;
}
thread_pool_t*thread_pool_create(intthread_count,intqueue_size)
{
if(thread_count<=0||queue_size<=0)
returnNULL;
thread_pool_t*pool;
pool=(thread_pool_t*)malloc(sizeof(*pool));
if(pool==NULL)
returnNULL;
pool->thread_count=0;//从0开始计数pool->queue_size=queue_size;
pool->task_queue.head=0;
pool->task_queue.tail=0;
pool->task_queue.count=0;
pool->started=pool->closed=0;
pool->threads=NULL;
pool->task_queue.queue=NULL;
pool->task_queue.queue= (task_t*)malloc(sizeof(task_t)*queue_size);
if (pool->task_queue.queue==NULL)
    {
//free poolthread_pool_free(pool);
returnNULL;
    }
pool->threads= (pthread_t*)malloc(sizeof(pthread_t)*thread_count);
if (pool->threads==NULL)
    {
//free poolthread_pool_free(pool);
returnNULL;
    }
inti=0;
for (i=0; i<thread_count; i++)
    {
if (pthread_create(&(pool->threads[i]), NULL,thread_worker, (void*)pool) !=0)
        {
//free poolthread_pool_free(pool);
returnNULL;
        }
pool->thread_count++;
pool->started++;
    }
returnpool;
}
intthread_pool_post(thread_pool_t*pool, handler_ptfunc, void*arg)
{
if (pool==NULL||func==NULL)
returnTHREAD_POOL_NULL;
task_queue_t*task_queue=&(pool->task_queue);//取出队列if (pthread_mutex_lock(&(pool->mutex)) !=0)
returnTHREAD_MUTEX_FAIL;
if (pool->closed)
    {
pthread_mutex_unlock(&(pool->mutex));
returnTHREAD_POOL_CLOSED;
    }
if (task_queue->count==pool->queue_size)
    {
pthread_mutex_unlock(&(pool->mutex));
returnTHREAD_POOL_TASK_QUE_FULL;
    }
task_queue->queue[task_queue->tail].func=func;
task_queue->queue[task_queue->tail].arg=arg;
task_queue->tail= (task_queue->tail+1) %pool->queue_size;
task_queue->count++;
if (pthread_cond_signal(&(pool->condition)) !=0)
    {
pthread_mutex_unlock(&(pool->mutex));
returnTHREAD_COND_SIGNAL_FAIL;
    }
pthread_mutex_unlock(&(pool->mutex));
return0;
}
intwait_pool_done(thread_pool_t*pool)
{
inti, ret=0;
for (i=0; i<pool->thread_count; i++)
    {
if (pthread_join(pool->threads[i], NULL) !=0)
ret=1;
    }
returnret;
}
intthread_pool_destory(thread_pool_t*pool)
{
if (pool==NULL)
returnTHREAD_POOL_NULL;
if (pthread_mutex_unlock(&(pool->mutex)) !=0)
returnTHREAD_MUTEX_FAIL;
if (pool->closed)
    {
thread_pool_free(pool);
returnTHREAD_POOL_CLOSED;
    }
pool->closed=1;
if(pthread_cond_broadcast(&(pool->condition))!=0||pthread_mutex_unlock(&(pool->mutex)) !=0)
    {
thread_pool_free(pool);
returnTHREAD_COND_FAIL;
    }
wait_pool_done(pool);
thread_pool_free(pool);
return0;
}

main.c,使用线程池:

#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include <unistd.h>#include "thread_pool.h"intnum=0;
intdone=0;
pthread_mutex_tlock;
voiddo_task(void*arg)
{
usleep(3000);
pthread_mutex_lock(&lock);
done++;
printf("doing %d task\n", done);
pthread_mutex_unlock(&lock);
}
intmain(intargc,char**argv)
{
intthreads=4;
intqueue_size=256;
if (argc==2)
    {
queue_size=threads=atoi(argv[1]);
if (threads<=0||queue_size<=0)
        {
printf("threads number or queue size error: %d,%d\n", threads, queue_size);
return1;
        }
    }
thread_pool_t*pool=thread_pool_create(threads, queue_size);
if (pool==NULL)
returnTHREAD_POOL_NULL;
while (thread_pool_post(pool, &do_task, NULL) ==0)
    {
pthread_mutex_lock(&lock);
num++;
pthread_mutex_unlock(&lock);
    }
printf("add %d tasks\n", num);
wait_pool_done(pool);
printf("did %d tasks\n", done);
thread_pool_destory(pool);
return0;
}

四、哪些开源项目使用了线程池

4.1、nginx中的线程池

niginx github开源地址:https://github.com/nginx/nginx

nginx中线程池的作用是处理文件缓冲。 nginx线程池默认关闭,configure 时,需要 --with-threads

来指定。

线程池作用阶段:

(1)使用线程池的情况:nginx可以应用于静态web服务器,主要是处理文件缓冲,文件读写比较耗时。nginx推荐使用sendfile、directio、aio来处理耗时的任务,线程池不是重点推荐。nginx的耗时主要集中在文件操作,即compute阶段,这时可以使用线程池解决。

(2)使用线程池原因:磁盘IO读写比较耗时。nginx推荐使用sendfile、directio、aio来处理耗时的任务,线程池不是重点推荐。

(3)使用线程池:nginx线程池会有两个队列,任务消息队列和完成消息队列;任务消息队列存放发布的任务,将任务pull到线程池;线程池处理完会将结果push到完成消息队列,通知主线程获取结果。

static ngx_int_t
ngx_http_cache_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
{
  // 其他代码...
  task->event.data = r;
    task->event.handler = ngx_http_cache_thread_event_handler;
  if (ngx_thread_task_post(tp, task) != NGX_OK) {
        return NGX_ERROR;
    }
    // 其他代码...
}

调用逻辑:

4.2、redis中的线程池

redis是作为一个数据库,需要读写大量的数据、解析协议,这样一来读IO和写IO压力都非常大。因此,redis中线程池的作用是读写IO处理和数据包解析、压缩。

线程池作用阶段(recv、decode、encode、send):

redis使用线程池的条件:有多个客户端并发请求,并且有读写IO的问题(写日志业务、读大量数据等)。

redis线程池运行原理:

主线程收集所有的读事件,并放到一个队列中;线程池为每个线程都准备一个自己线程的队列;然后主线程将收集的事件分发到线程池IO线程的队列中,线程池的线程从自己的队列中取出任务、执行任务;主线程既是生产者也是消费者,主线程处理Compute阶段的业务逻辑。

每个线程都有自己的队列原因:避免加锁。

总结

(1)线程池,就是固定线程数量,复用线程不销毁。

(2)线程池是一种生产者–消费者模型,某类任务特别耗时,会严重影响该线程处理其他任务,因此需要线程池。

(3)线程池是面向生产者的,生产者使用线程。线程池最好至少设计两个队列,任务队列和完成队列。

(4)通常,线程池的线程调度使用互斥锁(mutex)和条件变量(condition)。但也不一定非得使用condition(条件变量),可以只有互斥锁,比如redis,主线程获取mutex,其他线程会一直阻塞着,巧妙的利用互斥锁使其他线程休眠。

(5)线程池的线程数量选择,依据业务是IO密集型还是CPU密集型;假设CPU核心数为N,IO密集型一般设置为2*N+2,CPU密集型设为N。可以参考这样一个公式:(IO处理时间+CPU运行时间)*核心数/CPU运行时间。

(6)线程池的作用:复用线程资源,充分利用系统资源,异步执行耗时任务。

关注公众号《Lion 莱恩呀》随时随地学习技术。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8月前
|
Java 数据库 Android开发
【专栏】Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理
【4月更文挑战第27天】本文探讨了Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理。通过案例分析展示了网络请求、图像处理和数据库操作的优化实践。同时,文章指出并发编程的挑战,如性能评估、调试及兼容性问题,并强调了多线程优化对提升应用性能的重要性。开发者应持续学习和探索新的优化策略,以适应移动应用市场的竞争需求。
216 5
|
8月前
|
Java 调度 Android开发
构建高效Android应用:探究Kotlin多线程编程
【2月更文挑战第17天】 在现代移动开发领域,性能优化一直是开发者关注的焦点。特别是在Android平台上,合理利用多线程技术可以显著提升应用程序的响应性和用户体验。本文将深入探讨使用Kotlin进行Android多线程编程的策略与实践,旨在为开发者提供系统化的解决方案和性能提升技巧。我们将从基础概念入手,逐步介绍高级特性,并通过实际案例分析如何有效利用Kotlin协程、线程池以及异步任务处理机制来构建一个更加高效的Android应用。
|
8月前
|
缓存 安全 Java
保障线程安全性:构建可靠的多线程应用
保障线程安全性:构建可靠的多线程应用
|
存储 Linux 调度
确保并发执行的安全性:探索多线程和锁机制以构建可靠的程序
在当今计算机系统中,多线程编程已成为常见的需求,然而,同时也带来了并发执行的挑战。为了避免数据竞争和其他并发问题,正确使用适当的锁机制是至关重要的。通过阅读本文,读者将了解到多线程和锁机制在并发编程中的重要性,以及如何避免常见的并发问题,确保程序的安全性和可靠性。通过实际案例和代码示例来说明如何正确地使用多线程和锁机制来构建可靠的程序。
72 1
|
3月前
|
调度 Android开发 开发者
构建高效Android应用:探究Kotlin多线程优化策略
【10月更文挑战第11天】本文探讨了如何在Kotlin中实现高效的多线程方案,特别是在Android应用开发中。通过介绍Kotlin协程的基础知识、异步数据加载的实际案例,以及合理使用不同调度器的方法,帮助开发者提升应用性能和用户体验。
76 4
|
7月前
|
Java 测试技术 Maven
Maven打包使用多线程加速构建过程
Maven打包使用多线程加速构建过程
1064 0
|
Rust 监控 并行计算
用Rust构建电脑网络监控软件:内存安全性和多线程编程
在当今数字化世界中,网络安全一直是至关重要的问题。电脑网络监控软件是确保网络系统安全和高效运行的关键工具。然而,编写电脑网络监控软件需要处理复杂的多线程编程和内存安全性问题。Rust编程语言提供了一种强大的方式来构建安全的电脑网络监控软件,同时避免了许多常见的编程错误。
361 0
|
8月前
|
安全 Java 开发者
构建高效微服务架构:后端开发的新范式Java中的多线程并发编程实践
【4月更文挑战第29天】在数字化转型的浪潮中,微服务架构已成为软件开发的一大趋势。它通过解耦复杂系统、提升可伸缩性和促进敏捷开发来满足现代企业不断变化的业务需求。本文将深入探讨微服务的核心概念、设计原则以及如何利用最新的后端技术栈构建和部署高效的微服务架构。我们将分析微服务带来的挑战,包括服务治理、数据一致性和网络延迟问题,并讨论相应的解决方案。通过实际案例分析和最佳实践的分享,旨在为后端开发者提供一套实施微服务的全面指导。 【4月更文挑战第29天】在现代软件开发中,多线程技术是提高程序性能和响应能力的重要手段。本文通过介绍Java语言的多线程机制,探讨了如何有效地实现线程同步和通信,以及如
|
8月前
|
API 数据库 Android开发
构建高效Android应用:探究Kotlin多线程优化策略
随着移动设备性能的日益强大,用户对应用程序的响应速度和流畅性要求越来越高。在Android开发中,合理利用多线程技术是提升应用性能的关键手段之一。Kotlin作为一种现代的编程语言,其协程特性为开发者提供了更为简洁高效的多线程处理方式。本文将深入探讨使用Kotlin进行Android多线程编程的最佳实践,包括协程的基本概念、优势以及在实际项目中的应用场景和性能优化技巧,旨在帮助开发者构建更加高效稳定的Android应用。
|
8月前
|
Java Android开发 开发者
构建高效Android应用:探究Kotlin多线程优化策略
【2月更文挑战第17天】 随着移动设备性能的不断提升,用户对应用的响应速度和稳定性要求越来越高。在Android开发中,Kotlin语言以其简洁、安全的特点受到开发者青睐。然而,面对复杂的多线程任务,如何有效利用Kotlin进行优化,以提升应用性能,是本文探讨的重点。通过分析Kotlin并发工具的使用场景与限制,结合实例演示其在Android开发中的实践,旨在为开发者提供实用的多线程处理指南。