Java异步编程CompletableFuture
1. 简介
CompletableFuture是jdk1.8引入的具有任务编排能力的类,它弥补了Future的不足,能让线程按照一定顺序和规则执行,提高程序执行效率.
2. Future的局限性
之前我们处理异步任务可以使用future,虽然future也能实现异步任务,但是存在一定局限性,下面我们通过一个案例来演示
使用的工具类
/** * 工具类 */ public class CommonUtil {
// 模拟任务耗时 public static void dowork(long ms){ try { TimeUnit.MILLISECONDS.sleep(ms); } catch (InterruptedException e) { e.printStackTrace(); } }
// 打印线程信息 public static void printLog(String msg){ Thread thread = Thread.currentThread(); System.out.println(System.currentTimeMillis()+"=="+thread.getId()+"=="+thread.getName()+"=="+msg); } } |
演示demo
package cn.demo;
import cn.demo.util.CommonUtil;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;
/** * 需求:使用线程池执行3个任务,其中任务3需要获取任务1,任务2的执行结果后再执行 */ public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException { // 创建定长线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); // 执行任务1 Future<String> f1 = executorService.submit(() -> { CommonUtil.printLog("任务1开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务1结束"); return "a"; }); // 执行任务2 Future<String> f2 = executorService.submit(() -> { CommonUtil.printLog("任务2开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务2结束"); return "b"; });
// 执行任务3,任务3需要获取任务1,2的执行结果再处理 Future<String> f3 = executorService.submit(() -> { // 获取任务1,2执行结果 String f1Result = f1.get(); String f2Result = f2.get(); CommonUtil.printLog("任务3开始"); String result = f1Result + f2Result; CommonUtil.printLog("任务3结束"); return result; }); // 获取结果 CommonUtil.printLog(f3.get()); executorService.shutdown(); } } |
通过上述案例,我们发现future存在以下问题
1. 无法对future结果进行进一步处理,需要手动get,get会阻塞线程
2. 处理有依赖关系的future比较麻烦
3. 无法对多个future合并结果
4. 异常需自己处理
3. 使用CompletableFuture
3.1.创建异步任务的方式
创建异步任务有2种方式,一种是使用runAsync,一种是supplyAsync,区别在于runAsync没有返回值supplyAsync有.
/** *创建异步任务方式1,无返回值 * @throws Exception */ @Test public void testRunAsync() throws Exception{ CommonUtil.printLog("main start"); CompletableFuture.runAsync(()-> { CommonUtil.printLog("任务开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务结束"); }); CommonUtil.printLog("=======main continue========"); CommonUtil.dowork(4000); CommonUtil.printLog("main end"); }
/** *创建异步任务方式2,有返回值 * @throws Exception */ @Test public void testSupplyAsync() throws Exception{ CommonUtil.printLog("main start"); CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务结束"); return "123"; }); CommonUtil.printLog("=======main continue========"); CommonUtil.printLog(supplyAsync.get()); CommonUtil.printLog("main end"); } |
3.2.异步任务中的线程池
异步任务中的线程池,默认用的是ForkJoinPool.commonPool
异步任务共享ForkJoinPool.commonPool线程池,主线程结束时,会自动关闭共享线程池
如果使用共享线程池遇到其中有些任务耗时比较长,可能会影响性能,因此建议使用自定义线程池
/** * 异步任务中的线程池,默认用的是ForkJoinPool.commonPool * 异步任务共享ForkJoinPool.commonPool线程池,主线程结束时,会关闭共享线程池 * 如果使用共享线程池遇到其中有些任务耗时比较长,可能会影响性能,建议使用自定义线程池 * @throws Exception */ @Test public void testSupplyByExcutor() throws Exception{ ExecutorService executorService = Executors.newFixedThreadPool(5); CommonUtil.printLog("main start"); CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务结束"); return "123"; },executorService); CommonUtil.printLog("=======main continue========"); CommonUtil.printLog(supplyAsync.get()); CommonUtil.printLog("main end"); } |
3.3.异步任务回调
异步任务回调有3种方式
thenApply 返回回调结果并处理
thenAccept 不想返回任何回调结果,一般在调用链的最后一步
thenRun 异步任务完成后,只想得到通知,不使用上一步的结果
/** * 异步任务回调之thenApply,返回回调结果并处理 * @throws Exception */ @Test public void testThenApply() throws Exception{ CommonUtil.printLog("main start"); CompletableFuture<String[]> cf = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务结束"); return "123"; }).thenApply(result -> { String[] split = result.split(""); return split; }); CommonUtil.printLog("=======main continue========"); CommonUtil.printLog(Arrays.toString(cf.get())); CommonUtil.printLog("main end"); }
/** * 异步任务回调之thenAccept,不想返回任何回调结果,一般在调用链的最后一步 * @throws Exception */ @Test public void testThenAccept() throws Exception{ CommonUtil.printLog("main start"); CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务结束"); return "123"; }).thenAccept(result->{ CommonUtil.printLog("result:"+result); }); CommonUtil.printLog("=======main continue========"); CommonUtil.dowork(4000); CommonUtil.printLog("main end"); }
/** * 异步任务回调之thenRun,异步任务完成后,只想得到通知,不使用上一步的结果 * @throws Exception */ @Test public void testThenRun() throws Exception{ CommonUtil.printLog("main start"); CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务结束"); return "123"; }).thenRun(()->{ CommonUtil.printLog("任务执行完成"); }); CommonUtil.printLog("=======main continue========"); CommonUtil.dowork(4000); CommonUtil.printLog("main end"); } |
3.4.异步任务回调优化
异步任务回调优化,带有xxxAsync,例如thenApplyAsync,thenAcceptAsync等这表示在单独的线程中执行回调任务,这样可以提高效率
备注:如果supplyAsync是立即返回结果,非耗时操作,不会开启线程池来执行,直接用主线程执行
/** * 异步任务回调优化,带有xxxAsync,例如thenApplyAsync,thenAcceptAsync等 * 这表示在单独的线程中执行回调任务 * 备注:如果supplyAsync是立即返回结果,非耗时操作,不会开启线程池来执行,直接用主线程执行 * @throws Exception */ @Test public void testThenApplyAsync() throws Exception{ ExecutorService executorService = Executors.newFixedThreadPool(5); CommonUtil.printLog("main start"); CompletableFuture<String[]> cf = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务结束"); return "123"; }).thenApplyAsync(result -> { CommonUtil.printLog("result:"+result); String[] split = result.split(""); return split; },executorService); CommonUtil.printLog("=======main continue========"); CommonUtil.printLog(Arrays.toString(cf.get())); CommonUtil.printLog("main end"); } |
3.5.异步任务编排
编排2个异步任务的方式
thenCompose:连接2个有依赖关系的任务,结果由第2个任务处理
thenCombine:连接2个没有依赖关系的任务,合并2个任务的结果
/** * 异步任务编排2个之thenCompose,目的是连接2个有依赖关系的任务,结果由第2个任务处理 * @throws Exception */ @Test public void testThenCompose() throws Exception{ CommonUtil.printLog("main start"); CompletableFuture<String[]> cf = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务结束"); return "123"; }).thenCompose(result -> CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("result:" + result); String[] split = result.split(""); return split; })); CommonUtil.printLog("=======main continue========"); CommonUtil.printLog(Arrays.toString(cf.get())); CommonUtil.printLog("main end"); }
/** * 异步任务编排2个之thenCombine,连接2个没有依赖关系的任务,合并2个任务的结果 * @throws Exception */ @Test public void testThenCombine() throws Exception{ CommonUtil.printLog("main start"); // 任务1 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务1开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务1结束"); return "a"; }); // 任务2 CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务2开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务2结束"); return "b"; }); // 合并2个任务的结果并处理 CompletableFuture<String> cf3 = cf1.thenCombine(cf2, (result1, result2) -> { CommonUtil.printLog("result1:" + result1); CommonUtil.printLog("result2:" + result2); return result1 + result2; }); CommonUtil.printLog("=======main continue========"); // 获取最终结果 CommonUtil.printLog(cf3.get()); CommonUtil.printLog("main end");
} |
编排多个异步任务的方式
Allof:合并多个异步任务,所有任务完成可以做进一步操作
Anyof:合并多个异步任务,任一执行任务执行完就返回结果
/** * 异步任务编排多个之allof,合并多个异步任务,所有任务完成可以做进一步操作 * @throws Exception */ @Test public void testAllof() throws Exception{ CommonUtil.printLog("main start"); // 任务1 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务1开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务1结束"); return "a"; }); // 任务2 CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务2开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务2结束"); return "b"; }); // 任务3 CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务3开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务3结束"); return "c"; }); // 合并多个任务的结果 CompletableFuture<String> cf = CompletableFuture.allOf(cf1, cf2, cf3).thenApply(v -> { // 这里使用join处理结果,get和join的区别在于,get需要自己处理异常,join的是运行时异常不需要自己处理,更适合流式编程 String result = cf1.join() + cf2.join() + cf3.join(); CommonUtil.printLog("result:" + result); return result; }); CommonUtil.printLog("=======main continue========"); // 获取最终结果 CommonUtil.printLog(cf.get()); CommonUtil.printLog("main end");
}
/** * 异步任务编排多个之anyof,合并多个异步任务,任一执行任务执行完就返回结果 * @throws Exception */ @Test public void testAnyof() throws Exception{ CommonUtil.printLog("main start"); // 任务1 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务1开始"); CommonUtil.dowork(2000); CommonUtil.printLog("任务1结束"); return "任务1"; }); // 任务2 CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务2开始"); CommonUtil.dowork(1000); CommonUtil.printLog("任务2结束"); return "任务2"; }); // 任务3 CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> { CommonUtil.printLog("任务3开始"); CommonUtil.dowork(3000); CommonUtil.printLog("任务3结束"); return "任务3"; }); CompletableFuture<Object> cf = CompletableFuture.anyOf(cf1, cf2, cf3).thenApply(result -> { CommonUtil.printLog("result:" + result); return result; }); CommonUtil.printLog("=======main continue========"); CommonUtil.printLog("获取结果:"+cf.join()); CommonUtil.printLog("main end");
} |
3.6.异步任务异常处理
异常处理有2种方式
Exceptionally:当回调链一旦出现异常,都不会再向下执行,直接转入异常处理
Handle:无论是否发生异常都会执行,用于恢复调用链中的一次异常,恢复后继续传递
/** * 异步任务异常处理之exceptionally,当回调链一旦出现异常,都不会再向下执行,直接转入异常处理 * @throws Exception */ @Test public void testExceptionally() throws Exception{ CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { // int i = 1 / 0; return "result1"; }).thenApply(res -> { // 模拟空指针异常 String str = null; int i = str.length(); return res + " result2"; }).thenApply(res -> { return res + " result3"; }).exceptionally(ex -> { CommonUtil.printLog("出现异常:" + ex.getMessage()); return "unkonwn"; }); CommonUtil.printLog("获取结果:"+cf.join());
}
/** * 异步任务异常处理之handle,无论是否发生异常都会执行,用于恢复调用链中的一次异常,恢复后继续传递 * @throws Exception */ @Test public void testHandle() throws Exception{ CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { int i = 1 / 0; return "result1"; }).handle((res,ex)->{ // 使用handle处理异常,根据内部if else分支处理逻辑 // 如果没有异常,ex为空,传递正常结果,反之 if(!Objects.isNull(ex)){ CommonUtil.printLog("出现异常:" + ex.getMessage()); return "unknow1"; } return res; }).thenApply(res -> { // 模拟空指针异常 String str = null; int i = str.length(); return res + " result2"; }).handle((res,ex)->{ // 使用handle处理异常,根据内部if else分支处理逻辑 // 如果没有异常,ex为空,传递正常结果,反之 if(!Objects.isNull(ex)){ CommonUtil.printLog("出现异常:" + ex.getMessage()); return "unknow2"; } return res; }).thenApply(res -> { return res + " result3"; }); CommonUtil.printLog("获取结果:"+cf.join()); } |