工具类&容器类
这里要说到一些我们在平时开发中经常使用到的一些类以及他们的实现原理。
工具类&容器类
CountDownLatch
CountDownLatch适用于在多线程的场景需要等待所有子线程全部执行完毕之后再做操作的场景。
假设现在我们有一个业务场景,我们需要调用多个RPC接口去查询数据并且写入excel,最后把所有excel打包压缩发送邮件出去。
public class CountDownLatchTest { public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newFixedThreadPool(10); CountDownLatch countDownLatch = new CountDownLatch(2); executorService.submit(()->{ try { Thread.sleep(1000); System.out.println("写excelA完成"); countDownLatch.countDown(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); executorService.submit(()->{ try { Thread.sleep(3000); System.out.println("写excelB完成"); countDownLatch.countDown(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); System.out.println("等待excel写入完成"); countDownLatch.await(); System.out.println("开始打包发送数据.."); executorService.shutdown(); } } //输出 等待excel写入完成 写excelA完成 写excelB完成 开始打包发送数据..
整个过程如下:
初始化一个CountDownLatch实例传参2,因为我们有2个子线程,每次子线程执行完毕之后调用countDown()方法给计数器-1,主线程调用await()方法后会被阻塞,直到最后计数器变为0,await()方法返回,执行完毕。
他和join有个区别,像我们这里用的是ExecutorService创建线程池,是没法使用join的,相比起来,CountDownLatch的使用会显得更加灵活。
CountDownLatch基于AQS实现,用volatile修饰state变量维持倒数状态,多线程共享变量可见。
- CountDownLatch通过构造函数初始化传入参数实际为AQS的state变量赋值,维持计数器倒数状态
- 当主线程调用await()方法时,当前线程会被阻塞,当state不为0时进入AQS阻塞队列等待。
- 其他线程调用countDown()时,通过CAS修改state值-1,当state值为0的时候,唤醒所有调用await()方法阻塞的线程
CyclicBarrier
CyclicBarrier叫做回环屏障,它的作用是让一组线程全部达到一个状态之后再全部同时执行,他和CountDownLatch主要区别在于,CountDownLatch的计数器只能用一次,而CyclicBarrier的计数器状态则是可以一直重用的。
我们可以使用CyclicBarrier一样实现上面的需求。
public class CyclicBarrierTest { public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newFixedThreadPool(10); CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> { System.out.println("开始打包发送数据.."); }); executorService.submit(()->{ try { Thread.sleep(1000); System.out.println("写excelA完成"); cyclicBarrier.await(); } catch (Exception e) { throw new RuntimeException(e); } }); executorService.submit(()->{ try { Thread.sleep(3000); System.out.println("写excelB完成"); cyclicBarrier.await(); } catch (Exception e) { throw new RuntimeException(e); } }); System.out.println("等待excel写入完成"); executorService.shutdown(); } } //输出 等待excel写入完成 写excelA完成 写excelB完成 开始打包发送数据..
初始化的时候我们传入2个线程和一个回调方法,线程调用await()之后进入阻塞状态并且计数器-1,这个阻塞点被称作为屏障点或者同步点,只有最后一个线程到达屏障点的时候,所有被屏障拦截的线程才能继续运行,这也是叫做回环屏障的名称原因。
而当计数器为0时,就去执行CyclicBarrier构造函数中的回调方法,回调方法执行完成之后,就会退出屏障点,唤醒其他阻塞中的线程。
CyclicBarrier基于ReentrantLock实现,本质上还是基于AQS实现的,内部维护parties记录总线程数,count用于计数,最开始count=parties,调用await()之后count原子递减,当count为0之后,再次将parties赋值给count,这就是复用的原理。
- 当子线程调用await()方法时,获取独占锁ReentrantLock,同时对count递减,进入阻塞队列,然后释放锁
- 当第一个线程被阻塞同时释放锁之后,其他子线程竞争获取锁,操作同1
- 直到最后count为0,执行CyclicBarrier构造函数中的任务,执行完毕之后子线程继续向下执行,计数重置,开始下一轮循环
Semaphore
Semaphore叫做信号量,和前面两个不同的是,他的计数器是递增的,信号量这玩意儿在限流中就经常使用到。
public class SemaphoreTest { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(10); Semaphore semaphore = new Semaphore(0); executorService.submit(() -> { try { Thread.sleep(1000); System.out.println("写excelA完成"); semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); executorService.submit(() -> { try { Thread.sleep(3000); System.out.println("写excelB完成"); semaphore.release(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); System.out.println("等待excel写入完成"); semaphore.acquire(2); System.out.println("开始打包发送数据.."); executorService.shutdown(); } } //输出 等待excel写入完成 写excelA完成 写excelB完成 开始打包发送数据..
稍微和前两个有点区别,构造函数接受参数表示可用的许可证的数量,acquire方法表示获取一个许可证,使用完之后release归还许可证。
当子线程调用release()方法时,计数器递增,主线程acquire()传参为2则说明主线程一直阻塞,直到计数器为2才会返回。
Semaphore还还还是基于AQS实现的,同时获取信号量有公平和非公平两种策略,通过构造函数的传参可以修改,默认则是非公平的策略。
- 先说非公平的策略,主线程调用acquire()方法时,用当前信号量值-需要获取的值,如果小于0,说明还没有达到信号量的要求值,则会进入AQS的阻塞队列,大于0则通过CAS设置当前信号量为剩余值,同时返回剩余值。而对于公平策略来说,如果当前有其他线程在等待获取资源,那么自己就会进入AQS阻塞队列排队。
- 子线程调用release()给当前信号量值计数器+1(增加的值数量由传参决定),同时不停的尝试唤醒因为调用acquire()进入阻塞的线程
Exchanger
Exchanger用于两个线程之间交换数据,如果两个线程都到达同步点,这两个线程可以互相交换他们的数据。
举个栗子,A和B两个线程需要交换他们自己写的数据以便核对数据是否一致。
public class ExchangerTest { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(10); Exchanger<String> exchanger = new Exchanger<>(); executorService.submit(() -> { try { Thread.sleep(1000); System.out.println("写excelA完成"); System.out.println("A获取到数据=" + exchanger.exchange("excelA")); } catch (InterruptedException e) { throw new RuntimeException(e); } }); executorService.submit(() -> { try { Thread.sleep(3000); System.out.println("写excelB完成"); System.out.println("B获取到数据=" + exchanger.exchange("excelB")); } catch (InterruptedException e) { throw new RuntimeException(e); } }); executorService.shutdown(); } } //输出 写excelA完成 写excelB完成 B获取到数据=excelA A获取到数据=excelB
A写完之后exchange会一直阻塞等待,直到另外一个线程也exchange之后,才会继续执行。
ThreadLocalRandom
通常我们都会用 Random 去生成随机数,但是 Random 有点小问题,在多线程并发的情况下为了保证生成的随机性,通过 CAS 的方式保证生成新种子的原子性,但是这样带来了性能的问题,多线程并发去生成随机数,但是只有一个线程能成功,其他的线程会一直自旋,性能不高,所以 ThreadLocalRandom 就是为了解决这个问题而诞生。
//多线程下通过CAS保证新种子生成的原子性 protected int next(int bits) { long oldseed, nextseed; AtomicLong seed = this.seed; do { oldseed = seed.get(); nextseed = (oldseed * multiplier + addend) & mask; } while (!seed.compareAndSet(oldseed, nextseed)); return (int)(nextseed >>> (48 - bits)); }
ThreadLocalRandom 我们从名字就能看出来,肯定使用了 ThreadLocal,作用就是用 ThreadLocal 保存每个种子的变量,防止在高并发下对同一个种子的争夺。
使用也非常简单:
ThreadLocalRandom.current().nextInt(100);
看下源码实现,current 方法获取当前的 ThreadLocalRandom 实例。
public static ThreadLocalRandom current() { if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0) localInit(); return instance; }
nextInt 方法和 Random 看起来差不多,上面是生成新的种子,下面是固定的基于新种子计算随机数,主要看 nextSeed。
public int nextInt(int bound) { if (bound <= 0) throw new IllegalArgumentException(BadBound); int r = mix32(nextSeed()); //生成新种子 int m = bound - 1; if ((bound & m) == 0) // power of two r &= m; else { // reject over-represented candidates for (int u = r >>> 1; u + m - (r = u % bound) < 0; u = mix32(nextSeed()) >>> 1) ; } return r; }
r = UNSAFE.getLong(t, SEED) + GAMMA 计算出新的种子,然后使用 UNSAFE 的方法放入当前线程中。
final long nextSeed() { Thread t; long r; // read and update per-thread seed UNSAFE.putLong(t = Thread.currentThread(), SEED, r = UNSAFE.getLong(t, SEED) + GAMMA); return r; }
ConcurrentHashMap
这个我们就不说了,说的太多了,之前的文章也写过了,可以参考之前写过的。
CopyOnWriteArrayList&CopyOnWriteArraySet
这是线程安全的 ArrayList ,从名字我们就能看出来,写的时候复制,这叫做写时复制,也就是写的操作是对拷贝的数组的操作。
先看构造函数,有3个,分别是无参,传参为集合和传参数组,其实都差不多,无参构造函数创建一个新的数组,集合则是把集合类的元素拷贝到新的数组,数组也是一样。
public CopyOnWriteArrayList() { setArray(new Object[0]); } public CopyOnWriteArrayList(Collection<? extends E> c) { Object[] elements; if (c.getClass() == CopyOnWriteArrayList.class) elements = ((CopyOnWriteArrayList<?>)c).getArray(); else { elements = c.toArray(); if (c.getClass() != ArrayList.class) elements = Arrays.copyOf(elements, elements.length, Object[].class); } setArray(elements); } public CopyOnWriteArrayList(E[] toCopyIn) { setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); }
我们看 add 方法,你一眼就能看出来非常简单的实现,通过 ReentrantLock 加锁,然后拷贝出一个新的数组,数组长度+1,再把新数组赋值,所以这就是名字的由来,写入的时候操作的是数组的拷贝,其他的删除修改就不看了,基本上是一样的。
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
再看看 get 方法,也非常简单,直接获取数组当前索引的值,这里需要注意的是,读数据是没有加锁的,所以会有一致性的问题,它并不能保证读到的一定是最新的数据。
public E get(int index) { return get(getArray(), index); } private E get(Object[] a, int index) { return (E) a[index]; } final Object[] getArray() { return array; }
至于 CopyOnWriteArraySet ,他就是基于 CopyOnWriteArrayList 实现的,这里我们不再赘述。
public CopyOnWriteArraySet() { al = new CopyOnWriteArrayList<E>(); } public boolean add(E e) { return al.addIfAbsent(e); } public boolean addIfAbsent(E e) { Object[] snapshot = getArray(); return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false : addIfAbsent(e, snapshot); }