nginx线程池原理

简介: nginx线程池原理

1 数据结构

任务队列的描述:

工作队列的描述:

采用一个int整数来描述:

ngx_uint_t                threads;           // 线程数 

管理组件描述:

2 单个线程池api接口

创建,销毁,抛入任务

创建:1 初始化任务列表 2创建互斥锁 3 创建条件变量 4 创建若干工作线程

具体源码如下:

// 每个工作线程的线程函数
static void *
ngx_thread_pool_cycle(void *data)
{
    ngx_thread_pool_t *tp = data;
    int                 err;
    sigset_t            set;
    ngx_thread_task_t  *task;
#if 0
    ngx_time_update();
#endif
    ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0,
                   "thread in pool \"%V\" started", &tp->name);
    sigfillset(&set);
    sigdelset(&set, SIGILL);
    sigdelset(&set, SIGFPE);
    sigdelset(&set, SIGSEGV);
    sigdelset(&set, SIGBUS);
    err = pthread_sigmask(SIG_BLOCK, &set, NULL);
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
        return NULL;
    }
    // 线程函数循环
    for ( ;; ) {
        // 1 去任务队列取任务,若为空则跳出线程函数
        if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
            return NULL;
        }
        // 2 任务队列有任务,将空闲的计数器-1
        /* the number may become negative */
        tp->waiting--;
        // 3 从任务队列中拿出一个任务,并且唤醒一个线程取消休眠
        while (tp->queue.first == NULL) {
            if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
                != NGX_OK)
            {
                (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
                return NULL;
            }
        }
        task = tp->queue.first;
        tp->queue.first = task->next;
        if (tp->queue.first == NULL) {
            tp->queue.last = &tp->queue.first;
        }
        if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
            return NULL;
        }
#if 0
        ngx_time_update();
#endif
        ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
                       "run task #%ui in thread pool \"%V\"",
                       task->id, &tp->name);
        task->handler(task->ctx, tp->log);
        ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
                       "complete task #%ui in thread pool \"%V\"",
                       task->id, &tp->name);
        task->next = NULL;
        ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
        *ngx_thread_pool_done.last = task;
        ngx_thread_pool_done.last = &task->next;
        ngx_memory_barrier();
        ngx_unlock(&ngx_thread_pool_done_lock);
        // 4 处理任务对应的回调函数,真正执行的位置
        (void) ngx_notify(ngx_thread_pool_handler);
    }
}
static ngx_int_t
ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
{
    int             err;
    pthread_t       tid;
    ngx_uint_t      n;
    pthread_attr_t  attr;
    if (ngx_notify == NULL) {
        ngx_log_error(NGX_LOG_ALERT, log, 0,
               "the configured event method cannot be used with thread pools");
        return NGX_ERROR;
    }
    // 1 初始化任务队列
    ngx_thread_pool_queue_init(&tp->queue);
    // 2 创建互斥锁 对于工作线程队列取任务队列需要枷锁
    if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
        return NGX_ERROR;
    }
    // 3 创建条件变量  用于当任务队列为空时,让线程休眠进入条件变量空闲等待状态,当条件满足时,就唤醒线程
    if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
        (void) ngx_thread_mutex_destroy(&tp->mtx, log);
        return NGX_ERROR;
    }
    tp->log = log;
    err = pthread_attr_init(&attr);
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, log, err,
                      "pthread_attr_init() failed");
        return NGX_ERROR;
    }
    err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, log, err,
                      "pthread_attr_setdetachstate() failed");
        return NGX_ERROR;
    }
#if 0
    err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, log, err,
                      "pthread_attr_setstacksize() failed");
        return NGX_ERROR;
    }
#endif
    // 4 创建工作线程
    for (n = 0; n < tp->threads; n++) {
        err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
        if (err) {
            ngx_log_error(NGX_LOG_ALERT, log, err,
                          "pthread_create() failed");
            return NGX_ERROR;
        }
    }
    (void) pthread_attr_destroy(&attr);
    return NGX_OK;
}

销毁线程池

static void
ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
{
    ngx_uint_t           n;
    ngx_thread_task_t    task;
    volatile ngx_uint_t  lock;
    ngx_memzero(&task, sizeof(ngx_thread_task_t));
    task.handler = ngx_thread_pool_exit_handler;
    task.ctx = (void *) &lock;
    for (n = 0; n < tp->threads; n++) {
        lock = 1;
        if (ngx_thread_task_post(tp, &task) != NGX_OK) {
            return;
        }
        while (lock) {
            ngx_sched_yield();
        }
        task.event.active = 0;
    }
    (void) ngx_thread_cond_destroy(&tp->cond, tp->log);
    (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);
}

抛入任务接口

源码如下:

ngx_thread_task_t *
ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
{
    ngx_thread_task_t  *task;
    task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size);
    if (task == NULL) {
        return NULL;
    }
    task->ctx = task + 1;
    return task;
}
ngx_int_t
ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
{
    if (task->event.active) {
        ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
                      "task #%ui already active", task->id);
        return NGX_ERROR;
    }
    if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
        return NGX_ERROR;
    }
    if (tp->waiting >= tp->max_queue) {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
        ngx_log_error(NGX_LOG_ERR, tp->log, 0,
                      "thread pool \"%V\" queue overflow: %i tasks waiting",
                      &tp->name, tp->waiting);
        return NGX_ERROR;
    }
    task->event.active = 1;
    task->id = ngx_thread_pool_task_id++;
    task->next = NULL;
    if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
        return NGX_ERROR;
    }
    *tp->queue.last = task;
    tp->queue.last = &task->next;
    tp->waiting++;
    (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
    ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
                   "task #%ui added to thread pool \"%V\"",
                   task->id, &tp->name);
    return NGX_OK;
}

3 nginx线程池

nginx中是做法是一个进程对应一个线程池。简单的理解为一帧网络数据到达服务器后,nginx服务器开启了多个进程,每个进程都是recv–>parser–>send 将接受和解析全部打成任务抛入到自己对应的线程池中处理。

目录
相关文章
|
4月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
4月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
4月前
|
负载均衡 网络协议 关系型数据库
一口把LVS、Nginx及HAProxy工作原理讲清楚了。(附图)
一口把LVS、Nginx及HAProxy工作原理讲清楚了。(附图)
|
3月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
147 29
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
2月前
|
中间件 应用服务中间件 nginx
Nginx+uWSGI+Django原理
Nginx+uWSGI+Django原理
|
2月前
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
62 0
|
3月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
4月前
|
存储 NoSQL Java
线程池的原理与C语言实现
【8月更文挑战第22天】线程池是一种多线程处理框架,通过复用预创建的线程来高效地处理大量短暂或临时任务,提升程序性能。它主要包括三部分:线程管理器、工作队列和线程。线程管理器负责创建与管理线程;工作队列存储待处理任务;线程则执行任务。当提交新任务时,线程管理器将其加入队列,并由空闲线程处理。使用线程池能减少线程创建与销毁的开销,提高响应速度,并能有效控制并发线程数量,避免资源竞争。这里还提供了一个简单的 C 语言实现示例。
|
4月前
|
存储 Java
线程池的底层工作原理是什么?
【8月更文挑战第8天】线程池的底层工作原理是什么?
119 8
|
3月前
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
33 0