前言
在Linux设备驱动中必须解决的一个问题是多个进程对共享资源的并发访问,并发的访问会导致竞态,即使是经验丰富的驱动工程师也会常常设计出包含并发问题bug的驱动程序。
Linux提供了多种解决竞态问题的方式,这些方式适合不同的应用场景。一起了解一下
- 并发和竞态的概念及发生场合。
- 编译乱序、执行乱序的问题,以及内存屏障。
- 中断屏蔽、原子操作、自旋锁、信号量和互斥体等并发控制机制。
- 并发控制后的globalmem的设备驱动。
一、并发与竞态
并发(Concurrency)指的是多个执行单元同时、并行被执行,而并发的执行单元对共享资源(硬件资源和软件上的全局变量、静态变量等)的访问则很容易导致竞态(Race Conditions)。
例如,对于globalmem设备,假设一个执行单元A对其写入3000个字符“a”,而另一个执行单元B对其写入4000个“b”,第三个执行单元C读取globalmem的所有字符。如果执行单元A、B的写操作按图7.1那样顺序发生,执行单元C的读操作当然不会有什么问题。但是,如果执行单元A、B按图7.2那样被执行,而执行单元C又“不合时宜”地读,则会读出3000个“b”。
(这就是为什么驱动其实对于并发和操作系统也需要进行了解,脑子里需要计划1000个人一起找我麻烦的时候应该怎么处理)
比图7.2更复杂、更混乱的并发大量存在于设备驱动中,只要并发的多个执行单元存在对共享资源的访问,竞态就可能发生。在Linux内核中,主要的竞态发生于如下几种情况。(这个共享资源就是驱动程序)
1.对称多处理器(SMP)的多个CPU
SMP是一种紧耦合、共享存储的系统模型,其体系结构如图7.3所示,它的特点是多个CPU使用共同的系统总线,因此可访问共同的外设和储存器。
在SMP的情况下,两个核(CPU0和CPU1)的竞态可能发生于CPU0的进程与CPU1的进程之间、CPU0的进程与CPU1的中断之间以及CPU0的中断与CPU1的中断之间,图7.4中任何一条线连接的两个实体都有核间并发可能性。
2.单CPU内进程与抢占它的进程
Linux 2.6以后的内核支持内核抢占调度,一个进程在内核执行的时候可能耗完了自己的时间片(timeslice),也可能被另一个高优先级进程打断,进程与抢占它的进程访问共享资源的情况类似于SMP的多个CPU。
3.中断(硬中断、软中断、Tasklet、底半部)与进程之间
中断可以打断正在执行的进程,如果中断服务程序访问进程正在访问的资源,则竞态也会发生。
此外,中断也有可能被新的更高优先级的中断打断,因此,多个中断之间本身也可能引起并发而导致竞态。但是Linux 2.6.35之后,就取消了中断的嵌套。老版本的内核可以在申请中断时,设置标记IRQF_DISABLED以避免中断嵌套,由于新内核直接就默认不嵌套中断,这个标记反而变得无用了。详情见https://lwn.net/Articles/380931/文档《Disabling IRQF_DISABLED》。
上述并发的发生除了SMP是真正的并行以外,其他的都是单核上的“宏观并行,微观串行”,但其引发的实质问题和SMP相似。图7.5再现了SMP情况下总的竞争状态可能性,既包含某一个核内的,也包括两个核间的竞态。
解决竞态问题的途径是保证对共享资源的互斥访问,所谓互斥访问是指一个执行单元在访问共享资源的时候,其他的执行单元被禁止访问。
访问共享资源的代码区域称为临界区(Critical Sections),临界区需要被以某种互斥机制加以保护。中断屏蔽、原子操作、自旋锁、信号量、互斥体等是Linux设备驱动中可采用的互斥途径。
二、编译乱序和执行乱序
理解Linux内核的锁机制,还需要理解编译器和处理器的特点。
比如下面一段代码,写端申请一个新的struct foo结构体并初始化其中的a、b、c,之后把结构体地址赋值给全局gp指针:
struct foo { int a; int b; int c; }; struct foo *gp = NULL; /* . . . */ p = kmalloc(sizeof(*p), GFP_KERNEL); p->a = 1; p->b = 2; p->c = 3; gp = p;
而读端如果简单做如下处理,则程序的运行可能是不符合预期的:
p = gp; if (p != NULL) { do_something_with(p->a, p->b, p->c); }
有两种可能的原因会造成程序出错,一种可能性是编译乱序,另外一种可能性是执行乱序。
关于编译方面,C语言顺序的“p->a=1;p->b=2;p->c=3;gp=p;”的编译结果的指令顺序可能是gp的赋值指令发生在a、b、c的赋值之前。
现代的高性能编译器在目标码优化上都具备对指令进行乱序优化的能力。编译器可以对访存的指令进行乱序,减少逻辑上不必要的访存,以及尽量提高Cache命中率和CPU的Load/Store单元的工作效率。因此在打开编译器优化以后,看到生成的汇编码并没有严格按照代码的逻辑顺序,这是正常的。
解决编译乱序问题,需要通过barrier()编译屏障进行。我们可以在代码中设置barrier()屏障,这个屏障可以阻挡编译器的优化。对于编译器来说,设置编译屏障可以保证屏障前的语句和屏障后的语句不乱“串门”。
比如,下面的一段代码在e=d[4095]与b=a、c=a之间没有编译屏障:
int main(int argc, char *argv[]) { int a = 0, b, c, d[4096], e; e = d[4095]; b = a; c = a; printf("a:%d b:%d c:%d e:%d\n", a, b, c, e); return 0; }
用“arm-linux-gnueabihf-gcc-O2”优化编译,反汇编结果是:
int main(int argc, char *argv[]) { 831c: b530 push {r4, r5, lr} 831e: f5ad 4d80 sub.w sp, sp, #16384 ; 0x4000 8322: b083 sub sp, #12 8324: 2100 movs r1, #0 8326: f50d 4580 add.w r5, sp, #16384 ; 0x4000 832a: f248 4018 movw r0, #33816 ; 0x8418 832e: 3504 adds r5, #4 8330: 460a mov r2, r1 -> b= a; 8332: 460b mov r3, r1 -> c= a; 8334: f2c0 0000 movt r0, #0 8338: 682c ldr r4, [r5, #0] 833a: 9400 str r4, [sp, #0] -> e = d[4095]; 833c: f7ff efd4 blx 82e8 <_init+0x20> }
显然,尽管源代码级别b=a、c=a发生在e=d[4095]之后,但是目标代码的b=a、c=a指令发生在e=d[4095]之前。
假设我们重新编写代码,在e=d[4095]与b=a、c=a之间加上编译屏障:
#define barrier() __asm__ __volatile__("": : :"memory") int main(int argc, char *argv[]) { int a = 0, b, c, d[4096], e; e = d[4095]; barrier(); b = a; c = a; printf("a:%d b:%d c:%d e:%d\n", a, b, c, e); return 0; }
再次用“arm-linux-gnueabihf-gcc-O2”优化编译,反汇编结果是:
int main(int argc, char *argv[]) { 831c: b510 push {r4, lr} 831e: f5ad 4d80 sub.w sp, sp, #16384 ; 0x4000 8322: b082 sub sp, #8 8324: f50d 4380 add.w r3, sp, #16384 ; 0x4000 8328: 3304 adds r3, #4 832a: 681c ldr r4, [r3, #0] 832c: 2100 movs r1, #0 832e: f248 4018 movw r0, #33816 ; 0x8418 8332: f2c0 0000 movt r0, #0 8336: 9400 str r4, [sp, #0] -> e = d[4095]; 8338: 460a mov r2, r1 -> b= a; 833a: 460b mov r3, r1 -> c= a; 833c: f7ff efd4 blx 82e8 <_init+0x20> }
因为“asm____volatile(“”:::“memory”)”这个编译屏障的存在,原来的3条指令的顺序“拨乱反正”了。
关于解决编译乱序的问题,C语言volatile关键字的作用较弱,它更多的只是避免内存访问行为的合并,对C编译器而言,volatile是暗示除了当前的执行线索以外,其他的执行线索也可能改变某内存,所以它的含义是“易变的”。
换句话说,就是如果线程A读取var这个内存中的变量两次而没有修改var,编译器可能觉得读一次就行了,第2次直接取第1次的结果。但是如果加了volatile关键字来形容var,则就是告诉编译器线程B、线程C或者其他执行实体可能把var改掉了,因此编译器就不会再把线程A代码的第2次内存读取优化掉了。
另外,volatile也不具备保护临界资源的作用。总之,Linux内核明显不太喜欢volatile,这可参考内核源代码下的文档Documentation/volatile-considered-harmful.txt。
编译乱序是编译器的行为,而执行乱序则是处理器运行时的行为。
执行乱序是指即便编译的二进制指令的顺序按照“p->a=1;p->b=2;p->c=3;gp=p;”排放,在处理器上执行时,后发射的指令还是可能先执行完,这是处理器的“乱序执行(Out-of-Order Execution)”策略。高级的CPU可以根据自己缓存的组织特性,将访存指令重新排序执行。连续地址的访问可能会先执行,因为这样缓存命中率高。有的还允许访存的非阻塞,即如果前面一条访存指令因为缓存不命中,造成长延时的存储访问时,后面的访存指令可以先执行,以便从缓存中取数。因此,即使是从汇编上看顺序正确的指令,其执行的顺序也是不可预知的。
举个例子,ARM v6/v7的处理器会对以下指令顺序进行优化。
LDR r0, [r1] ; STR r2, [r3] ;
假设第一条LDR指令导致缓存未命中,这样缓存就会填充行,并需要较多的时钟周期才能完成。
老的ARM处理器,比如ARM926EJ-S会等待这个动作完成,再执行下一条STR指令。
而ARM v6/v7处理器会识别出下一条指令(STR)且不需要等待第一条指令(LDR)完成(并不依赖于r0的值),即会先执行STR指令,而不是等待LDR指令完成。
对于大多数体系结构而言,尽管每个CPU都是乱序执行,但是这一乱序对于单核的程序执行是不可见的,因为单个CPU在碰到依赖点(后面的指令依赖于前面指令的执行结果)的时候会等待,所以程序员可能感觉不到这个乱序过程。但是这个依赖点等待的过程,在SMP处理器里面对于其他核是不可见的。比如若在CPU0上执行:
while (f == 0); print x;
CPU1上执行:
x = 42; f = 1;
我们不能武断地认为CPU0上打印的x一定等于42,因为CPU1上即便“f=1”编译在“x=42”后面,执行时仍然可能先于“x=42”完成,所以这个时候CPU0上打印的x不一定就是42。
处理器为了解决多核间一个核的内存行为对另外一个核可见的问题,引入了一些内存屏障的指令。譬如,ARM处理器的屏障指令包括:
- DMB(数据内存屏障):在DMB之后的显式内存访问执行前,保证所有在DMB指令之前的内存访问完成;
- DSB(数据同步屏障):等待所有在DSB指令之前的指令完成(位于此指令前的所有显式内存访问均完成,位于此指令前的所有缓存、跳转预测和TLB维护操作全部完成);
- ISB(指令同步屏障):Flush流水线,使得所有ISB之后执行的指令都是从缓存或内存中获得的。Linux内核的自旋锁、互斥体等互斥逻辑,需要用到上述指令:在请求获得锁时,调用屏障指令;在解锁时,也需要调用屏障指令。代码清单7.1的汇编代码描绘了一个简单的互斥逻辑,留意其中的第14行和22行。关于ldrex和strex指令的作用,会在7.3节详述。
1LOCKED EQU 1 2UNLOCKED EQU 0 3lock_mutex 4 ; 互斥量是否锁定? 5 LDREX r1, [r0] ; 检查是否锁定 6 CMP r1, #LOCKED ; 和"locked"比较 7 WFEEQ ; 互斥量已经锁定,进入休眠 8 BEQ lock_mutex ; 被唤醒,重新检查互斥量是否锁定 9 ; 尝试锁定互斥量 10 MOV r1, #LOCKED 11 STREX r2, r1, [r0] ; 尝试锁定 12 CMP r2, #0x0 ; 检查STR指令是否完成 13 BNE lock_mutex ; 如果失败,重试 14 DMB ; 进入被保护的资源前需要隔离,保证互斥量已经被更新 15 BX lr 16 17unlock_mutex 18 DMB ; 保证资源的访问已经结束 19 MOV r1, #UNLOCKED ; 向锁定域写"unlocked" 20 STR r1, [r0] 21 22 DSB ; 保证在CPU唤醒前完成互斥量状态更新 23 SEV ; 像其他CPU发送事件,唤醒任何等待事件的CPU 24 25 BX lr
前面提到每个CPU都是乱序执行,但是单个CPU在碰到依赖点的时候会等待,所以执行乱序对单核不一定可见。
但是,当程序在访问外设的寄存器时,这些寄存器的访问顺序在CPU的逻辑上构不成依赖关系,但是从外设的逻辑角度来讲,可能需要固定的寄存器读写顺序,这个时候,也需要使用CPU的内存屏障指令。
内核文档Documentation/memory-barriers.txt和Documentation/io_ordering.txt对此进行了描述。
在Linux内核中,定义了读写屏障mb()、读屏障rmb()、写屏障wmb()、以及作用于寄存器读写的__iormb()、__iowmb()这样的屏障API。读写寄存器的readl_relaxed()和readl()、writel_relaxed()和writel()API的区别就体现在有无屏障方面。
#define readb(c) ({ u8 __v = readb_relaxed(c); __iormb(); __v; }) #define readw(c) ({ u16 __v = readw_relaxed(c); __iormb(); __v; }) #define readl(c) ({ u32 __v = readl_relaxed(c); __iormb(); __v; }) #define writeb(v,c) ({ __iowmb(); writeb_relaxed(v,c); }) #define writew(v,c) ({ __iowmb(); writew_relaxed(v,c); }) #define writel(v,c) ({ __iowmb(); writel_relaxed(v,c); })
比如我们通过writel_relaxed()写完DMA的开始地址、结束地址、大小之后,我们一定要调用writel()来启动DMA。
writel_relaxed(DMA_SRC_REG, src_addr); writel_relaxed(DMA_DST_REG, dst_addr); writel_relaxed(DMA_SIZE_REG, size); writel (DMA_ENABLE, 1);
三、中断屏蔽
在单CPU范围内避免竞态的一种简单而有效的方法是在进入临界区之前屏蔽系统的中断,但是在驱动编程中不值得推荐,驱动通常需要考虑跨平台特点而不假定自己在单核上运行。CPU一般都具备屏蔽中断和打开中断的功能,这项功能可以保证正在执行的内核执行路径不被中断处理程序所抢占,防止某些竞态条件的发生。具体而言,中断屏蔽将使得中断与进程之间的并发不再发生,而且,由于Linux内核的进程调度等操作都依赖中断来实现,内核抢占进程之间的并发也得以避免了。
中断屏蔽的使用方法为:
local_irq_disable() /* 屏蔽中断 */ . . . critical section /* 临界区*/ . . . local_irq_enable() /* 开中断*/
其底层的实现原理是让CPU本身不响应中断,比如,对于ARM处理器而言,其底层的实现是屏蔽ARM CPSR的I位:
static inline void arch_local_irq_disable(void) { asm volatile( " cpsid i @ arch_local_irq_disable" : : : "memory", "cc"); }
由于Linux的异步I/O、进程调度等很多重要操作都依赖于中断,中断对于内核的运行非常重要,在屏蔽中断期间所有的中断都无法得到处理,因此长时间屏蔽中断是很危险的,这有可能造成数据丢失乃至系统崩溃等后果。这就要求在屏蔽了中断之后,当前的内核执行路径应当尽快地执行完临界区的代码。
local_irq_disable()和local_irq_enable()都只能禁止和使能本CPU内的中断,因此,并不能解决SMP多CPU引发的竞态。因此,单独使用中断屏蔽通常不是一种值得推荐的避免竞态的方法(换句话说,驱动中使用local_irq_disable/enable()通常意味着一个bug),它适合与下文将要介绍的自旋锁联合使用。
与local_irq_disable()不同的是,local_irq_save(flags)除了进行禁止中断的操作以外,还保存目前CPU的中断位信息,local_irq_restore(flags)进行的是与local_irq_save(flags)相反的操作。对于ARM处理器而言,其实就是保存和恢复CPSR。
如果只是想禁止中断的底半部,应使用local_bh_disable(),使能被local_bh_disable()禁止的底半部应该调用local_bh_enable()。
四、原子操作
原子操作可以保证对一个整型数据的修改是排他性的。Linux内核提供了一系列函数来实现内核中的原子操作,这些函数又分为两类,分别针对位和整型变量进行原子操作。
位和整型变量的原子操作都依赖于底层CPU的原子操作,因此所有这些函数都与CPU架构密切相关。
对于ARM处理器而言,底层使用LDREX和STREX指令,比如atomic_inc()底层的实现会调用到atomic_add(),其代码如下:
static inline void atomic_add(int i, atomic_t *v) { unsigned long tmp; int result; prefetchw(&v->counter); __asm__ __volatile__("@ atomic_add\n" "1: ldrex %0, [%3]\n" " add %0, %0, %4\n" " strex %1, %0, [%3]\n" " teq %1, #0\n" " bne 1b" : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) : "r" (&v->counter), "Ir" (i) : "cc"); }
ldrex指令跟strex配对使用,可以让总线监控ldrex到strex之间有无其他的实体存取该地址,如果有并发的访问,执行strex指令时,第一个寄存器的值被设置为1(Non-Exclusive Access)并且存储的行为也不成功;
如果没有并发的存取,strex在第一个寄存器里设置0(Exclusive Access)并且存储的行为也是成功的。
本例中,如果两个并发实体同时调用ldrex+strex,如图7.6所示,在T3时间点上,CPU0的strex会执行失败,在T4时间点上CPU1的strex会执行成功。
所以CPU0和CPU1之间只有CPU1执行成功了,执行strex失败的CPU0的“teq%1,#0”判断语句不会成立,于是失败的CPU0通过“bne 1b”再次进入ldrex。
ldrex和strex的这一过程不仅适用于多核之间的并发,也适用于同一个核内部并发的情况。
1 整型原子操作
- 1.设置原子变量的值
void atomic_set(atomic_t *v, int i); /* 设置原子变量的值为i */ atomic_t v = ATOMIC_INIT(0); /* 定义原子变量v并初始化为0 */
- 2.获取原子变量的值
atomic_read(atomic_t *v); /* 返回原子变量的值*/
- 3.原子变量加/减
void atomic_add(int i, atomic_t *v); /* 原子变量增加i */ void atomic_sub(int i, atomic_t *v); /* 原子变量减少i */
- 4.原子变量自增/自减
void atomic_inc(atomic_t *v); /* 原子变量增加1 */ void atomic_dec(atomic_t *v); /* 原子变量减少1 */
- 5.操作并测试
int atomic_inc_and_test(atomic_t *v); int atomic_dec_and_test(atomic_t *v); int atomic_sub_and_test(int i, atomic_t *v);
上述操作对原子变量执行自增、自减和减操作后(注意没有加),测试其是否为0,为0返回true,否则返回false。
- 6.操作并返回
int atomic_add_return(int i, atomic_t *v); int atomic_sub_return(int i, atomic_t *v); int atomic_inc_return(atomic_t *v); int atomic_dec_return(atomic_t *v);
上述操作对原子变量进行加/减和自增/自减操作,并返回新的值。
2 位原子操作
- 1.设置位
void set_bit(nr, void *addr);
上述操作设置addr地址的第nr位,所谓设置位即是将位写为1。
- 2.清除位
void clear_bit(nr, void *addr);
上述操作清除addr地址的第nr位,所谓清除位即是将位写为0。
- 3.改变位
void change_bit(nr, void *addr);
上述操作对addr地址的第nr位进行反置。
- 4.测试位
test_bit(nr, void *addr);
上述操作返回addr地址的第nr位。
- 5.测试并操作位
int test_and_set_bit(nr, void *addr); int test_and_clear_bit(nr, void *addr); int test_and_change_bit(nr, void *addr);
上述test_and_xxx_bit(nr,voidaddr)操作等同于执行test_bit(nr,voidaddr)后再执行xxx_bit(nr,void*addr)。
1static atomic_t xxx_available = ATOMIC_INIT(1); /* 定义原子变量*/ 2 3static int xxx_open(struct inode *inode, struct file *filp) 4{ 5 ... 6 if (!atomic_dec_and_test(&xxx_available)) { 7 atomic_inc(&xxx_available); 8 return - EBUSY; /* 已经打开*/ 9 } 10 ... 11 return 0; /* 成功 */ 12} 13 14static int xxx_release(struct inode *inode, struct file *filp) 15{ 16 atomic_inc(&xxx_available); /* 释放设备 */ 17 return 0; 18}
使用原子变量使设备只能被一个进程打开