基于C语言 -- 线程池实现

简介: 基于C语言 -- 线程池实现

前言

线程池 -- 纯C版


使用步骤

//------------------------------------------//
// 作者: 干饭小白
// 时间: 2023-09-04
//------------------------------------------//
#pragma once
#ifdef __cplusplus
extern "C"
{
#endif __cplusplus
#define DEFAULT_TIME       5
#define MIN_WAIT_TASK_NUM  5
#define DEFAULT_THREAD_NUM 2
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <errno.h>
// 任务函数结构
typedef struct {
    void *(*function)(void*);
    void *arg;
}Task;
// 线程池管理者
struct Threadpool {
    /* 开关 */
    bool shutdown;
    /* 任务队列 */
    int queue_front;                  // 队头
    int queue_rear;                   // 队尾
    int queue_size;                   // 队列大小
    int queue_max_size;               // 最大的任务数
    Task *task_queue;                 // 任务队列
    /* 工作线程状态 */
    int min_thr_num;                  // 最小的线程数
    int max_thr_num;                  // 最大线线程数
    int live_thr_num;                 // 当前的活跃线程数
    int busy_thr_num;                 // 当前忙碌的线程数
    int wait_exit_thr_num;            // 当前正在等待退出的线程数
    pthread_t *threads;               // 线程集合
    /* 线程集合的管理 */
    pthread_t admin_tid;              // 线程管理者 -- 保持线程池中线程的相对平衡
    pthread_mutex_t lock;             // 互斥锁
    pthread_mutex_t thread_counter;   // 保证线程间竞争关系互斥 -- 正在忙碌的线程变量控制
    pthread_cond_t queue_not_full;    // 唤醒任务可入队线程 -- 队列不为满
    pthread_cond_t queue_not_empty;   // 唤醒工作线程取任务 -- 队列不为空
};
/*-----------------------------------------------
函数名:     
    threadpool_creat
函数说明:   
    创建并初始化一个线程池
输入参数:   
    min_thr_num(int)     最小的线程数量 
    max_thr_num(int)     最大的线程数量 
    queue_max_size(int)  任务队列大小
输出参数:
返回值:
    返回线程池句柄,失败返回 NULL
-----------------------------------------------*/
Threadpool * threadpool_creat(int min_thr_num, int max_thr_num, int queue_max_size);
/*-----------------------------------------------
函数名:     
    threadpool_thread
函数说明:   
    任务处理线程函数
输入参数:   
    threadpool(Threadpool) 线程池句柄
输出参数:
返回值:
-----------------------------------------------*/
void * threadpool_thread(void *threadpool);
/*-----------------------------------------------
函数名:     
    threadpool_thread
函数说明:   
    任务处理线程函数
输入参数:   
    threadpool(Threadpool*) 线程池句柄
输出参数:
返回值:
-----------------------------------------------*/
void * admin_thread(void *threadpool);
/*-----------------------------------------------
函数名:     
    threadpool_free
函数说明:   
    释放资源
输入参数:   
    pool(Threadpool*) 线程池句柄
输出参数:
返回值:
    0   正常
    -1  pool == NULL
-----------------------------------------------*/
int threadpool_free(Threadpool * pool);
/*-----------------------------------------------
函数名:     
    threadpool_destory
函数说明:   
    销毁线程池
输入参数:   
    pool(Threadpool*) 线程池句柄
输出参数:
返回值:
    0   正常
    -1  pool == NULL
-----------------------------------------------*/
int threadpool_destory(Threadpool * pool);
/*-----------------------------------------------
函数名:     
    threadpool_add_task
函数说明:   
    向任务队列中添加事件
输入参数:   
    pool(Threadpool*) 线程池句柄
    void *(*function)(void *arg) 任务函数
    arg(void *) 任务函数携带的参数
输出参数:
返回值:
-----------------------------------------------*/
int threadpool_add_task(Threadpool * pool, void *(*function)(void *arg), void *arg);
/*-----------------------------------------------
函数名:     
    is_thread_alive
函数说明:   
    判断一个线程是否存活
输入参数:   
    tid(pthread_t) 线程pid
输出参数:
返回值:
    存活 true
    消亡 false
-----------------------------------------------*/
bool is_thread_alive(pthread_t tid);
#ifdef __cplusplus
}
#endif
#include "ThreadPool.h"
Threadpool * threadpool_creat(int min_thr_num, int max_thr_num, int queue_max_size)
{
    Threadpool *pool = NULL;
    /* 使用do{}while(0) --> 实现goto机制,错误时即使跳出,并统一处理 */
    do
    {
        /** 开启线程池空间 **/
        pool = (Threadpool *)malloc(sizeof(Threadpool));
        if(NULL == pool)
        {
            break;
        }
        /** 初始化信息 **/
        pool->min_thr_num = min_thr_num;
        pool->max_thr_num = max_thr_num;
        pool->live_thr_num = min_thr_num;
        pool->busy_thr_num = 0;
        pool->wait_exit_thr_num = 0;
        pool->queue_front = 0;
        pool->queue_rear = 0;
        pool->queue_size = 0;
        pool->queue_max_size = queue_max_size;
        pool->shutdown = false;
        /** 分配工作线程空间 **/
        pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * max_thr_num);
        if(NULL == pool->threads)
        {
            break;
        }
        memset(pool->threads, 0, sizeof(pthread_t) * max_thr_num);
        /** 队列空间 **/
        pool->task_queue = (Task *)malloc(sizeof(Task) * queue_max_size);
        if(NULL == pool->task_queue)
        {
            break;
        }
        /** 初始化互斥锁和条件变量 **/
        if(pthread_mutex_init(&(pool->lock), NULL) != 0 ||
           pthread_mutex_init(&(pool->thread_counter), NULL) != 0 ||
           pthread_cond_init(&(pool->queue_not_empty), NULL) ||
           pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
        {
            break;
        } 
        /** 启动 min_thr_num 个工作线程 **/
        for(int i = 0; i < min_thr_num; ++i)
        {
            pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
        }
        /** 启动管理者线程 **/
        pthread_create(&(pool->admin_tid), NULL, admin_thread, (void*)pool);
        return pool;
    }while(0);
    /* 释放pool的空间 */
    threadpool_free(pool);
    return NULL;
}
int threadpool_destory(Threadpool * pool)
{
    /* 容错性判断 */
    if(NULL == pool)
    {
        return -1;
    }
    /* 线程标志位:true */
    pool->shutdown = true;
    /* 销毁管理线程 */
    pthread_join(pool->admin_tid, NULL);
    /* 通知存活的线程结束自己 */
    for(int i = 0; i < pool->live_thr_num; ++i)
    {
        /** 唤醒所有被阻塞的线程 **/
        pthread_cond_broadcast(&(pool->queue_not_empty));
    }
    /* 等待线程结束,进行回收 */
    for(int i = 0; i < pool->live_thr_num; ++i)
    {
        pthread_join(pool->threads[i], NULL);
    }
    /* 释放资源 */
    threadpool_free(pool);
    return 0;
}
int threadpool_free(Threadpool * pool)
{
    /* 容错性判断 */
    if(NULL == pool)
    {
        return -1;
    }
    /* 销毁任务队列 */
    if(pool->task_queue)
    {
        free(pool->task_queue);
        pool->task_queue = NULL;
    }
    /* 销毁工作线程集合 */
    if(pool->threads)
    {
        free(pool->threads);
        pool->threads = NULL;
    }
    /* 销毁池管理空间 */
    free(pool);
    pool = NULL;
    return 0;
}
void * admin_thread(void *threadpool)
{
    /* 容错处理 */
    if(NULL == threadpool)
    {
        return NULL;
    }
    /* 维护线程池的平衡 */
    Threadpool *pool = (Threadpool *)threadpool;
    while(!pool->shutdown)
    {
        /** 每隔DEFAULT_TIME进行一次 **/
        sleep(DEFAULT_TIME);
        /** 获取当前状态下活跃数和任务数 -- 多线程访问同一个变量(互斥锁) **/
        pthread_mutex_lock(&(pool->lock));
        int queue_size = pool->queue_size;
        int live_thr_num = pool->live_thr_num;
        pthread_mutex_unlock(&(pool->lock));
        /** 获取忙碌数 **/
        pthread_mutex_lock(&(pool->thread_counter));
        int busy_thr_num = pool->busy_thr_num;
        pthread_mutex_unlock(&(pool->thread_counter));
        /** 创建新线程:正在等待的任务 >= 最小允许等待的任务数 && 存活线程数 < 最大线程数 **/
        if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num)
        {
            pthread_mutex_lock(&(pool->lock));
            int add = 0;
            for(int i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_NUM && pool->live_thr_num < pool->max_thr_num; ++i)
            {
                if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
                {
                    pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool);
                    add++;
                    pool->live_thr_num++;
                }
            }
            pthread_mutex_unlock(&(pool->lock));
        }
        /** 销毁空闲的线程: 忙碌数*2 < 存活数(一半以上的空闲) && 存活数 > 最小线程数 **/
        if((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num)
        {
            pthread_mutex_lock(&(pool->lock));
            pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
            pthread_mutex_unlock(&(pool->lock));
        }
        for(int i = 0; i < DEFAULT_THREAD_NUM; ++i)
        {
            /*** 通知处于空闲的线程自杀 ***/
            pthread_cond_signal(&(pool->queue_not_empty));
        }
    }
    return NULL;
}
bool is_thread_alive(pthread_t tid)
{
    int kill_rc = pthread_kill(tid, 0);
    if(ESRCH == kill_rc)
    {
        return false;
    }
    return true;
}
void * threadpool_thread(void *threadpool)
{
    /* 容错处理 */
    if(NULL == threadpool)
    {
        return NULL;
    }
    Threadpool *pool = (Threadpool *)threadpool;
    Task task;
    while(true)
    {
        pthread_mutex_lock(&(pool->lock));
        /* 无任务:阻塞等待 */
        while((pool->queue_size == 0 && !pool->shutdown))
        {
            /** 阻塞等待 **/
            pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
            /** 是否是自杀任务 **/
            if(pool->wait_exit_thr_num > 0)
            {
                pool->wait_exit_thr_num--;
                /** 判断线程池中的线程数量是否大于最小线程数:是则结束当前线程 **/
                if(pool->live_thr_num > pool->min_thr_num)
                {
                    pool->live_thr_num--;
                    pthread_mutex_unlock(&(pool->lock));
                    pthread_exit(NULL);
                }
            }
        }
        /* 是否销毁池 */
        if(pool->shutdown)
        {
            pthread_mutex_unlock(&(pool->lock));
            pthread_exit(NULL);
        }
        /* 有任务:执行任务 */
        task.function = pool->task_queue[pool->queue_front].function;
        task.arg = pool->task_queue[pool->queue_front].arg;
        /* 维护循环队列平衡 */
        pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;
        pool->queue_size--; 
        /* 通知队列不未满 */
        pthread_cond_broadcast(&(pool->queue_not_full));
        pthread_mutex_unlock(&(pool->lock));
        /* 执行开始:忙碌线程+1 */
        pthread_mutex_lock(&(pool->thread_counter));
        pool->busy_thr_num++;
        pthread_mutex_unlock(&(pool->thread_counter));
        /* 执行任务 */
        (*(task.function))(task.arg);
        /* 执行结束:忙碌线程-1 */
        pthread_mutex_lock(&(pool->thread_counter));
        pool->busy_thr_num--;
        pthread_mutex_unlock(&(pool->thread_counter));
    }
    pthread_exit(NULL);
}
int threadpool_add_task(Threadpool * pool, void *(*function)(void *arg), void *arg)
{
    if(NULL == pool)
    {
        return -1;
    }
    pthread_mutex_lock(&(pool->lock));
    /* 任务队列满时需要等到 */
    while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
    {
        pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
    }
    /* 线程池处于关闭状态 */
    if(pool->shutdown)
    {
        pthread_mutex_unlock(&(pool->lock));
    }
    /* 清空残留 */
    if(pool->task_queue[pool->queue_rear].arg != NULL)
    {
        free(pool->task_queue[pool->queue_rear].arg);
        pool->task_queue[pool->queue_rear].arg = NULL;
    }
    /* 添加任务 */
    pthread_cond_signal(&(pool->queue_not_empty));
    pthread_mutex_unlock(&(pool->lock));
    return 0;
}
相关文章
|
1月前
|
消息中间件 Unix Linux
【C语言】进程和线程详解
在现代操作系统中,进程和线程是实现并发执行的两种主要方式。理解它们的区别和各自的应用场景对于编写高效的并发程序至关重要。
57 6
|
1月前
|
消息中间件 存储 负载均衡
C 语言多线程编程:并行处理的利剑
C语言多线程编程是实现并行处理的强大工具,通过创建和管理多个线程,可以显著提升程序执行效率,尤其在处理大量数据或复杂计算时效果显著。
|
5月前
|
安全 Java C语言
C语言线程解池解读和实现01
C语言线程解池解读和实现01
|
4月前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
4月前
|
存储 Ubuntu Linux
C语言 多线程编程(1) 初识线程和条件变量
本文档详细介绍了多线程的概念、相关命令及线程的操作方法。首先解释了线程的定义及其与进程的关系,接着对比了线程与进程的区别。随后介绍了如何在 Linux 系统中使用 `pidstat`、`top` 和 `ps` 命令查看线程信息。文档还探讨了多进程和多线程模式各自的优缺点及适用场景,并详细讲解了如何使用 POSIX 线程库创建、退出、等待和取消线程。此外,还介绍了线程分离的概念和方法,并提供了多个示例代码帮助理解。最后,深入探讨了线程间的通讯机制、互斥锁和条件变量的使用,通过具体示例展示了如何实现生产者与消费者的同步模型。
|
4月前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
5月前
|
C语言
【C语言】线程同步
【C语言】线程同步
50 3
|
5月前
|
程序员 C语言
【C语言】多线程
【C语言】多线程
38 0
|
6月前
|
调度 C语言
深入浅出:C语言线程以及线程锁
线程锁的基本思想是,只有一个线程能持有锁,其他试图获取锁的线程将被阻塞,直到锁被释放。这样,锁就确保了在任何时刻,只有一个线程能够访问临界区(即需要保护的代码段或数据),从而保证了数据的完整性和一致性。 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程可以包含一个或多个线程,而每个线程都有自己的指令指针和寄存器状态,它们共享进程的资源,如内存空间、文件句柄和网络连接等。 线程锁的概念
255 1
|
7月前
|
存储 Linux C语言
c++进阶篇——初窥多线程(二) 基于C语言实现的多线程编写
本文介绍了C++中使用C语言的pthread库实现多线程编程。`pthread_create`用于创建新线程,`pthread_self`返回当前线程ID。示例展示了如何创建线程并打印线程ID,强调了线程同步的重要性,如使用`sleep`防止主线程提前结束导致子线程未执行完。`pthread_exit`用于线程退出,`pthread_join`用来等待并回收子线程,`pthread_detach`则分离线程。文中还提到了线程取消功能,通过`pthread_cancel`实现。这些基本操作是理解和使用C/C++多线程的关键。