Java J.U.C 学习笔记-使用篇(一)
一直想写一篇关于多线程的文章,但是迟迟没有动笔,主要是感觉自己掌握的还是不够牢固,怕写出的东西误人子弟,不过看了看自己博客的访问量,感觉这个担心是多余的了。小丑竟是我自己.jpg
Java 多线程如果根据使用场景分的话可以分为两类类。
- 线程间的竞争。多个线程同时竞争一个资源。这个时候需要想办法保证,资源在同一时刻只能由一个线程操作。
- 线程间的通信。多个线程间需要通过信息传递。协同完成一件事情,这个时候要想办法处理线程间如何通信,如果交互。
下面我会针对这两种情况分别说明。
1. 线程间的竞争
线程间的竞争是指,多个线程间同时需要访问一个资源,而这个资源如果不是线程安全的,那么就会出现不可预知的问题。比如下面这段常见的代码
static int count = 0;
public static void func1() throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
count++;
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
count++;
}
});
t1.start();
t2.start();
//等待两个线程执行结束
Thread.sleep(1000);
System.out.println(count);
}
你会发现每次执行的结果可能都不一样,但是都会比 20000 小,这是因为 count++
并不是原子操作,t1 读到 count = A,时,t2 读取到的可能也是 count = A,这个时候同时执行 count++
就会少加一次,某个线程的结果被另外一个线程覆盖掉。导致最终的结果小于 20000。
这个问题就是我们上文说的那样,多个线程同时竞争一个资源 count 的值。这个时候我们需要想办法保证 count 在同一个时刻只有一个线程访问。
1.1 synchronized 关键字
第一种办法是使用synchronized关键字,synchronize 关键字可以对代码进行自动加锁和解锁操作,保证同时只有一个线程操作。例如:
static int count = 0;
public static void func1() throws InterruptedException {
Object lock = new Object();
Thread t1 = new Thread(() -> {
synchronized (lock) {
for (int i = 0; i < 10000; i++) {
count++;
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock) {
for (int i = 0; i < 10000; i++) {
count++;
}
}
});
t1.start();
t2.start();
//等待两个线程执行结束
Thread.sleep(1000);
System.out.println(count);
}
在 JDK1.6 之前,synchronize 关键字还是一个重量级的锁,底层是通过阻塞和唤醒机制实现的,所以需要操作系统切换CPU的执行状态,非常耗时。在 JDK1.6之后引入了偏向锁和轻量级锁后,synchronize 关键字的效率大大提高。
1.2 原子类AtomicInteger
synchronize 关键字是在操作上进行加锁控制,而原子类是在资源上进行控制,原子类内部可以保证只有一个线程访问该资源。例如:
static AtomicInteger atomicCount = new AtomicInteger(0);
public static void func2() throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
atomicCount.getAndAdd(1);
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
atomicCount.addAndGet(1);
}
});
t1.start();
t2.start();
//等待两个线程执行结束
Thread.sleep(1000);
System.out.println(atomicCount.get());
}
原子类的更新操作是使用的 CAS 指令。所以从性能上来说是要比 synchronize 高的。而且原子类也不止支持常见的基本类型,还支持数组(AtomicIntegerArray,AtomicLongArray,BooleanArray)和引用类型(AtomicReference,AtomicMarkedReference,AtomicStampedReference)。
1.3 可重入锁 ReentrantLock
ReentrantLock 支持加锁和解锁两个操作,可以在适当的时候对操作进行加锁。作用的内容还是操作行为,不是资源。比如:
public static ReentrantLock reentrantLock = new ReentrantLock(true);
public static void func4() throws InterruptedException {
Thread t1 = new Thread(() -> {
reentrantLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
reentrantLock.unlock();
});
Thread t2 = new Thread(() -> {
reentrantLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
reentrantLock.unlock();
});
t1.start();
t2.start();
//等待两个线程执行结束
Thread.sleep(1000);
System.out.println(count);
}
使用方式跟 synchronize 关键字很类似,区别是 synchronize 是自动的加锁和解锁,而 reentrantLock 是手动加锁操作的。构造参数支持公平和非公平锁。
1.4 读写锁 ReentrantReadWriteLock
ReentrantReadWriteLock 内部有两个锁分别是 ReadLock(读锁) 和 WriteLock(写锁)。读锁可以被多个线程共享。写锁是只能有一个线程持有,而且读写锁是互斥的,就是说,如果有线程持有写锁,那么读锁是获取不到的(线程等待),同样如果有线程持有写锁,读锁也是获取不到的。例如:
static ReentrantReadWriteLock rrwLock = new ReentrantReadWriteLock();
public static void func7(){
Thread t1 = new Thread(() -> {
rrwLock.readLock().lock();
System.out.println("t1 read lock");
for (int i = 0; i < 10000; i++) {
count++;
}
rrwLock.readLock().unlock();
System.out.println("t1 read unlock");
});
Thread t2 = new Thread(() -> {
rrwLock.writeLock().lock();
System.out.println("t2 write lock");
for (int i = 0; i < 10000; i++) {
count++;
}
rrwLock.writeLock().unlock();
System.out.println("t2 write unlock");
});
Thread t3 = new Thread(() -> {
rrwLock.readLock().lock();
System.out.println("t3 read lock");
for (int i = 0; i < 10000; i++) {
count++;
}
rrwLock.readLock().unlock();
System.out.println("t3 read unlock");
});
t1.start();
t2.start();
t3.start();
}
public static void main(String[] args) throws Exception {
func7();
Thread.sleep(1000);
System.out.println(count);
}
以上面的代码为例,程序启动了 3 个线程,其中 t1、t3 是获取的读锁,可以同时执行,t2 获取的是写锁,不能与 t1、t3 同时执行,所以你看到的执行顺序可能是下面几种情况。
t1 read lock
t3 read lock
t1 read unlock
t3 read unlock
t2 write lock
t2 write unlock
24523
t1、t3 同时执行,最后执行 t2,从这个执行结果我们也能够看到,count的值小于 30000 所以 t1、t2 是同时执行的。
t1 read lock
t1 read unlock
t2 write lock
t2 write unlock
t3 read lock
t3 read unlock
30000
t1、t2、t3 顺序执行
也有可能是其他情况,但是,不管怎样执行,t2 线程一定是在无锁的情况下执行的。要么 t1、t3 线程还没有开始,要么就是已经执行结束了。
2. 线程间的通信
线程间的通信主要是指线程间的等待-通知机制。A线程等待B线程执行,B线程执行到某个环节后,通知A线程继续执行。
2.1 LockSupport
当调用LockSupport.park()
时,会阻塞方法,表示当前线程将进入等待状态。当调用LockSupport.unpark(Thread thread)
时表示需要唤醒参数指定的线程。例如:
public static void func3(Thread thread) throws InterruptedException {
Thread t1 = new Thread(() -> {
System.out.println("t1,run");
LockSupport.park("t1");
for (int i = 0; i < 10000; i++) {
count++;
}
System.out.println("t1 park " + LockSupport.getBlocker(thread).toString());
LockSupport.unpark(thread);
System.out.println("t1,run end");
});
Thread t2 = new Thread(() -> {
System.out.println("t2,run");
LockSupport.park("t2");
for (int i = 0; i < 10000; i++) {
count++;
}
System.out.println("t2 park " + LockSupport.getBlocker(t1).toString());
LockSupport.unpark(t1);
System.out.println("t2,run end");
});
Thread t3 = new Thread(() -> {
System.out.println("t3,run");
for (int i = 0; i < 10000; i++) {
count++;
}
System.out.println("t3 park " + LockSupport.getBlocker(t2).toString());
LockSupport.unpark(t2);
System.out.println("t3,run end");
});
t1.start();
t2.start();
t3.start();
}
public static void main(String[] args) throws Exception {
func3(Thread.currentThread());
LockSupport.park("main");
System.out.println(count);
System.out.println("main,run end");
}
main 方法先执行,调用 func3 后进入等待状态,func3 开启了 3 个线程,t1 和 t2 如果先抢到执行权则直接 park 进入等待状态,t3 先执行,然后通知 t2,t2 执行结束后通知 t1,最后 t1 通知主线程。至于 park 函数的参数,主要是用于打印日志,接入监控等后期排查问题使用的。LockSupport 还有另外一个方法parkNanos(Object blocker, long nanos) 支持等待时长配置,等待 nanos 秒之后,继续执行。
LockSupport 的 park 函数和 unpark 函数与 Java 中的 wait 和 notify 函数非常类似。与 wait-notify 方法的区别是 wait 方法必须先调用,然后再调用 notify,反过来的的话将一直无法唤醒等待的线程。而 park 和 unpark 则没有这个问题。但是,如果在调用 unpark 时,对应的线程还没有启动,则有可能出现线程无法唤醒的问题。比如,我们将启动顺序做如下修改:
t3.start();
Thread.sleep(100);
t1.start();
t2.start();
这个时候则会出现线程 t1、t2 一直等待的情况。
2.2 ReentrantLock 的 Condition
ReentrantLock 的 Condition 对象也支持线程间等待通知机制。
static ReentrantLock reentrantLock = new ReentrantLock(true);
static Condition c1 = reentrantLock.newCondition();
static Condition c2 = reentrantLock.newCondition();
public static void func5() throws Exception {
Thread t1 = new Thread(() -> {
reentrantLock.lock();
System.out.println("t1,run");
try {
c1.await();
} catch (InterruptedException e) {
System.out.println("error");
}
for (int i = 0; i < 10000; i++) {
count++;
}
reentrantLock.unlock();
System.out.println("t1,run end");
});
Thread t2 = new Thread(() -> {
reentrantLock.lock();
System.out.println("t2,run");
try {
c2.await();
} catch (InterruptedException e) {
System.out.println("error");
}
for (int i = 0; i < 10000; i++) {
count++;
}
c1.signal();
reentrantLock.unlock();
System.out.println("t2,run end");
});
Thread t3 = new Thread(() -> {
reentrantLock.lock();
System.out.println("t3,run");
for (int i = 0; i < 10000; i++) {
count++;
}
c2.signal();
reentrantLock.unlock();
System.out.println("t3,run end");
});
t1.start();
t2.start();
t3.start();
}
func5 创建了三个线程,其中 t1、t2 线程如果先抢到执行权的话会进入等待状态,t3 先执行。然后通知 t2,t2 再通知 t1。需要注意的是,await 和 signal 方法必须在 lock 和 unlock 之间调用。因为 Condition 对象是强依赖 ReentrantLock 的。await 和 signal 底层也是 LockSupport 实现的,所以 await 和 signal 的调用顺序是不会造成死锁的,唯一需要注意的也是,调用 signal 时要保证线程是存活的。
2.3 同步计数器 CountDownLatch
CountDownLatch 是一个计数器工具类,构造方法必须传入一个大于 0 的整型值。而每调用一次 countDown() 方法 这个值就会减一。CountDownLatch 的 await() 方法会一直等待到这个值为 0 后才会继续执行。CountDownLatch 工具有点类似于 join 方法。主要功能就是等待 N 个线程全部执行完之后,某个线程才会继续执行。代码如下:
static CountDownLatch downLatch = new CountDownLatch(3);
public static void func8(){
Thread t1 = new Thread(() -> {
System.out.println("t1 run");
downLatch.countDown();
System.out.println("t1 run end");
});
Thread t2 = new Thread(() -> {
System.out.println("t2 run");
downLatch.countDown();
System.out.println("t2 run end");
});
Thread t3 = new Thread(() -> {
System.out.println("t3 run");
downLatch.countDown();
System.out.println("t3 run end");
});
t1.start();
t2.start();
t3.start();
}
public static void main(String[] args) throws Exception {
func8();
downLatch.await();
System.out.println("main end");
}
三个线程同时执行,每一个线程执行一次 countDown() 初始值就会减一,直到为 0 ,主线程继续执行。
2.4 栅栏 CyclicBarrier
与计数器的区别是,CyclicBarrier 的作用是,等待 N 个线程同时执行到某个节点(栅栏)后,N 个线程同时开始执行。举个简单的例子,王者荣耀在进入游戏时,10个用户线程需要等待服务器的资源全部准备完成后才同时进入到游戏页面。这个时候就可以使用 CyclicBarrier。而且 CyclicBarrier 可以重用。例如:
static CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("cyclicBarrier run ");
}
});
public static void func9(){
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("t1 run"+i);
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("t1 run end"+i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("t2 run"+i);
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("t2 run end"+i);
}
});
Thread t3 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("t3 run"+i);
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("t3 run end"+i);
}
});
t1.start();
t2.start();
t3.start();
}
public static void main(String[] args) throws Exception {
func9();
}
代码开启了 3 个线程,每一个线程都会循环 10 次。方法的执行结果一定是每个线程的第 i 次循环依次执行的,如果某个线程先执行到第 i 次循环,那么就会等待另外两个线程。直到另外两个线程也循环到 i,才会继续执行。CyclicBarrier 的构造方法 第一个参数是需要等待的线程数量。N 个线程同时开始运行,先到终点的线程需要等他其他线程到达终点后才能继续同时跑第二圈。与 CountDownLatch 相比的另外一个区别是 CyclicBarrier 支持循环使用,栅栏破坏后,还能继续循环使用。而且可以使用 reset()
可以手动重置。也可以调用getNumberWaiting()
方法获取当前在等待的线程数量。
2.5 信号量 Semaphore
Semaphore称为计数信号量,它允许n个任务同时访问某个资源,可以将信号量看做是在向外分发使用资源的许可证,只有成功获取许可证,才能使用资源。
public static Semaphore semaphore = new Semaphore(1);
public static void func10(){
Thread t1 = new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t1 run");
for (int i = 0; i < 10000; i++) {
count++;
}
System.out.println("t1 run end");
semaphore.release();
});
Thread t2 = new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2 run");
for (int i = 0; i < 10000; i++) {
count++;
}
System.out.println("t2 run end");
semaphore.release();
});
Thread t3 = new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t3 run");
for (int i = 0; i < 10000; i++) {
count++;
}
System.out.println("t3 run end");
semaphore.release();
});
t1.start();
t2.start();
t3.start();
}
信号量的初始化函数需要传入可用的令牌数量,每次调用acquire()
方法来获取一个令牌,如果当前没有可用到的令牌就会一直等待下去,直到有可用的令牌或者线程被中断。 调用release()
方法会释放一个令牌。这里需要注意的是,release 方法会增加令牌数量,每调用一次就会增加一个令牌,甚至在某些情况下回超过初始化设置的值。
2.6 阶段器 Phaser
Phaser 是 JDK1.7 新增的一个工具类。功能类似于 CountDownLatch 和 CyclicBarrier 组合。协调多个线程到达某个时间点后同时开始执行(类似于CountDownLatch)。这个过程可以重复执行(类似于CyclicBarrier)。不同的是,Phaser支持在运行期间增加或者减少参与的线程数量。
CyclicBarrier 有栅栏的概念,而在 Phaser 中栅栏被称作阶段(phase) 当所有参与者都达到当前阶段后,阶段或自动递增。同时所有线程同时开始执行。
public static Phaser phaser = new Phaser(3){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("phase:" + phase + ",registeredParties:" + registeredParties);
return super.onAdvance(phase, registeredParties);
}
};
public static void func11(){
Thread t1 = new Thread(() -> {
System.out.println("t1 run");
for (int i = 0; i < 10000; i++) {
phaser.arriveAndAwaitAdvance();
count++;
System.out.println("t1:" + i);
}
System.out.println("t1 run end");
phaser.arriveAndDeregister();
});
Thread t2 = new Thread(() -> {
System.out.println("t2 run");
for (int i = 0; i < 10000; i++) {
phaser.arriveAndAwaitAdvance();
count++;
System.out.println("t2:" + i);
}
System.out.println("t2 run end");
phaser.arriveAndDeregister();
});
Thread t3 = new Thread(() -> {
System.out.println("t3 run");
for (int i = 0; i < 10000; i++) {
phaser.arriveAndAwaitAdvance();
count++;
System.out.println("t3:" + i);
}
System.out.println("t3 run end");
phaser.arriveAndDeregister();
});
t1.start();
t2.start();
t3.start();
}
初始化时可以指定参与线程的个数。执行期间也可以通过 register()
方法增加线程个数。而 arriveAndAwaitAdvance()
方法是等待方法,所有线程执行到该方法后,才会继续执行。arriveAndDeregister()
方法的作用是减少参与线程的个数。构造方法支持重写 onAdvance()
方法,该方法每次所有线程到达某一个阶段时执行一次。类似于 CyclicBarrier 的 Runnable 参数。
PS:代码里的 phaser 只能保证每个线程的第 i 次循环执行后才会同时执行下一次,但是并不能保证 count 的值是正确的,因为每次循环都是 3 个线程在同时进行 +1 操作,最终的值也一定是不准确的。
3. 线程间的数据交互
上面的线程间的通信其实更偏向于线程间的协调。协调多个线程同时处理一个任务。而在某些场景下我们需要知道其他线程的执行结果。这个时候就需要在多线程之间进行数据的交互了。
3.1 Fork/Join 框架
fork/join 框架是 JDK1.7 新增的工具类,它是分治算法的并行实现。ForkJoinTask 常用的子类有三个:
- RecursiveTask :执行任务并返回结果
- RecursiveAction:无返回值
- CountedCompleter:任务执行结束后,会触发一个自定义函数。并且可以获取任务执行的结果。
public static void func12(){
List<RecursiveTask<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < 3; i++) {
int finalI = i;
RecursiveTask<Integer> recursiveTask = new RecursiveTask<Integer>() {
@Override
protected Integer compute() {
if (finalI % 2 == 0) {
throw new NullPointerException("sssssss");
}
System.out.println(finalI + " run");
int total = 0;
for (int i = 0; i < 10000; i++) {
total++;
}
return total;
}
};
tasks.add(recursiveTask);
recursiveTask.fork();
}
for (RecursiveTask<Integer> task : tasks) {
try {
System.out.println("join:"+task.join());
} catch (Exception e) {
System.out.println(e);
}
System.out.println("getRawResult:" + task.getRawResult());
}
}
public static void func13(){
List<RecursiveAction> tasks = new ArrayList<>();
for (int i = 0; i < 3; i++) {
int finalI = i;
RecursiveAction recursiveTask = new RecursiveAction() {
@Override
protected void compute() {
System.out.println(finalI + " run");
}
};
recursiveTask.fork();
}
}
public static void func14() {
List<CountedCompleter<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < 3; i++) {
int finalI = i;
CountedCompleter<Integer> countedCompleter = new CountedCompleter<Integer>() {
int total = 0;
@Override
public void compute() {
System.out.println(finalI + " run");
for (int i = 0; i < 10000; i++) {
total++;
}
tryComplete();
}
@Override
public void onCompletion(CountedCompleter<?> caller) {
System.out.println("onCompletion run!" + finalI + ",total:" + total);
}
@Override
public Integer getRawResult() {
return total;
}
};
tasks.add(countedCompleter);
countedCompleter.fork();
}
for (CountedCompleter<Integer> task : tasks) {
task.join();
System.out.println(task.getRawResult());
}
System.out.println("func end!");
}
fork 函数将启动线程开始执行。join 将等待线程执行结束,并返回结果。如果线程内部有异常抛出,则可以在 join 方法处捕获到,而 getRawResult 函数则是获取执行结果,并不会等待线程执行结束,如果直接调用 getRawResult 则有可能获取到的是执行到一半的结果。所以在调用 getRawResult 之前一定要先调用 join 方法。
对于 CountedCompleter 类来说,compute 执行结束后,会自动执行 onCompletion 方法。可以将一些任务执行完成的逻辑放到这个函数里。同样也是可以通过 join 和 getRawResult 方法获取执行结果,不过需要自己实现 getRawResult 方法的逻辑。
PS: compute 方法内,需要手动调用 tryComplete 函数,不然 join 将会一直等待,程序无法正常结束!
3.2 交换工具类 Exchanger
Exchanger 主要是用来在两个线程间进行数据交换的工具类。它可以在某个时间点等待另外一个线程进行交换数据的动作。
public static void func15(){
Exchanger<Integer> exchanger = new Exchanger<>();
Thread t1 = new Thread(() -> {
System.out.println("t1 run");
int num = 0;
for (int i = 0; i < 10000; i++) {
num++;
}
try {
count = exchanger.exchange(num) + num;
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t1 run end");
});
Thread t2 = new Thread(() -> {
System.out.println("t2 run");
int num = 0;
for (int i = 0; i < 10000; i++) {
num++;
}
try {
count = exchanger.exchange(num) + num;
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2 run end");
});
t1.start();
t2.start();
}
exchange
方法会同步等待另外一个线程进行数据交换。需要注意的是,交换数据一定是两两交换,所以使用 Exchanger 的线程一定要是双数,不然就会有一个线程一直在等待交换,无法结束。
3.3 FutureTask
FutureTask 可以通过 get 方法来获取线程的执行结果。是开发中比较常用的一个工具类。基本上可以支持大部分场景。
public static void func16() throws ExecutionException, InterruptedException {
List<FutureTask<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < 3; i++) {
FutureTask<Integer> task = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int num = 0;
for (int j = 0; j < 100; j++) {
num++;
}
return num;
}
});
tasks.add(task);
task.run();
}
for (FutureTask<Integer> task : tasks) {
System.out.println(task.get());
}
}
tast.get()
方法会等待线程的执行结果。而且可以从该方法中捕获线程内部抛出的异常。
4 总结
以上就是 JDK 提供的一些多线程的工具类,其实Java多线程并没有想象中的那么难,无非就是线程竞争要加锁,常用的就是synchronized和ReentrantLock。线程通信的话就是等待-通知机制,常用的就是 CountDownLatch和CyclicBarrier,如果要获取线程的执行结果就直接FutureTask。