这边文章的主要内容是基于“java并发编程艺术”这本书,中间加入了一些自己的理解。这篇文章包括并发编程涉及到的几乎所有基础知识。主要是帮助长期从事业务逻辑开发的java程序员梳理一下java并发开发基础。
CPU原理简介
术语 | 术语描述 |
---|---|
内存屏障(memory barriers) | 是一组处理器指令,用于实现对内存操作的顺序限制 |
内存缓冲行(cache line) | CPU高速缓存中可以分配的最小存储单元。处理器填写缓存行时会加载整个缓存行,现在CPU需要执行几百次CPU指令。在64位操作系统中,一个内存缓冲行是8个字节 |
原子操作(atomic operation) | 不可中断的一个或者一系列操作 |
缓存行填充(cache line fill) | 当处理器识别到从内存中读取操作数是可缓存的,处理器读取整个高速缓存行到适当的缓存(L1,L2,L3的或所有) |
缓存命中(cache hit) | 如果进行高速缓存行填充操作的内存位置仍然是下次处理器访问的地址时,处理器从缓存行读取操作数据,而不是内存读取 |
写命中(write hit) | 当处理器将操作数写回到一个内存缓存的区域时,它首先会检查这个缓存的内存地址是否在缓存行中,如果存在一个有效的缓存行,则处理器会将这个操作数写回到缓存,而不是写回到内存,这个称为写命中 |
写缺失(write misses the cache) | 一个有效的缓存行被写入到不存在的内存区域 |
比较和交换(Compare And Swap) | CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间比较久之有没有发生变化,如果没有发生变化,才交换成新值,否则不交换 |
CPU流水线(CPU pipeline) | 在CPU中由5-6个不同功能的电路单元组成一条指令处理流水线,一条流水线对应一个指令的执行 |
缓存的存储器层次结构
CPU缓存可以分为一级缓存,二级缓存,部分高端CPU还具有三级缓存,每一级缓存中所储存的全部数据都是下一级缓存的一部分,这三种缓存的技术难度和制造成本是相对递减的,所以其容量也是相对递增的。
基于缓存的存储器层次结构行之有效,是因为较慢的存储设备比较快的存储设备更便宜,还因为程序往往展示出局部性:
时间局部性:被引用过一次的存储器的位置很可能在不远的将来被再次引用。
空间局部性:如果一个存储器位置被引用了一次,那么程序很可能在不远的将来引用附近的一个存储器位置。
CPU如何实现原子操作
1)使用总线锁保证原子性,当一个处理器在总线上输出LOCK#信号,那么该处理器可以独占共享内存。
2)使用缓存锁保证原子性,同一时刻我们只需保证对某个内存地址的操作是原子性即可,总线锁定的开销比较大。通过缓存一致性保障当某个被多处理器缓存的变量被修改,所有的缓存处理器重新从共享内存加载新值。
处理器之间的缓存一致性协议:每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据读到操作器缓存里。
vilatile关键字原理
对vilatile关键字修饰的变量,当发生修改时,生成的汇编代码会多出一个lock前缀的指令。
1)lock前缀指令会引起处理器缓存回写到内存;
2)一个处理器的缓存回写到内存会导致其他处理器的缓存无效。(之后的所有操作,各个处理器看到的值是最新的,也就是valatile的可见性)
前面说过,处理器是通过嗅探在总线来检查混存的值是不是过期,根据处理器之间的缓存一致性协议,vilatile变量的修改是满足原子的。vilatile相对synchronized修饰一个变量,实现原子性会轻量很多。
可见性 != 线程安全
可见性指的是单个变量;线程安全指的是变量的一系列操作;两者是不同维度的概念,可以理解为可见性是线程安全的前提。
即便i变量是volatile关键字修饰的,但是i++不是线程安全的。
i++编译后是多条指令,指令如下:
0: aload_0
1: dup
2: getfield #2; //Field i:I
5: iconst_1
6: iadd
7: putfield #2; //Field i:I
8: return
可见性只保证了7这一步的原子性。如果另一个线程2也做i++,当线程1、2都执行到第7步,这个时候,并行变成串行,线程1执行第7步,线程2等待线程1执行完第7步才能往下执行。但是并发情况下线程2得到的值也是1,线程1、2各做了一次i++,应该得到2,但是输出是1。解决这个问题可以采用volatile配合CAS来保证多操作的一致性。
锁
同步的实现当然是采用锁了,java中使用锁的两个基本工具是 synchronized 和 Lock。Lock通过显示定义同步锁对象来实现同步,在这种机制下,同步锁由Lock对象充当。Lock提供了比synchronized方法和synchronized代码块更广泛的锁定操作,Lock允许实现更灵活的结构,可以具有差别很大的属性,并且支持多个相关的Condition对象。
Synchronized实现同步的基础:Java中的每一个对象都可以作为锁,Synchronized具体体现为以下四种形式:
1)对于普通对象,锁是当前实例对象;
2)对于普通同步方法,锁是当前实例对象;
3)对于静态同步方法,锁是当前类的class对象;
4)对于同步方法块,锁是Synchronized括号里配置的对象。
Synchronized在JVM里的实现原理:JVM基于进入和退出Monitor对象来实现对象同步和代码块同步,其中对象同步是通过获取对象的monitor(也就是对象头里面的锁信息)来决定哪个线程持可以操作这个对象;代码块的同步是使用monitorenter指令和monitorexit指令实现的,在代码编译后,在同步代码块开始处添加monitorenter指令,结束和异常处添加monitorexit指令,方法同步类似。
java对象头结构
如果对象是数组类型,则虚拟机用3个字宽(1个字宽,在32位虚拟机即4字节,64位虚拟机8字节),非数组,则用2个字宽。
内容 | 说明 |
---|---|
Mark Word | 存储对象的hashCode或锁信息 |
Class Metadata Address | 存储到对象类型数据的指针 |
其中Mark Word里默认存储对象的HashCode、分代年龄和锁标记位。32位的结构如下
Monitor Record
Monitor Record是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个monitor record关联(对象头的MarkWord中的LockWord指向monitor record的起始地址),同时monitor record中有一个Owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。Monitor Record的内部结构字段如下:
Owner:初始时为NULL表示当前没有任何线程拥有该monitor record,当线程成功拥有该锁后保存线程唯一标识,当锁被释放时又设置为NULL;
EntryQ:关联一个系统互斥锁(semaphore),阻塞所有试图锁住monitor record失败的线程。
RcThis:表示blocked或waiting在该monitor record上的所有线程的个数。
Nest:用来实现重入锁的计数。
HashCode:保存从对象头拷贝过来的HashCode值(可能还包含GC age)。
Candidate:用来避免不必要的阻塞或等待线程唤醒,因为每一次只有一个线程能够成功拥有锁,如果每次前一个释放锁的线程唤醒所有正在阻塞或等待的线程,会引起不必要的上下文切换(从阻塞到就绪然后因为竞争锁失败又被阻塞)从而导致性能严重下降。Candidate只有两种可能的值。0表示没有需要唤醒的线程1表示要唤醒一个继任线程来竞争锁。
偏向锁
Java偏向锁(Biased Locking)是Java6引入的一项多线程优化。 偏向锁,顾名思义,它会偏向于第一个访问锁的线程,如果在运行过程中,同步锁只有一个线程访问,不存在多线程争用的情况,则线程是不需要触发同步的,这种情况下,就会给线程加一个偏向锁。 如果在运行过程中,遇到了其他线程抢占锁,则持有偏向锁的线程会被挂起,JVM会消除它身上的偏向锁,将锁恢复到标准的轻量级锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态,撤销偏向锁后恢复到未锁定(标志位为“01”)或轻量级锁(标志位为“00”)的状态。
轻量级锁
轻量级锁是由偏向所升级来的,偏向锁运行在一个线程进入同步块的情况下,当第二个线程加入锁争用的时候,偏向锁就会升级为轻量级锁; 轻量级锁的加锁过程:
(1)当对象处于无锁状态时(RecordWord值为HashCode,状态位为001),线程首先从自己的可用moniter record列表中取得一个空闲的moniter record,初始Nest和Owner值分别被预先设置为1和该线程自己的标识,一旦monitor record准备好然后我们通过CAS原子指令安装该monitor record的起始地址到对象头的LockWord字段,如果存在其他线程竞争锁的情况而调用CAS失败,则只需要简单的回到monitorenter重新开始获取锁的过程即可。
(2)对象已经被膨胀同时Owner中保存的线程标识为获取锁的线程自己,这就是重入(reentrant)锁的情况,只需要简单的将Nest加1即可。不需要任何原子操作,效率非常高。
(3)对象已膨胀但Owner的值为NULL,当一个锁上存在阻塞或等待的线程同时锁的前一个拥有者刚释放锁时会出现这种状态,此时多个线程通过CAS原子指令在多线程竞争状态下试图将Owner设置为自己的标识来获得锁,竞争失败的线程在则会进入到第四种情况(4)的执行路径。
(4)对象处于膨胀状态同时Owner不为NULL(被锁住),在调用操作系统的重量级的互斥锁之前先自旋一定的次数,当达到一定的次数时如果仍然没有成功获得锁,则开始准备进入阻塞状态,首先将rfThis的值原子性的加1,由于在加1的过程中可能会被其他线程破坏Object和monitor record之间的关联,所以在原子性加1后需要再进行一次比较以确保LockWord的值没有被改变,当发现被改变后则要重新monitorenter过程。同时再一次观察Owner是否为NULL,如果是则调用CAS参与竞争锁,锁竞争失败则进入到阻塞状态。
多线程下java如何实现原子操作
通过锁和CAS的方式实现多指令的“原子”操作,以计数为例:
private void safeCount(){
for(;;){
int count = atomicCount.get();
boolean suc = atomicCount.get.compareAndSet(count,++count);
}
}
使用CAS方式实现原子操作的三个问题
1)ABA问题,1A->2B->3A,JDK中提供AtomicStampedReference来解决,是通过首先检查前引用是否等于预期引用,再检查当前值是否等于预期值来实现的。
2)循环时间开销比较大
3)只能保障一个变量的原子操作,多个变量的原子操作通过锁来实现。
JVM实现锁的方式都使用了循环CAS。
java内存模型
线程之间的通信和同步
线程之间的通信可以分为隐式通信和显式通信。隐式通信是指在共享内存的并发模型里,通过写-读内存中的公共状态来实现隐式通信。显式通信是指在信息传递的并发模型里,线程之间通过socket发送消息进行通信。
同步是指程序中用于控制不同线程间操作发生的相对顺序的机制。在共享内存并发模型里,同步是显式进行的,程序员必须显式指定某个方法或某段代码需要在线程之间互斥执行。在消息传递的并发模型里,由于消息的发送必须在消息的接收之前,因此同步是隐式进行的。
如果编写多线程程序的java程序员不理解隐式进行的线程之间通信,很可能遇到各种奇怪的内存可见性问题。
从源代码到最终执行的指令序列经过以下几步的重排序。
编译器在不影响 程序执行的结果下,可能对代码执行的先后顺序进行调整。
在执行程序时,为了提供性能,处理器和编译器常常会对指令进行重排序,但是不能随意重排序,不是你想怎么排序就怎么排序,它需要满足以下两个条件:
1. 在单线程环境下不能改变程序运行的结果;
2. 存在数据依赖关系的不允许重排序
A、B、C三个操作存在如下关系:A、B不存在数据依赖关系,A和C、B和C存在数据依赖关系,因此在进行重排序的时候,A、B可以随意排序,但是必须位于C的前面,执行顺序可以是A –> B –> C或者B –> A –> C。
在多线程情况下,重排序可能导致处理器执行内存操作的顺序和内存实际的操作执行顺序不一致。比如下面写缓存的存在,导致处理器对执行的操作顺序进行了重排序。处理器与内存的交互图如下
并发场景下,处理器A执行
a=1;
x=b;
处理器B执行
b=2;
y=a;
可能得到a=b=0的结果
从内存操作的实际发生顺序来看,直到A3将写缓存区的值刷新到主内存,A1操作才算真正执行。处理器执行的顺序是A1->A2,但是内存操作实际发生的顺序是A2->A1,这个就是内存系统的重排序。
对于未同步或未正确同步的多线程程序,JMM只提供最小的安全性:线程执行时读取到的值,要么是之前某个线程写入的值,要么是默认值(0,Null,false),JMM保证线程操作的值不会是无中生有冒出来的。
为了保证并发系统的顺序一致性,java编译器在生成指令序列的适当位置会插入内存屏障指令来禁止特定类型的处理器排序,有以下4类内存屏障指令,我们熟悉各种关键字,如volatile、锁、final等 都是通过屏障指令来保证内存语义的:
屏障指令类型 | 指令示例 | 说明 |
---|---|---|
LoadLoad Barriers | Load1;LoadLoad;Load2 | 确保Load1数据的装载先于Load2及所有后续装载指令 |
StoreStore Barriers | Store1;StoreStore;Store2 | 确保Store1数据对其他处理器可见(刷新到内存)先于Store2及后续所有存储指令的存储 |
LoadStore Barriers | Load1;LoadLoad;Store2 | 确保Load1数据的装载先于Store2及所有后续存储指令刷新到内存 |
StoreLoad Barriers | Store1;StoreStore;Load2 | 确保Store1数据对其他处理器可见(刷新到内存)先于Load2及后续装载指令 |
volatile
在每一个volatile写操作的前面插入一个StoreStore屏障
在每一个volatile写操作的后面插入一个StoreLoad屏障
在每一个volatile读操作的后面插入一个LoadLoad屏障
在每一个volatile读操作的后面插入一个LoadStore屏障
JSR-133还严格限制编译器和处理器对volatile变量与普通变量的重排序确保volatile的写-读 和锁的释放-获取具有相同的内存语义。
双重检查锁定和延迟初始化(单例的写法)
写法一:
public static synchronized Instance getInstance(){
if (instance == null){
instance = new Instance();
}
return instance;
}
}
这种做法的问题是很明显的,每一次读取instance都需要同步,可能会对性能产生较大的影响。
写法二:
public static Instance getInstance(){
if (instance == null){
synchronized(UnsafeLazyInit.classs){
if (instance == null){
instance = new Instance();
}
}
}
return instance;
}
}
这种方案看似解决了上面两种方案都存在的问题,但是也是有问题的。
问题根源
instance = new Instance();
这一条语句在实际执行中,可能会被拆分程三条语句,如下:
memory = allocate(); //1
createInstance(memory); //2
instance = memory; //3
根据重排序规则,后两条语句不存在数据依赖,因此是可以进行重排序的。重排序之后,就意味着,instance域在被赋值了之后,指向的对象可能尚未初始化完成,而instance域是一个静态域,可以被其他线程读取到,那么其他线程就可以读取到尚未初始化完成的instance域。
基于volatile的解决方案
要解决这个办法,只需要禁止语句2和语句3进行重排序即可,因此可以使用volatile来修改instance就能做到了。
private volatile static Instance instance;
因为Volatile语义会禁止编译器将volatile写之前的操作重排序到volatile之后。
写法三:
public class InstanceFactory {
private static class InstanceHolder {
public static Instance instance = new Instance();
}
public static Instance getInstance() {
return InstanceHolder.instance ; //这里将导致InstanceHolder类被初始化
}
}
Java语言规范规定,对于每一个类或者接口C ,都有一个唯一的初始化锁LC与之对应,从C到LC的映射,由JVM实现。
每个线程在读取一个类的信息时,如果此类尚未初始化,则尝试获取LC去初始化,如果获取失败则等待其他线程释放LC。
如果能获取到LC,则要判断类的初始化状态,如果是未初始化,则要进行初始化。如果是正在初始化,
则要等待其他线程初始化完成,如果是已经初始化,则直接使用此类对象。
线程
线程的状态
状态名称 | 说明 |
---|---|
NEW | 初始状态,线程被构建,但是还没有调用start()方法 |
RUNNABLE | 运行状态,Java线程将操作系统中的就绪和运行两种状态笼统称为“运行中” |
BLOCKED | 阻塞状态,表示线程阻塞于锁 |
WATING | 等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其他线程做出一些特定动作(通知或中断) |
TIME_WATING | 超时等待状态,该状态不同于WATING,它是可以在指定的时间自行返回的 |
TERMINATED | 终止状态,表示当前线程已经执行完毕 |
Java线程状态变迁
线程之间的通信方式
1)volatile和synchronized关键字
volatile修饰的变量可以被多个线程共用;synchronzed保证线程对变量的可见性和排他性
2)等待/通知机制
从WaitNotify中可以提炼出等待/通知的经典范式,该范式分为两部分:等待方(消费者)和通知方(生产者)。
等待方遵循如下的原则:
a.获取对象的锁
b.如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件
c.条件满足则执行对应的逻辑。
对应的伪代码如下
synchronized(对象){
while(条件不满足){
对象.wait();
}
}
通知方遵循如下原则:
a.获取对象的锁
b.改变条件
c.通知所有的等待方线程
对应的伪代码如下:
synchronized(对象){
改变条件;
对象.notifyAll();
}
3)管道输入/输出流
PipeWriter out = new PipeWriter();
PipeReader in = new PipeReader();
out.connect(in);
out、in分别在两个线程中进行读写
4)Thread.join()的使用
如果一个线程A执行了thread.join()语句,其含义:当前线程A等到thread线程终止之后才从thread.join()返回。
5)ThreadLocal的使用
ThreadLocal即本地线程变量,是一个以ThreadLocal对象为key,任意对象为value的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的值。
Lock
Lock接口提供了和Synchronized关键字类似的同步功能,只是在使用时需要显示地获取和释放锁。虽然它缺少了Synchronized隐式获取释放锁的便捷性,但是却拥有了锁获取和释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。
Lock的API
方法名称 | 描述 |
---|---|
void lock() | 获取锁,调用该方法获取锁,当获取成功后,从该方法返回,否则阻塞在这个方法中 |
void lockInterruptibly() throws InterruptedException | 可中断地获取锁,在获取过程中可以相应中断 |
boolean tryLock() | 尝试非阻塞获取锁,调用该方法立刻返回,如果能够获取则返回true,否则返回false |
boolean tryLock(long,unit) throws InterruptedException | 带超时时间的获取锁 |
void unlock() | 释放锁 |
Codition newCondition() | 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁 |
Lock接口的实现基本都是通过聚合一个同步器的子类来完成访问控制的,队列同步器中维护了一个FIFO的双向链表来管理线程之间的同步,当前线程调用lock.lock()获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
队列同步器(AbstractQueueSynchronizer)
子类通过继承同步器并实现它的抽象方法来管理同步状态,同步器可以支持独占式获取同步状态,也可以支持共享式地获取同步状态,这样就可以实现不同类型的组件(ReentrantLock、ReentrantReadWriteLock 和 CountDownLatch等)
同步器可重写的方法
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步器状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShare(int arg) | 共享式获取同步状态,返回大于等于0的值表示获取成功,反之获取失败 |
protected boolean tryReleaseShare(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
队列同步器内部中维护了一个FIFO的双向链表来管理线程之间的同步,每一个线程被封装成一个节点Node,Node的属性如下:
int waitStatus:CANCAELLED(值为1),等待超时或者被中断的状态;SIGNAL(值为-1),等待信号状态;CONDITION(值为-2),节点线程在等待Condition,如果其他线程对Condition调用了sinal()方法后,该节点获取同步状态;PROPAGATE(值为-3),表示下一次共享式同步状态获取会无条件地被传播下去;INITIAL(值为0),初始状态
Node prev:前驱节点
Node next:后继节点
Node nextWaiter:等待队列的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量。该字段用于处理等待状态为CONDITION的线程同步。
Thread thread:获取同步状态的线程
独占式同步状态获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感。源码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代码主要完成了同步状态获取、节点构造、加入同步器以及在同步队列中自旋等待的相关工作。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法是在第一次添加尾节点失败时通过自旋死循环的方式来确保节点加到队列尾部。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能尝试获取同步状态。
独占式同步状态获取流程
共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有过个线程同时获取到同步状态。一个文件的写往往是独占式访问,但是读可以是共享式访问。
通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态,该方法代码如下所示:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)返回大于等于0时,表示能够获取到同步状态。在doAcquireShared(int arg)方法的自旋中,如果当前节点的前驱节点为头节点时,尝试获取同步状态,如果返回值大于等于0,表示获取同步状态成功,并从自旋中退出。setHeadAndPropagate是设置头节点,然后调用releaseShared()方法释放后续处于SIGNAL状态的线程节点。
重入锁(ReentrantLock)
重入锁是基于独占式同步来实现的,在此基础上增加了请求线程是不是获得锁的线程判断,来支持线程在获得到锁之后能够在此获取该锁而不会被锁阻塞。每次获得锁,状态都要累加,如果锁被获取了n此,那么前(n-1)次tryRelease(int release)方法必须返回fasle,只有同步状态完全释放了,才能返回true。
锁的获取存在公平和非公平两种实现。如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,即FIFO。所有锁的实现都是基于同步队列实现的额,ReentrantLock也不例外,通过重写tryAcquire(int arg)方法来实现公平和非公平。现在看一下公平锁和非公平锁的区别:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
唯一的区别是公平锁比非公平锁在获取锁是多一个hasQueuedPredecessors()逻辑,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放之后才能获取锁。
读写锁
现实应用场景下,对某个资源的读远远大于写,这个时候使用独占锁来同等对待读写线程,性能远没有使用共享锁来的高,在读线程占用资源时让其他读线程也访问资源可以减少线程之间的竞争。
读写状态的设计
在ReentrantLock中,同步状态代表了一个线程获取锁的次数。读写锁同样适用同步状态来维护多个读线程和一个写线程的状态。
如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量分成两个部分,高16位表示读,低16位表示写。
写锁的获取与释放
写锁是一个支持重进入的排他锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。ReentrantReadWriteLock的tryAcquire代码如下:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 存在读锁或者当前获取线程不是已经获取写锁的线程 if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
读锁的获取与释放
读锁时一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。ReentrantReadWriteLock的tryAcquireShared方法代码如下:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
LockSupport工具
LockSupport提供的阻塞和唤醒方法如下:
方法名称 | 描述 |
---|---|
void park() | 阻塞当前线程,如果调用unpark(Thread thread)方法或当前线程被中断,才能从park()方法返回 |
void parkNanos(long nanos) | 阻塞当前线程,最长不超过nanos纳秒,返回条件在park()基础上增加了超时返回 |
void parkUntil(long deadline) | 阻塞当前线程,知道deadline时间(从1970年开始到deadline的毫秒数) |
void unpark(Thread thread) | 唤醒处于阻塞的线程thread |
Condition接口
Condition的使用如下:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Condition接口的使用类似于普通java对象的监视器方法,普通java对象的监视器方法主要包括wait()、wait(long timeout)、notify、notifyAll()方法,Condition对应的方法为await()、await(long timeout)、signal()、signalAll().
Object的监视器方法与Condition接口的主要区别如下:
对比项 | Object Monitor Methods | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock()获取锁 然后调用Lock.newCondition()获取Condition对象 |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态,在等待状态中不响应中断 | 不支持 | 支持 |
相对于同步器的队列是双向队列,condition的等待队列是一个单向队列。每个condition一个队列。
ConditionObject的await()、signal()方法源码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
调用await方法,说明该线程已经获取到了锁,该方法会将当前线程构造成节点加入到condition的等待队列。
signal方法会唤起等待的头线程节点。
java并发容器、框架、工具类
ConcurrentHashMap
jdk1.8之前实现原理是通过锁分段技术提高并发访问率。hashMap的内部是一个Entry数组,如果在这个数组上面上锁,并发常见下线程等待会非常多。通过分段技术,ConcurrentHashMap维护一个两级数组,第一级为最大65535个Segment,每个Segment对象内部再维护一个HashEntry数组。因为Segment最多65535个,也就是hashCode的最后15位决定分到哪个Segment,为了使分到Segment地数据更加分散,ConcurrentHashMap对hashCode再次散列。
jdk1.8之后,ConcurrentHashMap放弃了Segment概念,采用table数组某个元素作为锁,从而实现了对每一行(链表)数据进行加锁,进一步减少并发冲突的概率。将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树(当链表个数超过8时,链表转为红黑树,查询复杂度从O(N)降为O(lnN))的结构。
Fork/Join框架
Fork就是把一个大任务切分成若干个子任务并行执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。
ForkJoin框架的几个核心类和接口
ForkJoinTask类,实现Future接口的抽象类,需要实现compute方法(类似Thread的run方法,完成相应的计算逻辑)可以当Thread来理解使用。调用fork方法就加入到任务池中,调用join方法得到任务类的执行结果。
ForkJoinPool类,执行任务的池子,可以当ThreadPool来理解使用
CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。CountDownLatch接受一个正整数作为构造函数,当我们调用countDown方法时,N就会减1。CountDownLatch的await方法会阻塞当前线程,加入到AQS的同步队列,当N变成0时,就会唤起AQS中等待的等待线程。
CyclicBarrier
CyclicBarrier的字面意思是可循环的屏障。它让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。构造函数也接受一个正整数,表示要同步的线程数。
Semaphore
信号量Semaphore是用来控制同时访问特定资源的线程数量,它通过协调各个线程以保证合理的使用公共资源。构造函数也是一个正整数表示最大并发访问的线程数量。通过acquire方法获取信号,接着使用资源执行操作,完成后通过release方法释放信号量
原子操作类
AtomicBoolean
AtomicInteger
AtomicLong
以上提供CAS方法,如addAndGet、compareAndSet等
AtomicIntergerArray
AtomicLongArray
AtomicReferenceArray
以上提供CAS原子的方式更新数组中的值,如addAndGet(int i,int delta)等
AtomicReference:原子更新引用类型
AtomicReferenceFieldUpdater:原子更新引用类型里的字段
AtomicMarkableReference:原子更新带有标记位的引用类型
AtomicIntergerFiledUpdater:原子更新整型的字段
AtomicLongFiledUpdater:原子更新Long类型字段
AtomicStampedReference:原子更新带有版本号的引用类型,可以解决CAS进行原子更新时可能出现的ABA问题。
Java的线程池
线程池创建的构造函数如下
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) ;
参数说明:
corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数量。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界队列这个参数就没什么效果。
keepAliveTime(工作线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。
unit:时间单位
workQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几种队列:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue。
threadFactory:用于设置创建线程的工厂,可以通过线程给每个创建出来的线程设置更有意义的名字。
handler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略丢弃任务。jdk1.5中提供以下4中策略:AbortPolicy(直接跑出异常)、CallerRunsPolicy(只用调用者所在线程运行任务)、DiscardOldestPolicy(丢弃队列里最近的一个任务,并执行当前任务)、DiscardPolicy(不处理,丢弃掉)。
线程池处理的主要流程:
1)线程池判断核心线程池里的线程是否都在执行任务,如果不是,则创建一个新的工作线程来执行任务,如果核心线程里的线程都在执行任务,则进入下个流程。
2)线程池判断工作队列是否已经满,如果工作队列没有慢,则将新提交的任务存储在这个工作队列里面,如果工作队列满了,则进入下个流程。
3)线程池判断线程池的线程(最大线程数)是否都处于工作状态,如果没有,则创建一个新的工作线程来执行,如果已经满了,则将诶饱和策略来处理这个任务。
工作线程:线程池创建时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。
Executor框架
Executor是用户级的调度框架,成员关系图如下:
其中ThreadPoolExecutor可以使用工厂类Executors来创建,有以下三种类型:
1)FixedThreadPool 可重用固定线程数的线程池。
corePoolSize和maximumPoolSize都被设置成nThreads。keepAliveTime为0,代表多余的空闲线程会被立即终止。队列使用的是链表类型,是无界的。
2)SingleThreadPool 单线程的线程池。corePoolSize和maximumPoolSize都为1。keepAliveTime为0,代表多余的空闲线程会被立即终止。队列使用的是链表类型,是无界的。
3)CachedThreadPool 是大小无界的线程池。corePool被设置为0,即corePool为空,maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60S,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,超过60秒会被终止。
ScheduledThreadPoolExecutor功能和Timer类似,但ScheduledThreadPoolExecutor功能更强大灵活。Timer对应的是单个后台线程,ScheduledThreadPoolExecutor对应的是一个线程池。
ScheduledThreadPoolExecutor中的延迟队列DelayQueue也是一个无界队列。DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序。