1、并发
分工:如何高效地拆解任务并分配给线程
同步:线程之间如何协作
互斥:保证同一时刻只允许一个线程访问共享资源
Fork/Join 框架就是一种分工模式,CountDownLatch 就是一种典型的同步方式,而可重入锁则是一种互斥手段。
2、可见性、原子性、有序性
(1)可见性:缓存导致
(2)原子性:线程切换
count+=1
(3)有序性:编译优化
3、java内存模型
(1)可见性:缓存导致-----按需禁用缓存
(2)有序性:编译优化-----按需禁用
volatile int x=0;(该变量的读写,不使用cpu缓存,直接使用内存读取或者写入)
(3)原子性:同一时刻,只有一个线程执行,互斥。
synchronized
4、死锁
死锁发生的条件:
(1)互斥,共享资源x和y只能被一个线程占用
(2)占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
破坏占用且等待条件:一次性申请所有资源
(3)不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
破坏不可强占条件
(4)循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。
破坏循环等待条件:
wait和sleep区别
1:wait释放资源,sleep不释放资源
2:wait需要被唤醒,sleep不需要
3:wait需要获取到监视器,否则抛异常,sleep不需要
4:wait是object顶级父类的方法,sleep则是Thread的方法
5.CountDownLatch和CyclicBarrier:如何让多线程步调一致?(主线程等待子线程结束)
Thread t1 = new Thread(() -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); Thread t2 = new Thread(() -> { try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); t2.start(); //实现等待 t1.join(); t2.join(); System.out.println("=============");
线程池
Executor executor = Executors.newFixedThreadPool(2); CountDownLatch latch = new CountDownLatch(2); executor.execute(()->{ try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); executor.execute(()->{ try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName()); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }); latch.await(); System.out.println("=============");
CountDownLatch 主要用来解决一个线程等待多个线程的场景。(CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。)
CyclicBarrier ---------- A线程执行,B线程执行,A、B其中一个线程等到AB执行完成再执行(不是主线程,且是异步的)
参考:https://www.cnblogs.com/dolphin0520/p/3920397.html
6.并发容器
List、Map、Set、Queue
非线程安全:ArrayList、HashMap
7.原子类
8.线程池、Executor
ThreadPoolExecutor
线程池实际上是生产者 - 消费者模式
ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
https://blog.csdn.net/weixin_36586120/article/details/109274771
线程池大小设置:
cpu(计算型)
io型
https://stefan.blog.csdn.net/article/details/109234348
1.当创建的线程数小于核心线程数corePoolSize时,提交任务会继续创建新线程执行任务。
2.当创建的线程数大于等于corePoolSize时,此时再提交任务将被添加到工作队列workQueue中。
3.当工作队列workQueue已满,此时再提交任务会创建新线程,触发第二个阈值的判断maximumPoolSize。
4.当创建的线程数大于等于最大线程数maximumPoolSize时,此时再提交任务将触发拒绝策略RejectedExecutionHandler
9.Future
ExecutorService executor = Executors.newFixedThreadPool(1); // 创建 Result 对象 r Result r = new Result(); r.setAAA(a); // 提交任务 Future<Result> future = executor.submit(new Task(r), r); Result fr = future.get(); // 下面等式成立 fr === r; fr.getAAA() === a; fr.getXXX() === x class Task implements Runnable{ Result r; // 通过构造函数传入 result Task(Result r){ this.r = r; } void run() { // 可以操作 result a = r.getAAA(); r.setXXX(x); } }
// 创建 FutureTask FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2); // 创建线程池 ExecutorService es = Executors.newCachedThreadPool(); // 提交 FutureTask es.submit(futureTask); // 获取计算结果 Integer result = futureTask.get();
// 创建 FutureTask FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2); // 创建并启动线程 Thread T1 = new Thread(futureTask); T1.start(); // 获取计算结果 Integer result = futureTask.get();
// 创建任务 T2 的 FutureTask FutureTask<String> ft2 = new FutureTask<>(new T2Task()); // 创建任务 T1 的 FutureTask FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2)); // 线程 T1 执行任务 ft1 Thread T1 = new Thread(ft1); T1.start(); // 线程 T2 执行任务 ft2 Thread T2 = new Thread(ft2); T2.start(); // 等待线程 T1 执行结果 System.out.println(ft1.get()); // T1Task 需要执行的任务: // 洗水壶、烧开水、泡茶 class T1Task implements Callable<String>{ FutureTask<String> ft2; // T1 任务需要 T2 任务的 FutureTask T1Task(FutureTask<String> ft2){ this.ft2 = ft2; } @Override String call() throws Exception { System.out.println("T1: 洗水壶..."); TimeUnit.SECONDS.sleep(1); System.out.println("T1: 烧开水..."); TimeUnit.SECONDS.sleep(15); // 获取 T2 线程的茶叶 String tf = ft2.get(); System.out.println("T1: 拿到茶叶:"+tf); System.out.println("T1: 泡茶..."); return " 上茶:" + tf; } } // T2Task 需要执行的任务: // 洗茶壶、洗茶杯、拿茶叶 class T2Task implements Callable<String> { @Override String call() throws Exception { System.out.println("T2: 洗茶壶..."); TimeUnit.SECONDS.sleep(1); System.out.println("T2: 洗茶杯..."); TimeUnit.SECONDS.sleep(2); System.out.println("T2: 拿茶叶..."); TimeUnit.SECONDS.sleep(1); return " 龙井 "; } } // 一次执行结果: T1: 洗水壶... T2: 洗茶壶... T1: 烧开水... T2: 洗茶杯... T2: 拿茶叶... T1: 拿到茶叶: 龙井 T1: 泡茶... 上茶: 龙井
10.CompletableFuture
// 任务 1:洗水壶 -> 烧开水 CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{ System.out.println("T1: 洗水壶..."); sleep(1, TimeUnit.SECONDS); System.out.println("T1: 烧开水..."); sleep(15, TimeUnit.SECONDS); }); // 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶 CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{ System.out.println("T2: 洗茶壶..."); sleep(1, TimeUnit.SECONDS); System.out.println("T2: 洗茶杯..."); sleep(2, TimeUnit.SECONDS); System.out.println("T2: 拿茶叶..."); sleep(1, TimeUnit.SECONDS); return " 龙井 "; }); // 任务 3:任务 1 和任务 2 完成后执行:泡茶 CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{ System.out.println("T1: 拿到茶叶:" + tf); System.out.println("T1: 泡茶..."); return " 上茶:" + tf; }); // 等待任务 3 执行结果 System.out.println(f3.join()); void sleep(int t, TimeUnit u) { try { u.sleep(t); }catch(InterruptedException e){} } // 一次执行结果: T1: 洗水壶... T2: 洗茶壶... T1: 烧开水... T2: 洗茶杯... T2: 拿茶叶... T1: 拿到茶叶: 龙井 T1: 泡茶... 上茶: 龙井
11.CompletionService
参考: http://blog.csdn.net/lmj623565791/article/details/27250059 https://www.cnblogs.com/hrhguanli/p/3998865.html
普通情况下,我们使用Runnable作为主要的任务表示形式,可是Runnable是一种有非常大局限的抽象,run方法中仅仅能记录日志,打印,或者把数据汇总入某个容器(一方面内存消耗大,还有一方面须要控制同步,效率非常大的限制),总之不能返回运行的结果;比方同一时候1000个任务去网络上抓取数据,然后将抓取到的数据进行处理(处理方式不定),我认为最好的方式就是提供回调接口,把处理的方式最为回调传进去;可是如今我们有了更好的方式实现:CompletionService + Callable
Callable的call方法能够返回运行的结果;
CompletionService将Executor(线程池)和BlockingQueue(堵塞队列)结合在一起,同一时候使用Callable作为任务的基本单元,整个过程就是生产者不断把Callable任务放入堵塞对了,Executor作为消费者不断把任务取出来运行,并返回结果;
优势:
a、堵塞队列防止了内存中排队等待的任务过多,造成内存溢出(毕竟一般生产者速度比較快,比方爬虫准备好网址和规则,就去运行了,运行起来(消费者)还是比較慢的)
b、CompletionService能够实现,哪个任务先运行完毕就返回,而不是按顺序返回,这样能够极大的提升效率;
package com.zhy.concurrency.completionService; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; /** * 将Executor和BlockingQueue功能融合在一起,能够将Callable的任务提交给它来运行, 然后使用take()方法获得已经完毕的结果 * * @author zhy * */ public class CompletionServiceDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { /** * 内部维护11个线程的线程池 */ ExecutorService exec = Executors.newFixedThreadPool(11); /** * 容量为10的堵塞队列 */ final BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>( 10); //实例化CompletionService final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( exec, queue); /** * 模拟瞬间产生10个任务,且每一个任务运行时间不一致 */ for (int i = 0; i < 10; i++) { completionService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { int ran = new Random().nextInt(1000); Thread.sleep(ran); System.out.println(Thread.currentThread().getName() + " 歇息了 " + ran); return ran; } }); } /** * 马上输出结果 */ for (int i = 0; i < 10; i++) { try { //谁最先运行完毕,直接返回 Future<Integer> f = completionService.take(); System.out.println(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } exec.shutdown(); } } 输出结果: pool-1-thread-4 歇息了 52 52 pool-1-thread-1 歇息了 59 59 pool-1-thread-10 歇息了 215 215 pool-1-thread-9 歇息了 352 352 pool-1-thread-5 歇息了 389 389 pool-1-thread-3 歇息了 589 589 pool-1-thread-2 歇息了 794 794 pool-1-thread-7 歇息了 805 805 pool-1-thread-6 歇息了 909 909 pool-1-thread-8 歇息了 987 987
2.ExecutorService.invokeAll
ExecutorService的invokeAll方法也能批量运行任务,并批量返回结果,可是呢,有个我认为非常致命的缺点,必须等待全部的任务运行完毕后统一返回,一方面内存持有的时间长;还有一方面响应性也有一定的影响,毕竟大家都喜欢看看刷刷的运行结果输出,而不是苦苦的等待;
package com.zhy.concurrency.executors; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestInvokeAll { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService exec = Executors.newFixedThreadPool(10); List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>(); Callable<Integer> task = null; for (int i = 0; i < 10; i++) { task = new Callable<Integer>() { @Override public Integer call() throws Exception { int ran = new Random().nextInt(1000); Thread.sleep(ran); System.out.println(Thread.currentThread().getName()+" 歇息了 " + ran ); return ran; } }; tasks.add(task); } long s = System.currentTimeMillis(); List<Future<Integer>> results = exec.invokeAll(tasks); System.out.println("运行任务消耗了 :" + (System.currentTimeMillis() - s) +"毫秒"); for (int i = 0; i < results.size(); i++) { try { System.out.println(results.get(i).get()); } catch (Exception e) { e.printStackTrace(); } } exec.shutdown(); } } 运行结果: pool-1-thread-10 歇息了 1 pool-1-thread-5 歇息了 59 pool-1-thread-6 歇息了 128 pool-1-thread-1 歇息了 146 pool-1-thread-3 歇息了 158 pool-1-thread-7 歇息了 387 pool-1-thread-9 歇息了 486 pool-1-thread-8 歇息了 606 pool-1-thread-4 歇息了 707 pool-1-thread-2 歇息了 817 运行任务消耗了 :819毫秒 146 817 158 707 59 128 387 606 486 1
12.Fork/Join 单机版本的MapReduce
分治任务
static void main(String[] args){ // 创建分治任务线程池 ForkJoinPool fjp = new ForkJoinPool(4); // 创建分治任务 Fibonacci fib = new Fibonacci(30); // 启动分治任务 Integer result = fjp.invoke(fib); // 输出结果 System.out.println(result); } // 递归任务 static class Fibonacci extends RecursiveTask<Integer>{ final int n; Fibonacci(int n){this.n = n;} protected Integer compute(){ if (n <= 1) return n; Fibonacci f1 = new Fibonacci(n - 1); // 创建子任务 f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); // 等待子任务结果,并合并结果 return f2.compute() + f1.join(); } }
13.ThreadLocal
https://baijiahao.baidu.com/s?id=1609916413785756020&wfr=spider&for=pc