读写锁由读引用计数、写线程id、读/写条件变量、等待读/写队列长度组合实现。当写优先锁被加写锁后,等待读队列的线程(新的读锁请求)阻塞等待,正在读的线程全部释放读锁后,写锁加锁。当前写锁被释放且写等待队列为空,被阻塞的加读锁线程才会被唤醒。
读优先模式(默认):加读锁优先级一定高于加写锁,写锁必须阻塞等待读锁释放
写优先模式:加写锁的优先级一定高于加读锁,读锁必须阻塞等待写锁释放
多读少写:使用读优先会导致写饥饿,应该使用写优先
少写多读:使用写优先会导致读饥饿,应该使用读优先
读写几率相等:会导致读写锁频繁切换,读写锁设计较复杂性能未必高于互斥锁,
以下分析机遇glibc.2.17源码,与开发机版本相同
man pthread_rwlockattr_setkind_np DESCRIPTION The pthread_rwlockattr_setkind_np() function sets the "lock kind" attribute of the read-write lock attribute object referred to by attr to the value specified in pref. The argument pref may be set to one of the following: 读优先,默认,读锁是一个递归锁 PTHREAD_RWLOCK_PREFER_READER_NP This is the default. A thread may hold multiple read locks; that is, read locks are recursive. According to The Single Unix Specification, the behavior is un‐ specified when a reader tries to place a lock, and there is no write lock but writers are waiting. Giving preference to the reader, as is set by PTHREAD_RWLOCK_PREFER_READER_NP, implies that the reader will receive the requested lock, even if a writer is waiting. As long as there are readers, the writer will be starved. 写优先,glibc忽略此实现,posix要求读锁是递归锁,如果写也是递归锁会死锁,使用此选项也会设置成读优先 PTHREAD_RWLOCK_PREFER_WRITER_NP This is intended as the write lock analog of PTHREAD_RWLOCK_PREFER_READER_NP. This is ignored by glibc because the POSIX requirement to support recursive read locks would cause this option to create trivial deadlocks; instead use PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP which ensures the application developer will not take recursive read locks thus avoiding deadlocks. 写优先,非递归实现,glibc写优先使用此选项 PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP Setting the lock kind to this avoids writer starvation as long as any read locking is not done in a recursive fashion.
底层函数
LIBC_PROBE(添加探针)
/* * 在调用出添加探针,用于监控内核和用户空间的执行 * * probe_name 探针名称 * number_of_arguments 参数列表数量 * argument1, argument2, ... 参数列表 */ LIBC_PROBE (probe_name, number_of_arguments, argument1, argument2, ...);
lll_lock (底层加锁操作)
/* * glibc用于封装高级锁的一种基础函数 * * futex CAS锁 * private 锁类型(单进程独占/多进程共享) */ #define lll_lock(futex, private) __lll_lock (&(futex), private) static inline void __attribute__ ((always_inline)) __lll_lock (int *futex, int private) { if (atomic_compare_and_exchange_bool_acq (futex, 1, 0) != 0) // 如果CAS锁被其他线程锁住 { // 阻塞等待加锁成功 if (__builtin_constant_p (private) && private == LLL_PRIVATE) __lll_lock_wait_private (futex); else __lll_lock_wait (futex, private); } }
lll_futex_wait (线程同步底层函数)
/* * 线程等待机制 * * futex 条件变量指针 * val 预期值(futex地址上的值 == val进入等待状态,不相等时被唤醒) * private 属性:单进程独占/多进程共享 */ #define lll_futex_wait(futexp, val, private) \ lll_futex_timed_wait(futexp, val, NULL, private)
读写锁数据结构
pthread_rwlock_t
typedef union { # ifdef __x86_64__ struct { int __lock; // CAS锁,保证多线程下读写锁数据结构线程安全 unsigned int __nr_readers; // 持有读锁的线程数量 (是否被加读锁的判断依据) unsigned int __readers_wakeup; // 同步原语,用于唤醒读线程 unsigned int __writer_wakeup; // 同步原语,用于唤醒写线程 unsigned int __nr_readers_queued; // 等待获取读锁线程数量 unsigned int __nr_writers_queued; // 等待获取写锁线程数量 int __writer; // 写锁持有线程ID (是否被加写锁的判断依据) int __shared; // 0:单进程独有该锁 / 1:多进程共享该锁 unsigned long int __pad1; // 填充字段,保证struct __data二进制兼容 unsigned long int __pad2; // 填充字段,保证struct __data二进制兼容 unsigned int __flags; // 锁属性,读/写优先? # define __PTHREAD_RWLOCK_INT_FLAGS_SHARED 1 } __data; # else /* ... */ # endif char __size[__SIZEOF_PTHREAD_RWLOCK_T]; // 确保各平台之间的实现sizeof(pthread_rwlock_t)大小相同 long int __align; // 内存对齐 } pthread_rwlock_t;
pthread_rwlock_attr_t
#define __SIZEOF_PTHREAD_RWLOCKATTR_T 32 /* 类型声明,必须以8字节大小实现pthread_rwlockattr */ typedef union { char __size[__SIZEOF_PTHREAD_RWLOCKATTR_T]; // 内存对齐,例如 pthread_rwlockattr_t 占32字节,long int 占用 8字节, 所以以8字节对齐,提高性能 long int __align; } pthread_rwlockattr_t; /* 读写锁属性实际定义 */ struct pthread_rwlockattr { int lockkind; // 锁属性(读/写优先) 赋值给pthread_rwlock_t.__flags int pshared; // 单进程独占 / 多进程共享 赋值pthread_rwlock_t.__shared }; /* pthread_rwlockattr.lockkind 属性枚举 */ #if defined __USE_UNIX98 || defined __USE_XOPEN2K enum { PTHREAD_RWLOCK_PREFER_READER_NP, // 读优先 PTHREAD_RWLOCK_PREFER_WRITER_NP, // 写优先 glibc忽略此选项,会设置成读锁 PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP, // 写优先(同一线程不可递归加锁) PTHREAD_RWLOCK_DEFAULT_NP = PTHREAD_RWLOCK_PREFER_READER_NP // 默认读优先 }; /* pthread_rwlockattr.pshared */ #ifndef __PTHREAD_RWLOCK_INT_FLAGS_SHARED # if __WORDSIZE == 64 # define __PTHREAD_RWLOCK_INT_FLAGS_SHARED 1 // 1: 多进程共享 0: 单进程独占 # endif #endif
读写锁加解锁具体实现
读锁
// K&R C风格函数声明方式 等同于 ANSI C风格:int __pthread_rwlock_rdlock (pthread_rwlock_t *rwlock); int __pthread_rwlock_rdlock (rwlock) pthread_rwlock_t *rwlock; { int result = 0; LIBC_PROBE (rdlock_entry, 1, rwlock); // 添加探针,用于调试、性能分析 /* CAS锁,保证只有一个线程可以修改rwlock */ lll_lock (rwlock->__data.__lock, rwlock->__data.__shared); while (1) { /* 加读锁 */ // 没有加写锁 if (rwlock->__data.__writer == 0 // 并且(没有写锁等待加锁 或者 是读优先锁) && (!rwlock->__data.__nr_writers_queued || PTHREAD_RWLOCK_PREFER_READER_P (rwlock))) { /* __nr_readers 为持有读锁线程数量,uint类型,自增后不可能溢出变成0,所以使用 __builtin_expect优化 */ if (__builtin_expect (++rwlock->__data.__nr_readers == 0, 0)) { --rwlock->__data.__nr_readers; result = EAGAIN; } else { LIBC_PROBE (rdlock_acquire_read, 1, rwlock); } break; } /* 阻塞等待加锁 */ // THREAD_GETMEM 获取当前线程id /* 一个线程加了写锁一定不会运行到这里,__builtin_expect优化 */ if (__builtin_expect (rwlock->__data.__writer == THREAD_GETMEM (THREAD_SELF, tid), 0)) { result = EDEADLK; break; } /* 等待加读锁引用计数自增后不可能溢出,__builtin_expect优化*/ /* 64位操作系统寻址空间 0-2^64, 需要2^64个读线程才能满足条件,一个线程栈空间8MB,2^64 * 8MB 远大于寻址空间*/ /* 并且也不允许创建这么多线程,详见ulimit -u */ if (__builtin_expect (++rwlock->__data.__nr_readers_queued == 0, 0)) { --rwlock->__data.__nr_readers_queued; result = EAGAIN; break; } /* 存放当前线程被唤醒的信号量的临时变量 */ int waitval = rwlock->__data.__readers_wakeup; /* 解锁,让其他尝试加锁线程修改rwlock */ lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); /* __readers_wakeup == waitval进入等待机制,等待被内核唤醒 */ lll_futex_wait (&rwlock->__data.__readers_wakeup, waitval, rwlock->__data.__shared); /* 被内核唤醒,读等待引用计数-1,重新争夺rwlock */ lll_lock (rwlock->__data.__lock, rwlock->__data.__shared); --rwlock->__data.__nr_readers_queued; } /* 加读锁成功或者出现异常__builtin_expect中判断条件 */ lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); return result; // 0成功 其余异常 }
写锁
// K&R C风格函数声明方式 等同于 ANSI C风格:int __pthread_rwlock_wrlock (pthread_rwlock_t *rwlock); int __pthread_rwlock_wrlock (rwlock) pthread_rwlock_t *rwlock; { int result = 0; /* 添加探针*/ LIBC_PROBE (wrlock_entry, 1, rwlock); /* 保证只有一个线程访问rwlock */ lll_lock (rwlock->__data.__lock, rwlock->__data.__shared); while (1) { /* 加写锁 */ if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0) // 没有其余线程读写 { /* 设置当前读线程id为本线程id */ rwlock->__data.__writer = THREAD_GETMEM (THREAD_SELF, tid); /* 探针 */ LIBC_PROBE (wrlock_acquire_write, 1, rwlock); /* 加锁成功,跳出while */ break; } /* 一个线程加了写锁一定不会运行到这里,__builtin_expect优化*/ if (__builtin_expect (rwlock->__data.__writer == THREAD_GETMEM (THREAD_SELF, tid), 0)) { result = EDEADLK; break; } /* 等待读锁队列引用计数+1 */ if (++rwlock->__data.__nr_writers_queued == 0) { /* Overflow on number of queued writers. */ --rwlock->__data.__nr_writers_queued; result = EAGAIN; break; } /* 期望被内核唤醒的值 */ int waitval = rwlock->__data.__writer_wakeup; lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); /* rwlock->__data.__writer_wakeup == waitval 进入等待状态, 等待被内核唤醒 */ lll_futex_wait (&rwlock->__data.__writer_wakeup, waitval, rwlock->__data.__shared); /* 被内核唤醒,读等待引用计数-1,重新争夺rwlock*/ lll_lock (rwlock->__data.__lock, rwlock->__data.__shared); --rwlock->__data.__nr_writers_queued; } /* 加写锁成功或者出现异常__builtin_expect中判断条件 */ lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); return result; // 0成功 其余异常 }
解锁
int __pthread_rwlock_unlock (pthread_rwlock_t *rwlock) { /* 探针 */ LIBC_PROBE (rwlock_unlock, 1, rwlock); /* 保证只有一个线程访问rwlock */ lll_lock (rwlock->__data.__lock, rwlock->__data.__shared); if (rwlock->__data.__writer) // 如果是写锁 rwlock->__data.__writer = 0; // 占用线程id归0 else // 如果是读锁 --rwlock->__data.__nr_readers; // 读引用计数-1 /* 解锁一次后,读引用计数=0,rwlock一定为无锁状态*/ if (rwlock->__data.__nr_readers == 0) { /* 如果有线程等待加写锁 */ if (rwlock->__data.__nr_writers_queued) { /* 更新写的同步原语 (下一个写线程进入等待状态的值) */ ++rwlock->__data.__writer_wakeup; lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); /* 唤醒1个期望加写线程 */ lll_futex_wake (&rwlock->__data.__writer_wakeup, 1, rwlock->__data.__shared); return 0; } /* 如果有线程等待加读锁 */ else if (rwlock->__data.__nr_readers_queued) { /* 更新读同步原语 (下一个读线程进入等待状态的值)*/ ++rwlock->__data.__readers_wakeup; lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); /* 唤醒INT_MAX个期望加读锁线程 */ lll_futex_wake (&rwlock->__data.__readers_wakeup, INT_MAX, rwlock->__data.__shared); return 0; } } lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); return 0; }
测试
测试代码
2000个读线程与1个写线程共同操作一个变量,读任务不sleep,持续抢占锁,期望情况:
默认读优先锁:2000个线程一直占用读锁,写锁无法抢占读写锁
写优先读写锁:在不间断的读任务中抢占到写锁,成功修改共享变量
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #define READERS_CNT 2000 #define WRITERS_CNT 1 pthread_rwlock_t rwlock; int shared_data = 0; void* reader(void* arg) { int id = *((int*)arg); // 循环中不sleep,持续抢占锁 while (1) { pthread_rwlock_rdlock(&rwlock); printf("Reader %d: Read shared data = %d\n", id, shared_data); pthread_rwlock_unlock(&rwlock); } return NULL; } void* writer(void* arg) { int id = *((int*)arg); while (1) { pthread_rwlock_wrlock(&rwlock); usleep(100 * 1000); printf("Writer %d: Update shared data to %d\n", id, ++shared_data); pthread_rwlock_unlock(&rwlock); } return NULL; } int main() { pthread_t readers[READERS_CNT], writers[WRITERS_CNT]; int reader_ids[READERS_CNT], writer_ids[WRITERS_CNT]; pthread_rwlockattr_t attr; pthread_rwlockattr_init(&attr); pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); // 注释为读优先 pthread_rwlock_init(&rwlock, &attr); pthread_rwlockattr_destroy(&attr); for (int i = 0; i < READERS_CNT; i++) { reader_ids[i] = i + 1; pthread_create(&readers[i], NULL, reader, &reader_ids[i]); } for (int i = 0; i < WRITERS_CNT; i++) { writer_ids[i] = i + 1; pthread_create(&writers[i], NULL, writer, &writer_ids[i]); } for (int i = 0; i < READERS_CNT; i++) { pthread_join(readers[i], NULL); } for (int i = 0; i < WRITERS_CNT; i++) { pthread_join(writers[i], NULL); } pthread_rwlock_destroy(&rwlock); return 0; }
测试结果
读优先
写锁饥饿,永远无法执行
...... Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 Reader 1718: Read shared data = 0 ^C
写优先
没有发生写饥饿,成功抢占到锁,读锁需要阻塞等待写锁
...... Reader 1488: Read shared data = 44 Reader 883: Read shared data = 44 Reader 1275: Read shared data = 44 Reader 649: Read shared data = 44 Reader 1230: Read shared data = 44 Reader 796: Read shared data = 44 Reader 842: Read shared data = 44 Reader 13: Read shared data = 44 Reader 870: Read shared data = 44 Reader 1529: Read shared data = 44 Reader 1685: Read shared data = 44 Reader 1702: Read shared data = 44 Reader 1503: Read shared data = 44 Reader 1395: Read shared data = 44 Reader 631: Read shared data = 44 Reader 245: Read shared data = 44 Reader 1686: Read shared data = 44 Reader 663: Read shared data = 44 Reader 763: Read shared data = 44 Reader 1367: Read shared data = 44 Reader 38: Read shared data = 44 Reader 1909: Read shared data = 44 Reader 1560: Read shared data = 44 Reader 1447: Read sha^C