C语言线程解池解读和实现01

简介: 在说到并发,池式组件的时候,最先想到的肯定是线程池。那线程池的原理是什么呢,又是如何工作的呢?这篇文章告诉你答案。

C语言线程解池解读和实现01

在说到并发,池式组件的时候,最先想到的肯定是线程池。那线程池的原理是什么呢,又是如何工作的呢?这篇文章告诉你答案。


知识梳理

1.什么是线程池

线程池就是维护和管理一定数量线程的池式组件。有提高CPU工 作效率的作用

2.为什么需要线程池

通俗来说,如果我们有一个IO十分耗时但是我们又是单线程的那么我们的线程将会阻塞,等待这个IO执行     完之后才会继续执行。这样就会十分耗时。

3.线程池有哪些组件

  • 任务队列
  • 一定数量的线程
  • 锁(保证线程安全)

4.线程池是如歌管理线程的

  • 有任务:执行任务
  • 没有任务:CPU休眠


头文件解读

我们先来看一下头文件吧:

#ifndef THRDPOOL_H_
#define THRDPOOL_H_
typedef struct thrdpool_s thrdpool_t;
typedef void (*handler_pt)(void*);
#ifdef __cplusplus
extern "C"
{
#endif
thrdpool_t *thrdpool_create(int thrd_count);
void thrdpool_treminater(thrdpool_t *pool);
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
void thrdpool_waitdone(thrdpool_t *pool);
#ifdef __cplusplus
}
#endif
#endif

代码解读:

作为一个组件,或者说一个库,我们并不希望别人可以看到我们内部是如何实现的,所以我们告诉用户如何使用即可。thrdpool_create用来初始化一个线程池,参数是线程数量。thrdpool_terminater用来停止线程池。thrdpool_post用来抛出任务,即在哪个线程池,执行哪个函数,参数是什么。thrdpool_waitdone检测线程是不是都执行完。

注意,虽然我们不希望用户看到我们的实现,但是要告诉用户我们库的使用规范,就是最前面的两个typedef。他告诉用户我们线程池的对象类型和传入任务的规范。

由于我们支持C++使用我们库,所以我们加上#ifdef __cplusplus extern “C”。就是说如果是C++,那我们就采用C规则编译这个文件。


数据结构解读


队列

typedef struct task_s
{
    void *next;
    handler_pt func;//对应函数
    void *arg;//参数
} task_t;
typedef struct task_queue_s
{
    void *head;//头指针
    void **tail;//尾指针
    int block;//标志
    spinlock_t lock;//回旋锁
    pthread_mutex_t mutex;//互斥锁
    pthread_cond_t cond;//条件
} task_queue_t;


代码解读:

这里我们先看一下我们队列结构的示意图:

我们使用一个链式结构。将所有task连接在一起,然后有一个manager来管理这些task。看了这张图结合上面的代码就可以很清楚的了解到我们队列的结构。

typedef struct thrdpool_s
{
    task_queue_t *task_queue;//任务队列
    atomic_int quit;//标志
    uint32_t thrd_count;//池内线程数量
    pthread_t *threads;//线程数组
} thrdpool_t;

代码解读:

这里没有什么好说的。其中标志的意思就是:如果为0,正常运行,如果为1阻塞。它是一个原子变量,所以具有线程安全。由于就这一个变量,没有设计复杂的操作,所以没有使用锁的必要,我们用原子变量即可解决。

代码实现

初始化队列

static task_queue_t* __taskqueue_create()
{
    int ret;
    task_queue_t *queue = (task_queue_t*)malloc(sizeof(*queue));
    if(queue)
    {
        ret = pthread_mutex_init(&queue->mutex);
        if(ret == 0)
        {
            ret = pthread_cond_init(&queue->cond);
            if(ret == 0)
            {
                spinlock_init(&queue->lock);
                queue->head = NULL;
                queue->tail = &queue->head;
                queue->block = 1;
                return queue;
            }
            pthread_mutex_destroy(&queue->mutex);
        }
        free(queue);
    }
    return NULL;
}

代码解读:

没有什么特别好说的,这里就是申请一块内存,然后对其中的结构体成员进行初始化。如果全部初始化成功就返回指针。主要注意的是,我们的block要初始化为0,就是默认在阻塞状态,因为刚刚初始化,队列里面没有任何任务。

移除阻塞

static int __nonblock(task_queue_t *queue)
{
    pthread_mutex_lock(&queue->mutex);
    queue->block = 0;
    pthread_mutex_unlock(&queue->mutex);
    pthread_cond_broadcast(&queue->cond);
}

代码解读:

在对task_queue内部变量操作的时候,必须使用锁来保证线程安全。将标志为阻塞的block置0,解除阻塞。然后广播,唤醒其他线程。

插入任务

static inline __add_task(task_queue_t *queue, void *task)
{
    void **link = (void**)task;
    *link = NULL;
    spinlock_lock(&queue->lock);
    *queue->tail = link;
    queue->tail = link;
    spinlock_unlock(&queue->lock);
    pthread_cond_signal(&queue->cond);
}

代码解读:

首先,我们让task->next指向NULL。然后,在加锁的情况下,对队列进行尾插。最后唤醒一个线程获取任务。这里需要解释一下我们的写法,为什么可以这样写。事实上,在内核中,队列都是这样的一级指针头+二级指针尾的写法,原理请看图:

0798bec9cb6b4abbbe3ed9d634b33764.png

从指正管理长度的角度来看void*就是管理了整个node,但是**void就是管理指向内存的后面8个字节(64位操作系统)即*next

所以*queue->tail 就是 queue->tail->next;同理*link 就是 *next

删除任务

static task_t* __pop_task(task_queue_t *queue)
{
    spinlock_lock(&queue->lock);
    if(queue->head == NULL)
        return NULL;
    task_t *task;
    void **link = (void**)queue->head;
    queue->head = *link;
    if(queue->head == NULL)
        queue->tail = &queue->head;
    spinlock_unlock(&queue->lock);
}

代码讲解:

我们将最前面的task取出,然后head指针指向后面一个。

获取任务

static void* __get_task(task_queue_t *queue)
{
    task_t *task;
    pthread_mutex_lock(&queue->mutex);
    while(task = __pop_task(queue) == NULL)
    {
        if(queue->block == 0)
        {
            pthread_mutex_unlock(&queue->mutex);
            return NULL;
        }
        pthread_cond_wait(&queue->cond, &queue->mutex);
        pthread_mutex_unlock(&queue->mutex);
    }
    return task;
}

代码解读:

我们调用上面写的__pop_task来获取队列最前面的task。如果队列是非阻塞的,那就直接返回。如果队列是阻塞的(刚刚初始化完),那我们就要进入休眠,等待__add_task里面的唤醒。然后返回获取到的task。

队列的销毁

static void __destroy_task_queue(task_queue_t *queue)
{
    task_t *task;
    while(task = __pop_task(queue))
    {
        free(task);
    }
    pthread_mutex_destroy(&queue->mutex);
    pthread_cond_destroy(&queue->cond);
    spinlock_destroy(&queue->lock);
    free(queue);
}

代码解读:

释放全部资源销毁即可。

线程池工作

static void* __thrdpool_work(void *arg)
{
    thrdpool_t *thrdpool = (thrdpool_t*)arg;
    task_t *task;
    void *cxt;
    if(atomic_load(&thrdpool->quit) == 0)
    {
        task = (task_t*)__get_task(thrdpool->task_queue);
        handler_pt func = task->func;
        cxt = task->arg;
        free(task);
        func(cxt);
    }
    return NULL;
}

代码解读:

线程停止

static void __thrdpool_terminater(thrdpool_t *pool)
{
    atomic_store(&pool->quit, 1);
    __nonblock(pool->task_queue);
    int i = 0;
    for(i; i < pool->thrd_count; i++)
    {
        pthread_join(pool->threads[i], NULL);
    }
}

代码解读:

首先让线程池阻塞,然后执行完当前所有线程的任务。

创建线程池

static int __thrdpool_create(thrdpool_t *pool, int thrd_num)
{
    int ret;
    pthread_attr_t attr;
    ret = pthread_attr_init(&attr);
    if(ret == 0)
    {
        pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thrd_num);
        if(pool->threads)
        {
            int i = 0;
            for(i; i < thrd_num; i++)
            {
                if(pthread_create(&pool->threads[i], &attr, __thrdpool_work, pool) != 0)
                {
                    break;
                }
            }
            pool->thrd_count = i;
            if(thrd_num == i)
                return 0;
            __thrdpool_terminater(pool);
            free(pool->threads);
        }
        ret = -1;
    }
    return ret;
}

代码解读:

主要就是在堆上开辟空间,然后使用循环批量创建线程。就是要注意一点一点来,如果资源创建失败需要及时销毁资源并且返回。

接口

void thrdpool_treminater(thrdpool_t *pool)
{
    atomic_store(&pool->quit, 1);
    __nonblock(pool->task_queue);
}
thrdpool_t *thrdpool_create(int thrd_count)
{
    thrdpool_t *pool = (thrdpool_t*)malloc(sizeof(thrdpool_t));
    if(pool)
    {
        task_queue_t *task = __taskqueue_create();
        if(task)
        {
            pool->task_queue = task;
            int ret = __thrdpool_create(pool, thrd_count);
            if(ret == 0)
            {
                return pool;
            }
            __destroy_task_queue(pool->task_queue);
        }
        free(pool);
    }
    return NULL;
}
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg)
{
    task_t *task = (task_t*)malloc(sizeof(task_t));
    if(atomic_load(&pool->quit) == 1)
    {
        return -1;
    }
    task->arg = arg;
    task->func = func;
    __add_task(pool->task_queue, task);
    return 0;
}
void thrdpool_waitdone(thrdpool_t *pool)
{
    int i = 0;
    for(i; i < pool->thrd_count; i++)
    {
        pthread_join(pool->threads[i], NULL);
    }
    __destroy_task_queue(pool->task_queue);
    free(pool->threads);
    free(pool);
}

代码解读:

这里的接口是提供给客户使用的。这里需要注意waitdoneterminater的区别。一个是完全停止线程池(带有销毁功能),一个是不销毁,只是暂停。


本文通过<零声教育>的课程引发思考记录:C/C++后台服务器教学

相关文章
|
12天前
|
调度 C语言
深入浅出:C语言线程以及线程锁
线程锁的基本思想是,只有一个线程能持有锁,其他试图获取锁的线程将被阻塞,直到锁被释放。这样,锁就确保了在任何时刻,只有一个线程能够访问临界区(即需要保护的代码段或数据),从而保证了数据的完整性和一致性。 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程可以包含一个或多个线程,而每个线程都有自己的指令指针和寄存器状态,它们共享进程的资源,如内存空间、文件句柄和网络连接等。 线程锁的概念
|
23天前
|
存储 Linux C语言
c++进阶篇——初窥多线程(二) 基于C语言实现的多线程编写
本文介绍了C++中使用C语言的pthread库实现多线程编程。`pthread_create`用于创建新线程,`pthread_self`返回当前线程ID。示例展示了如何创建线程并打印线程ID,强调了线程同步的重要性,如使用`sleep`防止主线程提前结束导致子线程未执行完。`pthread_exit`用于线程退出,`pthread_join`用来等待并回收子线程,`pthread_detach`则分离线程。文中还提到了线程取消功能,通过`pthread_cancel`实现。这些基本操作是理解和使用C/C++多线程的关键。
|
2月前
|
安全 Linux 编译器
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)(下)
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)
26 0
|
2月前
|
安全 C语言 C++
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)(中)
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)
33 0
|
2月前
|
Linux 调度 C语言
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)(上)
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)
38 0
|
2月前
|
算法 安全 C语言
【C 言专栏】C 语言中的多线程编程
【5月更文挑战第5天】本文探讨了C语言中的多线程编程,包括多线程概念、通过POSIX线程库和Windows线程库的实现方式、基本步骤(创建、执行、同步、销毁线程)、线程同步机制(互斥锁、条件变量、信号量)以及优点(提高性能、增强并发处理、改善用户体验)。同时,文章指出多线程编程面临的挑战如线程安全、死锁和资源竞争,并提及内存管理问题。通过案例分析和展望未来趋势,强调了掌握多线程编程在提升程序效率和应对复杂任务中的重要性。
【C 言专栏】C 语言中的多线程编程
|
2月前
|
C语言
c语言 使用多线程
c语言 使用多线程
22 0
|
2月前
|
消息中间件 Java Linux
C语言-线程池代码
github 地址:常用的C工具代码,这里的工具包含了C语言实现的线程池,hashtable,list,md5,字符串操作,消息队列等很多常用的工具,我这里就不一一说明了,感兴趣的朋友可以自行下载研究,工作中肯定用的上。
51 0
|
8月前
|
Java C语言
基于C语言 -- 线程池实现
基于C语言 -- 线程池实现
44 0
|
算法 C语言
C语言实现多线程
C语言实现多线程
203 0