多线程相关概念
1把锁:synchronized
2个并:并发(concurrent)在同一实体上的多个事件,在一台处理器上“同时处理多个任务”,同一时刻,其实是只有一个时间在发生
并行(parallel)在不同实体上的多个时间,在多台处理器上同时处理多个任务,同一时刻,大家都在做事情,你做你的,我做到我的,但是我们都在做
3个程:进程:在系统中运行的一个应用程序就是一个进程,每一个进程都有自己的内存空间和系统资源
线程:也被成为轻量级进程,在一个进程中会有1个或多个进程,是大多数操作系统进行时序调度的基本单元
管程:Monitor(监视器),也就是平时所说的锁
Monitor其实就是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。
JVM中同步是基于进入和退出监视器对象(Monitor,管程对象)来实现的,每个对象实例都会有一个Monitor对象
Object o = new Object(); Thread t1 = new Thread(() -> { synchronized (o) { System.out.println(1); } }, "t1"); t1.start();
用户线程和守护线程
一般情况下不做特别说明配置,默认都是用户线程
用户线程(User Thread): 是系统的工作线程,它会完成这个程序需要完成的业务条件。
守护线程(Daemon Thread):是一种特殊的线程为其它线程服务的,在后台默默地完成一些系统性的服务
守护线程作为一个服务线程,没有服务对象就没有必要继续运行了 ,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出了,所以假如当系统只剩下守护线程的时候,java虚拟机会自动退出。
// 是否是守护线程 t1.isDaemon(); // 设置为守护线程 t1.setDaemon(true);
setDaemon(true)方法必须在start()之前设置
Future接口
Future接口(Future实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者执行完,过了一会才去获取子任务的执行结果或变更的任务状态。
总结: Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
FutureTask异步任务
public class CompletableFutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask futureTask = new FutureTask(new MyThread()); Thread t1 = new Thread(futureTask); t1.start(); System.out.println(futureTask.get()); } } class MyThread implements Callable<String>{ @Override public String call() throws Exception { System.out.println("come in call----"); return "hello callable"; } }
Future+线程池异步多线程任务配合,能显著提高程序的执行效率。
futureTask.get(); futureTask.isDone();
Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
CompletableFuture
从jdk1.8开始引入,它是Future的功能增强版,减少阻塞和轮询。可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
ExecutorService threadPool = Executors.newFixedThreadPool(3); // CompletableFuture.runAsync() 无返回值 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); System.out.println("come in ---"); return "hello"; },threadPool); String s = completableFuture.get(); System.out.println(s); threadPool.shutdown();
//这里用自定义线程池,采用默认线程会当作一个守护线程,main方法执行完后future线程还未处理完时会直接关闭 ExecutorService threadPool = Executors.newFixedThreadPool(3); try { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("come in ----"); int i = ThreadLocalRandom.current().nextInt(10); try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } System.out.println("---1秒后出结果:" + i); return i; }, threadPool).whenComplete((v, e) -> { if (e == null) { System.out.println("计算完成:" + v); } }).exceptionally(e -> { System.out.println(e.getCause()); return null; }); System.out.println(java.lang.Thread.currentThread().getName() + "去忙其他任务了"); }catch (Exception e){ e.printStackTrace(); }finally { threadPool.shutdown(); }
CompletableFuture join和get区别
在编译时是否报出检查型异常
CompletableFuture的优点
异步任务结束时,会自动回调某个对象的方法
主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
异步任务出错时,会自动回调某个对象的方法
ps:和javascript回调不能说相似,只能说一模一样,一通百通
Lambda表达式+Stream流式调用+CHain链式调用+Java8函数式编程
Runnable
无参数,无返回值
Function
Function<T, R> 接收一个参数,并且有返回值
Consumer
Consumer接收一个参数,并且没有返回值
BiConsumer
BiConsumer<T, U>接收两个参数(Bi,英文单词词根,代表两个的意思),没有返回值
Supplier
Supplier供给型函数接口,没有参数,有一个返回值
Predicate
一般用于做判断,返回boolean类型
总结
函数时接口名称 | 方法名称 | 参数 | 返回值 |
Runnable | run | 无参数 | 无返回值 |
Function | apply | 1个参数 | 有返回值 |
Consumer | accept | 1个参数 | 无返回值 |
Supplier | get | 没有参数 | 有返回值 |
BiConsumer | accept | 2个参数 | 无返回值 |
Predicate | test | 1个参数 | 有返回值(boolean) |
日常工作中如何进行开发?
功能→性能,先完成功能实现,再考虑性能优化
CompletableFuture用例
假设要从多个电商平台查询一件商品的价格,每次查询耗时设定为1秒
普通查询:查询电商平台1→查询电商平台2→查询电商平台3 …
CompletableFuture: 同时异步执行要查询的电商平台
public class CompletableFutureDemo { static List<NetMall> list = Arrays.asList( new NetMall("jd"), new NetMall("dangdang"), new NetMall("taobao"), new NetMall("pdd"), new NetMall("tmall") ); /** * step by step 一家家搜查 * List<NetMall> ----->map------> List<String> * @param list * @param productName * @return */ public static List<String> getPrice(List<NetMall> list,String productName) { //《mysql》 in taobao price is 90.43 return list .stream() .map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName))) .collect(Collectors.toList()); } /** * List<NetMall> ----->List<CompletableFuture<String>>------> List<String> * @param list * @param productName * @return */ public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName) { return list.stream().map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName)))) .collect(Collectors.toList()) .stream() .map(s -> s.join()) .collect(Collectors.toList()); } public static void main(String[] args) { long startTime = System.currentTimeMillis(); List<String> list1 = getPrice(list, "mysql"); for (String element : list1) { System.out.println(element); } long endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"); System.out.println("--------------------"); long startTime2 = System.currentTimeMillis(); List<String> list2 = getPriceByCompletableFuture(list, "mysql"); for (String element : list2) { System.out.println(element); } long endTime2 = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒"); } } class NetMall { @Getter private String netMallName; public NetMall(String netMallName) { this.netMallName = netMallName; } public double calcPrice(String productName) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0); } }
执行效率显著提高
CompletableFuture常用API
获取结果和触发计算
T get(); 容易造成阻塞,非得拿到结果,否则不往下执行
T get(long timeout, TimeUnit unit); 指定时间后还没拿到结果,直接TimtoutException
T join(); 和get一样,区别在于编译时是否报出检查型异常
T getNow(T valueIfAbsent); 没有计算完成的情况下,返回一个替代结果。立即获取结果不阻塞:计算完,返回计算完成后的结果,没计算完:返回设定的valueIfAbsend值
boolean complete(T value); 是否打断get方法立即返回括号值,计算完:不打断,返回计算后的结果,没计算完:打断返回设定的value值
allOf:多个CompletableFuture任务并发执行,所有CompletableFuture任务完成时,返回一个新的CompletableFuture对象,其返回值为Void,也就是无返回值。该方法的应用之一是在继续程序之前等待一组独立的 CompletableFuture 完成,如:CompletableFuture.allOf(c1, c2, c3).join();
anyOf:多个CompletableFuture任务并发执行,只要有一个CompletableFuture任务完成时,就会返回一个新的CompletableFuture对象,并返回该CompletableFuture执行完成任务的返回值。
对计算结果进行处理
thenApply 计算结果存在依赖关系,将两个线程串行化,由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。
handle 计算结果存在依赖关系,这两个线程串行化。有异常也可以往下一步走,根据带的异常参数可以进一步处理
execptionally类似 try/catch
whenCpmplete和handle类似 try/finally,有异常也会往下执行
对计算结果进行消费
thenAccept 顾名思义,消费型接口。接收任务的处理结果,并消费处理,无返回结果。
对比
API |
说明 | |
thenRun | thenRun(Runnable runnable) | 任务A执行完执行B,并且B不需要A的结果 |
thenAccept | thenAccpet(Consumer action) | 任务A执行完执行B,B需要A的结果,但是任务B无返回值 |
thenApply | thenApply(Function fn) | 任务A执行完执行B,B需要A的结果,同时任务B有返回值 |
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
thenRun和thenRunAsync区别?
没有传入自定义线程池,都用默认线程池ForkJoinPool
如果执行第一个任务的时候,传入一个自定义线程池
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务时使用同一个线程池
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoinpool线程池
备注:有可能处理的太快,系统优化切换原则,直接使用main线程处理
其他如:thenAccept和thenAccpetAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理
对计算速度选用
applyToEither对比任务执行速度
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> { System.out.println("A come in"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return "playA"; }); CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> { System.out.println("B come in"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "playB"; }); CompletableFuture<String> result = playA.applyToEither(playB, f -> { return f + " is winer"; }); System.out.println(Thread.currentThread().getName()+"\t"+"-----: "+result.join());
对计算结果进行合并
两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理。先完成的先等着,等待其他分支任务。
public static void main(String[] args) { long start = System.currentTimeMillis(); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{ try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { throw new RuntimeException(e); } return 10; }); CompletableFuture<Integer> future2= CompletableFuture.supplyAsync(()->{ try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } return 10; }); CompletableFuture<Integer> future = future1.thenCombine(future2, (x, y) -> { System.out.println("-----开始两个结果合并"); return x + y; }); System.out.println(future.join()); long end = System.currentTimeMillis(); System.out.println("耗时:" + (end-start)); }