join() 方法
private static void join() throws InterruptedException { Thread t1 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }) ; Thread t2 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running2"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } }) ; t1.start(); t2.start(); //等待线程1终止 t1.join(); //等待线程2终止 t2.join(); LOGGER.info("main over"); }
输出结果:
2018-03-16 20:21:30.967 [Thread-1] INFO c.c.actual.ThreadCommunication - running2 2018-03-16 20:21:30.967 [Thread-0] INFO c.c.actual.ThreadCommunication - running 2018-03-16 20:21:34.972 [main] INFO c.c.actual.ThreadCommunication - main over
在 t1.join()
时会一直阻塞到 t1 执行完毕,所以最终主线程会等待 t1 和 t2 线程执行完毕。
其实从源码可以看出,join() 也是利用的等待通知机制:
核心逻辑:
while (isAlive()) { wait(0); }
在 join 线程完成后会调用 notifyAll() 方法,是在 JVM 实现中调用,所以这里看不出来。
volatile 共享内存
因为 Java 是采用共享内存的方式进行线程通信的,所以可以采用以下方式用主线程关闭 A 线程:
public class Volatile implements Runnable{ private static volatile boolean flag = true ; @Override public void run() { while (flag){ System.out.println(Thread.currentThread().getName() + "正在运行。。。"); } System.out.println(Thread.currentThread().getName() +"执行完毕"); } public static void main(String[] args) throws InterruptedException { Volatile aVolatile = new Volatile(); new Thread(aVolatile,"thread A").start(); System.out.println("main 线程正在运行") ; TimeUnit.MILLISECONDS.sleep(100) ; aVolatile.stopThread(); } private void stopThread(){ flag = false ; } }
输出结果:
thread A正在运行。。。 thread A正在运行。。。 thread A正在运行。。。 thread A正在运行。。。 thread A执行完毕
这里的 flag 存放于主内存中,所以主线程和线程 A 都可以看到。
flag 采用 volatile 修饰主要是为了内存可见性,更多内容可以查看这里。
CountDownLatch 并发工具
CountDownLatch 可以实现 join 相同的功能,但是更加的灵活。
private static void countDownLatch() throws Exception{ int thread = 3 ; long start = System.currentTimeMillis(); final CountDownLatch countDown = new CountDownLatch(thread); for (int i= 0 ;i<thread ; i++){ new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { Thread.sleep(2000); countDown.countDown(); LOGGER.info("thread end"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } countDown.await(); long stop = System.currentTimeMillis(); LOGGER.info("main over total time={}",stop-start); }
输出结果:
2018-03-16 20:19:44.126 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:44.126 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:44.126 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:46.136 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [main] INFO c.c.actual.ThreadCommunication - main over total time=2012
CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 实现的,更多实现参考 ReentrantLock 实现原理
- 初始化一个 CountDownLatch 时告诉并发的线程,然后在每个线程处理完毕之后调用 countDown() 方法。
- 该方法会将 AQS 内置的一个 state 状态 -1 。
- 最终在主线程调用 await() 方法,它会阻塞直到
state == 0
的时候返回。
CyclicBarrier 并发工具
private static void cyclicBarrier() throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ; new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { Thread.sleep(5000); cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); LOGGER.info("main thread"); }