(1)什么是内核同步
所谓的内核同步就是对共享资源进行保护,防止并发访问。
如果有多个执行线程(指任何正在执行的代码实例,比如一个在内核执行的进程,
一个中断处理程序,或一个内核线程)同时访问和操作共享的数据,
就有可能造成进程之间互相覆盖共享数据,造成被访问数据处于不一致的情况。
这种错误很难跟踪和调试,但非常重要。
要做到对共享资源的恰当保护是很困难的。
a.linux2.0以前的时代
在多年前,linux还没有支持对称多处理器SMP的时候,避免并发数据访问相对简单。
在单处理器的时候,只有在中断发生的时候,或是在内核代码显式地请求重新调度(调用schedule())时,数据才可能被并发访问。
b.linux2.0以后的时代
从2.0开始,linux开始支持SMP.
此时如果不加保护,运行在两个不同处理器上的内核代码完全可能在同一时刻并发访问共享数据。
到2.6时,linux已经发展成抢占式内核,
在不加保护的时候,调度程序可以在任何时刻抢占正在运行的内核代码,重新调度其他的进程运行。
随着内核的发展,共享数据的保护已经成为驱动开发中最困难的问题之一,而这些问题在用户应用程序中并不常见。
(2)临界区和竞争条件
a.临界区(critical region)
所谓临界区就是访问和操作共享数据的代码段。
因为多个执行线程并发访问同一个资源通常是不安全的,所以程序员必须保证这些代码段原子地执行,
也就是说,代码在执行前不可被打断,就如同整个临界区是一个不可分割的指令一样。
b.竞争条件(race condition)
如果发生了两个执行线程处于同一个临界区的情况,我们称这就是一个竞争条件。
这是程序包含的一个bug。竞争引起的错误很难重现,所以非常难调试。
c.同步(synchronization)
避免并发和防止竞争条件就被称为同步。
d.单个变量形成的临界区
一个非常简单的共享资源:
int i; /* 共享资源,全局整型变量 */ ... i++; /* 临界区,对共享资源的操作 */
这个临界区可以转化成下面的机器指令:
*得到当前变量i的值并且拷贝到一个寄存器 *将寄存器中的值加1 *把i的新值写回到内存中
如果有两个执行线程同时进入临界区,
假设i的初始值为7,我们期望的结果如下:
线程1 线程2 获得i(7) - 增加i(7->8) - 写回i(8) - 获得i(8) 增加i(8->9) 写回i(9)
实际的执行结果却可能是:
线程1 线程2 获得i(7) - - 获得i(7) 增加i(7->8) - - 增加i(7->8) 写回i(8) - - 写回i(8)
e.如何处理上面的临界区
这种是最简单的竞争条件,只要把操作指令变成原子的就可以了。
多数处理器都提供了指令来原子地读变量,增加变量然后再写回变量。
如果使用原子变量,唯一可能的结果就是:
线程1 线程2 增加i(7->8) - - 增加i(8->9) 或者是: 线程1 线程2 - 增加i(7->8) 增加i(8->9) -
两个原子操作交错执行更本就不可能发生,
因为处理器会从物理上确保这种不可能。
(3)加锁
当涉及到对数据结构的操作时,比如对链表的处理时,
就不可能仅通过原子指令来保证同步,此时,需要一种锁机制。
程序中的锁机制就像日常生活中的门锁,门后的房间就是临界区。
房间中同一时间只能有一个线程。
任何要访问队列的代码首先要占住相应的锁,这样该锁就能阻止来自其他执行线程的并发访问:
线程1 线程2 试图锁定队列 试图锁定队列 成功:获得锁 失败:等待... 访问队列... 等待... 为队列解除锁 等待... ... 成功:获得锁 访问队列... 为队列解除锁
锁的使用是自愿的,非强制的,它完全属于一种编程者自选的编程手段。
锁有多种形式,而且加锁的粒度范围也各不相同。
linux实现了几种不同的锁机制,
各种锁机制之间的区别主要在于当锁被争用时的行为:
一些锁被争用时会简单地进行忙等待(spinlock)
一些锁会使当前任务睡眠直到锁可用为止(semaphore)
(4)是什么造成了并发执行
a.用户空间的并发原因
用户空间之所以需要同步,是因为用户程序会被调度程序抢占和重新调度。
在单cpu上,并发操作并不是真的同时发生,
而是交错执行,称为伪并发。
如果是SMP系统,两个进程就可以真正在临界区中同时执行了。这称为真并发。
用户空间可能产生并发的地方有:
*共享内存
同一个进程的两个可执行线程,访问共享的内存时可能因为被调度程序抢占后发生重新调度而并发
*信号
信号处理是异步的,如果信号处理程序和进程的其他部分共享数据,则有可能并发
b.内核空间的并发原因
*中断
中断几乎可以在任何时刻异步发生,也就可能随时打断当前正在执行的代码
*软中断和tasklet
内核能在任何时刻唤醒或调度软中断和tasklet,打断当前正在执行的代码
*内核抢占
因为内核具有抢占性,所以内核中的任务可能被另一个任务抢占
*睡眠及与用户空间的同步
睡眠可能导致用户进程切换
*对称多处理SMP
多个处理器可以同时执行代码
内核开发者必须理解这些并发执行的原因,并为其做好准备。
注意!辨认出真正需要共享的数据和相应的临界区,才是真正有挑战性的地方。
在编写代码的开始阶段就要设计出恰当的锁,而不是事后才想到。
(5)要保护些什么
找出哪些数据需要保护是关键所在。
基本上除了执行线程的内部数据不需要锁,其他所有的数据都可能需要锁保护。
加锁经验:
如果有其他执行线程可以访问某些数据,那么就给这些数据加上某种形式的锁。
记住!要给数据而不是代码加锁。
内核的配置选项
CONFIG_SMP
可以选择不支持smp。许多加锁问题在单处理器上是不存在的。
如果在内核配置时选择了CONFIG_SMP,则不必要的代码就不会被编入针对单处理器的内核映像
CONFIG_PREEMPT
配置内核是否支持抢占。
在你编写内核代码时,要问自己下面这些问题:
a.这个数据是不是全局的?
除了当前的线程外,其他线程能不能访问它?
b.这个数据会不会在进程上下文和中断上下文中共享?
它是不是要在两个不同的中断处理程序中共享?
c.进程在访问数据时可不可能被抢占?
被调度的新程序会不会访问同一数据?
d.当前进程是不是会睡眠(阻塞)在某些资源上?
如果是,它会让共享数据处于何种状态?
e.如果这个函数又在另一个处理器上被调度将会发生什么?
f.你要对这些代码做什么?
简言之,几乎访问所有的内核全局变量和共享数据都需要某种形式的同步方法。
(6)死锁
死锁的产生需要一定的条件:
需要一个或多个执行线程和一个或多个资源,每个线程都在等待某个已经被占用的资源。
例如交通路口的拥堵。
a.自死锁
如果代码已经获得了某个锁,又再次去获得它,就会造成自死锁。
如:
线程1 获得锁 再次试图获得锁 等待锁重新可用
b.ABBA死锁
线程1 线程2 获得A锁 获得B锁 试图获得B锁 试图获得A锁 等待B锁 等待A锁
c.避免死锁的简单规则
*加锁的顺序是关键 使用嵌套的锁时必须保证以相同的顺序获得锁。 比如上面的ABBA死锁,如果所有进程都按照先获得A锁再获得B锁的顺序,就不会死锁了 *不要重复请求同一个锁 *越复杂的加锁方案越有可能造成死锁--设计应力求简单 *尽管释放锁的顺序和死锁是无关的,但最好还是以获得锁的相反顺序来释放锁。 /********************* * 内核同步方法 ********************/
前面讨论了竞争如何产生以及怎么去解决。
下面将介绍linux为解决竞争问题而提供的同步方法
(1)原子整数操作
原子操作可以保证指令以原子的方式运行--执行过程不能被打断。
原子操作把读取和改变变量的行为包含在一个单步中执行,
从而避免了竞争,如:
线程1 线程2 atomic i(7->8) - - atomic i(8->9)
内核提供了针对整数和单独的位进行的原子操作。
针对整数的原子操作只能对atomic_t类型的数据进行访问。
定义在<asm/atomic.h> typedef struct { volatile int counter; } atomic_t;
a.定义
atomic_t t; atomic_t u = ATOMIC_INIT(0); /* 定义u并初始化为0 */
b.操作
都定义在中。
原子操作通常是内联函数,往往通过内嵌汇编指令来实现。
int atomic_read(atomic_t v);
原子地读取变量v
void atomic_set(atomic_t *v, int i); // v = i; void atomic_add(int i, atomic_t *v); // v += i; void atomic_sub(int i, atomic_t *v); // v -= i; void atomic_inc(atomic_t *v); // v += 1; void atomic_dec(atomic_t *v); // v -= 1;
原子地加减1
int atomic_sub_and_test(int i, atomic_t *v)
原子地从v减去i,如果结果等于0返回真,否则返回假
int atomic_add_negative(int i, atomic_t *v)
结果为负数返回真,否则返回假
int atomic_dec_and_test(atomic_t *v); // i -= 1; i == 0; int atomic_inc_and_test(atomic_t *v); // i += 1; i == 0;
结果等于0返回真,否则返回假
(2)原子位操作
定义在。这些函数针对某个普通的内存地址。
可用的位操作:
void set_bit(nr, void *addr);
设置addr指向的数据项的第nr位
*addr |= (0x1 << nr); void clear_bit(nr, void *addr);
清除addr指向的数据项的第nr位
*addr &= ~(0x1 << nr); test_bit(nr, void *addr);
返回指定位的当前值
*addr & (0x1 << nr); int test_and_set_bit(nr, void *addr);
设置nr位的同时返回原来的值
例子:
unsigned long word = 0; set_bit(0, &word); /* 原子地设定第0位 */ set_bit(1, &word); /* 原子地设定第1位 */ clear_bit(1, &word); /* 原子地清空第1位 */ change_bit(0, &word); /* 原子地翻转第0位 */ /* 原子地设置第0位并返回设置前的值(0) */ if(test_and_set_bit(0, &word){ /* 不为真 */ } word = 7; /* 合法 */
内核还提供了一组非原子位操作函数,名字前面多了两个下划线。如__test_bit()
如果不需要原子性操作,比如已经用锁保护了数据,
用这些非原子的位操作可能更快。
(3)原子性与顺序性的比较
原子性确保指令执行期间不被打断,要么全部执行完,要么不执行。
顺序性确保指令的执行顺序不改变。通过屏障指令(barrier)来保证。
能使用原子操作的时候,就尽量不要使用复杂的加锁机制。
(4)spinlock自旋锁
原子操作只能针对整数和位,面对更复杂的临界区,
比如从一个数据结构中移出数据,处理完后再加入到另一个数据结构中。
这种情况就需要更复杂的同步方法--锁来提供保护。
linux内核中最常见的锁是自旋锁(spin lock)。
自旋锁最多只能被一个可执行线程持有,等待锁的进程采用忙循环等待(只针对smp)。
因为忙循环很消耗处理器时间,所以自旋锁不能被长时间持有。
持有自旋锁的时间最好小于完成两次上下文切换的时间,
如果时间太长,就需要采用另外的加锁方式。
自旋锁定义在和
自旋锁可以在中断处理程序中使用。
在这种情况下,其他代码要获得锁时必须首先关闭中断。
在单处理器上,加锁操作只是简单地把内核抢占关闭。
等待自旋锁时程序不会睡眠,同样,一旦拥有了spinlock,代码也不能睡眠。
此时,必须注意每个调用的系统函数,因为有些函数是可能导致睡眠的(如kmallspin_unlock_irqstoreoc)。
可以选定CONFIG_DEBUG_SPINLOCK选项,打开spinlock的调试功能
a.spinlock的初始化
静态
spinlock_t my_lock = SPIN_LOCK_UNLOCKED;
动态
void spin_lock_init(spinlock_t *lock);
b.加锁函数
void spin_lock(spinlock_t *lock); void spin_lock_irqsave(spinlock_t *lock, unsigned long flags); void spin_lock_irq(spinlock_t *lock); void spin_lock_bh(spinlock_t *lock);
如果我们的一个自旋锁可以被中断处理程序或者是中断的下半部获得,则必须使用禁止中断的spin_lock形式。
c.解锁函数
void spin_unlock(spinlock_t *lock); void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags); void spin_unlock_irq(spinlock_t *lock); void spin_unlock_bh(spinlock_t *lock);
d.非阻塞的自旋锁操作
int spin_trylock(spinlock_t *lock); int spin_trylock_bh(spinlock_t *lock);
成功(即获得锁)时返回非零,否则返回0
(5)读取者/写入者自旋锁(reader/writer)
有时候,锁的用途可以明确分为读取和写入。比如对链表的更新和检索。
写操作必须要求并发保护,但多个并发的读操作是安全的。
这样可以提高锁的使用效率。定义在
a.初始化
静态
rwlock_t my_rwlock = RW_LOCK_UNLOCKED;
动态
rwlock_t my_rwlock; rwlock_init(&my_rwlock);
b.加锁
read/write_lock() read/write_lock_irq() read/write_lock_irqsave() read/write_lock_bh()
c.解锁
read/write_unlock() read/write_unlock_irq() read/write_unlock_irqsave() read/write_unlock_bh()
d.其他
write_trylock() rw_is_locked()
e.使用方式
rwlock_t my_rwlock = RW_LOCK_UNLOCKED;
读者
read_lock(&my_rwlock); /* 临界区(只读) */ read_unlock(&my_rwlock);
写者
write_lock(&my_rwlock); /* 临界区(读写) */ write_unlock(&my_rwlock);
通常,读锁和写锁位于完全分开的代码中。不能把一个读锁升级为写锁。如:
read_lock(&my_rwlock); ... write_lock(&my_rwlock);
这会造成死锁。
如果代码不能清晰分成读和写两部分,那么就用普通的spinlock就可以了。
注意!读/写者锁机制要更照顾读者。
当读锁被持有时,写者必须等待,直到所有的读者释放锁,而其他的读者却可以继续获得锁。
这样,大量读者必定会使刮起的写者处于饥饿状态。
如果加锁时间不长并且代码不会睡眠(比如中断处理程序),利用自旋锁是最佳方案;
如果加锁时间很长或者代码在持有锁时有可能睡眠,则最好用信号量来完成加锁。
(6)信号量(semaphore)和互斥体
信号量(semaphore)和信号(signal)是完全不同的概念。
linux的信号量是一种睡眠锁。
如果一个进程a试图获得一个已经被占用的信号量时,该进程将被送入一个等待队列,然后睡眠。
持有信号量的进程b将信号量释放后,进程a将被唤醒并获得信号量。
信号量比自旋锁能提供更好的处理器利用率,但信号量的开销更大。
信号量的特性
a.信号量适用于锁会被长期持有时,因为其开销比较大
b.只有在进程上下文中才能获得信号量,因为获取信号量时可能导致睡眠,不适用于中断上下文
c.可以在持有信号量时去睡眠,因此可以在持有信号量的时候和用户空间同步
d.不能在持有信号量的同时持有自旋锁
e.信号量可以允许任意数量的锁持有者,而自旋锁在一个时刻只能有一个持有者。
只拥有一个持有者的信号量称为互斥信号量
(7)信号量的实现
信号量的实现与体系结构相关,具体实现定义在
a.信号量的声明
静态
static DECLARE_SEMAPHORE_GENERIC(name, count);
或声明一个互斥信号量
static DECLARE_MUTEX(name);
动态
struct semaphore sem; /*信号量常作为一个大的结构体中的一部分*/ sema_init(&sem, count); init_MUTEX(&sem); /* 初始化互斥信号量 */ init_MUTEX_LOCKED(&sem); /*初始化后信号量就被锁定 */
b.获得信号量
void down(struct semaphore *sem);
减小信号量的值。函数返回后获得信号量
int down_interruptible(struct semaphore *sem);;
在等待信号量时,可以被用户空间程序通过信号中断。
这是我们常用的版本。
如果操作被中断,则返回非零值,调用者无法获得信号量。
注意!调用时始终要检查返回值。
int down_trylock(struct semaphore *sem);
不会休眠,如果不能获得信号量,立即返回一个非零值
c.释放信号量
viod up(struct semaphore *sem);
linux也提供了读取者/写入者信号量。
#include struct rw_semaphore;
(9)completion机制
在内核编程中常见的一种模式是:
在当前线程之外初始化某个活动,然后等待该活动的结束。
这个活动可能是创建一个新的内核线程或者新的用户空间进程,
对一个已有进程的某个请求,或者某种类型的硬件活动等等。
可以利用信号量对两个任务进行同步。例如:
在任务1中
struct semaphore sem; init_MUTEX_LOCKED(&sem); start_external_task(&sem); down(&sem);
在任务2中
首先down(&sem),这样任务1就会在sem上等待;
任务2完成后,调用up(&sem),任务1恢复执行
用信号量来完成这一工作并不太有效,因此内核提供了completion机制(出现在2.4.7)来完成这一工作。
a.创建completion
#include
静态
DECLARE_COMPLETION(my_completion);
动态
struct completion my_completion; init_completion(&my_completion);
b.等待completion
void wait_for_completion(struct completion *c);
执行一个非中断的等待,用户空间无法通过signal来中断这个进程。
c.完成completion
void complete(struct completion *);
发信号唤醒任何等待任务
(2)如何禁止内核抢占
由于内核是抢占性的,内核中的进程在任何时刻都可能停下来被更高优先级的进程抢占。
有时候需要把这一特性关闭。
用户驱动直接关闭内核抢占的情况不多,但一些内核机制的内部需要这一功能。
比如单cpu系统上的spinlock锁,
spin_lock的内部并没有忙等待,而只是关闭了内核抢占。
a.抢占的相关函数
*preempt_disable()
增加抢占计数值,从而禁止内核抢占。
如果抢占计数为0则内核可以进行抢占,如果为1或更大的数值,则禁止抢占
*preempt_enable()
减少抢占计数,并当该值降为0时检查和执行被挂起的需调度的任务
*preempt_enable_no_resched()
激活内核抢占,但不检查被挂起的需调度的任务
*preempt_count()preempt_count()
返回抢占计数
b.每个cpu上的数据访问
int cpu; /*禁止内核抢占,并将cpu设置为当前处理器 */ cpu=get_cpu();
对每个处理器的数据进行操作
/*启动内核抢占 */ put_cpu();