一、Nginx中使用nginx_shmtx的场景
1、初始化事件模块ngx_event_module_init
1.创建了一块共享内存
2.在共享内存空间创建一个信号量互斥锁
ngx_event_module_init(ngx_cycle_t *cycle) { ... shm.size = size; ngx_str_set(&shm.name, "nginx_shared_zone"); shm.log = cycle->log; if (ngx_shm_alloc(&shm) != NGX_OK) { return NGX_ERROR; } shared = shm.addr; ngx_accept_mutex_ptr = (ngx_atomic_t *) shared; ngx_accept_mutex.spin = (ngx_uint_t) -1; if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared, cycle->lock_file.data) != NGX_OK) { return NGX_ERROR; } ... }
进入ngx_shm_alloc
,发现共享内存是通过mmap来实现的
ngx_int_t ngx_shm_alloc(ngx_shm_t *shm) { shm->addr = (u_char *) mmap(NULL, shm->size, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0); if (shm->addr == MAP_FAILED) { ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno, "mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size); return NGX_ERROR; } return NGX_OK; }
使得不同进程之间都能通过共享内存访问到这把锁
二、内存共享的信号量互斥锁的结构体
1、ngx_shmtx_sh_t
通过专门的一个结构体来存放互斥锁的共享变量
typedef struct { ngx_atomic_t lock; #if (NGX_HAVE_POSIX_SEM) ngx_atomic_t wait; #endif } ngx_shmtx_sh_t;
lock
:用于标志是否被占用,0表示未被占用。如果被进程占有;该值是对应进程的pid值。
wait
:阻塞等待的进程数量
2、ngx_shmtx_t
这个结构体,lock,wait指向进程共享的变量,其他都是进程私有的。(这也就是为什么ngx_shmtx_sh_t
中lock和wait创建在堆上,而这里是指针。ngx_shmtx_t
其他变量也是创建在堆上)
typedef struct { #if (NGX_HAVE_ATOMIC_OPS) /*原子锁*/ ngx_atomic_t *lock; #if (NGX_HAVE_POSIX_SEM) /*信号量互斥锁*/ ngx_atomic_t *wait; ngx_uint_t semaphore;// sem_t sem;//信号量 #endif #else /*文件锁*/ ngx_fd_t fd; u_char *name; #endif /*自旋锁(多处理器时才会使用,即ngx_ncpu>1)*/ ngx_uint_t spin; } ngx_shmtx_t;
lock
:用于标志是否被占用,0表示未被占用。如果被进程占有;该值是对应进程的pid值。
[但是这里是指针,用于指向ngx_shmtx_sh_t
中lock
的共享内存]
wait
:阻塞的进程数量[同样是指向ngx_shmtx_sh_t
中wait
的共享内存]
semaphore
:信号量创建成功的标志,1成功,0失败
sem
:信号量
spin
:自旋锁的持续次数为(while(spin>>=1)
)(多处理器时才会使用,即ngx_ncpu>1
)
暂时忽略文件锁,因为由于#else #endif
中nginx没有把它编译进去
三、shmtx
以下是shmtx.c中对共享互斥锁的一些操作,如锁的创建、销毁、加锁、解锁等。
ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name); void ngx_shmtx_destroy(ngx_shmtx_t *mtx); ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx); void ngx_shmtx_lock(ngx_shmtx_t *mtx); void ngx_shmtx_unlock(ngx_shmtx_t *mtx); ngx_uint_t ngx_shmtx_force_unlock(ngx_shmtx_t *mtx, ngx_pid_t pid);
1、ngx_shmtx_create
//在ngx_event_module_init中用于创建ngx_accept_mutex //在ngx_init_zone_pool中用于创建(slab pool)sp->mutex,用于处理内存缓存 //下面代码中使用(mtx->spin == (ngx_uint_t) -1)来判断是ngx_accept_mutex,否则就是sp->mutex ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) { mtx->lock = &addr->lock;//(指向共享内存) //在ngx_event_module_init有初始化mtx->spin = (ngx_uint_t) -1,对于ngx_accept_mutex来说,这边就应该要返回了。 if (mtx->spin == (ngx_uint_t) -1) { return NGX_OK; } //只有ngx_init_zone_pool中在执行ngx_shmtx_create,才会执行下面部分 mtx->spin = 2048; #if (NGX_HAVE_POSIX_SEM) mtx->wait = &addr->wait;//(指向共享内存) if (sem_init(&mtx->sem, 1, 0) == -1) {//初始化信号量(__pshared=1表示进程之间共享,__value=0为初始值) ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed"); } else { mtx->semaphore = 1;//信号量创建成功的标志 } #endif return NGX_OK; }
2、ngx_shmtx_destroy
//销毁锁(删除信号量) void ngx_shmtx_destroy(ngx_shmtx_t *mtx) { #if (NGX_HAVE_POSIX_SEM) if (mtx->semaphore) {//如果存在信号量 if (sem_destroy(&mtx->sem) == -1) {//删除信号量 ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_destroy() failed"); } } #endif }
3、ngx_shmtx_trylock
//尝试加锁,(加锁成功返回1,失败返回0) ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) { //如果没有进程占有锁(mtx->lock=0),那么设置mtx->lock为当前进程的pid(ngx_pid),表示该进程持有锁 return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)); }
4、ngx_shmtx_lock
//加锁 void ngx_shmtx_lock(ngx_shmtx_t *mtx) { ngx_uint_t i, n; ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock"); for ( ;; ) { //成功获得锁,直接返回,让当前进程继续执行,而不是阻塞 if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } //如果cpu核数大于1,就会执行自旋锁 if (ngx_ncpu > 1) { //自旋期间不断去访问mtx->lock //如果加锁成功就返回,如果超过一定次数,就会停止自旋 for (n = 1; n < mtx->spin; n <<= 1) { for (i = 0; i < n; i++) { ngx_cpu_pause(); } if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } } } #if (NGX_HAVE_POSIX_SEM) //信号量还存在 但 还未加锁成功 if (mtx->semaphore) { (void) ngx_atomic_fetch_add(mtx->wait, 1);//阻塞的进程数量+1 //再次尝试去获取锁 if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { (void) ngx_atomic_fetch_add(mtx->wait, -1);//如果成功获取锁,阻塞的进程数量-1 return; } ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wait %uA", *mtx->wait); //如果信号量值小于等于0,原地等待阻塞,直到信号量mtx->sem的值比0大为止。如果大于0,那么-1,并且往下运行 while (sem_wait(&mtx->sem) == -1) { ngx_err_t err; err = ngx_errno; if (err != NGX_EINTR) {//如果发生异常,就会break ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err, "sem_wait() failed while waiting on shmtx"); break; } } ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx awoke"); continue; } #endif //信号量不存在,直接yield让出 ngx_sched_yield(); } }
5、ngx_shmtx_unlock
//解锁 void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { if (mtx->spin != (ngx_uint_t) -1) {//如果是sp->mutex ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock"); } if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {//解锁(通过CAS方式,重新将mtx->lock置为0) ngx_shmtx_wakeup(mtx); } }
6、ngx_shmtx_force_unlock
//强制解锁,和上面的是一样的,只是日志不同 ngx_uint_t ngx_shmtx_force_unlock(ngx_shmtx_t *mtx, ngx_pid_t pid) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx forced unlock"); if (ngx_atomic_cmp_set(mtx->lock, pid, 0)) { ngx_shmtx_wakeup(mtx); return 1; } return 0; }
7、ngx_shmtx_wakeup
//唤醒进程 static void ngx_shmtx_wakeup(ngx_shmtx_t *mtx) { #if (NGX_HAVE_POSIX_SEM) ngx_atomic_uint_t wait; if (!mtx->semaphore) {//如果没有创建信号量,直接返回 return; } for ( ;; ) { wait = *mtx->wait; if ((ngx_atomic_int_t) wait <= 0) {//如果没有阻塞等待的进程,返回 return; } if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) {//如果有阻塞的进程,并且wait-1 break; } } ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wake %uA", wait); if (sem_post(&mtx->sem) == -1) {//给信号量+1,用于唤醒进程 ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_post() failed while wake shmtx"); } #endif }