前言
本篇文章我们来讲解锁的概念和实现原理。
一、锁的概念
在Linux中,锁(Lock)是一种同步机制,用于保护共享资源或临界区免受并发访问的影响。它可以确保在任何给定时间只有一个线程可以访问共享资源,从而防止竞争条件(Race Condition)和数据不一致的问题。
锁的主要目的是用于协调并发执行的线程,以确保资源的正确访问顺序和数据的一致性。当一个线程需要访问共享资源时,它必须先获得锁,执行完对资源的操作后释放锁,以便其他线程可以获取锁并进行访问。
在Linux中,常见的锁有以下几种:
1.互斥锁(Mutex):也称为互斥量,是最基本的锁机制。它提供了互斥访问共享资源的能力,当一个线程获得互斥锁后,其他线程将被阻塞,直到该线程释放锁为止。
2.读写锁(ReadWrite Lock):在多线程环境中,在共享资源被频繁读取而较少修改的情况下,使用读写锁可以提高并发性能。读写锁允许多个线程同时获取读锁,但只允许一个线程获取写锁。
3.自旋锁(Spinlock):自旋锁是一种基于忙等待的同步机制。当一个线程尝试获取自旋锁时,如果锁已经被其他线程持有,则该线程将自旋(忙等待)直到锁被释放。自旋锁适用于保护临界区代码较短且短暂占用共享资源的情况。
4.条件变量(Condition Variable):条件变量用于在多线程环境中实现线程间的通信和协调。它允许线程等待特定条件的发生,当条件满足时,线程会被唤醒继续执行。
这些锁机制都是通过系统调用或特定的库函数在用户空间或内核空间实现的。在使用锁的过程中,需要注意避免死锁(Deadlock)和饥饿(Starvation)等并发编程中常见的问题。
二、内核抢占
内核抢占(Kernel Preemption)是一种操作系统的特性,指在内核运行过程中可能会被更高优先级的任务中断,以便让其他任务能够获得执行机会。它允许操作系统内核在需要时主动放弃 CPU 控制权,使得更高优先级的任务能够及时响应。
在内核抢占被启用的系统中,内核代码的执行可以被其他任务或中断处理函数打断,然后切换到更高优先级的任务上执行。这种切换可能发生在内核的任何位置,而不仅仅是在显式的任务切换点。
内核抢占的主要目的是提高系统的响应性和实时性。它可以防止优先级较低的任务长时间占用 CPU,导致系统对于更高优先级的任务或紧急事件的响应延迟。通过允许内核抢占,操作系统可以更好地实现任务调度和响应机制,以满足实时性要求或确保对外部事件的及时响应。
三、自旋锁
自旋锁(Spinlock)是一种在多线程编程中常用的同步机制。它为了保护临界区或共享资源而设计,避免多个线程同时访问或修改导致数据不一致的问题。
自旋锁的特点是在获取锁之前,线程会一直"忙等待",不会进入睡眠状态,通过反复检查锁的状态来等待其他线程释放锁。当自旋锁处于被占用的状态时,即有其他线程持有锁时,请求获取锁的线程会循环忙等待,直到锁被释放。
自旋锁适用于以下情况:
临界区的执行时间很短,不值得将线程切换到睡眠状态。
获取锁的竞争性较低,即锁的占用时间短暂,不容易出现长时间的竞争。
在对称多处理器(SMP)系统中,自旋锁的效果更好,因为线程可以在其他处理器上运行,不会导致整个系统的停滞。
自旋锁的实现可以基于硬件原子指令或内核提供的原子操作函数来保证操作的原子性。它在Linux内核中广泛使用,在多线程编程和内核开发中起到了重要的作用。
需要注意的是,由于自旋锁采用忙等待的方式,如果等待时间过长,会造成CPU资源的浪费。因此,在使用自旋锁时需要注意选择合适的场景和上下文,并避免死锁和饥饿等问题。
四、互斥锁mutex
mutex(互斥锁)是一种常见的同步机制,用于控制对共享资源的互斥访问。它提供了一种排他性的机制,确保在任何给定时间只有一个线程可以持有锁并访问共享资源。
mutex的全称是"mutual exclusion",意为互斥。当一个线程获得了互斥锁之后,其他线程在尝试获得同一个锁时将被阻塞,直到当前持有锁的线程释放锁。
五、信号量
信号量(Semaphore)是一种用于控制并发访问的同步原语。它提供了一种机制,允许多个线程在互斥或共享资源访问中进行协调。信号量可以用来解决线程同步、互斥访问和资源管理等问题。
信号量的基本思想是通过一个计数器和一组相关的操作来实现资源的控制和同步。计数器表示可用的资源数量,每次访问共享资源时,线程需要先申请信号量,如果信号量的计数器大于零,则线程可以访问共享资源并将计数器减一;如果计数器为零,则线程需要等待,直到其他线程释放资源,使计数器变为非零。
六、自旋锁实现原理
1.单核CPU系统
在内核中锁的实现函数如下:
static __always_inline void spin_lock(spinlock_t *lock) { raw_spin_lock(&lock->rlock); }
展开函数后得到:
在单核的CPU中实现原理很简单,直接使用preempt_disable函数来禁止CPU的调度就可以了。
#define __LOCK(lock) \ do { preempt_disable(); ___LOCK(lock); } while (0)
但是在实际中除了程序的调度外还有中断的发生也会影响对同一个资源的访问。
那么这个时候就会使用到下面的函数:
static __always_inline void spin_lock_irq(spinlock_t *lock) { raw_spin_lock_irq(&lock->rlock); }
函数展开后:
会使用到local_irq_disable()来禁止中断,和使用__LOCK(lock)来禁止CPU的调度,这样的话中断和CPU都无法打断当前临界资源的访问了。
#define __LOCK_IRQ(lock) \ do { local_irq_disable(); __LOCK(lock); } while (0)
2.SMP系统
在单核CPU上自旋锁的实现是非常简单的,但是在SMP系统中有多个CPU这个时候就要 既防家贼,又防外贼了。
在SMP系统中下面结构体中的next和owner是实现自旋锁的核心。
typedef struct { union { u32 slock; struct __raw_tickets { #ifdef __ARMEB__ u16 next; u16 owner; #else u16 owner; u16 next; #endif } tickets; }; } arch_spinlock_t;
函数实现:
在SMP系统中使用了preempt_disable()函数来禁止调度也就是防止家贼
,那么在SMP系统中是怎么样防止其他的CPU来打断当前临界资源的读取呢?
void __lockfunc _raw_spin_lock_nested(raw_spinlock_t *lock, int subclass) { preempt_disable(); spin_acquire(&lock->dep_map, subclass, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); }
最终实现原理:
static inline void arch_spin_lock(arch_spinlock_t *lock) { unsigned long tmp; u32 newval; arch_spinlock_t lockval; prefetchw(&lock->slock); __asm__ __volatile__( "1: ldrex %0, [%3]\n" " add %1, %0, %4\n" " strex %2, %1, [%3]\n" " teq %2, #0\n" " bne 1b" : "=&r" (lockval), "=&r" (newval), "=&r" (tmp) : "r" (&lock->slock), "I" (1 << TICKET_SHIFT) : "cc"); while (lockval.tickets.next != lockval.tickets.owner) { wfe(); lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner); } smp_mb(); }
原理讲解:
1.首先,函数使用 prefetchw(&lock->slock) 提前加载自旋锁数据到 CPU 缓存中,以减少后续访问的延迟。
2.通过内联汇编(__asm__ __volatile__)执行以下指令序列来获取自旋锁:
使用 ldrex 指令加载自旋锁的值到寄存器 %0 中,保持之前的值(lockval)。
将 %0 中的值与一个偏移值(1 << TICKET_SHIFT)相加,得到新的值,存储在寄存器 %1 中。
使用 strex 指令将 %1 中的值存回自旋锁的地址(&lock->slock),将 strex 操作的结果存储在寄存器 %2 中。
使用 teq 指令将 %2 和 0 进行比较,判断存储是否成功。
如果存储失败,即 %2 不为 0(bne 1b),则跳回 1: 处重新加载自旋锁的值,并再次尝试获取锁。
这个指令序列采用了原子操作指令,确保对自旋锁的操作是原子的,即在多核环境下保证获取锁的原子性和互斥性。
3.在获取到自旋锁之后,进入一个 while 循环,检查是否存在等待链表中的其他线程,以确保自旋锁的互斥性。
循环中的条件判断是 lockval.tickets.next != lockval.tickets.owner,如果等待链表中下一个线程的票号(next)不等于当前线程的票号(owner),则继续循环。
在循环中,使用 wfe() 指令(等待事件)来暂停当前线程,以避免忙等待。
在每次循环迭代开始之前,使用 ACCESS_ONCE() 宏从内存中获取 lock->tickets.owner 的值,确保每次都获得最新的值。
4.循环结束后,使用 smp_mb() 进行内存屏障操作,确保自旋锁的所有操作都完成,并且所有对共享数据的修改都对其他线程可见。
七、信号量的实现原理
信号量的实现如下:
struct semaphore { raw_spinlock_t lock; unsigned int count; struct list_head wait_list; };
下面来讲解一下这个结构体当中变量的作用
raw_spinlock_t lock 这是一个自旋锁变量,信号量的实现需要借助于自旋锁。
unsigned int count count是表示当前信号量的计数器值,信号量的计数器表示可用资源的数量。当计数器大于零时,表示有可用资源,可以继续访问共享资源;当计数器等于零时,表示资源已经被占用,需要等待其他进程或线程释放资源。
struct list_head wait_list: 这是一个链表的头结构,用于维护等待该信号量的进程或线程的队列。当一个进程或线程等待信号量时,它会将自己的控制数据结构(如进程控制块或线程控制块)添加到 wait_list 链表中。当信号量的计数器为零时,正在等待的进程或线程会被放置在该链表中,直到有其他进程或线程释放资源。
信号量的获取使用down函数:
void down(struct semaphore *sem);
实现原理:
这里可以看到使用到了自旋锁来访问count变量,当成功获取到信号量时count值减1,没有获取到时调用__down(sem)函数。
void down(struct semaphore *sem) { unsigned long flags; raw_spin_lock_irqsave(&sem->lock, flags); if (likely(sem->count > 0)) sem->count--; else __down(sem); raw_spin_unlock_irqrestore(&sem->lock, flags); } EXPORT_SYMBOL(down);
__down(sem)函数展开后:
static inline int __sched __down_common(struct semaphore *sem, long state, long timeout) { struct task_struct *task = current; struct semaphore_waiter waiter; list_add_tail(&waiter.list, &sem->wait_list); waiter.task = task; waiter.up = false; for (;;) { if (signal_pending_state(state, task)) goto interrupted; if (unlikely(timeout <= 0)) goto timed_out; __set_task_state(task, state); raw_spin_unlock_irq(&sem->lock); timeout = schedule_timeout(timeout); raw_spin_lock_irq(&sem->lock); if (waiter.up) return 0; } timed_out: list_del(&waiter.list); return -ETIME; interrupted: list_del(&waiter.list); return -EINTR; }
下面详细讲解实现逻辑:
分割线
struct task_struct *task = current;: 获取当前正在执行的任务(进程或线程)的任务结构体指针。
struct semaphore_waiter waiter;: 创建一个名为 waiter 的信号量等待者结构体。
list_add_tail(&waiter.list, &sem->wait_list);: 将 waiter 结构体添加到信号量的等待队列末尾。这将把当前任务添加到等待队列中,表示它正在等待获取信号量。
waiter.task = task;: 将 task 关联到 waiter 结构体,表示该等待者对应于当前任务。
waiter.up = false;: 将 waiter 结构体的 up 字段设置为 false,表示该等待者尚未获得信号量。
for ( ; ; ) { … }: 开始一个无限循环,用于等待获得信号量。
if (signal_pending_state(state, task)): 检查当前任务是否有待处理的信号(如中断信号或终止信号)。如果有,则跳转到标签 interrupted,表示获取操作被中断。
if (unlikely(timeout <= 0)): 检查等待超时时间是否小于等于零。如果是,则跳转到标签 timed_out,表示等待超时。
__set_task_state(task, state);: 将当前任务的状态设置为 state,通常是一个阻塞状态(例如,TASK_INTERRUPTIBLE 或 TASK_UNINTERRUPTIBLE),表示任务正在等待。
raw_spin_unlock_irq(&sem->lock);: 解锁信号量的自旋锁,允许其他任务访问信号量。
timeout = schedule_timeout(timeout);: 使当前任务进入睡眠状态,并等待指定的超时时间或直到被唤醒。
raw_spin_lock_irq(&sem->lock);: 当被唤醒后,重新获取信号量的自旋锁。
if (waiter.up) return 0;: 检查等待者的 up 字段是否被设置为 true。如果是,表示该等待者已经成功获取信号量(通过其他任务释放资源),函数返回 0,表示获取成功。
list_del(&waiter.list);: 从信号量的等待队列中删除 waiter 结构体。
return -ETIME;: 返回一个代表等待超时的错误代码 -ETIME,表示等待超时。
return -EINTR;: 返回一个代表等待被中断的错误代码 -EINTR,表示获取过程被中断。
分割线
void up(struct semaphore *sem);
如果没有其他进程或者线程在等待信号量那么就直接将count进行++,如果有那么调用__up(sem)函数进行处理。
void up(struct semaphore *sem) { unsigned long flags; raw_spin_lock_irqsave(&sem->lock, flags); if (likely(list_empty(&sem->wait_list))) sem->count++; else __up(sem); raw_spin_unlock_irqrestore(&sem->lock, flags); }
这个函数的实现机理是不是太难下面进行讲解:
1.从等待链表中取出第一个在等待的进程
2.将他从等待链表中删除
3.标识已经获取到了信号量
4.唤醒得到信号量的进程
static noinline void __sched __up(struct semaphore *sem) { struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list); list_del(&waiter->list); waiter->up = true; wake_up_process(waiter->task); }
八、互斥锁的实现原理
下面是mutex结构体中的成员:
struct mutex { /* 1: unlocked, 0: locked, negative: locked, possible waiters */ atomic_t count; spinlock_t wait_lock; struct list_head wait_list; #if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER) struct task_struct *owner; #endif #ifdef CONFIG_MUTEX_SPIN_ON_OWNER struct optimistic_spin_queue osq; /* Spinner MCS lock */ #endif #ifdef CONFIG_DEBUG_MUTEXES void *magic; #endif #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif };
这里讲解几个比较重要的成员:
1.atomic_t count; 原子变量
2.spinlock_t wait_lock; mutex的实现需要借助自旋锁
3.struct list_head wait_list; 等待mutex的进程会放在这里
4.struct task_struct *owner; 性能优化,调试使用变量
使用mutex时会用到下面两个函数:
mutex_lock函数:
void __sched mutex_lock(struct mutex *lock) { might_sleep(); /* * The locking fastpath is the 1->0 transition from * 'unlocked' into 'locked' state. */ __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath); mutex_set_owner(lock); }
mutex_unlock函数:
void __sched mutex_unlock(struct mutex *lock) { /* * The unlocking fastpath is the 0->1 transition from 'locked' * into 'unlocked' state: */ #ifndef CONFIG_DEBUG_MUTEXES /* * When debugging is enabled we must not clear the owner before time, * the slow path will always be taken, and that clears the owner field * after verifying that it was indeed current. */ mutex_clear_owner(lock); #endif __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath); }
这里我们可以看到mutex的获取和释放都存在fastpath和slowpath两个解决方法。
这两个函数都是先尝试使用fastpath函数再使用slowpath函数。
那么下面我们来看一下这两种方法有什么区别:
首先看到mutex_lock函数的实现:
fastpath实现:
一开始count值为1,减1后变成0,不会执行if中的语句,直接获得锁,当有进程占用了锁时,则会执行到if语句中的fail_fn(count)函数。
执行fail_fn(count)函数其实就是执行slowpath函数。
static inline void __mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *)) { if (unlikely(atomic_dec_return_acquire(count) < 0)) fail_fn(count); }
slowpath函数:
__mutex_lock_slowpath(atomic_t *lock_count) { struct mutex *lock = container_of(lock_count, struct mutex, count); __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_, NULL, 0); }
mutex_unlock的实现:
fastpath实现:
首先这里对count值进行加1操作,假如当前没有进程在等待锁那么count为1不执行fail_fn函数,假如有进程在等待锁,count值不为1,执行fail_fn函数。
static inline void __mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *)) { if (unlikely(atomic_inc_return_release(count) <= 0)) fail_fn(count); }
slowpath实现:
这里先使用atomic_set函数将count值设置为1,然后遍历等待的队列,将第一个等待的进程取出并唤醒。
static inline void __mutex_unlock_common_slowpath(struct mutex *lock, int nested) { unsigned long flags; WAKE_Q(wake_q); /* * As a performance measurement, release the lock before doing other * wakeup related duties to follow. This allows other tasks to acquire * the lock sooner, while still handling cleanups in past unlock calls. * This can be done as we do not enforce strict equivalence between the * mutex counter and wait_list. * * * Some architectures leave the lock unlocked in the fastpath failure * case, others need to leave it locked. In the later case we have to * unlock it here - as the lock counter is currently 0 or negative. */ if (__mutex_slowpath_needs_to_unlock()) atomic_set(&lock->count, 1); spin_lock_mutex(&lock->wait_lock, flags); mutex_release(&lock->dep_map, nested, _RET_IP_); debug_mutex_unlock(lock); if (!list_empty(&lock->wait_list)) { /* get the first entry from the wait-list: */ struct mutex_waiter *waiter = list_entry(lock->wait_list.next, struct mutex_waiter, list); debug_mutex_wake_waiter(lock, waiter); wake_q_add(&wake_q, waiter->task); } spin_unlock_mutex(&lock->wait_lock, flags); wake_up_q(&wake_q); }
总结
本篇文章就讲解到这里,知识量比较大,需要大家看完后好好总结复习。