Linux驱动开发(锁和信号量的概念及实现原理)

简介: Linux驱动开发(锁和信号量的概念及实现原理)

前言

本篇文章我们来讲解锁的概念和实现原理。

一、锁的概念

在Linux中,锁(Lock)是一种同步机制,用于保护共享资源或临界区免受并发访问的影响。它可以确保在任何给定时间只有一个线程可以访问共享资源,从而防止竞争条件(Race Condition)和数据不一致的问题。

锁的主要目的是用于协调并发执行的线程,以确保资源的正确访问顺序和数据的一致性。当一个线程需要访问共享资源时,它必须先获得锁,执行完对资源的操作后释放锁,以便其他线程可以获取锁并进行访问。

在Linux中,常见的锁有以下几种:

1.互斥锁(Mutex):也称为互斥量,是最基本的锁机制。它提供了互斥访问共享资源的能力,当一个线程获得互斥锁后,其他线程将被阻塞,直到该线程释放锁为止。

2.读写锁(ReadWrite Lock):在多线程环境中,在共享资源被频繁读取而较少修改的情况下,使用读写锁可以提高并发性能。读写锁允许多个线程同时获取读锁,但只允许一个线程获取写锁。

3.自旋锁(Spinlock):自旋锁是一种基于忙等待的同步机制。当一个线程尝试获取自旋锁时,如果锁已经被其他线程持有,则该线程将自旋(忙等待)直到锁被释放。自旋锁适用于保护临界区代码较短且短暂占用共享资源的情况。

4.条件变量(Condition Variable):条件变量用于在多线程环境中实现线程间的通信和协调。它允许线程等待特定条件的发生,当条件满足时,线程会被唤醒继续执行。

这些锁机制都是通过系统调用或特定的库函数在用户空间或内核空间实现的。在使用锁的过程中,需要注意避免死锁(Deadlock)和饥饿(Starvation)等并发编程中常见的问题。

二、内核抢占

内核抢占(Kernel Preemption)是一种操作系统的特性,指在内核运行过程中可能会被更高优先级的任务中断,以便让其他任务能够获得执行机会。它允许操作系统内核在需要时主动放弃 CPU 控制权,使得更高优先级的任务能够及时响应。

在内核抢占被启用的系统中,内核代码的执行可以被其他任务或中断处理函数打断,然后切换到更高优先级的任务上执行。这种切换可能发生在内核的任何位置,而不仅仅是在显式的任务切换点。

内核抢占的主要目的是提高系统的响应性和实时性。它可以防止优先级较低的任务长时间占用 CPU,导致系统对于更高优先级的任务或紧急事件的响应延迟。通过允许内核抢占,操作系统可以更好地实现任务调度和响应机制,以满足实时性要求或确保对外部事件的及时响应。

三、自旋锁

自旋锁(Spinlock)是一种在多线程编程中常用的同步机制。它为了保护临界区或共享资源而设计,避免多个线程同时访问或修改导致数据不一致的问题。

自旋锁的特点是在获取锁之前,线程会一直"忙等待",不会进入睡眠状态,通过反复检查锁的状态来等待其他线程释放锁。当自旋锁处于被占用的状态时,即有其他线程持有锁时,请求获取锁的线程会循环忙等待,直到锁被释放。

自旋锁适用于以下情况:

临界区的执行时间很短,不值得将线程切换到睡眠状态。

获取锁的竞争性较低,即锁的占用时间短暂,不容易出现长时间的竞争。

在对称多处理器(SMP)系统中,自旋锁的效果更好,因为线程可以在其他处理器上运行,不会导致整个系统的停滞。

自旋锁的实现可以基于硬件原子指令或内核提供的原子操作函数来保证操作的原子性。它在Linux内核中广泛使用,在多线程编程和内核开发中起到了重要的作用。

需要注意的是,由于自旋锁采用忙等待的方式,如果等待时间过长,会造成CPU资源的浪费。因此,在使用自旋锁时需要注意选择合适的场景和上下文,并避免死锁和饥饿等问题。

四、互斥锁mutex

mutex(互斥锁)是一种常见的同步机制,用于控制对共享资源的互斥访问。它提供了一种排他性的机制,确保在任何给定时间只有一个线程可以持有锁并访问共享资源。

mutex的全称是"mutual exclusion",意为互斥。当一个线程获得了互斥锁之后,其他线程在尝试获得同一个锁时将被阻塞,直到当前持有锁的线程释放锁。

五、信号量

信号量(Semaphore)是一种用于控制并发访问的同步原语。它提供了一种机制,允许多个线程在互斥或共享资源访问中进行协调。信号量可以用来解决线程同步、互斥访问和资源管理等问题。

信号量的基本思想是通过一个计数器和一组相关的操作来实现资源的控制和同步。计数器表示可用的资源数量,每次访问共享资源时,线程需要先申请信号量,如果信号量的计数器大于零,则线程可以访问共享资源并将计数器减一;如果计数器为零,则线程需要等待,直到其他线程释放资源,使计数器变为非零。

六、自旋锁实现原理

1.单核CPU系统

在内核中锁的实现函数如下:

static __always_inline void spin_lock(spinlock_t *lock)
{
  raw_spin_lock(&lock->rlock);
}

展开函数后得到:

在单核的CPU中实现原理很简单,直接使用preempt_disable函数来禁止CPU的调度就可以了。

#define __LOCK(lock) \
  do { preempt_disable(); ___LOCK(lock); } while (0)

但是在实际中除了程序的调度外还有中断的发生也会影响对同一个资源的访问。

那么这个时候就会使用到下面的函数:

static __always_inline void spin_lock_irq(spinlock_t *lock)
{
  raw_spin_lock_irq(&lock->rlock);
}

函数展开后:

会使用到local_irq_disable()来禁止中断,和使用__LOCK(lock)来禁止CPU的调度,这样的话中断和CPU都无法打断当前临界资源的访问了。

#define __LOCK_IRQ(lock) \
  do { local_irq_disable(); __LOCK(lock); } while (0)

2.SMP系统

在单核CPU上自旋锁的实现是非常简单的,但是在SMP系统中有多个CPU这个时候就要 既防家贼,又防外贼了。

在SMP系统中下面结构体中的next和owner是实现自旋锁的核心。

typedef struct {
  union {
    u32 slock;
    struct __raw_tickets {
#ifdef __ARMEB__
      u16 next;
      u16 owner;
#else
      u16 owner;
      u16 next;
#endif
    } tickets;
  };
} arch_spinlock_t;

函数实现:

在SMP系统中使用了preempt_disable()函数来禁止调度也就是防止家贼,那么在SMP系统中是怎么样防止其他的CPU来打断当前临界资源的读取呢?

void __lockfunc _raw_spin_lock_nested(raw_spinlock_t *lock, int subclass)
{
  preempt_disable();
  spin_acquire(&lock->dep_map, subclass, 0, _RET_IP_);
  LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

最终实现原理:

static inline void arch_spin_lock(arch_spinlock_t *lock)
{
  unsigned long tmp;
  u32 newval;
  arch_spinlock_t lockval;
  prefetchw(&lock->slock);
  __asm__ __volatile__(
"1: ldrex %0, [%3]\n"
" add %1, %0, %4\n"
" strex %2, %1, [%3]\n"
" teq %2, #0\n"
" bne 1b"
  : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
  : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
  : "cc");
  while (lockval.tickets.next != lockval.tickets.owner) {
    wfe();
    lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);
  }
  smp_mb();
}

原理讲解:

1.首先,函数使用 prefetchw(&lock->slock) 提前加载自旋锁数据到 CPU 缓存中,以减少后续访问的延迟。

2.通过内联汇编(__asm__ __volatile__)执行以下指令序列来获取自旋锁:

使用 ldrex 指令加载自旋锁的值到寄存器 %0 中,保持之前的值(lockval)。

将 %0 中的值与一个偏移值(1 << TICKET_SHIFT)相加,得到新的值,存储在寄存器 %1 中。

使用 strex 指令将 %1 中的值存回自旋锁的地址(&lock->slock),将 strex 操作的结果存储在寄存器 %2 中。

使用 teq 指令将 %2 和 0 进行比较,判断存储是否成功。

如果存储失败,即 %2 不为 0(bne 1b),则跳回 1: 处重新加载自旋锁的值,并再次尝试获取锁。

这个指令序列采用了原子操作指令,确保对自旋锁的操作是原子的,即在多核环境下保证获取锁的原子性和互斥性。

3.在获取到自旋锁之后,进入一个 while 循环,检查是否存在等待链表中的其他线程,以确保自旋锁的互斥性。

循环中的条件判断是 lockval.tickets.next != lockval.tickets.owner,如果等待链表中下一个线程的票号(next)不等于当前线程的票号(owner),则继续循环。

在循环中,使用 wfe() 指令(等待事件)来暂停当前线程,以避免忙等待。

在每次循环迭代开始之前,使用 ACCESS_ONCE() 宏从内存中获取 lock->tickets.owner 的值,确保每次都获得最新的值。

4.循环结束后,使用 smp_mb() 进行内存屏障操作,确保自旋锁的所有操作都完成,并且所有对共享数据的修改都对其他线程可见。

七、信号量的实现原理

信号量的实现如下:

struct semaphore {
  raw_spinlock_t    lock;
  unsigned int    count;
  struct list_head  wait_list;
};

下面来讲解一下这个结构体当中变量的作用

raw_spinlock_t lock 这是一个自旋锁变量,信号量的实现需要借助于自旋锁。

unsigned int count count是表示当前信号量的计数器值,信号量的计数器表示可用资源的数量。当计数器大于零时,表示有可用资源,可以继续访问共享资源;当计数器等于零时,表示资源已经被占用,需要等待其他进程或线程释放资源。

struct list_head wait_list: 这是一个链表的头结构,用于维护等待该信号量的进程或线程的队列。当一个进程或线程等待信号量时,它会将自己的控制数据结构(如进程控制块或线程控制块)添加到 wait_list 链表中。当信号量的计数器为零时,正在等待的进程或线程会被放置在该链表中,直到有其他进程或线程释放资源。

信号量的获取使用down函数:

void down(struct semaphore *sem);

实现原理:

这里可以看到使用到了自旋锁来访问count变量,当成功获取到信号量时count值减1,没有获取到时调用__down(sem)函数。

void down(struct semaphore *sem)
{
  unsigned long flags;
  raw_spin_lock_irqsave(&sem->lock, flags);
  if (likely(sem->count > 0))
    sem->count--;
  else
    __down(sem);
  raw_spin_unlock_irqrestore(&sem->lock, flags);
}
EXPORT_SYMBOL(down);

__down(sem)函数展开后:

static inline int __sched __down_common(struct semaphore *sem, long state,
                long timeout)
{
  struct task_struct *task = current;
  struct semaphore_waiter waiter;
  list_add_tail(&waiter.list, &sem->wait_list);
  waiter.task = task;
  waiter.up = false;
  for (;;) {
    if (signal_pending_state(state, task))
      goto interrupted;
    if (unlikely(timeout <= 0))
      goto timed_out;
    __set_task_state(task, state);
    raw_spin_unlock_irq(&sem->lock);
    timeout = schedule_timeout(timeout);
    raw_spin_lock_irq(&sem->lock);
    if (waiter.up)
      return 0;
  }
 timed_out:
  list_del(&waiter.list);
  return -ETIME;
 interrupted:
  list_del(&waiter.list);
  return -EINTR;
}

下面详细讲解实现逻辑:

分割线

struct task_struct *task = current;: 获取当前正在执行的任务(进程或线程)的任务结构体指针。

struct semaphore_waiter waiter;: 创建一个名为 waiter 的信号量等待者结构体。

list_add_tail(&waiter.list, &sem->wait_list);: 将 waiter 结构体添加到信号量的等待队列末尾。这将把当前任务添加到等待队列中,表示它正在等待获取信号量。

waiter.task = task;: 将 task 关联到 waiter 结构体,表示该等待者对应于当前任务。

waiter.up = false;: 将 waiter 结构体的 up 字段设置为 false,表示该等待者尚未获得信号量。

for ( ; ; ) { … }: 开始一个无限循环,用于等待获得信号量。

if (signal_pending_state(state, task)): 检查当前任务是否有待处理的信号(如中断信号或终止信号)。如果有,则跳转到标签 interrupted,表示获取操作被中断。

if (unlikely(timeout <= 0)): 检查等待超时时间是否小于等于零。如果是,则跳转到标签 timed_out,表示等待超时。

__set_task_state(task, state);: 将当前任务的状态设置为 state,通常是一个阻塞状态(例如,TASK_INTERRUPTIBLE 或 TASK_UNINTERRUPTIBLE),表示任务正在等待。

raw_spin_unlock_irq(&sem->lock);: 解锁信号量的自旋锁,允许其他任务访问信号量。

timeout = schedule_timeout(timeout);: 使当前任务进入睡眠状态,并等待指定的超时时间或直到被唤醒。

raw_spin_lock_irq(&sem->lock);: 当被唤醒后,重新获取信号量的自旋锁。

if (waiter.up) return 0;: 检查等待者的 up 字段是否被设置为 true。如果是,表示该等待者已经成功获取信号量(通过其他任务释放资源),函数返回 0,表示获取成功。

list_del(&waiter.list);: 从信号量的等待队列中删除 waiter 结构体。

return -ETIME;: 返回一个代表等待超时的错误代码 -ETIME,表示等待超时。

return -EINTR;: 返回一个代表等待被中断的错误代码 -EINTR,表示获取过程被中断。

分割线
void up(struct semaphore *sem);

如果没有其他进程或者线程在等待信号量那么就直接将count进行++,如果有那么调用__up(sem)函数进行处理。

void up(struct semaphore *sem)
{
  unsigned long flags;
  raw_spin_lock_irqsave(&sem->lock, flags);
  if (likely(list_empty(&sem->wait_list)))
    sem->count++;
  else
    __up(sem);
  raw_spin_unlock_irqrestore(&sem->lock, flags);
}

这个函数的实现机理是不是太难下面进行讲解:

1.从等待链表中取出第一个在等待的进程

2.将他从等待链表中删除

3.标识已经获取到了信号量

4.唤醒得到信号量的进程

static noinline void __sched __up(struct semaphore *sem)
{
  struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
            struct semaphore_waiter, list);
  list_del(&waiter->list);
  waiter->up = true;
  wake_up_process(waiter->task);
}

八、互斥锁的实现原理

下面是mutex结构体中的成员:

struct mutex {
  /* 1: unlocked, 0: locked, negative: locked, possible waiters */
  atomic_t    count;
  spinlock_t    wait_lock;
  struct list_head  wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
  struct task_struct  *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
  struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXES
  void      *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
  struct lockdep_map  dep_map;
#endif
};

这里讲解几个比较重要的成员:

1.atomic_t count; 原子变量

2.spinlock_t wait_lock; mutex的实现需要借助自旋锁

3.struct list_head wait_list; 等待mutex的进程会放在这里

4.struct task_struct *owner; 性能优化,调试使用变量

使用mutex时会用到下面两个函数:

mutex_lock函数:

void __sched mutex_lock(struct mutex *lock)
{
  might_sleep();
  /*
   * The locking fastpath is the 1->0 transition from
   * 'unlocked' into 'locked' state.
   */
  __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
  mutex_set_owner(lock);
}

mutex_unlock函数:

void __sched mutex_unlock(struct mutex *lock)
{
  /*
   * The unlocking fastpath is the 0->1 transition from 'locked'
   * into 'unlocked' state:
   */
#ifndef CONFIG_DEBUG_MUTEXES
  /*
   * When debugging is enabled we must not clear the owner before time,
   * the slow path will always be taken, and that clears the owner field
   * after verifying that it was indeed current.
   */
  mutex_clear_owner(lock);
#endif
  __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}

这里我们可以看到mutex的获取和释放都存在fastpath和slowpath两个解决方法。

这两个函数都是先尝试使用fastpath函数再使用slowpath函数。

那么下面我们来看一下这两种方法有什么区别:

首先看到mutex_lock函数的实现:

fastpath实现:

一开始count值为1,减1后变成0,不会执行if中的语句,直接获得锁,当有进程占用了锁时,则会执行到if语句中的fail_fn(count)函数。

执行fail_fn(count)函数其实就是执行slowpath函数。

static inline void
__mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
  if (unlikely(atomic_dec_return_acquire(count) < 0))
    fail_fn(count);
}

slowpath函数:

__mutex_lock_slowpath(atomic_t *lock_count)
{
  struct mutex *lock = container_of(lock_count, struct mutex, count);
  __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
          NULL, _RET_IP_, NULL, 0);
}
mutex_unlock的实现:

fastpath实现:

首先这里对count值进行加1操作,假如当前没有进程在等待锁那么count为1不执行fail_fn函数,假如有进程在等待锁,count值不为1,执行fail_fn函数。

static inline void
__mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
  if (unlikely(atomic_inc_return_release(count) <= 0))
    fail_fn(count);
}

slowpath实现:

这里先使用atomic_set函数将count值设置为1,然后遍历等待的队列,将第一个等待的进程取出并唤醒。

static inline void
__mutex_unlock_common_slowpath(struct mutex *lock, int nested)
{
  unsigned long flags;
  WAKE_Q(wake_q);
  /*
   * As a performance measurement, release the lock before doing other
   * wakeup related duties to follow. This allows other tasks to acquire
   * the lock sooner, while still handling cleanups in past unlock calls.
   * This can be done as we do not enforce strict equivalence between the
   * mutex counter and wait_list.
   *
   *
   * Some architectures leave the lock unlocked in the fastpath failure
   * case, others need to leave it locked. In the later case we have to
   * unlock it here - as the lock counter is currently 0 or negative.
   */
  if (__mutex_slowpath_needs_to_unlock())
    atomic_set(&lock->count, 1);
  spin_lock_mutex(&lock->wait_lock, flags);
  mutex_release(&lock->dep_map, nested, _RET_IP_);
  debug_mutex_unlock(lock);
  if (!list_empty(&lock->wait_list)) {
    /* get the first entry from the wait-list: */
    struct mutex_waiter *waiter =
        list_entry(lock->wait_list.next,
             struct mutex_waiter, list);
    debug_mutex_wake_waiter(lock, waiter);
    wake_q_add(&wake_q, waiter->task);
  }
  spin_unlock_mutex(&lock->wait_lock, flags);
  wake_up_q(&wake_q);
}

总结

本篇文章就讲解到这里,知识量比较大,需要大家看完后好好总结复习。


相关文章
|
11月前
|
Ubuntu 搜索推荐 Linux
详解Ubuntu的strings与grep命令:Linux开发的实用工具。
这就是Ubuntu中的strings和grep命令,透明且强大。我希望你喜欢这个神奇的世界,并能在你的Linux开发旅程上,通过它们找到你的方向。记住,你的电脑是你的舞台,在上面你可以做任何你想做的事,只要你敢于尝试。
488 32
|
11月前
|
NoSQL Linux 编译器
GDB符号表概念和在Linux下获取符号表的方法
通过掌握这些关于GDB符号表的知识,你可以更好地管理和理解你的程序,希望这些知识可以帮助你更有效地进行调试工作。
471 16
|
11月前
|
Unix Linux
对于Linux的进程概念以及进程状态的理解和解析
现在,我们已经了解了Linux进程的基础知识和进程状态的理解了。这就像我们理解了城市中行人的行走和行为模式!希望这个形象的例子能帮助我们更好地理解这个重要的概念,并在实际应用中发挥作用。
215 20
|
Linux C语言
Linux读写锁源码分析
本文分析了读写锁的实现原理与应用场景,基于glibc 2.17源码。读写锁通过读引用计数、写线程ID、条件变量等实现,支持读优先(默认)和写优先模式。读优先时,写锁可能饥饿;写优先时,读线程需等待写锁释放。详细解析了`pthread_rwlock_t`数据结构及加解锁流程,并通过实验验证:2000个读线程与1个写线程测试下,读优先导致写锁饥饿,写优先则正常抢占锁。
369 19
|
10月前
|
存储 Linux Shell
Linux进程概念-详细版(二)
在Linux进程概念-详细版(一)中我们解释了什么是进程,以及进程的各种状态,已经对进程有了一定的认识,那么这篇文章将会继续补全上篇文章剩余没有说到的,进程优先级,环境变量,程序地址空间,进程地址空间,以及调度队列。
184 0
|
10月前
|
Linux 调度 C语言
Linux进程概念-详细版(一)
子进程与父进程代码共享,其子进程直接用父进程的代码,其自己本身无代码,所以子进程无法改动代码,平时所说的修改是修改的数据。为什么要创建子进程:为了让其父子进程执行不同的代码块。子进程的数据相对于父进程是会进行写时拷贝(COW)。
243 0
|
存储 Linux 调度
【Linux】进程概念和进程状态
本文详细介绍了Linux系统中进程的核心概念与管理机制。从进程的定义出发,阐述了其作为操作系统资源管理的基本单位的重要性,并深入解析了task_struct结构体的内容及其在进程管理中的作用。同时,文章讲解了进程的基本操作(如获取PID、查看进程信息等)、父进程与子进程的关系(重点分析fork函数)、以及进程的三种主要状态(运行、阻塞、挂起)。此外,还探讨了Linux特有的进程状态表示和孤儿进程的处理方式。通过学习这些内容,读者可以更好地理解Linux进程的运行原理并优化系统性能。
477 4
|
存储 IDE Unix
用了这么久 Linux ,才知道这些概念。。。
我们大家应该知道,Linux 和 UNIX 中的文件系统是一个以 / 为根的树状式文件结构,/ 是 Linux 和 UNIX 中的根目录,同样它也是文件系统的起点。所有的文件和目录都位于 / 路径下,包括我们经常听到的 /usr、/etc、/bin、/home 等。在早期的 UNIX 系统中,各个厂家都定义了自己文件系统的命名构成,比较混乱,而且难以区分。
用了这么久 Linux ,才知道这些概念。。。
|
7月前
|
Linux 应用服务中间件 Shell
二、Linux文本处理与文件操作核心命令
熟悉了Linux的基本“行走”后,就该拿起真正的“工具”干活了。用grep这个“放大镜”在文件里搜索内容,用find这个“探测器”在系统中寻找文件,再用tar把东西打包带走。最关键的是要学会使用管道符|,它像一条流水线,能把这些命令串联起来,让简单工具组合出强大的功能,比如 ps -ef | grep 'nginx' 就能快速找出nginx进程。
811 1
二、Linux文本处理与文件操作核心命令
|
7月前
|
Linux
linux命令—stat
`stat` 是 Linux 系统中用于查看文件或文件系统详细状态信息的命令。相比 `ls -l`,它提供更全面的信息,包括文件大小、权限、所有者、时间戳(最后访问、修改、状态变更时间)、inode 号、设备信息等。其常用选项包括 `-f` 查看文件系统状态、`-t` 以简洁格式输出、`-L` 跟踪符号链接,以及 `-c` 或 `--format` 自定义输出格式。通过这些选项,用户可以灵活获取所需信息,适用于系统调试、权限检查、磁盘管理等场景。
470 137
下一篇
开通oss服务