五、自旋锁
信号量对互斥来讲是非常有用的工具,但它并不是内核提供的唯一的这类工具。相反,大多数锁定通过称为“自旋锁(spinlock)”的机制实现。和信号量不同,自旋锁可在不能休眠的代码中使用,比如中断处理例程。在正确使用的情况下,自旋锁通常可以提供比信号量更高的性能。
一个自旋锁是一个互斥设备,它只能有两个值:“锁定”和“解锁”。它通常实现为某个整数值中的单个位。希望获得某特定锁的代码测试相关的位。如果锁可用,则“锁定”位被设置,而代码继续进入临界区;相反,如果锁被其他人获得,则代码进入忙循环并重复检查这个锁,直到该锁可用为止。这个循环就是自旋锁的“自旋”部分。
“测试并设置”的操作必须以原子方式完成,这样,即使有多个线程在给定时间自旋,也只有一个线程可获得该锁。在超线程处理器上,还必须仔细处理以避免死锁。这里的超线程处理器可实现多个虚拟的 CPU,它们共享单个处理器核心及缓存。当存在自旋锁时,等待执行忙循环的处理器做不了任何有用的工作。
只要考虑到并发问题,单处理器工作站在运行可抢占内核时其行为就类似于 SMP(对称多处理)。如果非抢占式的单处理器系统进入某个锁上的自旋状态,则会永远自旋下去;也就是说,没有任何其他线程能够获得 CPU 来释放这个锁。
1、自旋锁 API 介绍
自旋锁原语所需要包含的文件是 <linux/spinlock.h>。实际的锁具有 spinlock_t 类型和其他任何数据结构类似,一个自旋锁必须被初始化。对自旋锁的初始化可在编译时过下面的代码完成:
spinlock_t my_lock = SPIN_LOCK_UNLOCKED; // 静态
或者在运行时,调用下面的函数:
void spin_lock_init(spinlock_t *lock);
在进入临界区之前,我们的代码必须调用下面的函数获得需要的锁:
void spin_lock(spinlock_t *lock);
注意,所有的自旋锁等待在本质上都是不可中断的。一旦调用了 spin_lock,在获得锁之前将一直处于自旋状态。
要释放已经获取的锁,可将锁传递给下面的函数:
void spin_unlock(spinlock_t *lock);
2、自旋锁和原子上下文
适用于自旋锁的核心规则是: 任何拥有自旋锁的代码都必须是原子的。它不能休眠,事实上,它不能因为任何原因放弃处理器,除了服务中断以外(某些情况下此时也不能放弃处理器)。
只要内核代码拥有自旋锁,在相关处理器上的抢占就会被禁止。甚至在单处理器系统上,也必须以同样的方式禁止抢占以避免竞态。
在用户空间和内核空间之间复制数据就是个明显的例子:在复制继续前,必需的用户空间页也许需要从磁盘上交换进入,而这个操作明显需要休眠。需要分配内存的任何操作也会休眠,比如 kmalloc,如果没有明确告知,它会在等待可用内存时放弃处理器进入休眠。休眠可发生在许多无法预期的地方:当我们编写需要在自旋锁下执行的代码时,必须注意每一个所调用的函数。
在中断处理例程中拥有锁是合法的,这也是为什么自旋锁操作不能休眠的一个原因。但是,当中断例程在最初拥有锁的代码所在的处理器上运行时,会发生什么情况呢?在中断例程自旋时,非中断代码将没有任何机会来释放这个锁。处理器将永远自旋下去。
为了避免这种陷阱,我们需要在拥有自旋锁时禁止中断(仅在本地CPU 上)。
自旋锁使用上的最后一个重要规则是,自旋锁必须在可能的最短时间内拥有,拥有锁的时间越短越好。
3、自旋锁函数
锁定一个自旋锁的函数实际有四个:
void spin_lock(spinlock_t *lock); void spin_lock_irqsave(spinlock_t *lock, unsigned long flags); void spin_lock_irq(spinlock_t *lock); void spin_lock_bh(spinlock_t *lock);
spin_lock_irqsave 会在获得自旋锁之前禁止中断(只在本地处理器上),而先前的中断状态保存在 flags 中。如果我们能够确保在释放自旋锁时应该启用中断,则可以使用spin_lock_irq,而无需跟踪标志。最后,spin_lock_bh 在获得锁之前禁止软件中断,但是会让硬件中断保持打开。如果我们有一个自旋锁,它可以被运行在(硬件或软件)中断上下文中的代码获得,则必须使用某个禁止中断的 spin_lock 形式,因为使用其他的锁定函数迟早会导致系统死锁。如果我们不会在硬件中断处理例程中访问自旋锁,但可能在软件中断中访问,则应该使用 spin_lock_bh,以便在安全地避免死锁的同时还能服务硬件中断。
释放自旋锁的方法也有四种,严格对应于获取自旋锁的那些函数:
void spin_unlock(spinlock_t *lock); void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags); void spin_unlock_irq(spinlock_t *lock); void spin_unlock_bh(spinlock_t *lock);
每个 spin_unlock 的变种都会撤销对应的 spin_lock 函数所做的工作。传递到 spin_unlock_irqrestore 的 flags 参数必须是传递给 spin_lock_irqsave 的同一个变量。
我们还必须在同一个函数中调用 spin_lock_irqsave 和 spin_unlock_irqrestore,否则代码可能在某些架构上出现问题。
还有如下非阻塞的自旋锁操作:
int spin_trylock(spinlock_t *lock); int spin_trylock_bh(spinlock_t *lock);
这两个函数在成功(即获得自旋锁)时返回非零值,否则返回零。
4、读取者/写入者自旋锁
这种锁允许任意数量的读取者同时进入临界区,但写入者必须互斥访问。读取者/写入者锁具有 rwlock_t 类型,在 <linux/spinlock.h> 中定义。我们可以用下面的两种方式声明和初始化它们:
rwlock_t my_rwlock = RW_LOCK_UNLOCKED; /* Static way */ rwlock_t my_rwlock; rwlock_init(&my_rwlock); /* Dynamic way */
对读取者来讲,可使用如下函数:
void read_lock(rwlock_t *lock); void read_lock_irqsave(rwlock_t *lock, unsigned long flags); void read_lock_irq(rwlock_t *lock); void read_lock_bh(rwlock_t *lock); void read_unlock(rwlock_t *lock); void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags); void read_unlock_irq(rwlock_t *lock); void read_unlock_bh(rwlock_t *lock);
这里并没有 read_trylock 函数可用。
用于写入者的函数类似于读取者:
void write_lock(rwlock_t *lock); void write_lock_irqsave(rwlock_t *lock, unsigned long flags); void write_lock_irq(rwlock_t *lock); void write_lock_bh(rwlock_t *lock); int write_trylock(rwlock_t *lock); void write_unlock(rwlock_t *lock); void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags); void write_unlock_irq(rwlock_t *lock); void write_unlock_bh(rwlock_t *lock);
和 rwsem 类似,读取者/写入者锁可能造成读取者饥饿。这种情况几乎不成问题,但是如果对锁的竞争导致饥饿,性能会变得很低。
六、锁陷阱
并发的管理本来就非常棘手,而许多使用方法都可能导致错误。
1、不明确的规则
当我们创建了一个可被并行访问的对象时,应该同时定义用来控制访问的锁。
如果某个获得锁的函数要调用其他同样试图获取这个锁的函数,我们的代码就会死锁。不论是信号量还是自旋锁,都不允许锁拥有者第二次获得这个锁;如果试图这么做,系统将挂起。
2、锁的顺序规则
如果我们有两个锁,分别是 Lockl 和 Lock2,而代码需要同时拥有这两个锁,这时就有可能进入潜在的死锁。对于这个问题的解决办法通常比较简单:在必须获取多个锁时,应该始终以相同的顺序获得。只要遵守这个约定,如上所述的那种死锁就可以避免。
如果我们必须获得一个局部锁(比如一个设备锁),以及一个属于内核更中心位置的锁,则应该首先获取自己的局部锁。如果我们拥有信号量和自旋锁的组合,则必须首先获得信号量;在拥有自旋锁时调用 down(可导致休眠)是个严重的错误的。
3、细粒度锁和粗粒度锁的对比
现代的内核可包含数千个锁,每个锁保护一个小的资源。这种类型的细粒度锁具有良好的伸缩性;它允许每个处理器在执行特定任务时无需和其他处理器正在使用的锁竞争。
细粒度锁将带来某种程度的复杂性。
如果我们的确怀疑锁竞争导致性能下降,则可以使用 lockmeter 工具。这个补丁(可在
http://oss.sgi.com/projects/lockmeter/找到)可度量内核花费在锁上的时间。通过查看它的输出报告,我们可以很快确定锁竞争是否是问题所在。
七、除了锁之外的办法
1、免锁算法
大量的读取者/写入者情况——如果只有一个写入者——就可以用这种方法来设计我们的算法。如果写入者看到的数据结构和读取者看到的始终一致,就有可能构造一种免锁的数据结构。
经常用于免锁的生产者/消费者任务的数据结构之一是循环缓冲区(circular buffer)。在这个算法中,一个生产者将数据放入数组的结尾,而消费者从数组的另一端移走数据。在达到数组尾部的时候,生产者绕回到数组的头部。
生产者是唯一允许修改写入索引以及该索引指向的数组位置的线程。只要写入者在更新写人索引之前将新的值保存到缓冲区,则读取者将始终看到一致的数据结构。同时,读取者是唯一可访问读取索引以及该索引指向位置的数据的线程。
当读取和写入指针相等时,表明缓冲区是空的,而只要写入指针马上要跑到读取指针的后面时(需谨慎处理交换!),就表明缓冲区已满。
循环缓冲区的使用在设备驱动程序中相当普遍。特别是网络适配器,经常使用循环缓冲区和处理器交换数据(数据包)。
2、原子变量
有时,共享的资源可能恰好是一个简单的整数值。假定我们的驱动程序维护着一个共享变量 n_op,该变量的值表明有多少个设备操作正在并发地执行。通常,即使下面的简单操作也需要锁定:
n_op++;
完整的锁机制对一个简单的整数来讲却显得有些浪费。针对这种情况,内核提供了一种原子的整数类型,称为atomic_t,定义在 <asm/atomic.h> 中
一个 atomic_t 变量在所有内核支持的架构上保存一个 int 值,atomic_t 变量中不能记录大于 24 位的整数。
下面针对这种类型的操作在 SMP 计算机的所有处理器上都确保是原子的,这种操作的速度非常快,因为只要可能,它们就会被编译成单个机器指令。
void atomic_set(atomic_t *v, int i); atomic_t v = ATOMIC_INIT(0);
将原子变量 v 的值设置为整数值 。也可以在编译时利用 ATOMIC_INIT 宏来初始化原子变量的值。
int atomic_read(atomic_t *v);
返回 v 的当前值。
void atomic_add(int i, atomic_t *v);
将 i 累加到 v 指向的原子变量。返回值是 void,这是因为返回新的值将带来额外的成本,而大多数情况下没有必要知道累加后的值。
void atomic_sub(int i, atomic_t *v);
从 *v 中减去 i。
void atomic_inc(atomic_t *v); void atomic_dec(atomic_t *v);
增加或缩减一个原子变量。
int atomic_inc_and_test(atomic_t *v); int atomic_dec_and_test(atomic_t *v); int atomic_sub_and_test(int i, atomic_t *v);
执行特定的操作并测试结果;如果在操作结束后,原子值为 0,则返回值为 true;否则返回值为 false。注意,不存在 atomic_add_and_test 函数。
int atomic_add_negative(int i, atomic_t *v);
将整数变量 i 累加到 v。返回值在结果为负时为 true,否则为 false。
int atomic_add_return(int i, atomic_t *v); int atomic_sub_return(int i, atomic_t *v); int atomic_inc_return(atomic_t *v); int atomic_dec_return(atomic_t *v);
类似于 atomic_add 及其变种,例外之处在于这些函数会将新的值返回给调用者。
atomic_t 数据项必须只能通过上述函数来访问。如果读者将原子变量传递给了需要整型参数的函数,则会遇到编译错误。
只有原子变量的数目是原子的,atomic_t 变量才能工作。需要多个 atomic_t 变量的操作,仍然需要某种类型的锁。
3、位操作
为了实现位操作,内核提供了一组可原子地修改和测试单个位的函数。因为整个操作发生在单个步骤中,因此,不会受到中断(或者其他处理器)的干扰。
原子位操作非常快,只要底层硬件允许,这种操作就可以使用单个机器指令来执行,并且不需要禁止中断。这些函数依赖于具体的架构,因此在 <asm/bitops.h> 中声明。
nr 参数(用来描述要操作的位)通常被定义为 int,但在少数架构上被定义为 unsigned long。要修改的地址通常是指向 unsigned long 的指针,但在某些架构上却使用 void *来代替。
可用的位操作如下:
void set_bit(nr, void *addr);
设置 addr 指向的数据项的第 nr 位
void clear_bit(nr, void *addr);
清除 addr 指向的数据项的第 nr 位,其原语和 set_bit 相反
void change_bit(nr, void *addr);
切换指定的位
test_bit(nr, void *addr);
该函数是唯一一个不必以原子方式实现的位操作函数,它仅仅返回指定位的当前值。
int test_and_set_bit(nr, void *addr); int test_and_clear_bit(nr, void *addr); int test_and_change_bit(nr, void *addr);
像前面列出的函数一样具有原子化的行为,例外之处是它同时返回这个位的先前值。
要以原子方式获得锁并访问某个共享数据项的代码,可使用 test_and_set_bit 或者 test_and_clear_bit。
常见的实现方法如下所列,该方法假定锁就是 addr 地址上的第 nr 位。它还假定当锁在零时空闲,而在非零时忙。
/* try to set lock */ while (test_and_set_bit(nr, addr) != 0) wait_for_a_while(); /* do your work */ /* release lock, and check... */ if (test_and_clear_bit(nr, addr) == 0) something_went_wrong(); /* already released: error */
然而,新代码应该使用自旋锁,因为自旋锁已被很好调试,并且能够处理类似中断和内核抢占这样的问题。
4、seqlock
2.6 内核包含有两个新的机制,可提供对共享资源的快速、免锁访问。当要保护的资源很小,很简单、会频繁被访问而且写入访问很少发生且必须快速时,就可以使用 seqlock。从本质上讲,seqlock 会允许读取者对资源的自由访问,但需要读取者检查是否和写入者发生冲突,当这种冲突发生时,就需要重试对资源的访问。
seqlock 在<linux/seqlock.h> 中定义。通常用于初始化 seqlock(具有 seqlock_t类型)的方法有如下两种:
seqlock_t lock1 = SEQLOCK_UNLOCKED; seqlock_t lock2; seqlock_init(&lock2);
读取访问通过获得一个(无符号的)整数顺序值而进入临界区。在退出时,该顺序值会和当前值比较;如果不相等,则必须重试读取访问。其结果是,读取者代码会如下编写:
unsigned int seq; do { seq = read_seqbegin(&the_lock); /* Do what you need to do */ } while read_seqretry(&the_lock, seq);
如果在中断处理例程中使用 seqlock,则应该使用 IRQ 安全的版本:
unsigned int read_seqbegin_irqsave(seqlock_t *lock, unsigned long flags); int read_seqretry_irqrestore(seqlock_t *lock, unsigned int seq, unsigned long flags);
写入者必须在进人由 seqlock 保护的临界区时获得一个互斥锁。为此,需调用下面的函数:
void write_seqlock(seqlock_t *lock);
写入锁使用自旋锁实现,因此自旋锁的常见限制也适用于写入锁。做如下调用可释放该锁:
void write_sequnlock(seqlock_t *lock);
因为自旋锁用来控制写入访问,因此自旋锁的常见变种都可以使用,它们是:
void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags); void write_seqlock_irq(seqlock_t *lock); void write_seqlock_bh(seqlock_t *lock); void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags); void write_sequnlock_irq(seqlock_t *lock); void write_sequnlock_bh(seqlock_t *lock);
如果 write_tryseqlock 可以获得自旋锁,它也会返回非零值。
5、读取-复制-更新
读取-复制-更新(read-copy-update,RCU)也是一种高级的互斥机制,在正确的条件下,也可获得高的性能。它很少在驱动程序中使用,但很知名。
在需要修改该数据结构时,写入线程首先复制,然后修改副本,之后用新的版本替代相关指针,这也是该算法名称的由来。
作为 RCU 的实际使用示例,可考虑网络路由表。每个外出数据包都需要检查路由表,以便确定应该使用哪个接口。
使用 RCU 的代码应包含 <linux/rcupdate.h>。
在读取端,代码使用受 RCU 保护的数据结构时,必须将引用数据结构的代码包括在 rcu_read_lock 和rcu_read_unlock 调用之间。这样,RCU代码可能如下所示:
struct my_stuff *stuff; rcu_read_lock(); stuff = find_the_stuff(args...); do_something_with(stuff); rcu_read_unlock();
rcu_read_lock 调用非常快,它会禁止内核抢占,但不会等待任何东西。用来检验读取“锁”的代码必须是原子的。在调用 rcu_read_unlock 之后,就不应该存在对受保护结构的任何引用。
用来修改受保护结构的代码必须在一个步骤中完成。第一步很简单,只需分配一个新的结构,如果必要则从老的结构中复制数据,然后将读取代码能看到的指针替换掉。这时,读取端会假定修改已经完成,任何进入临界区的代码将看到数据的新版本。剩下的工作就是释放老的数据结构。
RCU 所做的就是,设置一个回调函数并等待所有的处理器被调度,之后由回调函数执行清除工作。
修改受 RCU 保护的数据结构的代码必须通过分配一个 struct rcu_head 数据结构来获得清除用的回调函数,但并不需要用什么方式来初始化这个结构。在修改完资源之后,应该做如下调用:void
void call_rcu(struct rcu_head *head, void (*func)(void *arg), void *arg);
在可安全释放该资源时,给定的 func 会被调用,传递到 call_rcu 的相同参数也会传递给这个函数。通常,func 要做的唯一工作就是调用 kfree。