线程池-手写线程池Linux C简单版本(生产者-消费者模型)

简介: 线程池-手写线程池Linux C简单版本(生产者-消费者模型)


简介

本线程池采用C语言实现

线程池的场景:

当某些任务特别耗时(例如大量的IO读写操作),严重影响线程其他的任务的执行,可以使用线程池

线程池的一般特点:

线程池通常是一个生产者-消费者模型

生产者线程用于发布任务,任务通常保存在任务队列中

线程池作为消费者,用于取出任务,执行任务

线程池中线程数量的选择:

有一个经验公式: 线程数量 =(io等待时间+cpu运算时间)*核心数/cpu运算时间

因此可以根据经验公式得出下面两种场景的线程数量:

  • cpu密集任务:线程数量=核心数(即上面的公式假设cpu运算时间>>io等待时间)
  • io密集任务:线程数量=2*n+2

手写线程池

线程池代码结构:

  • thread_pool_create:创建线程池所需要的资源,包含不限于任务队列,子线程的创建。
  • thread_pool_post:用于任务的发布,将执行任务存在任务队列中。
  • thread_pool_destroy:用于线程池的退出,以及资源的销毁。
  • wait_all_done:join线程池所有子线程,等待回收子线程。
  • thread_worker:用于任务执行。

主要的核心点集中在thread_pool_post和thread_worker两个函数中,这两个函数也构成了生产者-消费者模型。本文采用队列+互斥锁+条件变量实现。

线程池结构体分析

由于C语言不像C++可以用类封装函数,因此线程池会使用结构体来封装一些变量或者函数指针。

task_t

封装任务的入口指针以及参数。

typedef struct task_t {
    handler_pt func;
    void * arg;
} task_t;

task_queue_t

封装任务队列,为了不频繁移动队列中数据,此处采用头尾索引来标记任务。

typedef struct task_queue_t {
    uint32_t head;
    uint32_t tail;
    uint32_t count;
    task_t *queue;
} task_queue_t;

thread_pool_t

包含互斥锁,条件变量,任务队列等信息

struct thread_pool_t {
    pthread_mutex_t mutex;
    pthread_cond_t condition; //条件变量
    pthread_t *threads; //线程
    task_queue_t task_queue; //任务队列
    int closed; //是否关闭线程池执行的标志,为1表示关闭
    int started; // 当前正在运行的线程数
    int thrd_count; //线程数
    int queue_size; //任务队列大小
};

其中closed:表示是否关闭线程池执行的标志,为1表示关闭。在线程的运行函数中,用来判断是否继续循环等待执行任务队列中的任务。

started:表示当前正在运行的线程数。在thread_pool_destroy函数中销毁线程池时,需要等待所有线程停止才行,即started == 0

线程池函数分析

thread_pool_create

创建线程池,初始化一些线程池属性

通过循环pthread_create函数创建子线程。

thread_pool_t *thread_pool_create(int thrd_count, int queue_size) {
    thread_pool_t *pool;
    if (thrd_count <= 0 || queue_size <= 0) {
        return NULL;
    }
    pool = (thread_pool_t*) malloc(sizeof(*pool));
    if (pool == NULL) {
        return NULL;
    }
    pool->thrd_count = 0;
    pool->queue_size = queue_size;
    pool->task_queue.head = 0;
    pool->task_queue.tail = 0;
    pool->task_queue.count = 0;
    pool->started = pool->closed = 0;
    pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
    if (pool->task_queue.queue == NULL) {
        // TODO: free pool
        return NULL;
    }
    pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);
    if (pool->threads == NULL) {
        // TODO: free pool
        return NULL;
    }
    int i = 0;
    for (; i < thrd_count; i++) {
        if (pthread_create(&(pool ->threads[i]), NULL, thread_worker, (void*)pool) != 0) {
            // TODO: free pool
            return NULL;
        }
        pool->thrd_count++;
        pool->started++;
    }
    return pool;
}

thread_pool_post

作为生产者,往任务队列里面添加任务

通过pthread_cond_signal通知子唤醒子线程的pthread_cond_wait

int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {
    if (pool == NULL || func == NULL) {
        return -1;
    }
    task_queue_t *task_queue = &(pool->task_queue);
//此处用自旋锁会更节省消耗,因为锁里面的逻辑比较简单
    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }
    if (pool->closed) {
        pthread_mutex_unlock(&(pool->mutex));
        return -3;
    }
    if (task_queue->count == pool->queue_size) {
        pthread_mutex_unlock(&(pool->mutex));
        return -4;
    }
//避免queue数据的变化,采用头尾索引来标识
    task_queue->queue[task_queue->tail].func = func;
    task_queue->queue[task_queue->tail].arg = arg;
    task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
    task_queue->count++;
//唤醒一个休眠的线程
    if (pthread_cond_signal(&(pool->condition)) != 0) {
        pthread_mutex_unlock(&(pool->mutex));
        return -5;
    }
    pthread_mutex_unlock(&(pool->mutex));
    return 0;
}

thread_worker

pthread_cond_wait等待任务的唤醒

作为消费者, (*(task.func))(task.arg);执行任务

static void *thread_worker(void *thrd_pool) {
    thread_pool_t *pool = (thread_pool_t*)thrd_pool;
    task_queue_t *que;
    task_t task;
    for (;;) {
        pthread_mutex_lock(&(pool->mutex));
        que = &pool->task_queue;
        while (que->count == 0 && pool->closed == 0) {
            // 阻塞在 condition,等待任务队列添加任务
            pthread_cond_wait(&(pool->condition), &(pool->mutex));
        }
        if (pool->closed == 1 && que->count == 0) break;//没有任务,并且关闭标志打开,即跳出循环
        task = que->queue[que->head];
        que->head = (que->head + 1) % pool->queue_size;
        que->count--;
        pthread_mutex_unlock(&(pool->mutex));
        (*(task.func))(task.arg);//执行对应任务函数
    }
    pool->started--;//跳出循环之后,运行线程数需要减1
    pthread_mutex_unlock(&(pool->mutex));
    pthread_exit(NULL);
    return NULL;
}

thread_pool_destroy

销毁释放线程池,置 pool->closed = 1;

通过pthread_cond_broadcast唤醒线程池所有线程,这个和thread_pool_post里的pthread_cond_signal一样,并且broadcast会通知到所有的线程

int thread_pool_destroy(thread_pool_t *pool) {
    if (pool == NULL) {
        return -1;
    }
    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }
    if (pool->closed) {
        thread_pool_free(pool);
        return -3;
    }
    pool->closed = 1;
//广播形式,通知所有阻塞在condition的线程接触阻塞
    if (pthread_cond_broadcast(&(pool->condition)) != 0 || 
            pthread_mutex_unlock(&(pool->mutex)) != 0) {
        thread_pool_free(pool);
        return -4;
    }
    wait_all_done(pool);
    thread_pool_free(pool);
    return 0;
}

wait_all_done

将所有线程通过pthread_join回收,所有子线程任务执行完毕,回收线程

int wait_all_done(thread_pool_t *pool) {
    printf("wait_all_done start!pool->thrd_count:%d\n", pool->thrd_count);
    int i, ret=0;
    for (i=0; i < pool->thrd_count; i++) {
        printf("wait_all_done doing! i:%d\n", i);
        if (pthread_join(pool->threads[i], NULL) != 0) {
            ret=1;
        }
    }
    printf("wait_all_done end!\n");
    return ret;
}

thread_pool_free

释放线程池空间

static void thread_pool_free(thread_pool_t *pool) {
    if (pool == NULL || pool->started > 0) {
        return;
    }
    if (pool->threads) {
        free(pool->threads);
        pool->threads = NULL;
        pthread_mutex_lock(&(pool->mutex));
        pthread_mutex_destroy(&pool->mutex);
        pthread_cond_destroy(&pool->condition);
    }
    if (pool->task_queue.queue) {
        free(pool->task_queue.queue);
        pool->task_queue.queue = NULL;
    }
    free(pool);
}

主函数调用

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include "thrd_pool.h"
int nums = 0;
int done = 0;
int task_num = 100;
pthread_mutex_t lock;
void do_task(void *arg) {
    usleep(10000);
    pthread_mutex_lock(&lock);
    done++;
    printf("doing %d task\n", done);
    pthread_mutex_unlock(&lock);
}
int main(int argc, char **argv) {
    int threads = 8;
    int queue_size = 256;
    if (argc == 2) {
        threads = atoi(argv[1]);
        if (threads <= 0) {
            printf("threads number error: %d\n", threads);
            return 1;
        }
    } else if (argc > 2) {
        threads = atoi(argv[1]);
        queue_size = atoi(argv[1]);
        if (threads <= 0 || queue_size <= 0) {
            printf("threads number or queue size error: %d,%d\n", threads, queue_size);
            return 1;
        }
    }
    thread_pool_t *pool = thread_pool_create(threads, queue_size);
    if (pool == NULL) {
        printf("thread pool create error!\n");
        return 1;
    }
    while (thread_pool_post(pool, &do_task, NULL) == 0) {
        pthread_mutex_lock(&lock);
        nums++;
        pthread_mutex_unlock(&lock);
        if (nums > task_num) break;
    }
    printf("add %d tasks\n", nums);
    usleep(1000000);//延时等待所有的作业完成
    printf("did %d tasks\n", done);
    thread_pool_destroy(pool);
    return 0;
}

运行结果

使用指令编译文件:

gcc main.c thrd_pool.c -o main -lpthread

运行执行文件得到运行结果

完整代码下载线程池Linux C语言简单版本

目录
相关文章
|
5天前
|
Java 数据库
【Java多线程】对线程池的理解并模拟实现线程池
【Java多线程】对线程池的理解并模拟实现线程池
14 1
|
5天前
|
安全 API 数据库
【转】Android线程模型(AsyncTask的使用)
【转】Android线程模型(AsyncTask的使用)
|
5天前
|
消息中间件 存储 网络协议
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
Apache Kafka的单分区写入性能在某些严格保序场景中至关重要,但其现有线程模型限制了性能发挥。本文分析了Kafka的串行处理模型,包括SocketServer、KafkaChannel、RequestChannel等组件,指出其通过KafkaChannel状态机确保请求顺序处理,导致处理效率低下。AutoMQ提出流水线处理模型,简化KafkaChannel状态机,实现网络解析、校验定序和持久化的阶段间并行化,提高处理效率。测试结果显示,AutoMQ的极限吞吐是Kafka的2倍,P99延迟降低至11ms。
14 3
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
|
5天前
|
存储 NoSQL Redis
深入浅出Redis(二):Redis单线程模型与通信流程
深入浅出Redis(二):Redis单线程模型与通信流程
|
7天前
|
算法 Java Linux
【探索Linux】P.23(线程池 —— 简单模拟)
【探索Linux】P.23(线程池 —— 简单模拟)
10 0
|
7天前
|
消息中间件 监控 前端开发
面试官:核心线程数为0时,线程池如何执行?
线程池是 Java 中用于提升程序执行效率的主要手段,也是并发编程中的核心实现技术,并且它也被广泛的应用在日常项目的开发之中。那问题来了,如果把线程池中的核心线程数设置为 0 时,线程池是如何执行的? 要回答这个问题,我们首先要了解在正常情况下,线程池的执行流程,也就是说当有一个任务来了之后,线程池是如何运行的? ## 1.线程池的执行流程 正常情况下(核心线程数不为 0 的情况下)线程池的执行流程如下: 1. **判断核心线程数**:先判断当前工作线程数是否大于核心线程数,如果结果为 false,则新建线程并执行任务。 2. **判断任务队列**:如果大于核心线程数,则判断任务队列是否
17 1
面试官:核心线程数为0时,线程池如何执行?
|
8天前
|
关系型数据库 MySQL Linux
Linux CentOs7 安装Mysql(5.7和8.0版本)密码修改 超详细教程
Linux CentOs7 安装Mysql(5.7和8.0版本)密码修改 超详细教程
|
14天前
|
NoSQL Redis
Redis 线程模型
Redis 线程模型
|
15天前
|
监控 安全 Java
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
|
17天前
|
Java 数据库 Android开发
【专栏】Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理
【4月更文挑战第27天】本文探讨了Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理。通过案例分析展示了网络请求、图像处理和数据库操作的优化实践。同时,文章指出并发编程的挑战,如性能评估、调试及兼容性问题,并强调了多线程优化对提升应用性能的重要性。开发者应持续学习和探索新的优化策略,以适应移动应用市场的竞争需求。