Linux读写锁源码分析

简介: 本文分析了读写锁的实现原理与应用场景,基于glibc 2.17源码。读写锁通过读引用计数、写线程ID、条件变量等实现,支持读优先(默认)和写优先模式。读优先时,写锁可能饥饿;写优先时,读线程需等待写锁释放。详细解析了`pthread_rwlock_t`数据结构及加解锁流程,并通过实验验证:2000个读线程与1个写线程测试下,读优先导致写锁饥饿,写优先则正常抢占锁。

读写锁由读引用计数、写线程id、读/写条件变量、等待读/写队列长度组合实现。当写优先锁被加写锁后,等待读队列的线程(新的读锁请求)阻塞等待,正在读的线程全部释放读锁后,写锁加锁。当前写锁被释放且写等待队列为空,被阻塞的加读锁线程才会被唤醒。

读优先模式(默认):加读锁优先级一定高于加写锁,写锁必须阻塞等待读锁释放
写优先模式:加写锁的优先级一定高于加读锁,读锁必须阻塞等待写锁释放

多读少写:使用读优先会导致写饥饿,应该使用写优先
少写多读:使用写优先会导致读饥饿,应该使用读优先
读写几率相等:会导致读写锁频繁切换,读写锁设计较复杂性能未必高于互斥锁,

以下分析机遇glibc.2.17源码,与开发机版本相同

man pthread_rwlockattr_setkind_np  
DESCRIPTION
       The  pthread_rwlockattr_setkind_np()  function  sets the "lock kind" attribute of the read-write lock attribute object referred to by attr to the value specified in pref.
       The argument pref may be set to one of the following:
             读优先,默认,读锁是一个递归锁
       PTHREAD_RWLOCK_PREFER_READER_NP
              This is the default.  A thread may hold multiple read locks; that is, read locks are recursive.  According to The Single Unix Specification, the  behavior  is  un‐
              specified  when  a  reader  tries  to  place  a  lock,  and  there  is  no  write  lock  but  writers  are  waiting.  Giving preference to the reader, as is set by
              PTHREAD_RWLOCK_PREFER_READER_NP, implies that the reader will receive the requested lock, even if a writer is waiting.  As long as there are  readers,  the  writer
              will be starved.
       写优先,glibc忽略此实现,posix要求读锁是递归锁,如果写也是递归锁会死锁,使用此选项也会设置成读优先
       PTHREAD_RWLOCK_PREFER_WRITER_NP 
              This  is  intended  as  the write lock analog of PTHREAD_RWLOCK_PREFER_READER_NP.  This is ignored by glibc because the POSIX requirement to support recursive read
              locks would cause this option to create trivial deadlocks; instead use PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP which ensures the  application  developer  will
              not take recursive read locks thus avoiding deadlocks.
             写优先,非递归实现,glibc写优先使用此选项
       PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP
              Setting the lock kind to this avoids writer starvation as long as any read locking is not done in a recursive fashion.

底层函数

LIBC_PROBE(添加探针)

/* 
 * 在调用出添加探针,用于监控内核和用户空间的执行
 *
 * probe_name                 探针名称
 * number_of_arguments        参数列表数量
 * argument1, argument2, ...  参数列表
 */
LIBC_PROBE (probe_name, number_of_arguments, argument1, argument2, ...);

lll_lock (底层加锁操作)

/* 
 * glibc用于封装高级锁的一种基础函数
 * 
 * futex                      CAS锁
 * private                    锁类型(单进程独占/多进程共享)
 */
#define lll_lock(futex, private) __lll_lock (&(futex), private)
static inline void __attribute__ ((always_inline))
__lll_lock (int *futex, int private)
{
  if (atomic_compare_and_exchange_bool_acq (futex, 1, 0) != 0)       // 如果CAS锁被其他线程锁住
    {
      // 阻塞等待加锁成功
      if (__builtin_constant_p (private) && private == LLL_PRIVATE)
    __lll_lock_wait_private (futex);
      else
    __lll_lock_wait (futex, private);
    }
}

lll_futex_wait (线程同步底层函数)

/* 
 * 线程等待机制
 * 
 * futex                      条件变量指针
 * val                        预期值(futex地址上的值 == val进入等待状态,不相等时被唤醒)
 * private                    属性:单进程独占/多进程共享
 */
#define lll_futex_wait(futexp, val, private) \
  lll_futex_timed_wait(futexp, val, NULL, private)

读写锁数据结构

pthread_rwlock_t

typedef union
{
# ifdef __x86_64__
  struct
  {
    int __lock;                                      // CAS锁,保证多线程下读写锁数据结构线程安全
    unsigned int __nr_readers;                       // 持有读锁的线程数量 (是否被加读锁的判断依据)
    unsigned int __readers_wakeup;                   // 同步原语,用于唤醒读线程
    unsigned int __writer_wakeup;                    // 同步原语,用于唤醒写线程
    unsigned int __nr_readers_queued;                // 等待获取读锁线程数量
    unsigned int __nr_writers_queued;                // 等待获取写锁线程数量
    int __writer;                                    // 写锁持有线程ID     (是否被加写锁的判断依据)
    int __shared;                                    // 0:单进程独有该锁 / 1:多进程共享该锁
    unsigned long int __pad1;                        // 填充字段,保证struct __data二进制兼容
    unsigned long int __pad2;                        // 填充字段,保证struct __data二进制兼容
    unsigned int __flags;                            // 锁属性,读/写优先?
# define __PTHREAD_RWLOCK_INT_FLAGS_SHARED    1
  } __data;
# else
  /* ... */
# endif
  char __size[__SIZEOF_PTHREAD_RWLOCK_T];            // 确保各平台之间的实现sizeof(pthread_rwlock_t)大小相同 
  long int __align;                                  // 内存对齐
} pthread_rwlock_t;

pthread_rwlock_attr_t

#define __SIZEOF_PTHREAD_RWLOCKATTR_T 32
/* 类型声明,必须以8字节大小实现pthread_rwlockattr */
typedef union
{
  char __size[__SIZEOF_PTHREAD_RWLOCKATTR_T];
  // 内存对齐,例如 pthread_rwlockattr_t 占32字节,long int 占用 8字节, 所以以8字节对齐,提高性能
  long int __align;                             
} pthread_rwlockattr_t;
/* 读写锁属性实际定义 */
struct pthread_rwlockattr                            
{
  int lockkind;                                      // 锁属性(读/写优先) 赋值给pthread_rwlock_t.__flags
  int pshared;                                       // 单进程独占 / 多进程共享 赋值pthread_rwlock_t.__shared
};
/* pthread_rwlockattr.lockkind 属性枚举 */
#if defined __USE_UNIX98 || defined __USE_XOPEN2K
enum
{
  PTHREAD_RWLOCK_PREFER_READER_NP,                             // 读优先
  PTHREAD_RWLOCK_PREFER_WRITER_NP,                             // 写优先  glibc忽略此选项,会设置成读锁
  PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP,                // 写优先(同一线程不可递归加锁)
  PTHREAD_RWLOCK_DEFAULT_NP = PTHREAD_RWLOCK_PREFER_READER_NP  // 默认读优先
};
/* pthread_rwlockattr.pshared */
#ifndef __PTHREAD_RWLOCK_INT_FLAGS_SHARED
# if __WORDSIZE == 64
#  define __PTHREAD_RWLOCK_INT_FLAGS_SHARED 1                 // 1: 多进程共享 0: 单进程独占
# endif
#endif

读写锁加解锁具体实现

读锁

// K&R C风格函数声明方式 等同于 ANSI C风格:int __pthread_rwlock_rdlock (pthread_rwlock_t *rwlock);     
int __pthread_rwlock_rdlock (rwlock)
     pthread_rwlock_t *rwlock;                                 
{
  int result = 0;
  LIBC_PROBE (rdlock_entry, 1, rwlock);                        // 添加探针,用于调试、性能分析
  /* CAS锁,保证只有一个线程可以修改rwlock  */
  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
  while (1)
  {
    /* 加读锁 */
    
    // 没有加写锁
    if (rwlock->__data.__writer == 0
    // 并且(没有写锁等待加锁 或者 是读优先锁)
        && (!rwlock->__data.__nr_writers_queued || PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))
        {
        /* __nr_readers 为持有读锁线程数量,uint类型,自增后不可能溢出变成0,所以使用 __builtin_expect优化 */
          if (__builtin_expect (++rwlock->__data.__nr_readers == 0, 0))
          {
            --rwlock->__data.__nr_readers;
            result = EAGAIN;
          }
          else
        {
            LIBC_PROBE (rdlock_acquire_read, 1, rwlock);
        }
          break;
        }
      /* 阻塞等待加锁 */
    
        // THREAD_GETMEM 获取当前线程id
      /* 一个线程加了写锁一定不会运行到这里,__builtin_expect优化 */
      if (__builtin_expect (rwlock->__data.__writer == THREAD_GETMEM (THREAD_SELF, tid), 0))
        {
          result = EDEADLK;
          break;
        }
      /* 等待加读锁引用计数自增后不可能溢出,__builtin_expect优化*/
      /* 64位操作系统寻址空间 0-2^64, 需要2^64个读线程才能满足条件,一个线程栈空间8MB,2^64 * 8MB 远大于寻址空间*/
      /* 并且也不允许创建这么多线程,详见ulimit -u */
      if (__builtin_expect (++rwlock->__data.__nr_readers_queued == 0, 0))
        {
          --rwlock->__data.__nr_readers_queued;
          result = EAGAIN;
          break;
        }
        /* 存放当前线程被唤醒的信号量的临时变量 */
      int waitval = rwlock->__data.__readers_wakeup;
    
      /* 解锁,让其他尝试加锁线程修改rwlock */
      lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
      /* __readers_wakeup == waitval进入等待机制,等待被内核唤醒 */
      lll_futex_wait (&rwlock->__data.__readers_wakeup, waitval, rwlock->__data.__shared);
      /* 被内核唤醒,读等待引用计数-1,重新争夺rwlock */
      lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
      --rwlock->__data.__nr_readers_queued;
    }
  /* 加读锁成功或者出现异常__builtin_expect中判断条件 */
  lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
  return result;  // 0成功 其余异常
}

写锁

// K&R C风格函数声明方式 等同于 ANSI C风格:int __pthread_rwlock_wrlock (pthread_rwlock_t *rwlock);     
int __pthread_rwlock_wrlock (rwlock)
     pthread_rwlock_t *rwlock;
{
  int result = 0;
  /* 添加探针*/
  LIBC_PROBE (wrlock_entry, 1, rwlock);
  /* 保证只有一个线程访问rwlock */
  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
  while (1)
    {
      /* 加写锁 */
      if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0) // 没有其余线程读写
        {
          /* 设置当前读线程id为本线程id */
          rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);
        /* 探针 */
          LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
          /* 加锁成功,跳出while */
        break;
        }
      /* 一个线程加了写锁一定不会运行到这里,__builtin_expect优化*/
      if (__builtin_expect (rwlock->__data.__writer == THREAD_GETMEM (THREAD_SELF, tid), 0))
        {
          result = EDEADLK;
          break;
        }
      /* 等待读锁队列引用计数+1 */
      if (++rwlock->__data.__nr_writers_queued == 0)
        {
          /* Overflow on number of queued writers.  */
          --rwlock->__data.__nr_writers_queued;
          result = EAGAIN;
          break;
        }
      
      /* 期望被内核唤醒的值 */
      int waitval = rwlock->__data.__writer_wakeup;
      lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
      /* rwlock->__data.__writer_wakeup == waitval 进入等待状态, 等待被内核唤醒 */
      lll_futex_wait (&rwlock->__data.__writer_wakeup, waitval, rwlock->__data.__shared);
      /* 被内核唤醒,读等待引用计数-1,重新争夺rwlock*/
      lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
      --rwlock->__data.__nr_writers_queued;
    }
  /* 加写锁成功或者出现异常__builtin_expect中判断条件 */
  lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
  return result; // 0成功 其余异常
}

解锁

int
__pthread_rwlock_unlock (pthread_rwlock_t *rwlock)
{
    /* 探针 */
    LIBC_PROBE (rwlock_unlock, 1, rwlock);
    /* 保证只有一个线程访问rwlock */
    lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
    if (rwlock->__data.__writer)                 // 如果是写锁
        rwlock->__data.__writer = 0;               // 占用线程id归0
    else                                         // 如果是读锁
        --rwlock->__data.__nr_readers;             // 读引用计数-1
    /* 解锁一次后,读引用计数=0,rwlock一定为无锁状态*/
    if (rwlock->__data.__nr_readers == 0)
    {
        /* 如果有线程等待加写锁 */
        if (rwlock->__data.__nr_writers_queued)
        {
            /* 更新写的同步原语 (下一个写线程进入等待状态的值) */
            ++rwlock->__data.__writer_wakeup;
            lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
            /* 唤醒1个期望加写线程 */
            lll_futex_wake (&rwlock->__data.__writer_wakeup, 1, rwlock->__data.__shared);
            return 0;
        }
        /* 如果有线程等待加读锁 */
        else if (rwlock->__data.__nr_readers_queued)
        {
            /* 更新读同步原语 (下一个读线程进入等待状态的值)*/
            ++rwlock->__data.__readers_wakeup;
            lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
            /* 唤醒INT_MAX个期望加读锁线程 */
            lll_futex_wake (&rwlock->__data.__readers_wakeup, INT_MAX, rwlock->__data.__shared);
            return 0;
        }
    }
    lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
    return 0;
}

测试

测试代码

2000个读线程与1个写线程共同操作一个变量,读任务不sleep,持续抢占锁,期望情况:

   默认读优先锁:2000个线程一直占用读锁,写锁无法抢占读写锁
   写优先读写锁:在不间断的读任务中抢占到写锁,成功修改共享变量

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define READERS_CNT 2000
#define WRITERS_CNT 1
pthread_rwlock_t rwlock;
int shared_data = 0;
void* reader(void* arg) {
    int id = *((int*)arg);
    // 循环中不sleep,持续抢占锁 
    while (1) {
        pthread_rwlock_rdlock(&rwlock);
        printf("Reader %d: Read shared data = %d\n", id, shared_data);
        pthread_rwlock_unlock(&rwlock);
    }
    return NULL;
}
void* writer(void* arg) {
    int id = *((int*)arg);
    while (1) {
        pthread_rwlock_wrlock(&rwlock);
        usleep(100 * 1000);
        printf("Writer %d: Update shared data to %d\n", id, ++shared_data);
        pthread_rwlock_unlock(&rwlock);
    }
    return NULL;
}
int main() {
    pthread_t readers[READERS_CNT], writers[WRITERS_CNT];
    int reader_ids[READERS_CNT], writer_ids[WRITERS_CNT];
    pthread_rwlockattr_t attr;
    pthread_rwlockattr_init(&attr);
    pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); // 注释为读优先
    pthread_rwlock_init(&rwlock, &attr);
    pthread_rwlockattr_destroy(&attr);
    for (int i = 0; i < READERS_CNT; i++) {
        reader_ids[i] = i + 1;
        pthread_create(&readers[i], NULL, reader, &reader_ids[i]);
    }
    for (int i = 0; i < WRITERS_CNT; i++) {
        writer_ids[i] = i + 1;
        pthread_create(&writers[i], NULL, writer, &writer_ids[i]);
    }
    for (int i = 0; i < READERS_CNT; i++) {
        pthread_join(readers[i], NULL);
    }
    for (int i = 0; i < WRITERS_CNT; i++) {
        pthread_join(writers[i], NULL);
    }
    pthread_rwlock_destroy(&rwlock);
    return 0;
}

测试结果

读优先

写锁饥饿,永远无法执行

......
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
^C

写优先

   没有发生写饥饿,成功抢占到锁,读锁需要阻塞等待写锁

......
Reader 1488: Read shared data = 44
Reader 883: Read shared data = 44
Reader 1275: Read shared data = 44
Reader 649: Read shared data = 44
Reader 1230: Read shared data = 44
Reader 796: Read shared data = 44
Reader 842: Read shared data = 44
Reader 13: Read shared data = 44
Reader 870: Read shared data = 44
Reader 1529: Read shared data = 44
Reader 1685: Read shared data = 44
Reader 1702: Read shared data = 44
Reader 1503: Read shared data = 44
Reader 1395: Read shared data = 44
Reader 631: Read shared data = 44
Reader 245: Read shared data = 44
Reader 1686: Read shared data = 44
Reader 663: Read shared data = 44
Reader 763: Read shared data = 44
Reader 1367: Read shared data = 44
Reader 38: Read shared data = 44
Reader 1909: Read shared data = 44
Reader 1560: Read shared data = 44
Reader 1447: Read sha^C
相关文章
|
8月前
|
自然语言处理 监控 Linux
Linux 内核源码分析---proc 文件系统
`proc`文件系统是Linux内核中一个灵活而强大的工具,提供了一个与内核数据结构交互的接口。通过本文的分析,我们深入探讨了 `proc`文件系统的实现原理,包括其初始化、文件的创建与操作、动态内容生成等方面。通过对这些内容的理解,开发者可以更好地利用 `proc`文件系统来监控和调试内核,同时也为系统管理提供了便利的工具。
392 16
|
11月前
|
监控 算法 Linux
Linux内核锁机制深度剖析与实践优化####
本文作为一篇技术性文章,深入探讨了Linux操作系统内核中锁机制的工作原理、类型及其在并发控制中的应用,旨在为开发者提供关于如何有效利用这些工具来提升系统性能和稳定性的见解。不同于常规摘要的概述性质,本文将直接通过具体案例分析,展示在不同场景下选择合适的锁策略对于解决竞争条件、死锁问题的重要性,以及如何根据实际需求调整锁的粒度以达到最佳效果,为读者呈现一份实用性强的实践指南。 ####
|
12月前
|
算法 Linux 开发者
Linux内核中的锁机制:保障并发控制的艺术####
本文深入探讨了Linux操作系统内核中实现的多种锁机制,包括自旋锁、互斥锁、读写锁等,旨在揭示这些同步原语如何高效地解决资源竞争问题,保证系统的稳定性和性能。通过分析不同锁机制的工作原理及应用场景,本文为开发者提供了在高并发环境下进行有效并发控制的实用指南。 ####
|
Linux 数据库
Linux内核中的锁机制:保障并发操作的数据一致性####
【10月更文挑战第29天】 在多线程编程中,确保数据一致性和防止竞争条件是至关重要的。本文将深入探讨Linux操作系统中实现的几种关键锁机制,包括自旋锁、互斥锁和读写锁等。通过分析这些锁的设计原理和使用场景,帮助读者理解如何在实际应用中选择合适的锁机制以优化系统性能和稳定性。 ####
286 6
|
监控 关系型数据库 MySQL
在Linux中,mysql的innodb如何定位锁问题?
在Linux中,mysql的innodb如何定位锁问题?
|
安全 Linux
Linux线程(十一)线程互斥锁-条件变量详解
Linux线程(十一)线程互斥锁-条件变量详解
linux内核 —— 读写信号量实验
linux内核 —— 读写信号量实验
|
监控 Linux 数据处理
lslocks:Linux系统中的锁信息查看利器
`lslocks`是Linux工具,用于查看系统上的文件锁信息,帮助诊断进程同步问题。它显示持有锁的进程、锁类型(如POSIX、flock)和状态。通过简洁的输出,用户能识别死锁和资源争用,优化性能。结合其他命令如`grep`和`awk`可增强分析能力。需适当权限运行,定期监控以预防并发访问问题,处理死锁时要谨慎。
|
运维 监控 Linux
Linux系统读写硬盘慢,如何排查?四步教你定位问题!
【8月更文挑战第24天】在Linux系统运维中,硬盘读写速度慢是一个常见且影响重大的问题。它不仅会导致服务器性能下降,还会影响用户体验。本文将详细介绍四个步骤,帮助你快速定位并解决Linux系统硬盘读写慢的问题。
2060 0