C语言线程解池解读和实现01

简介: C语言线程解池解读和实现01

C语言线程解池解读和实现01

在说到并发,池式组件的时候,最先想到的肯定是线程池。那线程池的原理是什么呢,又是如何工作的呢?这篇文章告诉你答案。

知识梳理

  1. 什么是线程池
    线程池就是维护和管理一定数量线程的池式组件。有提高CPU工 作效率的作用
  2. 为什么需要线程池
    通俗来说,如果我们有一个IO十分耗时但是我们又是单线程的那么我们的线程将会阻塞,等待这个IO执行完之后才会继续执行。这样就会十分耗时。
  3. 线程池有哪些组件
  • 任务队列
  • 一定数量的线程
  • 锁(保证线程安全)
  1. 线程池是如歌管理线程的
  • 有任务:执行任务
  • 没有任务:CPU休眠

头文件解读

我们先来看一下头文件吧:

#ifndef THRDPOOL_H_
#define THRDPOOL_H_
typedef struct thrdpool_s thrdpool_t;
typedef void (*handler_pt)(void*);
#ifdef __cplusplus
extern "C"
{
#endif
thrdpool_t *thrdpool_create(int thrd_count);
void thrdpool_treminater(thrdpool_t *pool);
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
void thrdpool_waitdone(thrdpool_t *pool);
#ifdef __cplusplus
}
#endif
#endif

代码解读:

作为一个组件,或者说一个库,我们并不希望别人可以看到我们内部是如何实现的,所以我们告诉用户如何使用即可。thrdpool_create用来初始化一个线程池,参数是线程数量。thrdpool_terminater用来停止线程池。thrdpool_post用来抛出任务,即在哪个线程池,执行哪个函数,参数是什么。thrdpool_waitdone检测线程是不是都执行完。

注意,虽然我们不希望用户看到我们的实现,但是要告诉用户我们库的使用规范,就是最前面的两个typedef。他告诉用户我们线程池的对象类型和传入任务的规范。

由于我们支持C++使用我们库,所以我们加上#ifdef __cplusplus extern “C”。就是说如果是C++,那我们就采用C规则编译这个文件。

数据结构解读

队列

typedef struct task_s
{
    void *next;
    handler_pt func;//对应函数
    void *arg;//参数
} task_t;
typedef struct task_queue_s
{
    void *head;//头指针
    void **tail;//尾指针
    int block;//标志
    spinlock_t lock;//回旋锁
    pthread_mutex_t mutex;//互斥锁
    pthread_cond_t cond;//条件
} task_queue_t;

代码解读:

这里我们先看一下我们队列结构的示意图:

我们使用一个链式结构。将所有task连接在一起,然后有一个manager来管理这些task。看了这张图结合上面的代码就可以很清楚的了解到我们队列的结构。

typedef struct thrdpool_s
{
    task_queue_t *task_queue;//任务队列
    atomic_int quit;//标志
    uint32_t thrd_count;//池内线程数量
    pthread_t *threads;//线程数组
} thrdpool_t;

代码解读:

这里没有什么好说的。其中标志的意思就是:如果为0,正常运行,如果为1阻塞。它是一个原子变量,所以具有线程安全。由于就这一个变量,没有设计复杂的操作,所以没有使用锁的必要,我们用原子变量即可解决。

代码实现

初始化队列

static task_queue_t* __taskqueue_create()
{
    int ret;
    task_queue_t *queue = (task_queue_t*)malloc(sizeof(*queue));
    if(queue)
    {
        ret = pthread_mutex_init(&queue->mutex);
        if(ret == 0)
        {
            ret = pthread_cond_init(&queue->cond);
            if(ret == 0)
            {
                spinlock_init(&queue->lock);
                queue->head = NULL;
                queue->tail = &queue->head;
                queue->block = 1;
                return queue;
            }
            pthread_mutex_destroy(&queue->mutex);
        }
        free(queue);
    }
    return NULL;
}

代码解读:

没有什么特别好说的,这里就是申请一块内存,然后对其中的结构体成员进行初始化。如果全部初始化成功就返回指针。主要注意的是,我们的block要初始化为0,就是默认在阻塞状态,因为刚刚初始化,队列里面没有任何任务。

移除阻塞

static int __nonblock(task_queue_t *queue)
{
    pthread_mutex_lock(&queue->mutex);
    queue->block = 0;
    pthread_mutex_unlock(&queue->mutex);
    pthread_cond_broadcast(&queue->cond);
}

代码解读:

在对task_queue内部变量操作的时候,必须使用锁来保证线程安全。将标志为阻塞的block置0,解除阻塞。然后广播,唤醒其他线程。

插入任务

static inline __add_task(task_queue_t *queue, void *task)
{
    void **link = (void**)task;
    *link = NULL;
    spinlock_lock(&queue->lock);
    *queue->tail = link;
    queue->tail = link;
    spinlock_unlock(&queue->lock);
    pthread_cond_signal(&queue->cond);
}

代码解读:

首先,我们让task->next指向NULL。然后,在加锁的情况下,对队列进行尾插。最后唤醒一个线程获取任务。这里需要解释一下我们的写法,为什么可以这样写。事实上,在内核中,队列都是这样的一级指针头+二级指针尾的写法,原理请看图:

从指正管理长度的角度来看void*就是管理了整个node,但是**void就是管理指向内存的后面8个字节(64位操作系统)即*next;

所以*queue->tail 就是 queue->tail->next;同理*link 就是 *next。

删除任务

static task_t* __pop_task(task_queue_t *queue)
{
    spinlock_lock(&queue->lock);
    if(queue->head == NULL)
        return NULL;
    task_t *task;
   
    void **link = (void**)queue->head;
    queue->head = *link;
    if(queue->head == NULL)
        queue->tail = &queue->head;
    spinlock_unlock(&queue->lock);
}

代码讲解:

我们将最前面的task取出,然后head指针指向后面一个。

获取任务

static void* __get_task(task_queue_t *queue)
{
    task_t *task;
    pthread_mutex_lock(&queue->mutex);
    while(task = __pop_task(queue) == NULL)
    {
        if(queue->block == 0)
        {
            pthread_mutex_unlock(&queue->mutex);
            return NULL;
        }
        pthread_cond_wait(&queue->cond, &queue->mutex);
        pthread_mutex_unlock(&queue->mutex);
    }
    return task;
}

代码解读:

我们调用上面写的__pop_task来获取队列最前面的task。如果队列是非阻塞的,那就直接返回。如果队列是阻塞的(刚刚初始化完),那我们就要进入休眠,等待__add_task里面的唤醒。然后返回获取到的task。

队列的销毁

static void __destroy_task_queue(task_queue_t *queue)
{
    task_t *task;
    while(task = __pop_task(queue))
    {
        free(task);
    }
    pthread_mutex_destroy(&queue->mutex);
    pthread_cond_destroy(&queue->cond);
    spinlock_destroy(&queue->lock);
    free(queue);
}

代码解读:

释放全部资源销毁即可。

线程池工作

static void* __thrdpool_work(void *arg)
{
    thrdpool_t *thrdpool = (thrdpool_t*)arg;
    task_t *task;
    void *cxt;
    if(atomic_load(&thrdpool->quit) == 0)
    {
        task = (task_t*)__get_task(thrdpool->task_queue);
        handler_pt func = task->func;
        cxt = task->arg;
        free(task);
        func(cxt);
    }
    return NULL;
}

代码解读:

线程停止

static void __thrdpool_terminater(thrdpool_t *pool)
{
    atomic_store(&pool->quit, 1);
    __nonblock(pool->task_queue);
    int i = 0;
    for(i; i < pool->thrd_count; i++)
    {
        pthread_join(pool->threads[i], NULL);
    }
}

代码解读:

首先让线程池阻塞,然后执行完当前所有线程的任务。

创建线程池

static int __thrdpool_create(thrdpool_t *pool, int thrd_num)
{
    int ret;
    pthread_attr_t attr;
    ret = pthread_attr_init(&attr);
    if(ret == 0)
    {
        pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thrd_num);
        if(pool->threads)
        {
            int i = 0;
            for(i; i < thrd_num; i++)
            {
                if(pthread_create(&pool->threads[i], &attr, __thrdpool_work, pool) != 0)
                {
                    break;
                }
                
            }
            pool->thrd_count = i;
            if(thrd_num == i)
                return 0;
            __thrdpool_terminater(pool);
            free(pool->threads);
        }
        ret = -1;
    }
    return ret;
}

代码解读:

主要就是在堆上开辟空间,然后使用循环批量创建线程。就是要注意一点一点来,如果资源创建失败需要及时销毁资源并且返回。

接口

void thrdpool_treminater(thrdpool_t *pool)
{
    atomic_store(&pool->quit, 1);
    __nonblock(pool->task_queue);
}
thrdpool_t *thrdpool_create(int thrd_count)
{
    thrdpool_t *pool = (thrdpool_t*)malloc(sizeof(thrdpool_t));
    if(pool)
    {
        task_queue_t *task = __taskqueue_create();
        if(task)
        {
            pool->task_queue = task;
            int ret = __thrdpool_create(pool, thrd_count);
            if(ret == 0)
            {
                return pool;
            }
            __destroy_task_queue(pool->task_queue);
        }
        free(pool);
    }
    return NULL;
}
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg)
{
    task_t *task = (task_t*)malloc(sizeof(task_t));
    if(atomic_load(&pool->quit) == 1)
    {
        return -1;
    }
    task->arg = arg;
    task->func = func;
    __add_task(pool->task_queue, task);
    
    return 0;
}
void thrdpool_waitdone(thrdpool_t *pool)
{
    int i = 0;
    for(i; i < pool->thrd_count; i++)
    {
        pthread_join(pool->threads[i], NULL);
    }
    __destroy_task_queue(pool->task_queue);
    free(pool->threads);
    free(pool);
}

代码解读:

这里的接口是提供给客户使用的。这里需要注意waitdone和terminater的区别。一个是完全停止线程池(带有销毁功能),一个是不销毁,只是暂停。

相关文章
|
2月前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
2月前
|
存储 Ubuntu Linux
C语言 多线程编程(1) 初识线程和条件变量
本文档详细介绍了多线程的概念、相关命令及线程的操作方法。首先解释了线程的定义及其与进程的关系,接着对比了线程与进程的区别。随后介绍了如何在 Linux 系统中使用 `pidstat`、`top` 和 `ps` 命令查看线程信息。文档还探讨了多进程和多线程模式各自的优缺点及适用场景,并详细讲解了如何使用 POSIX 线程库创建、退出、等待和取消线程。此外,还介绍了线程分离的概念和方法,并提供了多个示例代码帮助理解。最后,深入探讨了线程间的通讯机制、互斥锁和条件变量的使用,通过具体示例展示了如何实现生产者与消费者的同步模型。
|
2月前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
3月前
|
C语言
【C语言】线程同步
【C语言】线程同步
39 3
|
3月前
|
C语言
【C语言】多线程服务器
【C语言】多线程服务器
30 0
|
3月前
|
程序员 C语言
【C语言】多线程
【C语言】多线程
27 0
|
4月前
|
调度 C语言
深入浅出:C语言线程以及线程锁
线程锁的基本思想是,只有一个线程能持有锁,其他试图获取锁的线程将被阻塞,直到锁被释放。这样,锁就确保了在任何时刻,只有一个线程能够访问临界区(即需要保护的代码段或数据),从而保证了数据的完整性和一致性。 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程可以包含一个或多个线程,而每个线程都有自己的指令指针和寄存器状态,它们共享进程的资源,如内存空间、文件句柄和网络连接等。 线程锁的概念
170 1
|
5月前
|
存储 Linux C语言
c++进阶篇——初窥多线程(二) 基于C语言实现的多线程编写
本文介绍了C++中使用C语言的pthread库实现多线程编程。`pthread_create`用于创建新线程,`pthread_self`返回当前线程ID。示例展示了如何创建线程并打印线程ID,强调了线程同步的重要性,如使用`sleep`防止主线程提前结束导致子线程未执行完。`pthread_exit`用于线程退出,`pthread_join`用来等待并回收子线程,`pthread_detach`则分离线程。文中还提到了线程取消功能,通过`pthread_cancel`实现。这些基本操作是理解和使用C/C++多线程的关键。
|
6月前
|
算法 安全 C语言
【C 言专栏】C 语言中的多线程编程
【5月更文挑战第5天】本文探讨了C语言中的多线程编程,包括多线程概念、通过POSIX线程库和Windows线程库的实现方式、基本步骤(创建、执行、同步、销毁线程)、线程同步机制(互斥锁、条件变量、信号量)以及优点(提高性能、增强并发处理、改善用户体验)。同时,文章指出多线程编程面临的挑战如线程安全、死锁和资源竞争,并提及内存管理问题。通过案例分析和展望未来趋势,强调了掌握多线程编程在提升程序效率和应对复杂任务中的重要性。
256 0
【C 言专栏】C 语言中的多线程编程
|
6月前
|
安全 Linux 编译器
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)(下)
从C语言到C++_40(多线程相关)C++线程接口+线程安全问题加锁(shared_ptr+STL+单例)
46 0