Nginx源码阅读:nginx_shmtx共享互斥锁(进程锁)

简介: Nginx源码阅读:nginx_shmtx共享互斥锁(进程锁)

一、Nginx中使用nginx_shmtx的场景

1、初始化事件模块ngx_event_module_init

1.创建了一块共享内存

2.在共享内存空间创建一个信号量互斥锁

ngx_event_module_init(ngx_cycle_t *cycle)
{ 
  ...
    shm.size = size;
    ngx_str_set(&shm.name, "nginx_shared_zone");
    shm.log = cycle->log;
    if (ngx_shm_alloc(&shm) != NGX_OK) {
        return NGX_ERROR;
    }
    shared = shm.addr;
    ngx_accept_mutex_ptr = (ngx_atomic_t *) shared;
    ngx_accept_mutex.spin = (ngx_uint_t) -1;
    if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared,
                         cycle->lock_file.data)
        != NGX_OK)
    {
        return NGX_ERROR;
    }
    ...
}

进入ngx_shm_alloc,发现共享内存是通过mmap来实现的

ngx_int_t
ngx_shm_alloc(ngx_shm_t *shm)
{
    shm->addr = (u_char *) mmap(NULL, shm->size,
                                PROT_READ|PROT_WRITE,
                                MAP_ANON|MAP_SHARED, -1, 0);
    if (shm->addr == MAP_FAILED) {
        ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno,
                      "mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size);
        return NGX_ERROR;
    }
    return NGX_OK;
}

使得不同进程之间都能通过共享内存访问到这把锁

二、内存共享的信号量互斥锁的结构体

1、ngx_shmtx_sh_t

通过专门的一个结构体来存放互斥锁的共享变量

typedef struct {
    ngx_atomic_t   lock;
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_t   wait;
#endif
} ngx_shmtx_sh_t;

lock:用于标志是否被占用,0表示未被占用。如果被进程占有;该值是对应进程的pid值。

wait:阻塞等待的进程数量

2、ngx_shmtx_t

这个结构体,lock,wait指向进程共享的变量,其他都是进程私有的。(这也就是为什么ngx_shmtx_sh_t中lock和wait创建在堆上,而这里是指针。ngx_shmtx_t其他变量也是创建在堆上)

typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)       /*原子锁*/
    ngx_atomic_t  *lock;   
#if (NGX_HAVE_POSIX_SEM)        /*信号量互斥锁*/
    ngx_atomic_t  *wait;
    ngx_uint_t     semaphore;//
    sem_t          sem;//信号量
#endif
#else                           /*文件锁*/
    ngx_fd_t       fd;
    u_char        *name;
#endif                          /*自旋锁(多处理器时才会使用,即ngx_ncpu>1)*/
    ngx_uint_t     spin;
} ngx_shmtx_t;

lock:用于标志是否被占用,0表示未被占用。如果被进程占有;该值是对应进程的pid值。

[但是这里是指针,用于指向ngx_shmtx_sh_tlock的共享内存]

wait:阻塞的进程数量[同样是指向ngx_shmtx_sh_twait的共享内存]

semaphore:信号量创建成功的标志,1成功,0失败

sem:信号量

spin:自旋锁的持续次数为(while(spin>>=1))(多处理器时才会使用,即ngx_ncpu>1

暂时忽略文件锁,因为由于#else #endif中nginx没有把它编译进去

三、shmtx

以下是shmtx.c中对共享互斥锁的一些操作,如锁的创建、销毁、加锁、解锁等。

ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr,
    u_char *name);
void ngx_shmtx_destroy(ngx_shmtx_t *mtx);
ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx);
void ngx_shmtx_lock(ngx_shmtx_t *mtx);
void ngx_shmtx_unlock(ngx_shmtx_t *mtx);
ngx_uint_t ngx_shmtx_force_unlock(ngx_shmtx_t *mtx, ngx_pid_t pid);

1、ngx_shmtx_create

//在ngx_event_module_init中用于创建ngx_accept_mutex
//在ngx_init_zone_pool中用于创建(slab pool)sp->mutex,用于处理内存缓存
//下面代码中使用(mtx->spin == (ngx_uint_t) -1)来判断是ngx_accept_mutex,否则就是sp->mutex
ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    mtx->lock = &addr->lock;//(指向共享内存)
    //在ngx_event_module_init有初始化mtx->spin = (ngx_uint_t) -1,对于ngx_accept_mutex来说,这边就应该要返回了。
    if (mtx->spin == (ngx_uint_t) -1) {
        return NGX_OK;
    }
    //只有ngx_init_zone_pool中在执行ngx_shmtx_create,才会执行下面部分
    mtx->spin = 2048;
#if (NGX_HAVE_POSIX_SEM)
    mtx->wait = &addr->wait;//(指向共享内存)
    if (sem_init(&mtx->sem, 1, 0) == -1) {//初始化信号量(__pshared=1表示进程之间共享,__value=0为初始值)
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                      "sem_init() failed");
    } else {
        mtx->semaphore = 1;//信号量创建成功的标志
    }
#endif
    return NGX_OK;
}

2、ngx_shmtx_destroy

//销毁锁(删除信号量)
void
ngx_shmtx_destroy(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
    if (mtx->semaphore) {//如果存在信号量
        if (sem_destroy(&mtx->sem) == -1) {//删除信号量
            ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                          "sem_destroy() failed");
        }
    }
#endif
}

3、ngx_shmtx_trylock

//尝试加锁,(加锁成功返回1,失败返回0)
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
    //如果没有进程占有锁(mtx->lock=0),那么设置mtx->lock为当前进程的pid(ngx_pid),表示该进程持有锁
    return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}

4、ngx_shmtx_lock

//加锁
void
ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
    ngx_uint_t         i, n;
    ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock");
    for ( ;; ) {
        //成功获得锁,直接返回,让当前进程继续执行,而不是阻塞
        if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
            return;
        }
        //如果cpu核数大于1,就会执行自旋锁
        if (ngx_ncpu > 1) {
            //自旋期间不断去访问mtx->lock
            //如果加锁成功就返回,如果超过一定次数,就会停止自旋
            for (n = 1; n < mtx->spin; n <<= 1) {
                for (i = 0; i < n; i++) {
                    ngx_cpu_pause();
                }
                if (*mtx->lock == 0
                    && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
                {
                    return;
                }
            }
        }
#if (NGX_HAVE_POSIX_SEM)
        //信号量还存在 但 还未加锁成功
        if (mtx->semaphore) {
            (void) ngx_atomic_fetch_add(mtx->wait, 1);//阻塞的进程数量+1
            //再次尝试去获取锁
            if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
                (void) ngx_atomic_fetch_add(mtx->wait, -1);//如果成功获取锁,阻塞的进程数量-1
                return;
            }
            ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
                           "shmtx wait %uA", *mtx->wait);
            //如果信号量值小于等于0,原地等待阻塞,直到信号量mtx->sem的值比0大为止。如果大于0,那么-1,并且往下运行
            while (sem_wait(&mtx->sem) == -1) {
                ngx_err_t  err;
                err = ngx_errno;
                if (err != NGX_EINTR) {//如果发生异常,就会break
                    ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err,
                                  "sem_wait() failed while waiting on shmtx");
                    break;
                }
            }
            ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
                           "shmtx awoke");
            continue;
        }
#endif
        //信号量不存在,直接yield让出
        ngx_sched_yield();
    }
}

5、ngx_shmtx_unlock

//解锁
void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
    if (mtx->spin != (ngx_uint_t) -1) {//如果是sp->mutex
        ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock");
    }
    if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {//解锁(通过CAS方式,重新将mtx->lock置为0)
        ngx_shmtx_wakeup(mtx);
    }
}

6、ngx_shmtx_force_unlock

//强制解锁,和上面的是一样的,只是日志不同
ngx_uint_t
ngx_shmtx_force_unlock(ngx_shmtx_t *mtx, ngx_pid_t pid)
{
    ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
                   "shmtx forced unlock");
    if (ngx_atomic_cmp_set(mtx->lock, pid, 0)) {
        ngx_shmtx_wakeup(mtx);
        return 1;
    }
    return 0;
}

7、ngx_shmtx_wakeup

//唤醒进程
static void
ngx_shmtx_wakeup(ngx_shmtx_t *mtx)
{
#if (NGX_HAVE_POSIX_SEM)
    ngx_atomic_uint_t  wait;
    if (!mtx->semaphore) {//如果没有创建信号量,直接返回
        return;
    }
    for ( ;; ) {
        wait = *mtx->wait;
        if ((ngx_atomic_int_t) wait <= 0) {//如果没有阻塞等待的进程,返回
            return;
        }
        if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) {//如果有阻塞的进程,并且wait-1
            break;
        }
    }
    ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
                   "shmtx wake %uA", wait);
    if (sem_post(&mtx->sem) == -1) {//给信号量+1,用于唤醒进程
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                      "sem_post() failed while wake shmtx");
    }
#endif
}
相关文章
|
4月前
|
Linux
Linux源码阅读笔记10-进程NICE案例分析2
Linux源码阅读笔记10-进程NICE案例分析2
|
4月前
|
Linux
Linux源码阅读笔记09-进程NICE案例分析1
Linux源码阅读笔记09-进程NICE案例分析1
|
4月前
|
数据采集 存储 安全
如何确保Python Queue的线程和进程安全性:使用锁的技巧
本文探讨了在Python爬虫技术中使用锁来保障Queue(队列)的线程和进程安全性。通过分析`queue.Queue`及`multiprocessing.Queue`的基本线程与进程安全特性,文章指出在特定场景下使用锁的重要性。文中还提供了一个综合示例,该示例利用亿牛云爬虫代理服务、多线程技术和锁机制,实现了高效且安全的网页数据采集流程。示例涵盖了代理IP、User-Agent和Cookie的设置,以及如何使用BeautifulSoup解析HTML内容并将其保存为文档。通过这种方式,不仅提高了数据采集效率,还有效避免了并发环境下的数据竞争问题。
如何确保Python Queue的线程和进程安全性:使用锁的技巧
|
3月前
|
负载均衡 网络协议 应用服务中间件
web群集--rocky9.2源码部署nginx1.24的详细过程
Nginx 是一款由 Igor Sysoev 开发的开源高性能 HTTP 服务器和反向代理服务器,自 2004 年发布以来,以其高效、稳定和灵活的特点迅速成为许多网站和应用的首选。本文详细介绍了 Nginx 的核心概念、工作原理及常见使用场景,涵盖高并发处理、反向代理、负载均衡、低内存占用等特点,并提供了安装配置教程,适合开发者参考学习。
|
4月前
|
Linux
Linux源码阅读笔记13-进程通信组件中
Linux源码阅读笔记13-进程通信组件中
|
4月前
|
消息中间件 安全 Java
Linux源码阅读笔记13-进程通信组件上
Linux源码阅读笔记13-进程通信组件上
|
4月前
|
存储 Linux API
Linux源码阅读笔记08-进程调度API系统调用案例分析
Linux源码阅读笔记08-进程调度API系统调用案例分析
|
4月前
|
Linux API
Linux源码阅读笔记07-进程管理4大常用API函数
Linux源码阅读笔记07-进程管理4大常用API函数
|
4月前
|
数据采集 Java Python
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
|
4月前
|
消息中间件 存储 安全
python多进程并发编程之互斥锁与进程间的通信
python多进程并发编程之互斥锁与进程间的通信