Java 并发包提供了哪些并发工具类
Java 基础并发工具类
- 提供了比 synchronized更加高级的各种同步结构,包括 CountDownLatch, CyclicBarrier、 Semaphore等,可以实现更加丰富的多线程操作,比如利用 Semaphore作为资源
- 各种线程安全的容器,比如最常见的 ConcurrentHashMap、有序的 ConcunrrentskipListMap,或者通过类似快照机制,实现线程安全的动态数组 Copy onWriteArrayuist等
- 各种并发队列实现,如各种 BlockedQueue实现,比较典型的 ArrayBlockingQueue、 SynchorousQueue或针对特定场景的 Priority BlockingQueue等。
- 强大的 Executor框架,可以创建各种不同类型的线程池,调度任务运行等,绝大部分情况下,不再需要自己从头实现线程池和任务调度器。
多线程编程要注意哪些
- 利用多线程提高程序的扩展能力,以达到业务对吞吐量的要求。
- 协调线程间调度、交互,以完成业务逻辑。
- 线程间传递数据和状态,这同样是实现业务逻辑的需要。
并发包工具需要掌握哪些
- 从总体上,把握住几个主要组成部分
- 理解具体设计、实现和能力。
- 再深入掌握一些比较典型工具类的适用场景、用法甚至是原理,并熟练写岀典型的代码用例
CountDownLatch
允许一个或者多个线程等待操作完成
- CountDownLatch 是不可以重置的,无法重用,但是 CyclicBarrier则没有这个限制,可以重用。
- CountDownLatch 的基本操作时 countDown/await。调用await 线程阻塞等待 countDown 足够的次数,不管是在一个线程还是多个线程里 CountDown,只要次数足够即可。
假设有10个人排队,我们将其分成5个人一批,使用CountDownLatc 来协调。
public class LatchSample { /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(6); for(int i = 0; i < 5 ; i++){ Thread t = new Thread(new FirstBatchWorker(latch)); t.start(); } for(int i = 0 ; i < 5;i++){ Thread t = new Thread(new SecondBatchWorker(latch)); t.start(); } while(latch.getCount() != 1){ Thread.sleep(100L); } System.out.println(" wait gor first batch finish"); latch.countDown(); } } class FirstBatchWorker implements Runnable { private CountDownLatch latch; public FirstBatchWorker(CountDownLatch latch) { this.latch = latch; } /** * @see java.lang.Runnable#run() */ @Override public void run() { System.out.println("First batch executed!"); latch.countDown(); } } class SecondBatchWorker implements Runnable { private CountDownLatch latch; public SecondBatchWorker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { latch.await(); System.out.println("Second batch Executed!"); } catch (InterruptedException e) { e.printStackTrace(); } } }
CountDownLatch调度也较为简单,后一批的线程进行 await 等待前一批 countdown 足够多次。局限性是不能重用
CyclicBarrier
允许多个线程瞪大到达某个屏障
- CyclicBarrier的基本操作组合,则就是 await,当所有的伙伴( parties)都调用了 await,才会继续进行任务,并自动进行重置。注意,正常情况下, CyclicBarrier的重置都是自动发生的,如果我们调用 reset 方法,但还有线程在等待,就会导致等待线程被打扰,抛出 BrokenBarrierException异常。CyclicBarrier侧重点是线程,而不是调用事件,它的典型应用场景是用来等待并发线程结束
public class CyclicBarrierSample { /** * @param args */ public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("Action.... go again!"); } }); for (int i = 0; i < 5; i++) { Thread t = new Thread(new CyclicWorker(barrier)); t.start(); } } } class CyclicWorker implements Runnable { private CyclicBarrier barrier; public CyclicWorker(CyclicBarrier barrier) { this.barrier = barrier; } /** * @see java.lang.Runnable#run() */ @Override public void run() { try { for (int i = 0; i < 3; i++) { System.out.println("Executed!"); barrier.await(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
Semaphore
Java 版本信号量的实现,通过允许一定数量的允许(Permit)的方式,来表达限制通用资源访问的目的。租车时,当很多空出租车就位时,为防止过度拥挤,调度员指挥排队等待坐车的队伍一次进来5个人上车,等这5个人坐车出发,再放进去下一批,这和 Semaphore的工作原理类似
线程尝试获得许可,获得许可则进入任务,任务执行完,然后释放许可。这时等待许可的其他线程,可以获得许可进入工作状态,知道全部处理结束。
public class SemaphoreSample { /** * @param args */ public static void main(String[] args) { System.out.println("Action ...Go!"); Semaphore semaphore = new Semaphore(5); for(int i = 0; i < 10;i++){ Thread t = new Thread(new SemaphoreWorker(semaphore)); t.start(); } } } class SemaphoreWorker implements Runnable{ private String name; private Semaphore semaphore; public SemaphoreWorker(Semaphore semaphore){ this.semaphore = semaphore; } @Override public void run() { try { log("is waitting for permit"); semaphore.acquire(); log("acquired a permit"); log("excuted"); } catch (InterruptedException e) { e.printStackTrace(); } finally{ log("releae a permit"); semaphore.release(); } } private void log(String msg){ if(name == null){ name = Thread.currentThread().getName(); } System.out.println(name+", "+ msg); } }
Semapore 类似计数器,实现逻辑基于 Acquire/release,如果 semaphore 的数值初始化成1 相当于互斥锁只有一个资源供竞争。
线程安全的集合
线程安全Map ,List 和 Set。当应用侧重 Map 存放的速度,推荐使用 ConcurrentHashMap, 如果需要使用大量数据进行频繁的修改,推荐使用 ConcurrentSkipListMap
SkipList
CopyOnWriteArrayList
CopyOnWriteArraySet 包装了 CopyOnWriteArrayList 来实现 CopyOnWrite 原理是任何修改操作,add,set remove 都会拷贝原数组,修改后替换原理的数组,通过这种防御方式,实现另类的线程安全。这种数据结构,适合读多写少的操作。
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }