Linux读写锁源码分析

简介: 本文分析了读写锁的实现原理与应用场景,基于glibc 2.17源码。读写锁通过读引用计数、写线程ID、条件变量等实现,支持读优先(默认)和写优先模式。读优先时,写锁可能饥饿;写优先时,读线程需等待写锁释放。详细解析了`pthread_rwlock_t`数据结构及加解锁流程,并通过实验验证:2000个读线程与1个写线程测试下,读优先导致写锁饥饿,写优先则正常抢占锁。

读写锁由读引用计数、写线程id、读/写条件变量、等待读/写队列长度组合实现。当写优先锁被加写锁后,等待读队列的线程(新的读锁请求)阻塞等待,正在读的线程全部释放读锁后,写锁加锁。当前写锁被释放且写等待队列为空,被阻塞的加读锁线程才会被唤醒。

读优先模式(默认):加读锁优先级一定高于加写锁,写锁必须阻塞等待读锁释放
写优先模式:加写锁的优先级一定高于加读锁,读锁必须阻塞等待写锁释放

多读少写:使用读优先会导致写饥饿,应该使用写优先
少写多读:使用写优先会导致读饥饿,应该使用读优先
读写几率相等:会导致读写锁频繁切换,读写锁设计较复杂性能未必高于互斥锁,

以下分析机遇glibc.2.17源码,与开发机版本相同

man pthread_rwlockattr_setkind_np  
DESCRIPTION
       The  pthread_rwlockattr_setkind_np()  function  sets the "lock kind" attribute of the read-write lock attribute object referred to by attr to the value specified in pref.
       The argument pref may be set to one of the following:
             读优先,默认,读锁是一个递归锁
       PTHREAD_RWLOCK_PREFER_READER_NP
              This is the default.  A thread may hold multiple read locks; that is, read locks are recursive.  According to The Single Unix Specification, the  behavior  is  un‐
              specified  when  a  reader  tries  to  place  a  lock,  and  there  is  no  write  lock  but  writers  are  waiting.  Giving preference to the reader, as is set by
              PTHREAD_RWLOCK_PREFER_READER_NP, implies that the reader will receive the requested lock, even if a writer is waiting.  As long as there are  readers,  the  writer
              will be starved.
       写优先,glibc忽略此实现,posix要求读锁是递归锁,如果写也是递归锁会死锁,使用此选项也会设置成读优先
       PTHREAD_RWLOCK_PREFER_WRITER_NP 
              This  is  intended  as  the write lock analog of PTHREAD_RWLOCK_PREFER_READER_NP.  This is ignored by glibc because the POSIX requirement to support recursive read
              locks would cause this option to create trivial deadlocks; instead use PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP which ensures the  application  developer  will
              not take recursive read locks thus avoiding deadlocks.
             写优先,非递归实现,glibc写优先使用此选项
       PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP
              Setting the lock kind to this avoids writer starvation as long as any read locking is not done in a recursive fashion.

底层函数

LIBC_PROBE(添加探针)

/* 
 * 在调用出添加探针,用于监控内核和用户空间的执行
 *
 * probe_name                 探针名称
 * number_of_arguments        参数列表数量
 * argument1, argument2, ...  参数列表
 */
LIBC_PROBE (probe_name, number_of_arguments, argument1, argument2, ...);

lll_lock (底层加锁操作)

/* 
 * glibc用于封装高级锁的一种基础函数
 * 
 * futex                      CAS锁
 * private                    锁类型(单进程独占/多进程共享)
 */
#define lll_lock(futex, private) __lll_lock (&(futex), private)
static inline void __attribute__ ((always_inline))
__lll_lock (int *futex, int private)
{
  if (atomic_compare_and_exchange_bool_acq (futex, 1, 0) != 0)       // 如果CAS锁被其他线程锁住
    {
      // 阻塞等待加锁成功
      if (__builtin_constant_p (private) && private == LLL_PRIVATE)
    __lll_lock_wait_private (futex);
      else
    __lll_lock_wait (futex, private);
    }
}

lll_futex_wait (线程同步底层函数)

/* 
 * 线程等待机制
 * 
 * futex                      条件变量指针
 * val                        预期值(futex地址上的值 == val进入等待状态,不相等时被唤醒)
 * private                    属性:单进程独占/多进程共享
 */
#define lll_futex_wait(futexp, val, private) \
  lll_futex_timed_wait(futexp, val, NULL, private)

读写锁数据结构

pthread_rwlock_t

typedef union
{
# ifdef __x86_64__
  struct
  {
    int __lock;                                      // CAS锁,保证多线程下读写锁数据结构线程安全
    unsigned int __nr_readers;                       // 持有读锁的线程数量 (是否被加读锁的判断依据)
    unsigned int __readers_wakeup;                   // 同步原语,用于唤醒读线程
    unsigned int __writer_wakeup;                    // 同步原语,用于唤醒写线程
    unsigned int __nr_readers_queued;                // 等待获取读锁线程数量
    unsigned int __nr_writers_queued;                // 等待获取写锁线程数量
    int __writer;                                    // 写锁持有线程ID     (是否被加写锁的判断依据)
    int __shared;                                    // 0:单进程独有该锁 / 1:多进程共享该锁
    unsigned long int __pad1;                        // 填充字段,保证struct __data二进制兼容
    unsigned long int __pad2;                        // 填充字段,保证struct __data二进制兼容
    unsigned int __flags;                            // 锁属性,读/写优先?
# define __PTHREAD_RWLOCK_INT_FLAGS_SHARED    1
  } __data;
# else
  /* ... */
# endif
  char __size[__SIZEOF_PTHREAD_RWLOCK_T];            // 确保各平台之间的实现sizeof(pthread_rwlock_t)大小相同 
  long int __align;                                  // 内存对齐
} pthread_rwlock_t;

pthread_rwlock_attr_t

#define __SIZEOF_PTHREAD_RWLOCKATTR_T 32
/* 类型声明,必须以8字节大小实现pthread_rwlockattr */
typedef union
{
  char __size[__SIZEOF_PTHREAD_RWLOCKATTR_T];
  // 内存对齐,例如 pthread_rwlockattr_t 占32字节,long int 占用 8字节, 所以以8字节对齐,提高性能
  long int __align;                             
} pthread_rwlockattr_t;
/* 读写锁属性实际定义 */
struct pthread_rwlockattr                            
{
  int lockkind;                                      // 锁属性(读/写优先) 赋值给pthread_rwlock_t.__flags
  int pshared;                                       // 单进程独占 / 多进程共享 赋值pthread_rwlock_t.__shared
};
/* pthread_rwlockattr.lockkind 属性枚举 */
#if defined __USE_UNIX98 || defined __USE_XOPEN2K
enum
{
  PTHREAD_RWLOCK_PREFER_READER_NP,                             // 读优先
  PTHREAD_RWLOCK_PREFER_WRITER_NP,                             // 写优先  glibc忽略此选项,会设置成读锁
  PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP,                // 写优先(同一线程不可递归加锁)
  PTHREAD_RWLOCK_DEFAULT_NP = PTHREAD_RWLOCK_PREFER_READER_NP  // 默认读优先
};
/* pthread_rwlockattr.pshared */
#ifndef __PTHREAD_RWLOCK_INT_FLAGS_SHARED
# if __WORDSIZE == 64
#  define __PTHREAD_RWLOCK_INT_FLAGS_SHARED 1                 // 1: 多进程共享 0: 单进程独占
# endif
#endif

读写锁加解锁具体实现

读锁

// K&R C风格函数声明方式 等同于 ANSI C风格:int __pthread_rwlock_rdlock (pthread_rwlock_t *rwlock);     
int __pthread_rwlock_rdlock (rwlock)
     pthread_rwlock_t *rwlock;                                 
{
  int result = 0;
  LIBC_PROBE (rdlock_entry, 1, rwlock);                        // 添加探针,用于调试、性能分析
  /* CAS锁,保证只有一个线程可以修改rwlock  */
  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
  while (1)
  {
    /* 加读锁 */
    
    // 没有加写锁
    if (rwlock->__data.__writer == 0
    // 并且(没有写锁等待加锁 或者 是读优先锁)
        && (!rwlock->__data.__nr_writers_queued || PTHREAD_RWLOCK_PREFER_READER_P (rwlock)))
        {
        /* __nr_readers 为持有读锁线程数量,uint类型,自增后不可能溢出变成0,所以使用 __builtin_expect优化 */
          if (__builtin_expect (++rwlock->__data.__nr_readers == 0, 0))
          {
            --rwlock->__data.__nr_readers;
            result = EAGAIN;
          }
          else
        {
            LIBC_PROBE (rdlock_acquire_read, 1, rwlock);
        }
          break;
        }
      /* 阻塞等待加锁 */
    
        // THREAD_GETMEM 获取当前线程id
      /* 一个线程加了写锁一定不会运行到这里,__builtin_expect优化 */
      if (__builtin_expect (rwlock->__data.__writer == THREAD_GETMEM (THREAD_SELF, tid), 0))
        {
          result = EDEADLK;
          break;
        }
      /* 等待加读锁引用计数自增后不可能溢出,__builtin_expect优化*/
      /* 64位操作系统寻址空间 0-2^64, 需要2^64个读线程才能满足条件,一个线程栈空间8MB,2^64 * 8MB 远大于寻址空间*/
      /* 并且也不允许创建这么多线程,详见ulimit -u */
      if (__builtin_expect (++rwlock->__data.__nr_readers_queued == 0, 0))
        {
          --rwlock->__data.__nr_readers_queued;
          result = EAGAIN;
          break;
        }
        /* 存放当前线程被唤醒的信号量的临时变量 */
      int waitval = rwlock->__data.__readers_wakeup;
    
      /* 解锁,让其他尝试加锁线程修改rwlock */
      lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
      /* __readers_wakeup == waitval进入等待机制,等待被内核唤醒 */
      lll_futex_wait (&rwlock->__data.__readers_wakeup, waitval, rwlock->__data.__shared);
      /* 被内核唤醒,读等待引用计数-1,重新争夺rwlock */
      lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
      --rwlock->__data.__nr_readers_queued;
    }
  /* 加读锁成功或者出现异常__builtin_expect中判断条件 */
  lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
  return result;  // 0成功 其余异常
}

写锁

// K&R C风格函数声明方式 等同于 ANSI C风格:int __pthread_rwlock_wrlock (pthread_rwlock_t *rwlock);     
int __pthread_rwlock_wrlock (rwlock)
     pthread_rwlock_t *rwlock;
{
  int result = 0;
  /* 添加探针*/
  LIBC_PROBE (wrlock_entry, 1, rwlock);
  /* 保证只有一个线程访问rwlock */
  lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
  while (1)
    {
      /* 加写锁 */
      if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0) // 没有其余线程读写
        {
          /* 设置当前读线程id为本线程id */
          rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid);
        /* 探针 */
          LIBC_PROBE (wrlock_acquire_write, 1, rwlock);
          /* 加锁成功,跳出while */
        break;
        }
      /* 一个线程加了写锁一定不会运行到这里,__builtin_expect优化*/
      if (__builtin_expect (rwlock->__data.__writer == THREAD_GETMEM (THREAD_SELF, tid), 0))
        {
          result = EDEADLK;
          break;
        }
      /* 等待读锁队列引用计数+1 */
      if (++rwlock->__data.__nr_writers_queued == 0)
        {
          /* Overflow on number of queued writers.  */
          --rwlock->__data.__nr_writers_queued;
          result = EAGAIN;
          break;
        }
      
      /* 期望被内核唤醒的值 */
      int waitval = rwlock->__data.__writer_wakeup;
      lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
      /* rwlock->__data.__writer_wakeup == waitval 进入等待状态, 等待被内核唤醒 */
      lll_futex_wait (&rwlock->__data.__writer_wakeup, waitval, rwlock->__data.__shared);
      /* 被内核唤醒,读等待引用计数-1,重新争夺rwlock*/
      lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
      --rwlock->__data.__nr_writers_queued;
    }
  /* 加写锁成功或者出现异常__builtin_expect中判断条件 */
  lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
  return result; // 0成功 其余异常
}

解锁

int
__pthread_rwlock_unlock (pthread_rwlock_t *rwlock)
{
    /* 探针 */
    LIBC_PROBE (rwlock_unlock, 1, rwlock);
    /* 保证只有一个线程访问rwlock */
    lll_lock (rwlock->__data.__lock, rwlock->__data.__shared);
    if (rwlock->__data.__writer)                 // 如果是写锁
        rwlock->__data.__writer = 0;               // 占用线程id归0
    else                                         // 如果是读锁
        --rwlock->__data.__nr_readers;             // 读引用计数-1
    /* 解锁一次后,读引用计数=0,rwlock一定为无锁状态*/
    if (rwlock->__data.__nr_readers == 0)
    {
        /* 如果有线程等待加写锁 */
        if (rwlock->__data.__nr_writers_queued)
        {
            /* 更新写的同步原语 (下一个写线程进入等待状态的值) */
            ++rwlock->__data.__writer_wakeup;
            lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
            /* 唤醒1个期望加写线程 */
            lll_futex_wake (&rwlock->__data.__writer_wakeup, 1, rwlock->__data.__shared);
            return 0;
        }
        /* 如果有线程等待加读锁 */
        else if (rwlock->__data.__nr_readers_queued)
        {
            /* 更新读同步原语 (下一个读线程进入等待状态的值)*/
            ++rwlock->__data.__readers_wakeup;
            lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
            /* 唤醒INT_MAX个期望加读锁线程 */
            lll_futex_wake (&rwlock->__data.__readers_wakeup, INT_MAX, rwlock->__data.__shared);
            return 0;
        }
    }
    lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared);
    return 0;
}

测试

测试代码

2000个读线程与1个写线程共同操作一个变量,读任务不sleep,持续抢占锁,期望情况:

   默认读优先锁:2000个线程一直占用读锁,写锁无法抢占读写锁
   写优先读写锁:在不间断的读任务中抢占到写锁,成功修改共享变量

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define READERS_CNT 2000
#define WRITERS_CNT 1
pthread_rwlock_t rwlock;
int shared_data = 0;
void* reader(void* arg) {
    int id = *((int*)arg);
    // 循环中不sleep,持续抢占锁 
    while (1) {
        pthread_rwlock_rdlock(&rwlock);
        printf("Reader %d: Read shared data = %d\n", id, shared_data);
        pthread_rwlock_unlock(&rwlock);
    }
    return NULL;
}
void* writer(void* arg) {
    int id = *((int*)arg);
    while (1) {
        pthread_rwlock_wrlock(&rwlock);
        usleep(100 * 1000);
        printf("Writer %d: Update shared data to %d\n", id, ++shared_data);
        pthread_rwlock_unlock(&rwlock);
    }
    return NULL;
}
int main() {
    pthread_t readers[READERS_CNT], writers[WRITERS_CNT];
    int reader_ids[READERS_CNT], writer_ids[WRITERS_CNT];
    pthread_rwlockattr_t attr;
    pthread_rwlockattr_init(&attr);
    pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); // 注释为读优先
    pthread_rwlock_init(&rwlock, &attr);
    pthread_rwlockattr_destroy(&attr);
    for (int i = 0; i < READERS_CNT; i++) {
        reader_ids[i] = i + 1;
        pthread_create(&readers[i], NULL, reader, &reader_ids[i]);
    }
    for (int i = 0; i < WRITERS_CNT; i++) {
        writer_ids[i] = i + 1;
        pthread_create(&writers[i], NULL, writer, &writer_ids[i]);
    }
    for (int i = 0; i < READERS_CNT; i++) {
        pthread_join(readers[i], NULL);
    }
    for (int i = 0; i < WRITERS_CNT; i++) {
        pthread_join(writers[i], NULL);
    }
    pthread_rwlock_destroy(&rwlock);
    return 0;
}

测试结果

读优先

写锁饥饿,永远无法执行

......
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
Reader 1718: Read shared data = 0
^C

写优先

   没有发生写饥饿,成功抢占到锁,读锁需要阻塞等待写锁

......
Reader 1488: Read shared data = 44
Reader 883: Read shared data = 44
Reader 1275: Read shared data = 44
Reader 649: Read shared data = 44
Reader 1230: Read shared data = 44
Reader 796: Read shared data = 44
Reader 842: Read shared data = 44
Reader 13: Read shared data = 44
Reader 870: Read shared data = 44
Reader 1529: Read shared data = 44
Reader 1685: Read shared data = 44
Reader 1702: Read shared data = 44
Reader 1503: Read shared data = 44
Reader 1395: Read shared data = 44
Reader 631: Read shared data = 44
Reader 245: Read shared data = 44
Reader 1686: Read shared data = 44
Reader 663: Read shared data = 44
Reader 763: Read shared data = 44
Reader 1367: Read shared data = 44
Reader 38: Read shared data = 44
Reader 1909: Read shared data = 44
Reader 1560: Read shared data = 44
Reader 1447: Read sha^C
目录
打赏
0
18
19
1
72
分享
相关文章
一、(LINUX 线程同步) 引入
原创水平有限有误请指出 线程相比进程有着先天的数据共享的优势,如下图,线程共享了进程除栈区以外的所有内存区域如下图所示: 但是这种共享有时候也会带来问题,简单的考虑如下C++代码: 点击(此处)折叠或打开 ...
943 0
linux下使用读写锁
    在多线程程序中,有一种读写者的问题,即对某些资源的访问,存在两种可能的情况,一种是访问必须排他的,称为写操作;另外一种访问是可共享的,称为读操作。     处理读写着问题的两种常见策略是:强读者同步和强写者同步。
729 0
Linux---线程读写锁详解及代码实现
Linux---线程读写锁详解及代码实现
Linux 多线程同步机制(上)
Linux 多线程同步机制(上)
137 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等