Java多线程进阶
锁的类型
乐观锁vs悲观锁
悲观锁:
总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁。
乐观锁:
假设数据一般情况下不会产生并发冲突,所以在数据进行提交更新的时候,才会正式对数据是否产生并发冲突进行检测,如果发现并发冲突了,则让返回用户错误的信息,让用户决定如何去做。
Synchronized 初始使用乐观锁策略. 当发现锁竞争比较频繁的时候, 就会自动切换成悲观锁策略
乐观锁的一个重要功能就是要检测出数据是否发生访问冲突. 我们可以引入一个 “版本号” 来解决访问冲突以及ABA问题
读写锁
读写锁(readers-writer lock),看英文可以顾名思义,在执行加锁操作时需要额外表明读写意图,复数读者之间并不互斥,而写者则要求与任何人互斥。
读写锁就是把读操作和写操作区分对待:
- Java 标准库提供了 ReentrantReadWriteLock 类, 实现了读写锁
- ReentrantReadWriteLock.ReadLock 类表示一个读锁. 这个对象提供了 lock / unlock 方法进行加锁解锁
- ReentrantReadWriteLock.WriteLock 类表示一个写锁. 这个对象也提供了 lock / unlock 方法进行加锁解锁
读写锁特别适合于 “频繁读, 不频繁写” 的场景中. (这样的场景其实也是非常广泛存在的)
重量级锁vs轻量级锁
锁的核心特性 “原子性”, 这样的机制追根溯源是 CPU 这样的硬件设备提供的
CPU 提供了 “原子操作指令”
操作系统基于 CPU 的原子指令, 实现了 mutex 互斥锁
JVM 基于操作系统提供的互斥锁, 实现了 synchronized 和 ReentrantLock 等关键字和类
注意:synchronized 并不仅仅是对 mutex 进行封装, 在 synchronized 内部还做了很多其他的工作
重量级锁: 加锁机制重度依赖了 OS 提供了 mutex,大量的内核态用户态切换,很容易引发线程的调度
轻量级锁: 加锁机制尽可能不使用 mutex, 而是尽量在用户态代码完成. 实在搞不定了, 再使用 mutex,少量的内核态用户态切换,不太容易引发线程调度
synchronized 开始是一个轻量级锁. 如果锁冲突比较严重, 就会变成重量级锁
自旋锁
自旋锁:获取锁失败, 立即再尝试获取锁, 无限循环, 直到获取到锁为止
自旋锁是一种典型的 轻量级锁 的实现方式
优点: 没有放弃 CPU, 不涉及线程阻塞和调度, 一旦锁被释放, 就能第一时间获取到锁
缺点: 如果锁被其他线程持有的时间比较久, 那么就会持续的消耗 CPU 资源. (而挂起等待的时候是不消耗 CPU 的)
synchronized 中的轻量级锁策略大概率就是通过自旋锁的方式实现的
公平锁vs非公平锁
公平锁: 遵守 “先来后到”. B 比 C 先来的. 当 A 释放锁的之后, B 就能先于 C 获取到锁
非公平锁: 不遵守 “先来后到”. B 和 C 都有可能获取到锁
操作系统内部的线程调度就可以视为是随机的. 如果不做任何额外的限制, 锁就是非公平锁
如果要想实现公平锁, 就需要依赖额外的数据结构, 来记录线程们的先后顺序
公平锁和非公平锁没有好坏之分, 关键还是看适用场景
synchronized 是非公平锁
可重入锁vs不可重入锁
可重入锁的字面意思是“可以重新进入的锁”,即允许同一个线程多次获取同一把锁
Java里只要以Reentrant开头命名的锁都是可重入锁,而且JDK提供的所有现成的Lock实现类,包括synchronized关键字锁都是可重入的。
而 Linux 系统提供的 mutex 是不可重入锁
CAS
CAS介绍和原理
CAS: 全称Compare and swap,字面意思:”比较并交换“
一个 CAS 涉及到以下操作:
- 比较 A 与 V 是否相等(比较)
- 如果比较相等,将 B 写入 V(交换)
- 返回操作是否成功
当多个线程同时对某个资源进行CAS操作,只能有一个线程操作成功,但是并不会阻塞其他线程,其他线程只会收到操作失败的信号。
CAS 可以视为是一种乐观锁. (或者可以理解成 CAS 是乐观锁的一种实现方式)
针对不同的操作系统,JVM 用到了不同的 CAS 实现原理:
- java 的 CAS 利用的的是 unsafe 这个类提供的 CAS 操作
- unsafe 的 CAS 依赖了的是 jvm 针对不同的操作系统实现的 Atomic::cmpxchg
- Atomic::cmpxchg 的实现使用了汇编的 CAS 操作,并使用 cpu 硬件提供的 lock 机制保证其原子性
CAS应用
实现原子类:
标准库中提供了 java.util.concurrent.atomic 包, 里面的类都是基于这种方式来实现的
典型的就是 AtomicInteger 类. 其中的 getAndIncrement 相当于 i++ 操作
AtomicInteger atomicInteger = new AtomicInteger(0); // 相当于 i++ atomicInteger.getAndIncrement();
伪代码实现:
class AtomicInteger { private int value; public int getAndIncrement() { int oldValue = value; while ( CAS(value, oldValue, oldValue+1) != true) { oldValue = value; } return oldValue; } }
自旋锁
基于 CAS 实现更灵活的锁, 获取到更多的控制权
自旋锁伪代码:
public class SpinLock { private Thread owner = null; public void lock(){ // 通过 CAS 看当前锁是否被某个线程持有. // 如果这个锁已经被别的线程持有, 那么就自旋等待. // 如果这个锁没有被别的线程持有, 那么就把 owner 设为当前尝试加锁的线程. while(!CAS(this.owner, null, Thread.currentThread())){ } } public void unlock (){ this.owner = null; } }
CAS 的 ABA 问题
问题描述:
两个线程 t1 和 t2. 有一个共享变量 num, 初始值为 A
线程 t1 想使用 CAS 把 num 值改成 Z,需要读取数据和修改两个操作
在 t1 执行这两个操作之间, t2 线程可能把 num 的值从 A 改成了 B, 又从 B 改成了 A
大部分的情况下, t2 线程这样的一个反复横跳改动, 对于 t1 是否修改 num 是没有影响的. 但是不排除一些特殊情况
解决方案:
给要修改的值, 引入版本号. 在 CAS 比较数据当前值和旧值的同时, 也要比较版本号是否符合预期,CAS 操作在读取旧值的同时, 也要读取版本号
真正修改的时候,如果当前版本号和读到的版本号相同, 则修改数据, 并把版本号 + 1;如果当前版本号高于读到的版本号. 就操作失败(认为数据已经被修改过了).
在 Java 标准库中提供了 AtomicStampedReference 类. 这个类可以对某个类进行包装, 在内部就提供了版本管理功能
Synchronized 原理
Synchronized 具有以下特性(只考虑 JDK 1.8):
- 开始时是乐观锁, 如果锁冲突频繁, 就转换为悲观锁
- 开始是轻量级锁实现, 如果锁被持有的时间较长, 就转换成重量级锁
- 实现轻量级锁的时候大概率用到的自旋锁策略
- 是一种不公平锁
- 是一种可重入锁
- 不是读写锁
JVM 将 synchronized 锁分为 无锁、偏向锁、轻量级锁、重量级锁 状态。会根据情况,进行依次升级
偏向锁:
- 第一个尝试加锁的线程, 优先进入偏向锁状态
- 偏向锁不是真的 “加锁”, 只是给对象头中做一个 “偏向锁的标记”, 记录这个锁属于哪个线程
- 如果后续没有其他线程来竞争该锁, 那么就不用进行其他同步操作了(避免了加锁解锁的开销)
- 如果后续有其他线程来竞争该锁(刚才已经在锁对象中记录了当前锁属于哪个线程了, 很容易识别当前申请锁的线程是不是之前记录的线程), 那就取消原来的偏向锁状态, 进入一般的轻量级锁状态
轻量级锁:
- 随着其他线程进入竞争, 偏向锁状态被消除, 进入轻量级锁状态(自适应的自旋锁),此处的轻量级锁就是通过 CAS 来实现
- 通过 CAS 检查并更新一块内存 (比如 null => 该线程引用)
- 如果更新成功, 则认为加锁成功;如果更新失败, 则认为锁被占用, 继续自旋式的等待(并不放弃 CPU)
重量级锁:
- 如果竞争进一步激烈, 自旋不能快速获取到锁状态, 就会膨胀为重量级锁,此处的重量级锁就是指用到内核提供的 mutex
- 执行加锁操作, 先进入内核态,在内核态判定当前锁是否已经被占用
- 如果该锁没有占用, 则加锁成功, 并切换回用户态;如果该锁被占用, 则加锁失败. 此时线程进入锁的等待队列, 挂起. 等待被操作系统唤醒
其他的优化操作:
锁消除:
编译器+JVM 判断锁是否可消除. 如果可以, 就直接消除
有些应用程序的代码中, 用到了 synchronized, 但其实没有在多线程环境下, 那么这些加锁解锁操作是没有必要的, 白白浪费了一些资源开销
锁粗化:
一段逻辑中如果出现多次加锁解锁, 编译器 + JVM 会自动进行锁的粗化
实际开发过程中, 使用细粒度锁, 是期望释放锁的时候其他线程能使用锁,但是实际上可能并没有其他线程来抢占这个锁,这种情况 JVM 就会自动把锁粗化, 避免频繁申请释放锁
Callable 接口
Callable 是一个 interface,相当于把线程封装了一个 “返回值”,方便程序得到多线程的方式计算结果
Callable<Integer> callable = new Callable<Integer>() { @Override public Integer call() throws Exception { int sum = 0; for (int i = 1; i <= 1000; i++) { sum += i; } return sum; } }; FutureTask<Integer> futureTask = new FutureTask<>(callable); Thread t = new Thread(futureTask); t.start(); int result = futureTask.get(); System.out.println(result);
Callable 和 Runnable 相对, 都是描述一个 “任务”:Callable 描述的是带有返回值的任务,Runnable 描述的是不带返回值的任务
Callable 通常需要搭配 FutureTask 来使用:FutureTask 用来保存 Callable 的返回结果,Callable 往往是在另一个线程中执行的, 啥时候执行完并不确定,FutureTask 就可以负责这个等待结果出来的工作
JUC常见类
ReentrantLock
可重入互斥锁,和 synchronized 定位类似, 都是用来实现互斥效果, 保证线程安全
ReentrantLock 的用法:
- lock(): 加锁, 如果获取不到锁就死等
- trylock(超时时间): 加锁, 如果获取不到锁, 等待一定的时间之后就放弃加锁
- unlock(): 解锁
ReentrantLock 和 synchronized 的区别:
- synchronized 是一个关键字, 是 JVM 内部实现的(大概率是基于 C++ 实现);ReentrantLock 是标准库的一个类, 在 JVM 外实现的(基于 Java 实现)
- synchronized 使用时不需要手动释放锁;ReentrantLock 使用时需要手动释放. 使用起来更灵活,但是也容易遗漏 unlock
- synchronized 在申请锁失败时, 会死等;ReentrantLock 可以通过 trylock 的方式等待一段时间就放弃
- synchronized 是非公平锁;ReentrantLock 默认是非公平锁. 可以通过构造方法传入一个 true 开启公平锁模式
// ReentrantLock 的构造方法 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
- 更强大的唤醒机制:synchronized 是通过 Object 的 wait / notify 实现等待-唤醒,每次唤醒的是一个随机等待的线程;ReentrantLock 搭配 Condition 类实现等待-唤醒, 可以更精确控制唤醒某个指定的线程
如何选择使用:
- 锁竞争不激烈的时候, 使用 synchronized, 效率更高, 自动释放更方便
- 锁竞争激烈的时候, 使用 ReentrantLock, 搭配 trylock 更灵活控制加锁的行为, 而不是死等
- 如果需要使用公平锁, 使用 ReentrantLock;如果需要精准唤醒指定线程, 使用 ReentrantLock
原子类
原子类内部用的是 CAS 实现,所以性能要比加锁实现 i++ 高很多。
原子类有以下几个:
AtomicBoolean AtomicInteger AtomicIntegerArray AtomicLong AtomicReference AtomicStampedReference
AtomicInteger常见方法有:
addAndGet(int delta); i += delta; decrementAndGet(); --i; getAndDecrement(); i--; incrementAndGet(); ++i; getAndIncrement(); i++;
线程池
虽然创建销毁线程比创建销毁进程更轻量, 但是在频繁创建销毁线程的时候还是会比较低效
线程池就是为了解决这个问题. 如果某个线程不再使用了, 并不是真正把线程释放, 而是放到一个 "池子"中, 下次如果需要用到线程就直接从池子中取, 不必通过系统来创建了
ExecutorService 和 Executors:
- ExecutorService 表示一个线程池实例
- Executors 是一个工厂类, 能够创建出几种不同风格的线程池
- ExecutorService 的 submit 方法能够向线程池中提交若干个任务
ExecutorService pool = Executors.newFixedThreadPool(10); pool.submit(new Runnable() { @Override public void run() { System.out.println("hello"); } });
Executors 创建线程池的几种方式:
- newFixedThreadPool: 创建固定线程数的线程池
- newCachedThreadPool: 创建线程数目动态增长的线程池
- newSingleThreadExecutor: 创建只包含单个线程的线程池
- newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer
Executors 本质上是 ThreadPoolExecutor 类的封装
ThreadPoolExecutor:
ThreadPoolExecutor 提供了更多的可选参数, 可以进一步细化线程池行为的设定
理解 ThreadPoolExecutor 构造方法的参数:
把创建一个线程池想象成开个公司. 每个员工相当于一个线程
- corePoolSize: 正式员工的数量. (正式员工, 一旦录用, 永不辞退)
- maximumPoolSize: 正式员工 + 临时工的数目. (临时工: 一段时间不干活, 就被辞退)
- keepAliveTime: 临时工允许的空闲时间
- unit: keepaliveTime 的时间单位, 是秒, 分钟, 还是其他值
- workQueue: 传递任务的阻塞队列
- threadFactory: 创建线程的工厂, 参与具体的创建线程工作
- RejectedExecutionHandler: 拒绝策略, 如果任务量超出公司的负荷了接下来怎么处理.
- AbortPolicy(): 超过负荷, 直接抛出异常
- CallerRunsPolicy(): 调用者负责处理
- DiscardOldestPolicy(): 丢弃队列中最老的任务
- DiscardPolicy(): 丢弃新来的任务
ExecutorService pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for(int i=0;i<3;i++) { pool.submit(new Runnable() { @Override void run() { System.out.println("hello"); } }); }
信号量 Semaphore
信号量, 用来表示 “可用资源的个数”. 本质上就是一个计数器
Semaphore 的 PV 操作中的加减计数器操作都是原子的, 可以在多线程环境下直接使用
Semaphore semaphore = new Semaphore(4); Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("申请资源"); semaphore.acquire(); System.out.println("我获取到资源了"); Thread.sleep(1000); System.out.println("我释放资源了"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }; for (int i = 0; i < 20; i++) { Thread t = new Thread(runnable); t.start(); }
创建 Semaphore 示例, 初始化为 4, 表示有 4 个可用资源
acquire 方法表示申请资源(P操作), release 方法表示释放资源(V操作)
创建 20 个线程, 每个线程都尝试申请资源, sleep 1秒之后, 释放资源. 观察程序的执行效果
CountDownLatch
同时等待 N 个任务执行结束
public class Demo { public static void main(String[] args) throws Exception { CountDownLatch latch = new CountDownLatch(10); Runnable r = new Runable() { @Override public void run() { try { Thread.sleep(Math.random() * 10000); latch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }; for (int i = 0; i < 10; i++) { new Thread(r).start(); } // 必须等到 10 人全部回来 latch.await(); System.out.println("比赛结束"); } }
构造 CountDownLatch 实例, 初始化 10 表示有 10 个任务需要完成
每个任务执行完毕, 都调用 latch.countDown() . 在 CountDownLatch 内部的计数器同时自减
主线程中使用 latch.await(); 阻塞等待所有任务执行完毕. 相当于计数器为 0 了
线程安全的集合类
原来的集合类, 大部分都不是线程安全的:Vector, Stack, HashTable是线程安全的(不建议用), 其他的集合类不是线程安全的
多线程环境使用 ArrayList:
自己使用同步机制 (synchronized 或者 ReentrantLock)
Collections.synchronizedList(new ArrayList):synchronizedList 是标准库提供的一个基于 synchronized 进行线程同步的 List,synchronizedList 的关键操作上都带有 synchronized
使用 CopyOnWriteArrayList:CopyOnWrite容器即写时复制的容器
当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
优点:在读多写少的场景下, 性能很高, 不需要加锁竞争
缺点:占用内存较多;新写的数据不能被第一时间读取到.
多线程环境使用队列:
ArrayBlockingQueue:基于数组实现的阻塞队列
LinkedBlockingQueue:基于链表实现的阻塞队列
PriorityBlockingQueue:基于堆实现的带优先级的阻塞队列
TransferQueue:最多只包含一个元素的阻塞队列
多线程环境使用哈希表:
HashMap 本身不是线程安全的
在多线程环境下使用哈希表可以使用:Hashtable;ConcurrentHashMap
Hashtable:只是简单的把关键方法(put,get)加上了 synchronized 关键字
相当于直接针对 Hashtable 对象本身加锁
如果多线程访问同一个 Hashtable 就会直接造成锁冲突,size 属性也是通过 synchronized 来控制同步, 也是比较慢的,一旦触发扩容, 就由该线程完成整个扩容过程. 这个过程会涉及到大量的元素拷贝, 效率会非常低
ConcurrentHashMap:
相比于 Hashtable 做出了一系列的改进和优化
读操作没有加锁(但是使用了 volatile 保证从内存读取结果), 只对写操作进行加锁. 加锁的方式仍然是是用 synchronized, 但是不是锁整个对象, 而是 “锁桶” (用每个链表的头结点作为锁对象), 大大降低了锁冲突的概率
充分利用 CAS 特性. 比如 size 属性通过 CAS 来更新. 避免出现重量级锁的情况.
优化了扩容方式: 化整为零
- 发现需要扩容的线程, 只需要创建一个新的数组, 同时只搬几个元素过去
- 扩容期间, 新老数组同时存在
- 后续每个来操作 ConcurrentHashMap 的线程, 都会参与搬家的过程. 每个操作负责搬运一小部分元素
- 搬完最后一个元素再把老数组删掉
- 这个期间, 插入只往新数组加
- 这个期间, 查找需要同时查新数组和老数组