仅供个人学习整理,很多理解来自网络。
1:什么是锁,为什么需要锁?
当多个进程/线程需要共同操作一块共有资源时,如果不对这块资源加以保护,就会出现问题。
我的理解是,对共有资源加以保护,控制多个使用者对这块资源的访问机制,叫做锁。
2:临界资源的访问控制手段。
1:如过临界资源使用简单,可以相关原子操作函数。
2:加锁的方式: 自旋锁,互斥锁(条件变量 控制流程)
3:其他:读写锁,分布式锁等
3:初步了解nginx锁
1:nginx中的自旋锁 ngx_spinlock.c
void ngx_spinlock(ngx_atomic_t *lock, ngx_atomic_int_t value, ngx_uint_t spin) { #if (NGX_HAVE_ATOMIC_OPS) ngx_uint_t i, n; for ( ;; ) { //直接加锁成功 if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) { return; } if (ngx_ncpu > 1) { for (n = 1; n < spin; n <<= 1) { //加个遍历 控制cpu探测锁释放的时间 for (i = 0; i < n; i++) { ngx_cpu_pause(); } //已经成功获得该锁,给加锁 if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) { return; } } } ngx_sched_yield(); } #else #if (NGX_THREADS) #error ngx_spinlock() or ngx_atomic_cmp_set() are not defined ! #endif #endif } void ngx_spinlock(ngx_atomic_t *lock, ngx_atomic_int_t value, ngx_uint_t spin); #define ngx_trylock(lock) (*(lock) == 0 && ngx_atomic_cmp_set(lock, 0, 1)) #define ngx_unlock(lock) *(lock) = 0 //spinlock的适用 加锁,然后操作,最后解锁 ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); *ngx_thread_pool_done.last = task; ngx_thread_pool_done.last = &task->next; ngx_memory_barrier(); ngx_unlock(&ngx_thread_pool_done_lock);
2:nginx中通过原子变量,信号量,文件fd加锁的方式实现一套互斥锁机制
nginx中互斥锁的文件:ngx_shmtx.h ngx_shmtx.c
1:锁的结构定义:
typedef struct { ngx_atomic_t lock; #if (NGX_HAVE_POSIX_SEM) ngx_atomic_t wait; #endif } ngx_shmtx_sh_t; //相关锁的定义 //信号量和原子变量实现互斥锁 typedef struct { #if (NGX_HAVE_ATOMIC_OPS) ngx_atomic_t *lock; #if (NGX_HAVE_POSIX_SEM) ngx_atomic_t *wait; ngx_uint_t semaphore; sem_t sem; #endif #else ngx_fd_t fd; u_char *name; #endif ngx_uint_t spin; } ngx_shmtx_t;
其实经过拆分细化,可以看出,这里想要的定义是这样的:
//原子锁 +信号 { ngx_atomic_t *lock; ngx_atomic_t *wait; ngx_uint_t semaphore; sem_t sem; ngx_uint_t spin; } //原子锁 { ngx_atomic_t *lock; ngx_uint_t spin; //标识信号量还是原子的方式而已 } //文件锁 { ngx_fd_t fd; u_char *name; ngx_uint_t spin; }
2: 分析nginx互斥锁的实现逻辑
1: 原子变量的操作,实现自旋锁,
{ ngx_atomic_t *lock; ngx_uint_t spin; //自旋锁访问cpu控制 } //如果满足原子操作不满足信号量的环境,这就是一个自旋锁 ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) { mtx->lock = &addr->lock; //没明白这个mtx->spin 为-1的场景,返回成功貌似没啥意义, if (mtx->spin == (ngx_uint_t) -1) { return NGX_OK; } mtx->spin = 2048; ... } //加锁函数 void ngx_shmtx_lock(ngx_shmtx_t *mtx) { ngx_uint_t i, n; ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock"); for ( ;; ) { //其实就是给这个原子变量赋值 参考spinlock if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } //通过定义的spin大小对cpu进行轮询,探测是否能得到锁 if (ngx_ncpu > 1) { for (n = 1; n < mtx->spin; n <<= 1) { for (i = 0; i < n; i++) { ngx_cpu_pause(); } if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } } } //强制cpu让出 ngx_sched_yield(); } } //解锁函数: void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { if (mtx->spin != (ngx_uint_t) -1) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock"); } //这里的解锁 只需要给原子变量赋值为0 就可以 if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) { ngx_shmtx_wakeup(mtx); } }
2:信号量配合原子变量实现的互斥锁。
1: 相关信号量的接口:
#include<semaphore.h> int sem_init(sem_t *sem, int shared, unsigned int value); int sem_destroy(sem_t *sem); int sem_wait(sem_t *sem); int sem_post(sem_t *sem); //sem_init用于初始化,sem参数指向应用程序分配的sem_t变量,shared如果为0那么初始化的信号量是在同一个进程的各个线程间共享的,否则是在进程共享的,value是分配给信号量的初始值。 //sem_destroy则用于销毁信号量。 //sem_wait用于给信号量减1操作,当信号量小于等于0的时候阻塞,直到信号量大于0。 //sem_post则是用于给信号量做加1操作。
2: 相关结构及操作:
数据结构和构造与析构
//原子锁 +信号 typedef struct { ngx_atomic_t lock; ngx_atomic_t wait; } ngx_shmtx_sh_t; //初始化时用 { ngx_atomic_t *lock; //原子变量标识锁的获取和释放 ngx_atomic_t *wait; //表示有几个线程在公用这个锁 ngx_uint_t semaphore; //标记信号量启动成功 sem_t sem; //信号量 ngx_uint_t spin; } //初始化锁结构以及相关的信号量 ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) { mtx->lock = &addr->lock; if (mtx->spin == (ngx_uint_t) -1) { return NGX_OK; } //cpu轮询遍历设置 mtx->spin = 2048; //#if (NGX_HAVE_POSIX_SEM) mtx->wait = &addr->wait; //标记已经共享的线程个数 if (sem_init(&mtx->sem, 1, 0) == -1) { //初始化信号量,标识在进程间共享 ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,"sem_init() failed"); } else { mtx->semaphore = 1; //信号量创建成功的标志 } return NGX_OK; } void ngx_shmtx_destroy(ngx_shmtx_t *mtx) { //资源的释放,信号量的销毁 if (mtx->semaphore) { if (sem_destroy(&mtx->sem) == -1) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_destroy() failed"); } } }
加锁和解锁操作:
//互斥锁加锁操作 要考虑多线程,加锁成功的场景,不成功的时候,等待信号触发 void ngx_shmtx_lock(ngx_shmtx_t *mtx) { ngx_uint_t i, n; ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock"); for ( ;; ) { //先检查是否已经加锁,如果为0,表示没有加锁,进行加锁,第一个进入就会直接返回,加锁成功 //如果不为0,说明这是一个已经加锁的锁,由下文控制等待释放,获取锁,等待加锁成功 if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } if (ngx_ncpu > 1) { //这是一个自旋锁的逻辑 //先用自旋的逻辑判断,如果在一定时间内不满足,则用信号的方式,如果成功,直接返回 for (n = 1; n < mtx->spin; n <<= 1) { //设置cpu等待 for (i = 0; i < n; i++) { ngx_cpu_pause(); } //锁已经释放并且成功加锁,就返回了 if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } } } #if (NGX_HAVE_POSIX_SEM) //真正互斥锁信号处理逻辑 if (mtx->semaphore) { (void) ngx_atomic_fetch_add(mtx->wait, 1); //等待锁的个数+1 与下文的ngx_shmtx_wakeup配合使用 //检查是否可以获得锁,可以获得,则加锁后返回 if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { (void) ngx_atomic_fetch_add(mtx->wait, -1); return; } ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wait %uA", *mtx->wait); //等待信号的唤醒,,如果有唤醒,继续执行,唤醒多个的话,由上文的原子操作保证流程 //相当于P操作 信号量等于0,则阻塞 while (sem_wait(&mtx->sem) == -1) { ngx_err_t err; err = ngx_errno; if (err != NGX_EINTR) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err, "sem_wait() failed while waiting on shmtx"); break; } } ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx awoke"); continue; } #endif ngx_sched_yield(); //强制让出cpu } } //释放锁的逻辑: 给锁的标识原子变量lock赋值0 //信号的操作 void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { if (mtx->spin != (ngx_uint_t) -1) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock"); } if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) { ngx_shmtx_wakeup(mtx); } } //判断是否还有等待锁的进程,如果有,还需要发送信号 static void ngx_shmtx_wakeup(ngx_shmtx_t *mtx) { #if (NGX_HAVE_POSIX_SEM) ngx_atomic_uint_t wait; if (!mtx->semaphore) { return; } for ( ;; ) { wait = *mtx->wait; //如果没有等待的线程,说明不用唤醒了,直接运行结束 if ((ngx_atomic_int_t) wait <= 0) { return; } //等待信号的线程数减少一个, if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) { break; } } ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wake %uA", wait); if (sem_post(&mtx->sem) == -1) { //释放信号量 给信号量加1,相当于V操作 ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,"sem_post() failed while wake shmtx"); } #endif }
3:当不满足原子变量的环境场景下,使用文件锁实现互斥锁。
//文件锁 { ngx_fd_t fd; //文件fd的标识 u_char *name; //文件名的标识 ngx_uint_t spin; } //文件锁的创建 ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) { //对入参作相关的校验,并重新初始化 if (mtx->name) { if (ngx_strcmp(name, mtx->name) == 0) { mtx->name = name; return NGX_OK; } ngx_shmtx_destroy(mtx); } //调用底层open 可读可写可创建的方式打开该文件 mtx->fd = ngx_open_file(name, NGX_FILE_RDWR, NGX_FILE_CREATE_OR_OPEN, NGX_FILE_DEFAULT_ACCESS); //如果文件打开失败 if (mtx->fd == NGX_INVALID_FILE) { ngx_log_error(NGX_LOG_EMERG, ngx_cycle->log, ngx_errno, ngx_open_file_n " \"%s\" failed", name); return NGX_ERROR; } //unlink文件,还没有调用close,所以这个文件还可以用 ==》 //unlink函数删除文件,并且减少一个链接数。如果链接数达到0并且没有任何进程打开该文件,该文件内容才被真正删除 //如果在unlilnk之前没有close,那么依旧可以访问文件内容。所以只是unlink了文件,文件的链接数为0,但是进程与文件还有访问关系,所以文件并没有被删除 //在调用close时,内核会检查打开该文件的进程数,如果此数为0,进一步检查文件的链接数,如果这个数也为0,那么就删除文件内容。 if (ngx_delete_file(name) == NGX_FILE_ERROR) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, ngx_delete_file_n " \"%s\" failed", name); } mtx->name = name; return NGX_OK; } //这里才会调用文件的close,配合上面的unlink,检测真正删除文件 void ngx_shmtx_destroy(ngx_shmtx_t *mtx) { if (ngx_close_file(mtx->fd) == NGX_FILE_ERROR) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, ngx_close_file_n " \"%s\" failed", mtx->name); } }
文件锁的加锁,以及解锁操作:struct flock 结构和 fcntl()
这里涉及到struct flock 结构和 fcntl()函数
struct flock { short l_type;/*F_RDLCK, F_WRLCK, or F_UNLCK */ off_t l_start;/*offset in bytes, relative to l_whence */ short l_whence;/*SEEK_SET, SEEK_CUR, or SEEK_END */ off_t l_len;/*length, in bytes; 0 means lock to EOF */ pid_t l_pid;/*returned with F_GETLK */ }; //第一个成员是加锁的类型:只读锁,读写锁,或是解锁 //l_start和l_whence用来指明加锁部分的开始位置, //l_len是加锁的长度, //l_pid是加锁进程的进程id //使用 fcntl 操作文件描述词的一些特性 #include <unistd.h> #include <fcntl.h> int fcntl(int fd, int cmd); int fcntl(int fd, int cmd, long arg); int fcntl(int fd, int cmd, struct flock * lock); //cmd有如下: //F_DUPFD 用来查找大于或等于参数arg 的最小且仍未使用的文件描述词, 并且复制参数fd 的文件描述词. 执行成功则返回新复制的文件描述词. 请参考dup2(). //F_GETFD 取得close-on-exec 旗标. 若此旗标的FD_CLOEXEC 位为0, 代表在调用exec()相关函数时文件将不会关闭. //F_SETFD 设置close-on-exec 旗标. 该旗标以参数arg 的FD_CLOEXEC 位决定. //F_GETFL 取得文件描述词状态旗标, 此旗标为open()的参数flags. //F_SETFL 设置文件描述词状态旗标, 参数arg 为新旗标, 但只允许O_APPEND、O_NONBLOCK 和O_ASYNC 位的改变, 其他位的改变将不受影响. //F_GETLK 取得文件锁定的状态. //F_SETLK 设置文件锁定的状态. 此时flcok 结构的l_type 值必须是F_RDLCK、F_WRLCK 或F_UNLCK. 如果无法建立锁定, 则返回-1, 错误代码为EACCES 或EAGAIN. //F_SETLKW 同F_SETLK 作用相同, 但是无法建立锁定时, 此调用会一直等到锁定动作成功为止. 若在等待锁定的过程中被信号中断时, 会立即返回-1, 错误代码为EINTR.
//通过fcntl实现文件锁的相关封装,立即返回 ngx_err_t ngx_trylock_fd(ngx_fd_t fd) { struct flock fl; ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET; //SEEK_SET 文件的开头 SEEK_CUR: 当前位置 SEEK_END: 文件结尾 //默认文件描述符是阻塞的,不成功就会一直等待, //F_SETLK 设置文件锁定的状态. 如果无法建立锁定, 则返回-1, 错误代码为EACCES 或EAGAIN. //此时flcok 结构的l_type 值必须是F_RDLCK、F_WRLCK 或F_UNLCK. if (fcntl(fd, F_SETLK, &fl) == -1) { return ngx_errno; } return 0; } //给文件fd加锁,直到成功 ngx_err_t ngx_lock_fd(ngx_fd_t fd) { struct flock fl; ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET; //F_SETLKW 同F_SETLK 作用相同, 但是无法建立锁定时, 此调用会一直等到锁定动作成功为止. 若在等待锁定的过程中被信号中断时, 会立即返回-1, 错误代码为EINTR. if (fcntl(fd, F_SETLKW, &fl) == -1) { return ngx_errno; } return 0; } ngx_err_t ngx_unlock_fd(ngx_fd_t fd) { struct flock fl; ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_UNLCK; fl.l_whence = SEEK_SET; //设置解锁 if (fcntl(fd, F_SETLK, &fl) == -1) { return ngx_errno; } return 0; }
4:nginx accept锁
nginx是一个多进程服务器,当多个进程同时监听一个端口时,如果有一个外部连接进来,就会触发多个进程共同唤醒,但是实际处理只能有一个进程正常处理accept事件,这就是所谓的惊群。
其实在Linux2.6版本以后,内核内核已经解决了accept()函数的“惊群”问题,大概的处理方式就是,当内核接收到一个客户连接后,只会唤醒等待队列上的第一个进程或线程。所以,如果服务器采用accept阻塞调用方式,在最新的Linux系统上,已经没有“惊群”的问题了。
nginx处理惊群问题:
我们先大概梳理一下 Nginx 的网络架构,几个关键步骤为:
- Nginx 主进程解析配置文件,根据 listen 指令,将监听套接字初始化到全局变量 ngx_cycle 的 listening 数组之中。此时,监听套接字的创建、绑定工作早已完成。
- Nginx 主进程 fork 出多个子进程。
- 每个子进程在 ngx_worker_process_init 方法里依次调用各个 Nginx 模块的 init_process 钩子,其中当然也包括 NGX_EVENT_MODULE 类型的 ngx_event_core_module 模块,其 init_process 钩子为 ngx_event_process_init。
- ngx_event_process_init 函数会初始化 Nginx 内部的连接池,并把 ngx_cycle 里的监听套接字数组通过连接池来获得相应的表示连接的 ngx_connection_t 数据结构,这里关于 Nginx 的连接池先略过。我们主要看 ngx_event_process_init 函数所做的另一个工作:如果在配置文件里没有开启accept_mutex锁,就通过 ngx_add_event 将所有的监听套接字添加到 epoll 中。
- 每一个 Nginx 子进程在执行完 ngx_worker_process_init 后,会在一个死循环中执行 ngx_process_events_and_timers,这就进入到时间处理的核心逻辑了。
- 在 ngx_process_events_and_timers 中,如果在配置文件里开启了 accept_mutex 锁,子进程就会去获取 accept_mutex 锁。如果获取成功,则通过 ngx_enable_accept_events 将监听套接字添加到 epoll 中,否则,不会将监听套接字添加到 epoll 中,甚至有可能会调用 ngx_disable_accept_events 将监听套接字从 epoll 中删除(如果在之前的连接中,本worker子进程已经获得过accept_mutex锁)。
- ngx_process_events_and_timers 继续调用 ngx_process_events,在这个函数里面阻塞调用 epoll_wait。
如果配置文件中没有开启 accept_mutex,则所有的监听套接字不管三七二十一,都加入到 epoll中,这样当一个新的连接来到时,所有的 worker 子进程都会惊醒。
如果配置文件中开启了 accept_mutex,则只有一个子进程会将监听套接字添加到 epoll 中,这样当一个新的连接来到时,当然就只有一个 worker 子进程会被唤醒了。
源码分析:
网络事件入口,只有在accept的时候加入,开始监听这一个。
//每个worker进程都会死循环执行的时间处理循环函数 void ngx_process_events_and_timers(ngx_cycle_t *cycle) { .... //如果定义了accept锁 if (ngx_use_accept_mutex) { //ngx_accept_disabled = nginx单进程的所有连接总数 / 8 -空闲连接数量,当ngx_accept_disabled大于0时,不会去尝试获取accept_mutex锁,ngx_accept_disable越大,于是让出的机会就越多,这样其它进程获取锁的机会也就越大。不 //ngx_accept_disabled 为正数时,触发负载均衡,不再获取accept锁,但是会-1,说明运行时间负载降低 if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { //尝试获取accept锁 这个是重点 //没有获取到 直接返回,负责加入一个NGX_READ_EVENT事件 if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } //获取成功 设置flag if (ngx_accept_mutex_held) { // 如果进程获得了锁,将添加一个 NGX_POST_EVENTS 标志。 // 这个标志的作用是将所有产生的事件放入一个队列中,等释放后,在慢慢来处理事件。 // 因为,处理时间可能会很耗时,如果不先施放锁再处理的话,该进程就长时间霸占了锁, // 导致其他进程无法获取锁,这样accept的效率就低了。 flags |= NGX_POST_EVENTS; } else {//设置失败的场景 if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { //没有获得所得进程,当然不需要NGX_POST_EVENTS标志。 //但需要设置延时多长时间,再去争抢锁。 timer = ngx_accept_mutex_delay; } } } } delta = ngx_current_msec; //所有事件处理的入口 :epoll要开始wait事件, //ngx_process_events的具体实现是对应到epoll模块中的ngx_epoll_process_events函数 //只是post到队列中 (void) ngx_process_events(cycle, timer, flags); //统计本次wait事件的耗时 delta = ngx_current_msec - delta; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"timer delta: %M", delta); // ngx_posted_accept_events是一个事件队列,暂存epoll从监听套接口wait到的accept事件。 //前文提到的NGX_POST_EVENTS标志被使用后,会将所有的accept事件暂存到这个队列 ngx_event_process_posted(cycle, &ngx_posted_accept_events); //accept 延迟事件队列 处理对应的handler 其实就是ngx_event_accept函数 //所有accept事件处理完之后,如果持有锁的话,就释放掉 if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } if (delta) { ngx_event_expire_timers(); } //处理普通事件(连接上获得的读写事件), 因为每个事件都有自己的handler方法 ngx_event_process_posted(cycle, &ngx_posted_events);//普通延迟事件队列 处理对应的handler } //尝试获取accept锁 ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { //尝试加锁 if (ngx_shmtx_trylock(&ngx_accept_mutex)) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"accept mutex locked"); if (ngx_accept_mutex_held && ngx_accept_events == 0) { return NGX_OK; } //加入事件中 if (ngx_enable_accept_events(cycle) == NGX_ERROR) { ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; } ngx_accept_events = 0; ngx_accept_mutex_held = 1; return NGX_OK; } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held); //获取锁失败 将当前子进程中已经处理结束的相关事件移除 if (ngx_accept_mutex_held) { if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) { return NGX_ERROR; } ngx_accept_mutex_held = 0; } return NGX_OK; }
了解到一些nginx锁相关的知识,简单做一下汇总,遗留的部分不清晰的源码问题,后续更新吧