Java 编程问题:十一、并发-深入探索1https://developer.aliyun.com/article/1426165
附加处理异步任务结果并返回结果的回调
用户问题:取某客户的订单发票,然后计算总金额并签字。
依赖阻塞get()
对此类问题不是很有用。我们需要的是一个回调方法,当CompletableFuture
的结果可用时,该方法将被自动调用。
所以,我们不想等待结果。当发票准备就绪时(这是CompletableFuture
的结果),回调方法应该计算总值,然后,另一个回调应该对其签名。这可以通过thenApply()
方法实现。
thenApply()
方法可用于CompletableFuture
结果到达时的处理和转换。它以Function
为参数。让我们在工作中看看:
public static void fetchInvoiceTotalSign() { CompletableFuture<String> cfFetchInvoice = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Fetch invoice by: " + Thread.currentThread().getName()); Thread.sleep(500); return "Invoice #3344"; }); CompletableFuture<String> cfTotalSign = cfFetchInvoice .thenApply(o -> o + " Total: $145") .thenApply(o -> o + " Signed"); String result = cfTotalSign.get(); logger.info(() -> "Invoice: " + result + "\n"); }
或者,我们可以将其链接如下:
public static void fetchInvoiceTotalSign() { CompletableFuture<String> cfTotalSign = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Fetch invoice by: " + Thread.currentThread().getName()); Thread.sleep(500); return "Invoice #3344"; }).thenApply(o -> o + " Total: $145") .thenApply(o -> o + " Signed"); String result = cfTotalSign.get(); logger.info(() -> "Invoice: " + result + "\n"); }
同时检查applyToEither()
和applyToEitherAsync()
。当这个或另一个给定的阶段以正常方式完成时,这两个方法将返回一个新的完成阶段,并将结果作为提供函数的参数执行。
附加处理异步任务结果并返回void
的回调
用户问题:取某客户订单打印。
通常,不返回结果的回调充当异步管道的终端操作。
这种行为可以通过thenAccept()
方法获得。取Consumer
返回CompletableFuture
。此方法可以对CompletableFuture
的结果进行处理和转换,但不返回结果。因此,它可以接受一个订单,它是CompletableFuture
的结果,并按下面的代码片段打印出来:
public static void fetchAndPrintOrder() { CompletableFuture<String> cfFetchOrder = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Fetch order by: " + Thread.currentThread().getName()); Thread.sleep(500); return "Order #1024"; }); CompletableFuture<Void> cfPrintOrder = cfFetchOrder.thenAccept( o -> logger.info(() -> "Printing order " + o + " by: " + Thread.currentThread().getName())); cfPrintOrder.get(); logger.info("Order was fetched and printed \n"); }
或者,它可以更紧凑,如下所示:
public static void fetchAndPrintOrder() { CompletableFuture<Void> cfFetchAndPrintOrder = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Fetch order by: " + Thread.currentThread().getName()); Thread.sleep(500); return "Order #1024"; }).thenAccept( o -> logger.info(() -> "Printing order " + o + " by: " + Thread.currentThread().getName())); cfFetchAndPrintOrder.get(); logger.info("Order was fetched and printed \n"); }
同时检查acceptEither()
和acceptEitherAsync()
。
附加在异步任务之后运行并返回void
的回调
用户问题:下订单通知客户。
通知客户应在交付订单后完成。这只是一条亲爱的客户,您的订单已经在今天送达之类的短信,所以通知任务不需要知道任何关于订单的信息。这类任务可以通过thenRun()
来完成。此方法取Runnable
,返回CompletableFuture
。让我们在工作中看看:
public static void deliverOrderNotifyCustomer() { CompletableFuture<Void> cfDeliverOrder = CompletableFuture.runAsync(() -> { logger.info(() -> "Order was delivered by: " + Thread.currentThread().getName()); Thread.sleep(500); }); CompletableFuture<Void> cfNotifyCustomer = cfDeliverOrder.thenRun(() -> logger.info( () -> "Dear customer, your order has been delivered today by:" + Thread.currentThread().getName())); cfNotifyCustomer.get(); logger.info(() -> "Order was delivered and customer was notified \n"); }
为了进一步的并行化,thenApply()
、thenAccept()
和thenRun()
伴随着thenApplyAsync()
、thenAcceptAsync()
和thenRunAsync()
。其中每一个都可以依赖于全局ForkJoinPool.commonPool()
或自定义线程池(Executor
。当thenApply
/Accept
/Run()
在与之前执行的CompletableFuture
任务相同的线程中执行时(或在主线程中),可以在不同的线程中执行thenApplyAsync
/AcceptAsync
/RunAsync()
(来自ForkJoinPool.commonPool()
或自定义线程池(Executor
)。
通过exceptionally()
处理异步任务的异常
用户问题:计算订单总数。如果出了问题,就抛出IllegalStateException
。
以下屏幕截图举例说明了异常是如何在异步管道中传播的;在某个点发生异常时,不会执行矩形中的代码:
以下截图显示了thenApply()
和thenAccept()
中的异常:
因此,在supplyAsync()
中,如果发生异常,则不会调用以下回调。此外,Future
将得到解决,但这一异常除外。相同的规则适用于每个回调。如果第一个thenApply()
出现异常,则不调用以下thenApply()
和thenAccept()
。
如果我们试图计算订单总数的结果是一个IllegalStateException
,那么我们可以依赖exceptionally()
回调,这给了我们一个恢复的机会。此方法接受一个Function
,并返回一个CompletionStage
,因此返回一个CompletableFuture
。让我们在工作中看看:
public static void fetchOrderTotalException() { CompletableFuture<Integer> cfTotalOrder = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Compute total: " + Thread.currentThread().getName()); int surrogate = new Random().nextInt(1000); if (surrogate < 500) { throw new IllegalStateException( "Invoice service is not responding"); } return 1000; }).exceptionally(ex -> { logger.severe(() -> "Exception: " + ex + " Thread: " + Thread.currentThread().getName()); return 0; }); int result = cfTotalOrder.get(); logger.info(() -> "Total: " + result + "\n"); }
异常情况下,输出如下:
Compute total: ForkJoinPool.commonPool-worker-3 Exception: java.lang.IllegalStateException: Invoice service is not responding Thread: ForkJoinPool.commonPool-worker-3 Total: 0
让我们看看另一个问题。
用户问题:取发票,计算合计,签字。如有问题,则抛出IllegalStateException
,停止处理。
如果我们用supplyAsync()
取发票,用thenApply()
计算合计,用另一个thenApply()
签字,那么我们可以认为正确的实现如下:
public static void fetchInvoiceTotalSignChainOfException() throws InterruptedException, ExecutionException { CompletableFuture<String> cfFetchInvoice = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Fetch invoice by: " + Thread.currentThread().getName()); int surrogate = new Random().nextInt(1000); if (surrogate < 500) { throw new IllegalStateException( "Invoice service is not responding"); } return "Invoice #3344"; }).exceptionally(ex -> { logger.severe(() -> "Exception: " + ex + " Thread: " + Thread.currentThread().getName()); return "[Invoice-Exception]"; }).thenApply(o -> { logger.info(() -> "Compute total by: " + Thread.currentThread().getName()); int surrogate = new Random().nextInt(1000); if (surrogate < 500) { throw new IllegalStateException( "Total service is not responding"); } return o + " Total: $145"; }).exceptionally(ex -> { logger.severe(() -> "Exception: " + ex + " Thread: " + Thread.currentThread().getName()); return "[Total-Exception]"; }).thenApply(o -> { logger.info(() -> "Sign invoice by: " + Thread.currentThread().getName()); int surrogate = new Random().nextInt(1000); if (surrogate < 500) { throw new IllegalStateException( "Signing service is not responding"); } return o + " Signed"; }).exceptionally(ex -> { logger.severe(() -> "Exception: " + ex + " Thread: " + Thread.currentThread().getName()); return "[Sign-Exception]"; }); String result = cfFetchInvoice.get(); logger.info(() -> "Result: " + result + "\n"); }
好吧,这里的问题是,我们可能面临如下输出:
[INFO] Fetch invoice by: ForkJoinPool.commonPool-worker-3 [SEVERE] Exception: java.lang.IllegalStateException: Invoice service is not responding Thread: ForkJoinPool.commonPool-worker-3 [INFO] Compute total by: ForkJoinPool.commonPool-worker-3 [INFO] Sign invoice by: ForkJoinPool.commonPool-worker-3 [SEVERE] Exception: java.lang.IllegalStateException: Signing service is not responding Thread: ForkJoinPool.commonPool-worker-3 [INFO] Result: [Sign-Exception]
即使发票拿不到,我们也会继续计算总数并签字。显然,这没有道理。如果无法提取发票,或者无法计算总额,则我们希望中止该过程。当我们可以恢复并继续时,这个实现可能是一个很好的选择,但它绝对不适合我们的场景。对于我们的场景,需要以下实现:
public static void fetchInvoiceTotalSignException() throws InterruptedException, ExecutionException { CompletableFuture<String> cfFetchInvoice = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Fetch invoice by: " + Thread.currentThread().getName()); int surrogate = new Random().nextInt(1000); if (surrogate < 500) { throw new IllegalStateException( "Invoice service is not responding"); } return "Invoice #3344"; }).thenApply(o -> { logger.info(() -> "Compute total by: " + Thread.currentThread().getName()); int surrogate = new Random().nextInt(1000); if (surrogate < 500) { throw new IllegalStateException( "Total service is not responding"); } return o + " Total: $145"; }).thenApply(o -> { logger.info(() -> "Sign invoice by: " + Thread.currentThread().getName()); int surrogate = new Random().nextInt(1000); if (surrogate < 500) { throw new IllegalStateException( "Signing service is not responding"); } return o + " Signed"; }).exceptionally(ex -> { logger.severe(() -> "Exception: " + ex + " Thread: " + Thread.currentThread().getName()); return "[No-Invoice-Exception]"; }); String result = cfFetchInvoice.get(); logger.info(() -> "Result: " + result + "\n"); }
这一次,在任何隐含的CompletableFuture
中发生的异常将停止该过程。以下是可能的输出:
[INFO ] Fetch invoice by: ForkJoinPool.commonPool-worker-3 [SEVERE] Exception: java.lang.IllegalStateException: Invoice service is not responding Thread: ForkJoinPool.commonPool-worker-3 [INFO ] Result: [No-Invoice-Exception]
从 JDK12 开始,异常情况可以通过exceptionallyAsync()
进一步并行化,它可以使用与引起异常的代码相同的线程或给定线程池(Executor
中的线程)。举个例子:
public static void fetchOrderTotalExceptionAsync() { ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<Integer> totalOrder = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Compute total by: " + Thread.currentThread().getName()); int surrogate = new Random().nextInt(1000); if (surrogate < 500) { throw new IllegalStateException( "Computing service is not responding"); } return 1000; }).exceptionallyAsync(ex -> { logger.severe(() -> "Exception: " + ex + " Thread: " + Thread.currentThread().getName()); return 0; }, executor); int result = totalOrder.get(); logger.info(() -> "Total: " + result + "\n"); executor.shutdownNow(); }
输出显示导致异常的代码是由名为ForkJoinPool.commonPool-worker-3
的线程执行的,而异常代码是由给定线程池中名为pool-1-thread-1
的线程执行的:
Compute total by: ForkJoinPool.commonPool-worker-3 Exception: java.lang.IllegalStateException: Computing service is not responding Thread: pool-1-thread-1 Total: 0
JDK12 exceptionallyCompose()
用户问题:通过打印服务获取打印机 IP 或回退到备份打印机 IP。或者,一般来说,当**这个阶段异常完成时,应该使用应用于这个阶段异常的所提供函数的结果来合成。
我们有CompletableFuture
获取打印服务管理的打印机的 IP。如果服务没有响应,则抛出如下异常:
CompletableFuture<String> cfServicePrinterIp = CompletableFuture.supplyAsync(() -> { int surrogate = new Random().nextInt(1000); if (surrogate < 500) { throw new IllegalStateException( "Printing service is not responding"); } return "192.168.1.0"; });
我们还有获取备份打印机 IP 的CompletableFuture
:
CompletableFuture<String> cfBackupPrinterIp = CompletableFuture.supplyAsync(() -> { return "192.192.192.192"; });
现在,如果没有打印服务,那么我们应该依靠备份打印机。这可以通过 JDK12exceptionallyCompose()
实现,如下所示:
CompletableFuture<Void> printInvoice = cfServicePrinterIp.exceptionallyCompose(th -> { logger.severe(() -> "Exception: " + th + " Thread: " + Thread.currentThread().getName()); return cfBackupPrinterIp; }).thenAccept((ip) -> logger.info(() -> "Printing at: " + ip));
调用printInvoice.get()
可能会显示以下结果之一:
- 如果打印服务可用:
[INFO] Printing at: 192.168.1.0
- 如果打印服务不可用:
[SEVERE] Exception: java.util.concurrent.CompletionException ... [INFO] Printing at: 192.192.192.192
对于进一步的并行化,我们可以依赖于exceptionallyComposeAsync()
。
通过handle()
处理异步任务的异常
用户问题:计算订单总数。如果出现问题,则抛出一个IllegalStateException
。
有时我们希望执行一个异常代码块,即使没有发生异常。类似于try
-catch
块的finally
子句。这可以使用handle()
回调。无论是否发生异常,都会调用此方法,它类似于一个catch
+finally
。它使用一个函数来计算返回的CompletionStage, BiFunction
的值并返回CompletionStage
(U
是函数的返回类型)。
让我们在工作中看看:
public static void fetchOrderTotalHandle() { CompletableFuture<Integer> totalOrder = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Compute total by: " + Thread.currentThread().getName()); int surrogate = new Random().nextInt(1000); if (surrogate < 500) { throw new IllegalStateException( "Computing service is not responding"); } return 1000; }).handle((res, ex) -> { if (ex != null) { logger.severe(() -> "Exception: " + ex + " Thread: " + Thread.currentThread().getName()); return 0; } if (res != null) { int vat = res * 24 / 100; res += vat; } return res; }); int result = totalOrder.get(); logger.info(() -> "Total: " + result + "\n"); }
注意,res
将是null
;否则,如果发生异常,ex
将是null
。
如果我们需要在异常下完成,那么我们可以通过completeExceptionally()
继续,如下例所示:
CompletableFuture<Integer> cf = new CompletableFuture<>(); ... cf.completeExceptionally(new RuntimeException("Ops!")); ... cf.get(); // ExecutionException : RuntimeException
取消执行并抛出CancellationException
可以通过cancel()
方法完成:
CompletableFuture<Integer> cf = new CompletableFuture<>(); ... // is not important if the argument is set to true or false cf.cancel(true/false); ... cf.get(); // CancellationException
显式完成CompletableFuture
CompletableFuture
可以使用complete(T value)
、completeAsync(Supplier supplier)
和completeAsync(Supplier supplier, Executor executor)
显式完成。T
是get()
返回的值。这里是一个创建CompletableFuture
并立即返回它的方法。另一个线程负责执行一些税务计算并用相应的结果完成CompletableFuture
:
public static CompletableFuture<Integer> taxes() { CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); new Thread(() -> { int result = new Random().nextInt(100); Thread.sleep(10); completableFuture.complete(result); }).start(); return completableFuture; }
我们称这个方法为:
logger.info("Computing taxes ..."); CompletableFuture<Integer> cfTaxes = CustomerAsyncs.taxes(); while (!cfTaxes.isDone()) { logger.info("Still computing ..."); } int result = cfTaxes.get(); logger.info(() -> "Result: " + result);
可能的输出如下:
[14:09:40] [INFO ] Computing taxes ... [14:09:40] [INFO ] Still computing ... [14:09:40] [INFO ] Still computing ... ... [14:09:40] [INFO ] Still computing ... [14:09:40] [INFO ] Result: 17
如果我们已经知道了CompletableFuture
的结果,那么我们可以调用completedFuture(U value)
,如下例所示:
CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("How are you?"); String result = completableFuture.get(); logger.info(() -> "Result: " + result); // Result: How are you?
同时检查whenComplete()
和whenCompleteAsync()
的文件。
Java 编程问题:十一、并发-深入探索3https://developer.aliyun.com/article/1426167