nginx线程池原理

简介: nginx线程池原理

1 数据结构

任务队列的描述:

工作队列的描述:

采用一个int整数来描述:

ngx_uint_t                threads;           // 线程数 

管理组件描述:

2 单个线程池api接口

创建,销毁,抛入任务

创建:1 初始化任务列表 2创建互斥锁 3 创建条件变量 4 创建若干工作线程

具体源码如下:

// 每个工作线程的线程函数
static void *
ngx_thread_pool_cycle(void *data)
{
    ngx_thread_pool_t *tp = data;
    int                 err;
    sigset_t            set;
    ngx_thread_task_t  *task;
#if 0
    ngx_time_update();
#endif
    ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0,
                   "thread in pool \"%V\" started", &tp->name);
    sigfillset(&set);
    sigdelset(&set, SIGILL);
    sigdelset(&set, SIGFPE);
    sigdelset(&set, SIGSEGV);
    sigdelset(&set, SIGBUS);
    err = pthread_sigmask(SIG_BLOCK, &set, NULL);
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
        return NULL;
    }
    // 线程函数循环
    for ( ;; ) {
        // 1 去任务队列取任务,若为空则跳出线程函数
        if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
            return NULL;
        }
        // 2 任务队列有任务,将空闲的计数器-1
        /* the number may become negative */
        tp->waiting--;
        // 3 从任务队列中拿出一个任务,并且唤醒一个线程取消休眠
        while (tp->queue.first == NULL) {
            if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
                != NGX_OK)
            {
                (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
                return NULL;
            }
        }
        task = tp->queue.first;
        tp->queue.first = task->next;
        if (tp->queue.first == NULL) {
            tp->queue.last = &tp->queue.first;
        }
        if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
            return NULL;
        }
#if 0
        ngx_time_update();
#endif
        ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
                       "run task #%ui in thread pool \"%V\"",
                       task->id, &tp->name);
        task->handler(task->ctx, tp->log);
        ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
                       "complete task #%ui in thread pool \"%V\"",
                       task->id, &tp->name);
        task->next = NULL;
        ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
        *ngx_thread_pool_done.last = task;
        ngx_thread_pool_done.last = &task->next;
        ngx_memory_barrier();
        ngx_unlock(&ngx_thread_pool_done_lock);
        // 4 处理任务对应的回调函数,真正执行的位置
        (void) ngx_notify(ngx_thread_pool_handler);
    }
}
static ngx_int_t
ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
{
    int             err;
    pthread_t       tid;
    ngx_uint_t      n;
    pthread_attr_t  attr;
    if (ngx_notify == NULL) {
        ngx_log_error(NGX_LOG_ALERT, log, 0,
               "the configured event method cannot be used with thread pools");
        return NGX_ERROR;
    }
    // 1 初始化任务队列
    ngx_thread_pool_queue_init(&tp->queue);
    // 2 创建互斥锁 对于工作线程队列取任务队列需要枷锁
    if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
        return NGX_ERROR;
    }
    // 3 创建条件变量  用于当任务队列为空时,让线程休眠进入条件变量空闲等待状态,当条件满足时,就唤醒线程
    if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
        (void) ngx_thread_mutex_destroy(&tp->mtx, log);
        return NGX_ERROR;
    }
    tp->log = log;
    err = pthread_attr_init(&attr);
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, log, err,
                      "pthread_attr_init() failed");
        return NGX_ERROR;
    }
    err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, log, err,
                      "pthread_attr_setdetachstate() failed");
        return NGX_ERROR;
    }
#if 0
    err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
    if (err) {
        ngx_log_error(NGX_LOG_ALERT, log, err,
                      "pthread_attr_setstacksize() failed");
        return NGX_ERROR;
    }
#endif
    // 4 创建工作线程
    for (n = 0; n < tp->threads; n++) {
        err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
        if (err) {
            ngx_log_error(NGX_LOG_ALERT, log, err,
                          "pthread_create() failed");
            return NGX_ERROR;
        }
    }
    (void) pthread_attr_destroy(&attr);
    return NGX_OK;
}

销毁线程池

static void
ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
{
    ngx_uint_t           n;
    ngx_thread_task_t    task;
    volatile ngx_uint_t  lock;
    ngx_memzero(&task, sizeof(ngx_thread_task_t));
    task.handler = ngx_thread_pool_exit_handler;
    task.ctx = (void *) &lock;
    for (n = 0; n < tp->threads; n++) {
        lock = 1;
        if (ngx_thread_task_post(tp, &task) != NGX_OK) {
            return;
        }
        while (lock) {
            ngx_sched_yield();
        }
        task.event.active = 0;
    }
    (void) ngx_thread_cond_destroy(&tp->cond, tp->log);
    (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);
}

抛入任务接口

源码如下:

ngx_thread_task_t *
ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
{
    ngx_thread_task_t  *task;
    task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size);
    if (task == NULL) {
        return NULL;
    }
    task->ctx = task + 1;
    return task;
}
ngx_int_t
ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
{
    if (task->event.active) {
        ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
                      "task #%ui already active", task->id);
        return NGX_ERROR;
    }
    if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
        return NGX_ERROR;
    }
    if (tp->waiting >= tp->max_queue) {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
        ngx_log_error(NGX_LOG_ERR, tp->log, 0,
                      "thread pool \"%V\" queue overflow: %i tasks waiting",
                      &tp->name, tp->waiting);
        return NGX_ERROR;
    }
    task->event.active = 1;
    task->id = ngx_thread_pool_task_id++;
    task->next = NULL;
    if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
        return NGX_ERROR;
    }
    *tp->queue.last = task;
    tp->queue.last = &task->next;
    tp->waiting++;
    (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
    ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
                   "task #%ui added to thread pool \"%V\"",
                   task->id, &tp->name);
    return NGX_OK;
}

3 nginx线程池

nginx中是做法是一个进程对应一个线程池。简单的理解为一帧网络数据到达服务器后,nginx服务器开启了多个进程,每个进程都是recv–>parser–>send 将接受和解析全部打成任务抛入到自己对应的线程池中处理。

目录
相关文章
|
2月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
2月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
2月前
|
负载均衡 网络协议 关系型数据库
一口把LVS、Nginx及HAProxy工作原理讲清楚了。(附图)
一口把LVS、Nginx及HAProxy工作原理讲清楚了。(附图)
|
27天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
23小时前
|
Java 编译器 程序员
【多线程】synchronized原理
【多线程】synchronized原理
10 0
|
17天前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
2月前
|
存储 NoSQL Java
线程池的原理与C语言实现
【8月更文挑战第22天】线程池是一种多线程处理框架,通过复用预创建的线程来高效地处理大量短暂或临时任务,提升程序性能。它主要包括三部分:线程管理器、工作队列和线程。线程管理器负责创建与管理线程;工作队列存储待处理任务;线程则执行任务。当提交新任务时,线程管理器将其加入队列,并由空闲线程处理。使用线程池能减少线程创建与销毁的开销,提高响应速度,并能有效控制并发线程数量,避免资源竞争。这里还提供了一个简单的 C 语言实现示例。
|
2月前
|
存储 Java
线程池的底层工作原理是什么?
【8月更文挑战第8天】线程池的底层工作原理是什么?
92 8
|
1月前
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
19 0
|
3月前
|
存储 SQL Java
(七)全面剖析Java并发编程之线程变量副本ThreadLocal原理分析
在之前的文章:彻底理解Java并发编程之Synchronized关键字实现原理剖析中我们曾初次谈到线程安全问题引发的"三要素":多线程、共享资源/临界资源、非原子性操作,简而言之:在同一时刻,多条线程同时对临界资源进行非原子性操作则有可能产生线程安全问题。