java 可重入读写锁 ReentrantReadWriteLock 详解

简介:

读写锁 ReadWriteLock读写锁维护了一对相关的锁,一个用于只读操作,一个用于写入操作。只要没有writer,读取锁可以由多个reader线程同时保持。写入锁是独占的。
互斥锁一次只允许一个线程访问共享数据,哪怕进行的是只读操作;读写锁允许对共享数据进行更高级别的并发访问:对于写操作,一次只有一个线程(write线程)可以修改共享数据,对于读操作,允许任意数量的线程同时进行读取。
与互斥锁相比,使用读写锁能否提升性能则取决于读写操作期间读取数据相对于修改数据的频率,以及数据的争用——即在同一时间试图对该数据执行读取或写入操作的线程数。
读写锁适用于读多写少的情况。
可重入读写锁 ReentrantReadWriteLock
属性ReentrantReadWriteLock 也是基于 AbstractQueuedSynchronizer 实现的,它具有下面这些属性(来自Java doc文档):

     * 获取顺序:此类不会将读取者优先或写入者优先强加给锁访问的排序。

          * 非公平模式(默认):连续竞争的非公平锁可能无限期地推迟一个或多个reader或writer线程,但吞吐量通常要高于公平锁。
          * 公平模式:线程利用一个近似到达顺序的策略来争夺进入。当释放当前保持的锁时,可以为等待时间最长的单个writer线程分配写入锁,如果有一组等待时间大于所有正在等待的writer线程的reader,将为该组分配读者锁。
          * 试图获得公平写入锁的非重入的线程将会阻塞,除非读取锁和写入锁都自由(这意味着没有等待线程)。

     * 重入:此锁允许reader和writer按照 ReentrantLock 的样式重新获取读取锁或写入锁。在写入线程保持的所有写入锁都已经释放后,才允许重入reader使用读取锁。
writer可以获取读取锁,但reader不能获取写入锁。
     * 锁降级:重入还允许从写入锁降级为读取锁,实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。
     * 锁获取的中断:读取锁和写入锁都支持锁获取期间的中断。
     * Condition 支持:写入锁提供了一个 Condition 实现,对于写入锁来说,该实现的行为与 ReentrantLock.newCondition() 提供的Condition 实现对 ReentrantLock 所做的行为相同。当然,此 Condition 只能用于写入锁。
读取锁不支持 Condition,readLock().newCondition() 会抛出 UnsupportedOperationException。
     * 监测:此类支持一些确定是读取锁还是写入锁的方法。这些方法设计用于监视系统状态,而不是同步控制。

实现AQS 回顾在之前的文章已经提到,AQS以单个 int 类型的原子变量来表示其状态,定义了4个抽象方法( tryAcquire(int)、tryRelease(int)、tryAcquireShared(int)、tryReleaseShared(int),前两个方法用于独占/排他模式,后两个用于共享模式 )留给子类实现,用于自定义同步器的行为以实现特定的功能。
对于 ReentrantLock,它是可重入的独占锁,内部的 Sync 类实现了 tryAcquire(int)、tryRelease(int) 方法,并用状态的值来表示重入次数,加锁或重入锁时状态加 1,释放锁时状态减 1,状态值等于 0 表示锁空闲。
对于 CountDownLatch,它是一个关卡,在条件满足前阻塞所有等待线程,条件满足后允许所有线程通过。内部类 Sync 把状态初始化为大于 0 的某个值,当状态大于 0 时所有wait线程阻塞,每调用一次 countDown 方法就把状态值减 1,减为 0 时允许所有线程通过。利用了AQS的共享模式。
现在,要用AQS来实现 ReentrantReadWriteLock。
一点思考问题
     * AQS只有一个状态,那么如何表示 多个读锁 与 单个写锁 呢?
     * ReentrantLock 里,状态值表示重入计数,现在如何在AQS里表示每个读锁、写锁的重入次数呢?
     * 如何实现读锁、写锁的公平性呢?

一点提示
     * 一个状态是没法既表示读锁,又表示写锁的,不够用啊,那就辦成两份用了,客家话说一个饭粒咬成两半吃,状态的高位部分表示读锁,低位表示写锁,由于写锁只有一个,所以写锁的重入计数也解决了,这也会导致写锁可重入的次数减小。
     * 由于读锁可以同时有多个,肯定不能再用辦成两份用的方法来处理了,但我们有 ThreadLocal,可以把线程重入读锁的次数作为值存在 ThreadLocal 里。
     * 对于公平性的实现,可以通过AQS的等待队列和它的抽象方法来控制,在状态值的另一半里存储当前持有读锁的线程数。如果读线程申请读锁,当前写锁重入次数不为 0 时,则等待,否则可以马上分配;如果是写线程申请写锁,当前状态为 0 则可以马上分配,否则等待。

源码分析现在来看看具体的实现源码。
辦成两份AQS 的状态是32位(int 类型)的,辦成两份,读锁用高16位,表示持有读锁的线程数(sharedCount),写锁低16位,表示写锁的重入次数 (exclusiveCount)。状态值为 0 表示锁空闲,sharedCount不为 0 表示分配了读锁,exclusiveCount 不为 0 表示分配了写锁,sharedCount和exclusiveCount 肯定不会同时不为 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
abstract  static  class  Sync  extends  AbstractQueuedSynchronizer {
  
        static  final  int  SHARED_SHIFT   =  16 ;
        // 由于读锁用高位部分,所以读锁个数加1,其实是状态值加 2^16
        static  final  int  SHARED_UNIT    = ( 1  << SHARED_SHIFT);
        // 写锁的可重入的最大次数、读锁允许的最大数量
        static  final  int  MAX_COUNT      = ( 1  << SHARED_SHIFT) -  1 ;
        // 写锁的掩码,用于状态的低16位有效值
        static  final  int  EXCLUSIVE_MASK = ( 1  << SHARED_SHIFT) -  1 ;
        // 读锁计数,当前持有读锁的线程数
     static  int  sharedCount( int  c)    {  return  c >>> SHARED_SHIFT; }
     // 写锁的计数,也就是它的重入次数
     static  int  exclusiveCount( int  c) {  return  c & EXCLUSIVE_MASK; }
}



读锁重入计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
abstract  static  class  Sync  extends  AbstractQueuedSynchronizer {
     /**
      * 每个线程特定的 read 持有计数。存放在ThreadLocal,不需要是线程安全的。
      */
     static  final  class  HoldCounter {
         int  count =  0 ;
         // 使用id而不是引用是为了避免保留垃圾。注意这是个常量。
         final  long  tid = Thread.currentThread().getId();
     }
     /**
      * 采用继承是为了重写 initialValue 方法,这样就不用进行这样的处理:
      * 如果ThreadLocal没有当前线程的计数,则new一个,再放进ThreadLocal里。
      * 可以直接调用 get。
      * */
     static  final  class  ThreadLocalHoldCounter
         extends  ThreadLocal<HoldCounter> {
         public  HoldCounter initialValue() {
             return  new  HoldCounter();
         }
     }
     /**
      * 保存当前线程重入读锁的次数的容器。在读锁重入次数为 0 时移除。
      */
     private  transient  ThreadLocalHoldCounter readHolds;
     /**
      * 最近一个成功获取读锁的线程的计数。这省却了ThreadLocal查找,
      * 通常情况下,下一个释放线程是最后一个获取线程。这不是 volatile 的,
      * 因为它仅用于试探的,线程进行缓存也是可以的
      * (因为判断是否是当前线程是通过线程id来比较的)。
      */
     private  transient  HoldCounter cachedHoldCounter;
     /**
      * firstReader是这样一个特殊线程:它是最后一个把 共享计数 从 0 改为 1 的
      * (在锁空闲的时候),而且从那之后还没有释放读锁的。如果不存在则为null。
      * firstReaderHoldCount 是 firstReader 的重入计数。
      *
      * firstReader 不能导致保留垃圾,因此在 tryReleaseShared 里设置为null,
      * 除非线程异常终止,没有释放读锁。
      *
      * 作用是在跟踪无竞争的读锁计数时非常便宜。
      *
      * firstReader及其计数firstReaderHoldCount是不会放入 readHolds 的。
      */
     private  transient  Thread firstReader =  null ;
     private  transient  int  firstReaderHoldCount;
     Sync() {
         readHolds =  new  ThreadLocalHoldCounter();
         setState(getState());  // 确保 readHolds 的内存可见性,利用 volatile 写的内存语义。
     }
}



写锁获取与释放写锁的获取与释放通过 tryAcquire 和 tryRelease 方法实现,源码文件里有这么一段说明:tryRelease 和 tryAcquire 可能被Conditions 调用。因此可能出现参数里包含在条件等待和用 tryAcquire 重新获取到锁的期间内已经释放的 读和写 计数。
这说明看起来像是在 tryAcquire 里设置状态时要考虑方法参数(acquires)的高位部分,其实是不需要的。由于写锁是独占的,acquires 表示的只能是写锁的计数,如果当前线程成功获取写锁,只需要简单地把当前状态加上 acquires 的值即可,tryRelease 里直接减去其参数值即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected  final  boolean  tryAcquire( int  acquires) {
     Thread current = Thread.currentThread();
     int  c = getState();
     int  w = exclusiveCount(c);
     if  (c !=  0 ) {  // 状态不为0,表示锁被分配出去了。
         // (Note: if c != 0 and w == 0 then shared count != 0)
       // c != 0 and w == 0 表示分配了读锁
       // w != 0 && current != getExclusiveOwnerThread() 表示其他线程获取了写锁。
         if  (w ==  0  || current != getExclusiveOwnerThread())
             return  false  ;
         // 写锁重入
         // 检测是否超过最大重入次数。
         if  (w + exclusiveCount(acquires) > MAX_COUNT)
             throw  new  Error( "Maximum lock count exceeded" );
         // 更新写锁重入次数,写锁在低位,直接加上 acquire 即可。
         // Reentrant acquire
         setState(c + acquires);
         return  true  ;
     }
     // writerShouldBlock 留给子类实现,用于实现公平性策略。
     // 如果允许获取写锁,则用 CAS 更新状态。
     if  (writerShouldBlock() ||
         !compareAndSetState(c, c + acquires))
         return  false  // 不允许获取锁 或 CAS 失败。
     // 获取写锁超过,设置独占线程。
     setExclusiveOwnerThread(current);
     return  true ;
}
protected  final  boolean  tryRelease( int  releases) {
     if  (!isHeldExclusively())  // 是否是当前线程持有写锁
         throw  new  IllegalMonitorStateException();
     // 这里不考虑高16位是因为高16位肯定是 0。
     int  nextc = getState() - releases;
     boolean  free = exclusiveCount(nextc) ==  0 ;
     if  (free)
         setExclusiveOwnerThread(  null );  // 写锁完全释放,设置独占线程为null。
     setState(nextc);
     return  free;
}



读锁获取与释放// 参数变为 unused 是因为读锁的重入计数是内部维护的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
protected  final  int  tryAcquireShared( int  unused) {
     Thread current = Thread.currentThread();
     int  c = getState();
     // 这个if语句是说:持有写锁的线程可以获取读锁。
     if  (exclusiveCount(c) !=  0  &&  // 已分配了写锁
         getExclusiveOwnerThread() != current)  // 且当前线程不是持有写锁的线程
         return  - 1 ;
     int  r = sharedCount(c);  // 取读锁计数
     if  (!readerShouldBlock() &&  // 由子类根据其公平策略决定是否允许获取读锁
         r < MAX_COUNT &&            // 读锁数量还没达到最大值
         // 尝试获取读锁。注意读线程计数的单位是  2^16
         compareAndSetState(c, c + SHARED_UNIT)) {
          // 成功获取读锁
      // 注意下面对firstReader的处理:firstReader是不会放到readHolds里的
      // 这样,在读锁只有一个的情况下,就避免了查找readHolds。
         if  (r ==  0 ) {  // 是 firstReader,计数不会放入  readHolds。
             firstReader = current;
             firstReaderHoldCount =  1 ;
         else  if  (firstReader == current) {  // firstReader 重入
             firstReaderHoldCount++;
         else  {
              // 非 firstReader 读锁重入计数更新
             HoldCounter rh = cachedHoldCounter;  // 首先访问缓存
             if  (rh ==  null  || rh.tid != current.getId())
                 cachedHoldCounter = rh = readHolds.get();
             else  if  (rh.count ==  0 )
                 readHolds.set(rh);
             rh.count++;
         }
         return  1 ;
     }
     // 获取读锁失败,放到循环里重试。
     return  fullTryAcquireShared(current);
}
final  int  fullTryAcquireShared(Thread current) {
     HoldCounter rh =  null ;
     for  (;;) {
         int  c = getState();
         if  (exclusiveCount(c) !=  0 ) {
             if  (getExclusiveOwnerThread() != current)
            // 写锁被分配,非写锁线程获取读锁,失败
                 return  - 1 ;
             // 否则,当前线程持有写锁,在这里阻塞将导致死锁。
         else  if  (readerShouldBlock()) {
             // 写锁空闲  且  公平策略决定 线程应当被阻塞
             // 下面的处理是说,如果是已获取读锁的线程重入读锁时,
             // 即使公平策略指示应当阻塞也不会阻塞。
             // 否则,这也会导致死锁的。
             if  (firstReader == current) {
                 // assert firstReaderHoldCount > 0;
             else  {
                 if  (rh ==  null ) {
                     rh = cachedHoldCounter;
                     if  (rh ==  null  || rh.tid != current.getId()) {
                         rh = readHolds.get();
                         if  (rh.count ==  0 )
                             readHolds.remove();
                     }
                 }
                 // 需要阻塞且是非重入(还未获取读锁的),获取失败。
                 if  (rh.count ==  0 )
                     return  - 1 ;
             }
         }
         // 写锁空闲  且  公平策略决定线程可以获取读锁
         if  (sharedCount(c) == MAX_COUNT)  // 读锁数量达到最多
             throw  new  Error(  "Maximum lock count exceeded" );
         if  (compareAndSetState(c, c + SHARED_UNIT)) {
             // 申请读锁成功,下面的处理跟tryAcquireShared是类似的。
             if  (sharedCount(c) ==  0 ) {
                 firstReader = current;
                 firstReaderHoldCount =  1 ;
             else  if  (firstReader == current) {
                 firstReaderHoldCount++;
             else  {
            // 设定最后一次获取读锁的缓存
                 if  (rh ==  null )
                     rh = cachedHoldCounter;
                 if  (rh ==  null  || rh.tid != current.getId())
                     rh = readHolds.get();
                 else  if  (rh.count ==  0 )
                     readHolds.set(rh);
                 rh.count++;
                 cachedHoldCounter = rh;  // 缓存起来用于释放
             }
             return  1 ;
         }
     }
}
protected  final  boolean  tryReleaseShared( int  unused) {
     Thread current = Thread.currentThread();
     // 清理firstReader缓存 或 readHolds里的重入计数
     if  (firstReader == current) {
         // assert firstReaderHoldCount > 0;
         if  (firstReaderHoldCount ==  1 )
             firstReader =  null ;
         else
             firstReaderHoldCount--;
     else  {
         HoldCounter rh = cachedHoldCounter;
         if  (rh ==  null  || rh.tid != current.getId())
             rh = readHolds.get();
         int  count = rh.count;
         if  (count <=  1 ) {
             // 完全释放读锁
             readHolds.remove();
             if  (count <=  0 )
                 throw  unmatchedUnlockException();
         }
         --rh.count;  // 主要用于重入退出
     }
     // 循环在CAS更新状态值,主要是把读锁数量减 1
     for  (;;) {
         int  c = getState();
         int  nextc = c - SHARED_UNIT;
         if  (compareAndSetState(c, nextc))
             // 释放读锁对其他读线程没有任何影响,
             // 但可以允许等待的写线程继续,如果读锁、写锁都空闲。
             return  nextc ==  0 ;
     }
}
公平性策略公平与非公平策略是由 Sync 的子类 FairSync 和 NonfairSync 实现的。
/**
* 这个非公平策略的同步器是写锁优先的,申请写锁时总是不阻塞。
*/
static  final  class  NonfairSync  extends  Sync {
     private  static  final  long  serialVersionUID = -8159625535654395037L;
     final  boolean  writerShouldBlock() {
         return  false // 写线程总是可以突入
     }
     final  boolean  readerShouldBlock() {
         /* 作为一个启发用于避免写线程饥饿,如果线程临时出现在等待队列的头部则阻塞,
          * 如果存在这样的,则是写线程。
          */
         return apparentlyFirstQueuedIsExclusive();
     }
}
/**
* 公平的 Sync,它的策略是:如果线程准备获取锁时,
* 同步队列里有等待线程,则阻塞获取锁,不管是否是重入
* 这也就需要tryAcqire、tryAcquireShared方法进行处理。
*/
static  final  class  FairSync  extends  Sync {
     private  static  final  long  serialVersionUID = -2274990926593161451L;
     final  boolean  writerShouldBlock() {
         return  hasQueuedPredecessors();
     }
     final  boolean  readerShouldBlock() {
         return  hasQueuedPredecessors();
     }
}

现在用奇数表示申请读锁的读线程,偶数表示申请写锁的写线程,每个数都表示一个不同的线程,存在下面这样的申请队列,假设开始时锁空闲:
1  3  5  0  7  9  2  4

读线程1申请读锁时,锁是空闲的,马上分配,读线程3、5申请时,由于已分配读锁,它们也可以马上获取读锁。
假设此时有线程11申请读锁,由于它不是读锁重入,只能等待。而线程1再次申请读锁是可以的,因为它的重入。
写线程0申请写锁时,由于分配了读锁,只能等待,当读线程1、3、5都释放读锁后,线程0可以获取写锁。
线程0释放后,线程7、9获取读锁,它们释放后,线程2获取写锁,此时线程4必须等待线程2释放。
线程4在线程2释放写锁后获取写锁,它释放写锁后,锁恢复空闲。

特别说明:尊重作者的劳动成果,转载请注明出处哦~~~http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt206
相关文章
|
8天前
|
安全 Java 调度
Java编程时多线程操作单核服务器可以不加锁吗?
Java编程时多线程操作单核服务器可以不加锁吗?
23 2
|
23天前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
11天前
|
算法 Java 关系型数据库
Java中到底有哪些锁
【9月更文挑战第24天】在Java中,锁主要分为乐观锁与悲观锁、自旋锁与自适应自旋锁、公平锁与非公平锁、可重入锁以及独享锁与共享锁。乐观锁适用于读多写少场景,通过版本号或CAS算法实现;悲观锁适用于写多读少场景,通过加锁保证数据一致性。自旋锁与自适应自旋锁通过循环等待减少线程挂起和恢复的开销,适用于锁持有时间短的场景。公平锁按请求顺序获取锁,适合等待敏感场景;非公平锁性能更高,适合频繁加解锁场景。可重入锁支持同一线程多次获取,避免死锁;独享锁与共享锁分别用于独占和并发读场景。
|
19天前
|
Java 数据库
JAVA并发编程-一文看懂全部锁机制
曾几何时,面试官问:java都有哪些锁?小白,一脸无辜:用过的有synchronized,其他不清楚。面试官:回去等通知! 今天我们庖丁解牛说说,各种锁有什么区别、什么场景可以用,通俗直白的分析,让小白再也不怕面试官八股文拷打。
|
19天前
|
安全 Java 开发者
Java并发编程中的锁机制解析
本文深入探讨了Java中用于管理多线程同步的关键工具——锁机制。通过分析synchronized关键字和ReentrantLock类等核心概念,揭示了它们在构建线程安全应用中的重要性。同时,文章还讨论了锁机制的高级特性,如公平性、类锁和对象锁的区别,以及锁的优化技术如锁粗化和锁消除。此外,指出了在高并发环境下锁竞争可能导致的问题,并提出了减少锁持有时间和使用无锁编程等策略来优化性能的建议。最后,强调了理解和正确使用Java锁机制对于开发高效、可靠并发应用程序的重要性。
18 3
|
2月前
|
存储 Java
Java锁是什么?简单了解
在高并发环境下,锁是Java中至关重要的概念。锁或互斥是一种同步机制,用于限制多线程环境下的资源访问,确保排他性和并发控制。例如,超市储物柜仅能存放一个物品,若三人同时使用,则需通过锁机制确保每次只有一个线程访问。Java中可以通过`synchronized`关键字实现加锁,确保关键代码段的原子性,避免数据不一致问题。正确使用锁可有效提升程序的稳定性和安全性。
Java锁是什么?简单了解
|
1月前
|
Oracle Java 关系型数据库
【颠覆性升级】JDK 22:超级构造器与区域锁,重塑Java编程的两大基石!
【9月更文挑战第6天】JDK 22的发布标志着Java编程语言在性能和灵活性方面迈出了重要的一步。超级构造器和区域锁这两大基石的引入,不仅简化了代码设计,提高了开发效率,还优化了垃圾收集器的性能,降低了应用延迟。这些改进不仅展示了Oracle在Java生态系统中的持续改进和创新精神,也为广大Java开发者提供了更多的可能性和便利。我们有理由相信,在未来的Java编程中,这些新特性将发挥越来越重要的作用,推动Java技术不断向前发展。
|
1月前
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
19 0
|
2月前
|
开发者 C# Windows
WPF与游戏开发:当桌面应用遇见游戏梦想——利用Windows Presentation Foundation打造属于你的2D游戏世界,从环境搭建到代码实践全面解析新兴开发路径
【8月更文挑战第31天】随着游戏开发技术的进步,WPF作为.NET Framework的一部分,凭借其图形渲染能力和灵活的UI设计,成为桌面游戏开发的新选择。本文通过技术综述和示例代码,介绍如何利用WPF进行游戏开发。首先确保安装最新版Visual Studio并创建WPF项目。接着,通过XAML设计游戏界面,并在C#中实现游戏逻辑,如玩家控制和障碍物碰撞检测。示例展示了创建基本2D游戏的过程,包括角色移动和碰撞处理。通过本文,WPF开发者可更好地理解并应用游戏开发技术,创造吸引人的桌面游戏。
99 0
|
2月前
|
开发者 C# 存储
WPF开发者必读:资源字典应用秘籍,轻松实现样式与模板共享,让你的WPF应用更上一层楼!
【8月更文挑战第31天】在WPF开发中,资源字典是一种强大的工具,用于共享样式、模板、图像等资源,提高了应用的可维护性和可扩展性。本文介绍了资源字典的基础知识、创建方法及最佳实践,并通过示例展示了如何在项目中有效利用资源字典,实现资源的重用和动态绑定。
45 0
下一篇
无影云桌面