锁消除
锁消除是指虚拟机即时编译器在运行时,对一些代码上要求同步,但是被检测到不可能存在共享数据竞争的锁进行消除。
锁消除的主要判定依据来源于逃逸分析的数据支持,如果判断在一段代码中,堆上的所有数据都不会逃逸出去从而被其他线程访问到,那就可以把它们当做栈上数据对待,认为它们是线程私有的,同步加锁自然就无须进行。
public String concatString(String s1,String s2){ StringBuffer sb=new StringBuffer(); sb.append(s1); sb.append(s2); return sb.toString(); }
每个StringBuffer.append
方法中都有一个同步块,锁就是sb对象。虚拟机观察变量sb,很快就会发现它的动态作用域被限制在concatString
方法内部。sb的所有引用永远不会逃逸到concatString
方法之外,其他线程无法访问到它
代码中concatString方法中的局部对象sb,就只在该方法内的作用域有效,不同线程同时调用concatString方法时,都会创建不同的sb对象,因此此时的append操作若是使用同步操作,就是白白浪费的系统资源因此,虽然这里有锁,但是可以被安全地消除掉,在即时编译之后,这段代码就会忽略掉所有的同步而直接执行了。
锁粗化
原则上,我们在编写代码的时候,总是推荐将同步块的作用范围限制得尽量小——只在共享数据的实际作用域中才进行同步,这样是为了使得需要同步的操作数量尽可能变小,如果存在锁竞争,那等待锁的线程也能尽快拿到锁。
大部分情况下,上面的原则都是正确的,但是如果一系列的连续操作都对同一个对象反复加锁和解锁,甚至加锁操作是出现在循环体中的,那即使没有线程竞争,频繁地进行互斥同步操作也会导致不必要的性能损耗。
for(int i=0;i<size;i++){ synchronized(lock){ }
如果虚拟机探测到有这样一串零碎的操作都对同一个对象加锁,将会把加锁同步的范围扩展(粗化)到整个操作序列的外部
synchronized(lock){ for(int i=0;i<size;i++){ } }
上述代码中,扩展到for循环之外加锁,这样只需要加锁一次就可以了。
锁升级
因为Synchronized太重了,所以在虚拟机层面上进行了优化,偏向锁/轻量级锁/重量级锁这三种锁是指锁的状态,Java 5通过引入锁升级的机制来实现高效Synchronized。这三种锁的状态是通过对象监视器在对象头中的字段来表明的。
- 偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
- 轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
- 重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。
其实在BlogJava并发机制的底层实现详细介绍过,这里不再赘述,这里给出简单的状态图:
线程本地存储(Thread Local Storage)
ThreadLocal提供了线程的局部变量,每个线程都可以通过set()和get()来对这个局部变量进行操作,但不会和其他线程的局部变量进行冲突,实现了线程的数据隔离。简要言之:往ThreadLocal中填充的变量属于当前线程,该变量对其他线程而言是隔离的,举个例子
public class MyThreadLocal { // 采用匿名内部类的方式来重写initialValue方法 private static final ThreadLocal<Object> threadLocal = new ThreadLocal<Object>() { /** * ThreadLocal没有被当前线程赋值时或当前线程刚调用remove方法后调用get方法,返回此方法值 */ @Override protected Object initialValue() { System.out.println("调用get方法时,当前线程共享变量没有设置,调用initialValue获取默认值!"); return null; } }; // 操纵int类型的任务线程 public static class MyIntegerTask implements Runnable { private String name; MyIntegerTask(String name) { this.name = name; } public void run() { for (int i = 0; i < 5; i++) { // ThreadLocal.get方法获取线程变量 if (null == MyThreadLocal.threadLocal.get()) { // ThreadLocal.et方法设置线程变量 MyThreadLocal.threadLocal.set(0); System.out.println("线程" + name + ": 0"); } else { int num = (Integer) MyThreadLocal.threadLocal.get(); MyThreadLocal.threadLocal.set(num + 1); System.out.println("线程" + name + ": " + MyThreadLocal.threadLocal.get()); if (i == 3) { MyThreadLocal.threadLocal.remove(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } // 操纵string类型的任务线程 public static class MyStringTask implements Runnable { private String name; MyStringTask(String name) { this.name = name; } public void run() { for (int i = 0; i < 5; i++) { if (null == MyThreadLocal.threadLocal.get()) { MyThreadLocal.threadLocal.set("a"); System.out.println("线程" + name + ": a"); } else { String str = (String) MyThreadLocal.threadLocal.get(); MyThreadLocal.threadLocal.set(str + "a"); System.out.println("线程" + name + ": " + MyThreadLocal.threadLocal.get()); } try { Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { new Thread(new MyIntegerTask("IntegerTask1")).start(); new Thread(new MyStringTask("StringTask1")).start(); } }
运行结果为:
调用get方法时,当前线程共享变量没有设置,调用initialValue获取默认值! 线程IntegerTask1: 0 调用get方法时,当前线程共享变量没有设置,调用initialValue获取默认值! 线程StringTask1: a 线程StringTask1: aa 线程IntegerTask1: 1 线程StringTask1: aaa 线程IntegerTask1: 2 线程StringTask1: aaaa 线程IntegerTask1: 3 线程StringTask1: aaaaa 调用get方法时,当前线程共享变量没有设置,调用initialValue获取默认值! 线程IntegerTask1: 0
对于多线程资源共享的问题,同步机制采用了以时间换空间的方式,而ThreadLocal采用了以空间换时间的方式。前者仅提供一份变量,让不同的线程排队访问,而后者为每一个线程都提供了一份变量,因此可以同时访问而互不影响
可以通过java.lang.ThreadLocal
类来实现线程本地存储的功能。
- 每一个线程的Thread对象中都有一个
ThreadLocalMap
对象, ThreadLocalMap
对象存储了一组以ThreadLocal.threadLocalHashCode
为键,以本地线程变量为值的K-V值对,
ThreadLocal对象就是当前线程的ThreadLocalMap的访问入口,每一个ThreadLocal对象都包含了一个独一无二的threadLocalHashCode值,使用这个值就可以在线程K-V值对中找回对应的本地线程变量。
ThreadLocal 内存泄露问题
ThreadLocalMap 中使⽤的 key 为 ThreadLocal 的弱引⽤,⽽ value 是强引⽤。所以,如果ThreadLocal 没有被外部强引⽤的情况下,在垃圾回收的时候,key 会被清理掉,⽽ value 不会被清理掉。这样⼀来, ThreadLocalMap 中就会出现key为null的Entry。假如我们不做任何措施的话,value 永远⽆法被GC 回收,这个时候就可能会产⽣内存泄露。ThreadLocalMap实现中已经考虑了这种情况,在调⽤ set() 、 get() 、 remove() ⽅法的时候,会清理掉 key 为 null 的记录。使⽤完ThreadLocal ⽅法后 最好⼿动调⽤ remove() ⽅法
JUC并发包概述
本部分回答以下几个问题,如果能回答正确,则证明本部分掌握好了。
- JUC并发包下有哪些内容,作用分别是什么
接下来我们看这部分的内容。JUC并发包提供了一切并发底层原理和实现机制的封装,并且做了大幅度的扩展
JDK并发工具类是JDK1.5引入的一大重要的功能,集中在Java.util.concurrent包下。java.util.concurrent包主要包含了原子类、并发锁、并发集合和队列、线程池、并发工具类
- 原子类,提供了一系列原子操作
- 并发锁,提供了一系列并发锁,主要关注ReentrantLock
- 并发集合和队列,提供了一系列并发集合,主要关注ConcurrentHashMap
- 线程池,提供了线程池进行操作,主要关注线程池参数和常用线程池
- 并发工具类,提供了一些并发工具,都大致了解下即可
接下来会分布进行介绍
JUC并发包下原子类
本部分回答以下几个问题,如果能回答正确,则证明本部分掌握好了。
- 原子类有哪些,简单介绍下AtomicInteger
- CAS操作什么原理?产生的ABA问题是什么?如何解决
接下来我们看这部分的内容。
原子类
atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰,所以,所谓原子类说简单点就是具有原子操作特征的类,原子操作类提供了一些修改数据的方法,这些方法都是原子操作的,在多线程情况下可以确保被修改数据的正确性,我们在前边的Java并发机制底层实现中了解到,通过CAS操作可以实现原子操作,整体分类如下
共有如下几种分类
AtomicInteger:int 类型原子类 AtomicLong:long 类型原子类 AtomicBoolean :boolean类型原子类 AtomicIntegerArray:整形数组原子操作类 AtomicLongArray:长整形数组原子操作类 AtomicReferenceArray :引用类型数组原子操作类 AtomicReference:引用类型原子类 AtomicStampedRerence:原子更新引用类型里的字段原子类 //可以用时间戳解决ABA问题 AtomicMarkableReference :原子更新带有标记位的引用类型 AtomicIntegerFieldUpdater:原子更新整形字段的值 AtomicLongFieldUpdater:原子更新长整形字段的值 AtomicReferenceFieldUpdater :原子更新应用类型字段的值
由于使用上大多类似 ,这里仅以AtomicInteger的常用方法为例进行说明::
public final int get() //获取当前的值 public final int getAndSet(int newValue)//获取当前的值,并设置新的值 public final int getAndIncrement()//获取当前的值,并自增 public final int getAndDecrement() //获取当前的值,并自减 public final int getAndAdd(int delta) //获取当前的值,并加上预期的值 //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update) boolean compareAndSet(int expect, int update) //最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。 public final void lazySet(int newValue)
CAS操作
AtomicInteger 类主要利⽤ CAS (compare and swap) + volatile 和 native ⽅法来保证原⼦操作,从⽽避免 synchronized 的⾼开销,执⾏效率⼤为提升。CAS的原理是拿期望的值和原本的⼀个值作⽐较,如果相同则更新成新的值。UnSafe 类的objectFieldOffset() ⽅法是⼀个本地⽅法,这个⽅法是⽤来拿到“原来的值”的内存地址,返回值是valueOffset。另外 value 是⼀个volatile变量,在内存中可⻅,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。我们从getAndAdd其实现的源码中可以看出:
private static final Unsafe unsafe = Unsafe.getUnsafe(); //value属性在AtomicInteger中的偏移量,通过这个偏移量可以快速定位到value字段,这个是实现AtomicInteger的关键 private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; //使用volatile修饰,可以确保value在多线程中的可见性。
可以通过一个方法的源码来看其调用方式:
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); //可以确保从主内存中获取变量最新的值 } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); //CAS自旋等待,多线程情况下安全 return var5; } public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
getAndAddInt操作相当于线程安全的count++操作
synchronize(lock){ count++; }
synchronize的方式会导致占时无法获取锁的线程处于阻塞状态,性能比较低。CAS的性能比synchronize要快很多
ABA问题
如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A。从Java 1.5开始,JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值
普通情况下ABA问题没有危害,不过可以看一种特殊场景,场景是用链表来实现一个栈,初始化向栈中压入B、A两个元素,栈顶head指向A元素。head(A)->B
Thread thread1 = new Thread( ->{ oldValue = head; sleep(3秒); //thread2切换执行 compareAndSet(oldValue, B); } ); Thread thread2 = new Thread( ->{ // 弹出A newHead = head.next; head.next = null; //即A.next = null; head = newHead; // 弹出B newHead = head.next; head.next = null; // 即B.next = null; head = newHead; // 此时head为null // 压入C head = C; // 压入D D.next = head; head = D; // 压入A A.next = D; head = A; } ); thread1.start(); thread2.start();
- 线程1试图将栈顶换成B,但它获取栈顶的oldValue(head,也就是A)后,被线程2中断了。
- 线程2依次将A、B弹出,然后压入C、D、A。
head(A)->D->C
- 然后换线程1继续运行,线程1执行compareAndSet发现head指向的元素确实与oldValue一致,都是A,所以就将head指向B了。
head(B)
但是,线程2在弹出B的时候,将B的next置为null了,因此在线程1将head指向B后,栈中只剩元素B。但按预期来说,栈中应该放的是B → A → D → C
AtomicStampedRerence
可以解决ABA问题,他内部不仅维护了对象的值,还维护了一个时间戳(我们这里把他称为时间戳,实际上它可以使用任何一个整形来表示状态值),当AtomicStampedRerence
对应的数值被修改时,除了更新数据本身外,还必须要更新时间戳。当AtomicStampedRerence
设置对象值时,对象值及时间戳都必须满足期望值,写入才会成功。因此,即使对象值被反复读写,写回原值,只要时间戳发生变量,就能防止不恰当的写入
JUC并发包下的锁
本部分回答以下几个问题,如果能回答正确,则证明本部分掌握好了。
- JUC下锁的继承关系模式
- AQS框架是什么?解决什么问题
接下来我们看这部分的内容。
JUC下的锁结构
根据锁出现在Java中的时间,Java中的锁,可以分为同步锁和JUC包中的锁,同步锁指的是通过synchronized关键字来进行同步,实现对竞争资源的互斥访问的锁,同步锁的原理是:
- 对于每一个对象,有且仅有一个同步锁,不同的线程能共同访问该同步锁。
- 在同一个时间点,该同步锁能且只能被一个线程获取到。
这样,获取到同步锁的线程就能进行CPU调度,从而在CPU上执行;而没有获取到同步锁的线程,必须进行等待,直到获取到同步锁之后才能继续运行
相比同步锁,JUC包中的锁的功能更加强大,它为锁提供了一个框架,该框架允许更灵活地使用锁。
AQS框架
AbstractQueuedSynchronizer就是被称之为AQS的类,它是一个非常有用的超类,可用来定义锁以及依赖于排队阻塞线程的其他同步器,ReentrantLock,ReentrantReadWriteLock,CountDownLatch,CyclicBarrier和Semaphore等这些类都是基于AQS类实现的
AQS核⼼思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的⼯作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占⽤,那么就需要⼀套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是⽤CLH队列锁实现的,即将暂时获取不到锁的线程加⼊到队列中
AQS使⽤⼀个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队⼯作。AQS使⽤CAS对该同步状态进⾏原⼦操作实现对其值的修改
/** * The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS 对资源的共享⽅式
AQS定义两种资源共享⽅式
- Exclusive(独占):只有⼀个线程能执⾏,如ReentrantLock。⼜可分为公平锁和⾮公平锁:公平锁:按照线程在队列中的排队顺序,先到者先拿到锁,⾮公平锁:当线程要获取锁时,⽆视队列顺序直接去抢锁,谁抢到就是谁的
- Share(共享):多个线程可同时执⾏,如Semaphore、CountDownLatch、 CyclicBarrier等
- ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某⼀资源进⾏读。
不同的⾃定义同步器争⽤共享资源的⽅式也不同。⾃定义同步器在实现时只需要实现共享资源 state的获取与释放⽅式即可,⾄于具体线程等待队列的维护(如获取资源失败⼊队/唤醒出队等),AQS已经在顶层实现好了。
AQS底层使用了模板方法模式
同步器的设计是基于模板⽅法模式的,如果需要⾃定义同步器⼀般的⽅式是这样(模板⽅法模式很经典的⼀个应⽤):
- 使⽤者继承AbstractQueuedSynchronizer并重写指定的⽅法。(这些重写⽅法很简单,⽆⾮是对于共享资源state的获取和释放)
- 将AQS组合在⾃定义同步组件的实现中,并调⽤其模板⽅法,⽽这些模板⽅法会调⽤使⽤者重写的⽅法。这和我们以往通过实现接⼝的⽅式有很⼤区别,这是模板⽅法模式很经典的⼀个运⽤。
AQS使⽤了模板⽅法模式,⾃定义同步器时需要重写下⾯⼏个AQS提供的模板⽅法:
isHeldExclusively()//该线程是否正在独占资源。只有⽤到condition才需要去实现它。 tryAcquire(int)//独占⽅式。尝试获取资源,成功则返回true,失败则返回false。 tryRelease(int)//独占⽅式。尝试释放资源,成功则返回true,失败则返回false。 tryAcquireShared(int)//共享⽅式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可⽤资源;正数表示成功,且有剩余资源。 tryReleaseShared(int)//共享⽅式。尝试释放资源,成功则返回true,失败则返回false。
这些⽅法的实现必须是内部线程安全的,并且通常应该简短⽽不是阻塞。AQS类中的其他⽅法都是final ,所以⽆法被其他类使⽤,只有这⼏个⽅法可以被其他类使⽤,例如ReentrantLock实现一个非公平锁
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); }
非公平锁的底层实现调用并重写了tryAcquire
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
JUC并发包下的工具类
本部分回答以下几个问题,如果能回答正确,则证明本部分掌握好了。
- JUC下有哪些并发工具类,它们的作用分别是什么
接下来我们看这部分的内容。
JUC并发包下有四个并发工具类,闭锁CountDownlatch、栅栏CyclicBarrier、信号量Semaphore、交换器Exchanger。
- CountDownlatch通常用于主线程等待其他任务线程执行完毕的场景,类似于Join。它的特点在于它可以让主线程一直等待其它几个线程执行完的时候再执行,例如使用三个线程来打印三个List,三个线程任务都完成得时候才允许主线程继续输出Print Task Finish!
- 通过
CountDownLatch countDownLatch = new CountDownLatch(3);
设置主线程的等待个数 - 每个线程执行完一次后等待数减一,
countDownLatch.countDown(); //锁减去1
- CyclicBarrier主要阻塞当前线程,等待其他线程(大家无论谁先跑到A点,必须要等其他线程也到达了A点,大家才能继续)。相当于大家都干完了自己手头的活然后在一个临界点等待,集齐后一起发力往下执行。可以看的出,CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
设置三个线程到达栅栏后统一执行- 每个线程在执行代码里显示的用
cyclicBarrier.await(); //栅栏唤醒,拦住执行线程
,然后等待大家来齐后一起继续执行
- 信号量Semaphore可以用来控制同时访问特定资源的线程数量(比如100个线程只能有10个线程可以获得MySQL连接)。Semaphore也叫信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。多个线程在到达Semaphore后获取令牌,其中几个被拿到继续执行,执行完后,释放令牌,其它的线程一起争抢。
- 通过
Semaphore semaphore = new Semaphore(5);
设置信号量的个数,在限制被解除前,一次只能有5个线程能活动,即使开启了20个线程。**信号量维护了一个信号量许可集。 - 每个线程执行时遇到
semaphore.acquire();
表示开启信号量限制,线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 - 线程可以通过
semaphore.release();
来释放它所持有的信号量许可
- 交换器Exchanger很少用,只适用于两个线程在同步点交换数据的场景,设置一个同步点,在这个同步点多个线程间两两之间线程可以交换彼此的数据
- 通过方法
Exchanger<String> exchanger = new Exchanger<>();
设置一个同步点 - 在同步点设置自己要交换的信息,
String girl = exchanger.exchange("hi girl");
- 打印的时候打印的就是对方线程给出的变量,例如
System.out.println("girl said: " + girl);
打印出的是girl said: hi boy
其实重点关注下CyclicBarrier和CountDownlatch即可,这两个工具比较常用一些。