首发公众号:Rand_cs
锁LOCK锁
锁,大家应该很熟悉了,用来避免竞争,实现同步。本文以 $xv6$ 为例来讲解锁本身是怎么实现的,废话不多说先来看一些需要了解的概念:
一些概念
公共资源:顾名思义就是被多个任务共享的资源,可以是公共内存,也可以是公共文件等等
临界区: 要访问使用公共资源,肯定得通过一些代码指令去访问,这些代码指令就是临界区
并发:单个 CPU 上交替处理多个任务,宏观上看就像是同时进行的一样,但微观上看任意时刻还是只有一个任务在进行。
并行:多个处理器同时处理多个任务,能够做到真正意义上的多个任务同时进行。
互斥:也称为排他,任何时候公共资源只允许最多一个任务独享,不允许多个任务同时执行临界区的代码访问公共资源。
竞争条件:竞争条件指的是多个任务以竞争的形式并行访问公共资源,公共资源的最终状态取决于这些任务的临界区代码的精确执行时序。
显然竞争条件并不是我们想要的,虽然一些竞争条件出现的概率很小,但根据墨菲定律,会出错的总会出错,加之计算机的运行频率,就算出错的概率再小,在某天某时某刻那也是有可能发生的。
所以对于进入临界区访问公共资源我们要避免竞争条件,保证公共资源的互斥排他性,一般有两种大的解决方案来实现互斥:
- 忙等待:没进入临界区时一直循环,占用 CPU 资源
- 休眠等待:没进入临界区时一直休眠,不占用 CPU,CPU 利用率较高,但有进程上下文切换的开销
那如何知道临界区能不能进,公共资源能不能访问,总得有个测试的东西,好让进程知晓现在是否进入临界区访问公共资源,这个用来测试的东西就是锁。根据上面两种大的解决方案,xv6 实现了两种锁,自旋锁和休眠锁,下面来仔细看看:
自旋锁
结构定义
struct spinlock {
uint locked; // Is the lock held? 该锁是否被锁上了
// For debugging:
char *name; // Name of lock. 名字
struct cpu *cpu; // The cpu holding the lock. 哪个CPU取了锁
uint pcs[10]; // The call stack (an array of program counters)
// that locked the lock. 调用栈信息
};
有关自旋锁的结构定义如上,最重要的就是 $locked$ 元素,用来表示该锁是否已被某 $CPU$ 取得,1 表示该锁已某 $CPU$ 取走,0 表示该锁空闲。其他三个元素用作为 $debug$ 调试信息,后面用到具体再看。
相关函数
初始化
void initlock(struct spinlock *lk, char *name) //初始化锁 lk
{
lk->name = name; //初始化该锁的名字
lk->locked = 0; //初始化该锁空闲
lk->cpu = 0; //初始化持有该锁的CPU为空
}
这部分就是初始化锁的名字,空闲,没有 CPU 持有该锁
"开关"中断
void pushcli(void)
{
int eflags;
eflags = readeflags(); //cli之前读取eflags寄存器
cli(); //关中断
if(mycpu()->ncli == 0) //第一次pushcli()
mycpu()->intena = eflags & FL_IF; //记录cli之前的中断状态
mycpu()->ncli += 1; //关中断次数(深度)加1
}
void popcli(void)
{
if(readeflags()&FL_IF) //如果eflags寄存器IF位为1
panic("popcli - interruptible");
if(--mycpu()->ncli < 0) //如果计数小于0
panic("popcli");
if(mycpu()->ncli == 0 && mycpu()->intena) //关中断次数为0时即开中断
sti();
}
pushcli 和 popcli 为 cli 和 sti 的封装函数,只是增加了计数,每个 popcli 与 pushcli 匹配,有多少次 pushcli 就要有多少次 popcli。
每次 pushcli 都要先读取 eflags 寄存器,然后 cli 关中断,这是因为如果是第一次 pushcli 的话需要记录在 cli 之前的中断状态到 cpu 结构体 intena 字段。
除了第一次 pushcli 和最后一次 popcli,pushcli 就是将深度计数 ncli 加 1,popcli 就是将 ncli 减 1。
每次 pushcli 都调用了 cli 来关中断,但其实只有第一次 pushcli 有实际关中断的效果,其实也不然,如果第一次 pushcli 之前本来就处于关中断,那么第一次的 pushcli 的 cli 也无甚作用。
但是 popcli 开中断的时机必须是 最后一个 popcli 也就是计数为 0,以及第一次 pushcli 之前的中断状态为允许中断,只有两者都满足时才能开中断。
为什么使用 pushcli() 和 popcli() 而不是使用 cli() sti() 后面详细说明。
取锁解锁
int holding(struct spinlock *lock)
{
int r;
pushcli();
r = lock->locked && lock->cpu == mycpu(); //检验锁lock是否被某CPU锁持有且上锁
popcli();
return r;
}
关中断下检查锁是否被某 CPU 取走,仔细看检查是否持有锁的条件为两个:一是锁是否被取走,二锁是否由当前 CPU 取走。
void acquire(struct spinlock *lk)
{
pushcli(); // disable interrupts to avoid deadlock.
if(holding(lk)) // 如果已经取了锁
panic("acquire");
while(xchg(&lk->locked, 1) != 0) //原子赋值
;
__sync_synchronize(); //发出一个full barrier
// 调试信息
lk->cpu = mycpu(); //记录当前取锁的CPU
getcallerpcs(&lk, lk->pcs); //获取调用栈信息
}
关中断下进行取锁的操作,以避免死锁,原因见后面 FAQ
检查当前 CPU 是否已经持有锁,如果已持有,则 panic(),也就是说 xv6 不允许同一个 CPU 对同一个锁重复上锁。
上锁的语句为 $while(xchg(\&lk \rightarrow locked, 1) != 0);$ xchg 函数可以看作一个原子赋值函数(本是交换,与 1 交换就相当于赋值,详见 内联汇编@@@@@),将 $\&lk \rightarrow locked$ 赋值为 1,返回 $\&lk \rightarrow locked$ 的旧值。也就是说如果该锁空闲没有 CPU 持有,那么当前 CPU 将其赋值为 1 表示取得该锁,xchg 返回旧值 0,跳出 while 循环。如果该锁已经被某 CPU 持有,那么 xchg 对其赋值为 1,但返回值也是 1,不满足循环跳出条件,所以一直循环等待某 CPU 释放该锁。因取锁可能需要一直循环等待,所以名为自旋锁。
__sync_synchronize() 是发出一个 full barrier,简单来说就是不允许将这条语句之前的内存读写指令放在这条之后,也不允许将这条语句之后的内存读写指令放在这条指令之前。为啥要放个屏障在这?按照正常的逻辑思维应该是该 CPU 获取到了锁才对该锁的一些 debug 信息做记录的对吧,如果不加屏障,顺序就可能颠倒。
memory barrier有几种类型:
- acquire barrier : 不允许将barrier之后的内存读取指令移到barrier之前(linux kernel中的wmb())。
- release barrier : 不允许将barrier之前的内存读取指令移到barrier之后 (linux kernel中的rmb())。
- full barrier : 以上两种barrier的合集(linux kernel中的mb())。
void release(struct spinlock *lk)
{
if(!holding(lk))
panic("release");
lk->pcs[0] = 0; //清除调试信息
lk->cpu = 0;
__sync_synchronize(); //发出一个full barrier
asm volatile("movl $0, %0" : "+m" (lk->locked) : ); //lk->locked=0
popcli();
}
解锁锁,基本就是上锁锁的逆操作了,释放锁和清除调试信息,有关内联汇编的同样还是看前文,本文不赘述。
调试信息
其他的像记录 cpu 都好说,主要是来看这个 getcallerpcs() 函数,记录调用栈的信息:
void getcallerpcs(void *v, uint pcs[])
{
uint *ebp;
int i;
ebp = (uint*)v - 2; //getcallerpcs的调用者的调用者的ebp地址
for(i = 0; i < 10; i++){
if(ebp == 0 || ebp < (uint*)KERNBASE || ebp == (uint*)0xffffffff) //停止条件
break;
pcs[i] = ebp[1]; // saved %eip
ebp = (uint*)ebp[0]; // saved %ebp
}
for(; i < 10; i++)
pcs[i] = 0;
}
pc[] 到底是个啥玩意儿?每次调用函数使用 call 指令的时候都会把 call 指令的下一条指令压栈,pc[] 就是存放的是这个返回地址,来看看是怎么实现的。
首先要知道函数调用的几个规则:
- call 指令调用函数之前要先将参数压栈,方向为从右至左,先压最后一个参数,最后压第一个参数
- call x,将 x 赋给 eip, 将下一条指令的地址压入栈中
- 进入函数时先 push %ebp,再 movl %esp, %ebp 形成新的栈帧。
所以栈中的情况大致应该是这样的:
每个被调用者形成的栈帧底部都是保存的调用者栈帧的 ebp,而被调用者的 ebp 指向它,所以其实各个栈帧就像是用 ebp 给串起来的,各个栈帧好比形成了一条链,每个栈帧就是一个结点,指针就是 ebp。
要理清 getcallerpcs() 函数,不能只看一层调用,至少要看两层调用,举个例子:函数 A 调用 acquire(plk),acquire(plk) 调用 getcallerpcs(&plk, plk->pcs),这里我用 plk 来表示锁,只是想说明它是个指针是个地址,所以 getcallerpcs() 中的 &plk,是个二级指针,这也是这个函数能够得以实现的关键点之一,来看看栈帧什么样子的:
所以 $(uint)v-2$ 实际上就是 $\&plk-2$ 也就是 acquire() 栈帧中保存的 A 函数的 $ebp$ 的地址,有点绕,看看图应该能明白。$ebp+4$ 是返回地址,填进 pc[] 中去,$ebp=(uint)ebp[0]$ 移到上上一个栈帧中去,跳过了 acquire 的栈帧。
另外有三个停止条件,第一个 $ebp==0$,ebp = 0 就表示后面没有调用栈帧了,但是关于这个条件我在 xv6 里面没有找到明确将 ebp 赋值 为 0 的语句,而在 jos 的 entry.S 里面是有明确的 ebp = 0 这条语句,而且还附有注释,就像我上述说的为了标识后面为空栈了。对此可能是 xv6 的一个小 bug 吧,补上就行。第二个条件 ebp 值不能在内核之下即处于用户态,getcallerpcs 的调用者,调用者的调用者都是运行在内核,所以应不会处于用户态的低地址。第三个条件 ebp==0xffffffff,最初我以为是 exec 函数调用时塞给用户栈的那个无效返回地址,但转念一想不对,getcallerpcs 运行在内核,不会与用户栈那个无效返回地址挂上钩,检测到用户态的地址时肯定会先从第二个条件跳出去。所以对此我认为,越是外层的调用栈帧处于更高的地址,0xffffffff 就是单纯地表示极限顶点,但是实际是不可能跑那么高的地址去的。
FAQ
基本函数说完,来聊聊一些遗留问题:
Ⅰ xv6 的竞争条件有哪些?
xv6 是个支持多处理器的系统,各个 CPU 之间可以并行执行,所以可能会出现同时访问公共资源的情况。
在单个 CPU 上,中断也可能导致并发,在允许中断时,内核代码可能在任何时候停下来,然后执行中断处理程序,内核代码和中断处理程序交叉访问公共资源也可能导致错误。所以在取锁检验锁都要在关中断下进行。
另外 xv6 不支持线程,而各个进程之间内存是不共享的,加之内核进入临界区访问公共资源的时候是关了中断的,关了中断除了自己休眠是不会让出 CPU 的,所以运行在单个处理器上的各个进程之间的并发其实并不会产生竞争条件
Ⅱ 为什么要使用 xchg()?
试想不使用 xchg() 来实现应该怎么实现?下面代码由 xv6 文档给出:
for(;;) {
if(!lk->locked) {
//如果没上锁
lk->locked = 1; //上锁
break; //跳出
}
}
这种实现方式,访问 $lk \rightarrow locked$ 和修改分为了两步,可能会出现这样的状况:CPU1 和 CPU2 接连执行到上述的 if 语句,两者都发现 $lk \rightarrow locked = 0$,没来得及修改 $lk \rightarrow locked$ 来组织对方访问,那么两者都执行 $lk \rightarrow locked = 1$ 拿到了锁,这就违反了互斥的原则。
归根揭底的原因就是因为访问和修改是分开的两步动作,而 xchg 将两者合为一个原子操作,将 $lk \rightarrow locked$ 与 1 原子交换并返回 $lk \rightarrow locked$ 的旧值则避免了上述问题。
Ⅲ acquire() 函数为什么要关中断,或者说先关中断再上锁?
对于为什么要关中断前面竞争条件简单说过,这里从死锁的角度来看,假如两者交换先上锁再关中断或者直接不关中断,若有 A 调用 acquire() 想要获得锁,当它拿到锁时刚好发生中断,中断处理程序也想要获得该锁,但 A 已经换下 CPU 了,肯定释放不了啊,那么就产生死锁。所以要先关中断再上锁。
release() 函数先原子赋值释放锁再开中断,也就同理了,如果两者交换先开中断,那么在释放锁之前可能发生中断,而中断处理程序刚好需要该锁,那么发生死锁。
Ⅲ 关中断开中断为什么要使用 pushcli() 和 popcli() 而不直接使用 cli() 和 sti()?
前面我们已经知道如果在 CPU 持有锁的阶段发生中断,中断服务程序可能也要取锁,那么就会死锁,所以 xv6 直接决定在取锁的时候就关中断,CPU 持有锁的整个阶段都处于关中断,只有释放锁的时候才可能开中断,注意这里是可能,因为用的是 popcli()。
那么正题就来了,为什么要使用 pushcli() 和 popcli(),其实也简单,那是因为某个函数中可能要取多个锁,比如先后取了锁 1 锁 2,那么释放锁 2 之后能开中断吗?显然不能,必须等锁 1 也释放的时候才能开中断。所以使用增加了计数功能的 pushcli() 和 popcli() 来实现最后一个锁释放的时候才开中断。
Ⅳ 指令乱序问题
现今的指令的执行都有流水线的技术,其实还有乱序执行。乱序执行指的是在 CPU 运行中的指令不按照代码既定的顺序执行,而是按照一定的策略打乱后顺序执行,以此来提高性能。
不是所有的指令序列都可以打乱,没有关系的指令之间才可以打乱。这种乱序执行的确在一定程度上可以提高性能,但也会暴露一些问题,比如前面涉及开关中断的指令,对于我们人来说,这种指令与它相邻的一些指令之间肯定是有逻辑联系的,但 CPU 不知道啊,CPU 判断指令之间是否有关系只能凭借简单的逻辑,比如 B 指令需要 A 指令的结果。但是像上面那种复杂的逻辑关系它是不能判断的就可能将指令顺序错误的打乱,为避免这种情况,我们设置了屏障,禁止这个屏障前后的指令顺序打乱。
休眠锁
xv6 里面还提供了另一种锁,休眠锁,它在自旋锁的基础之上实现,定义如下:
struct sleeplock {
uint locked; // Is the lock held? 已锁?
struct spinlock lk; // spinlock protecting this sleep lock 自旋锁
// For debugging:
char *name; // Name of lock. 名字
int pid; // Process holding lock 哪个进程持有?
};
休眠锁配了一把自旋锁来保护,原因见后。休眠锁的意思是某个进程为了取这个休眠锁不得而休眠,所以有个 pid 来记录进程号
相关函数
休眠锁的初始化,检验是否持有锁等都类似,就不赘述了,再这里主要看看取锁和解锁的区别:
void acquiresleep(struct sleeplock *lk)
{
acquire(&lk->lk); //优先级:->高于&
while (lk->locked) {
//当锁已被其他进程取走
sleep(lk, &lk->lk); //休眠
}
lk->locked = 1; //上锁
lk->pid = myproc()->pid; //取锁进程的pid
release(&lk->lk);
}
void releasesleep(struct sleeplock *lk)
{
acquire(&lk->lk); //取自旋锁
lk->locked = 0;
lk->pid = 0;
wakeup(lk); //唤醒
release(&lk->lk);
}
休眠锁要使用自旋锁来保护有两点原因,首先休眠锁的自旋锁可以使得对休眠锁的本身属性字段的操作为一个原子操作,是为保护。
另外一个原因涉及到进程的休眠唤醒,在这里先提前说说。sleep(&obj, (&obj)->lock);,这是 sleep 函数的原型,如果一个进程想要获取一个对象 obj 而不得,那么这个进程就会休眠在对象 obj 上面,有休眠就有唤醒,而这个 obj.lock 主要就是为了避免错过唤醒。这部分我会在进程那一块儿详细讲述,这里过过眼,灌灌耳音。
当前进程想要获取休眠锁,这个休眠锁就是对象,如果被别的进程取走的话,那么当前进程就取而不得,休眠在休眠锁这个对象上。如果取到了该休眠锁,就将 locked 置为 1,记录取得该锁的进程号。
解锁操作基本上就是上锁的逆操作,注意一点,可能有其他进程休眠在休眠锁上,所以当前进程解锁后需要唤醒休眠在休眠锁上的进程。
好了本节就这样吧,有什么问题还请批评指正,也欢迎大家来同我讨论交流学习进步。
首发公众号:Rand_cs