1 线程池
相关文章防止冗余:
Java由浅入深理解线程池设计和原理:https://blog.csdn.net/ZGL_cyy/article/details/133208026
Java线程池ExecutorService:https://blog.csdn.net/ZGL_cyy/article/details/117843472
Java并发计算判断线程池中的线程是否全部执行完毕:https://blog.csdn.net/ZGL_cyy/article/details/127599554
Java线程池创建方式和应用场景:https://blog.csdn.net/ZGL_cyy/article/details/126443994
2 Fork/Join
2.1 概念
ForkJoinPool是由JDK1.7后提供多线程并行执行任务的框架。可以理解为一种特殊的线程池。 1.任务分割:Fork(分岔),先把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割。
2.合并结果:join,分割后的子任务被多个线程执行后,再合并结果,得到最终的完整输出。
2.2 组成
- ForkJoinTask:主要提供fork和join两个方法用于任务拆分与合并;一般用子类 RecursiveAction(无返回值的任务)和RecursiveTask(需要返回值)来实现compute方法。
- ForkJoinPool:调度ForkJoinTask的线程池;
- ForkJoinWorkerThread:Thread的子类,存放于线程池中的工作线程(Worker);
- WorkQueue:任务队列,用于保存任务;
2.3 基本使用
一个典型的例子:计算1-1000的和
package com.oldlu.thread; import java.util.concurrent.*; public class SumTask { private static final Integer MAX = 100; static class SubTask extends RecursiveTask<Integer> { // 子任务开始计算的值 private Integer start; // 子任务结束计算的值 private Integer end; public SubTask(Integer start , Integer end) { this.start = start; this.end = end; } @Override protected Integer compute() { if(end - start < MAX) { //小于边界,开始计算 System.out.println("start = " + start + ";end = " + end); Integer totalValue = 0; for(int index = this.start ; index <= this.end ; index++) { totalValue += index; } return totalValue; }else { //否则,中间劈开继续拆分 SubTask subTask1 = new SubTask(start, (start + end) / 2); subTask1.fork(); SubTask subTask2 = new SubTask((start + end) / 2 + 1 , end); subTask2.fork(); return subTask1.join() + subTask2.join(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); Future<Integer> taskFuture = pool.submit(new SubTask(1,1000)); try { Integer result = taskFuture.get(); System.out.println("result = " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(System.out); } } }
2.4 设计思想
- 普通线程池内部有两个重要集合:工作线程集合(普通线程),和任务队列。
- ForkJoinPool也类似,线程集合里放的是特殊线程ForkJoinWorkerThread,任务队列里放的是特殊任务ForkJoinTask
不同之处在于,普通线程池只有一个队列。而ForkJoinPool的工作线程ForkJoinWorkerThread每个线程内都绑定一个双端队列。
在fork的时候,也就是任务拆分,将拆分的task会被当前线程放到自己的队列中。
如果有任务,那么线程优先从自己的队列里取任务执行,以LIFO先进后出方式从队尾获取任务,
当自己队列中执行完后,工作线程会跑到其他队列以work−stealing窃取,窃取方式为FIFO先进先出,减少竞争。
2.5 注意点
使用ForkJoin将相同的计算任务通过多线程执行。但是在使用中需要注意:
- 注意任务切分的粒度,也就是fork的界限。并非越小越好
- 判断要不要使用ForkJoin。任务量不是太大的话,串行可能优于并行。因为多线程会涉及到上下文的切换
3 原子操作
3.1 概念
原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为"不可被中断的一个或一系列操作" 。
3.2 CAS
CAS(Compare-and-Swap/Exchange),即比较并替换,是一种实现并发常用到的技术。CAS的整体架构如下:
juc中提供了Atomic开头的类,基于cas实现原子性操作,最基本的应用就是计数器
package com.oldlu; import java.util.concurrent.atomic.AtomicInteger; public class AtomicCounter { private static AtomicInteger i = new AtomicInteger(0); public int get(){ return i.get(); } public void inc(){ i.incrementAndGet(); } public static void main(String[] args) throws InterruptedException { final AtomicCounter counter = new AtomicCounter(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { counter.inc(); } }).start(); } Thread.sleep(3000); //可以正确输出10 System.out.println(counter.i.get()); } }
注:AtomicInteger源码。基于unsafe类cas思想实现,性能篇会讲到
CAS虽然很高效的解决了原子操作问题,但是CAS仍然存在三大问题。
1.自旋(循环)时间长开销很大,如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销,注意这里的自旋是在用户态/SDK 层面实现的。
2.只能保证一个共享变量的原子操作,对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁来保证原子性。
3.ABA问题,在使用CAS前要考虑清楚“ABA”问题是否会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比CAS更高效。
3.3 atomic
上面展示了AtomicInteger,关于atomic包,还有很多其他类型:
- 基本类型
- AtomicBoolean:以原子更新的方式更新boolean;
- AtomicInteger:以原子更新的方式更新Integer;
- AtomicLong:以原子更新的方式更新Long;
- 引用类型
- AtomicReference : 原子更新引用类型
- AtomicReferenceFieldUpdater :原子更新引用类型的字段
- AtomicMarkableReference : 原子更新带有标志位的引用类型
- 数组
- AtomicIntegerArray:原子更新整型数组里的元素。
- AtomicLongArray:原子更新长整型数组里的元素。
- AtomicReferenceArray:原子更新引用类型数组里的元素。
- 字段
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
- AtomicLongFieldUpdater:原子更新长整型字段的更新器。
- AtomicStampedReference:原子更新带有版本号的引用类型。
3.4 注意!
使用atomic要注意原子性的边界,把握不好会起不到应有的效果,原子性被破坏。
案例:原子性被破坏现象
package com.oldlu; import java.util.concurrent.atomic.AtomicInteger; public class BadAtomic { AtomicInteger i = new AtomicInteger(0); static int j=0; public void badInc(){ int k = i.incrementAndGet(); try { Thread.sleep(new Random().nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } j=k; } public static void main(String[] args) throws InterruptedException { BadAtomic atomic = new BadAtomic(); for (int i = 0; i < 10; i++) { new Thread(()->{ atomic.badInc(); }).start(); } Thread.sleep(3000); System.out.println(atomic.j); } }
结果分析:
- 每次都不一样,总之不是10
- i是原子性的,没问题。但是再赋值,变成了两部操作,原子性被打破
- 在badInc上加synchronized,问题解决