Java 编程问题:十一、并发-深入探索2

简介: Java 编程问题:十一、并发-深入探索

Java 编程问题:十一、并发-深入探索1https://developer.aliyun.com/article/1426165

附加处理异步任务结果并返回结果的回调

用户问题:取某客户的订单发票,然后计算总金额并签字

依赖阻塞get()对此类问题不是很有用。我们需要的是一个回调方法,当CompletableFuture的结果可用时,该方法将被自动调用。

所以,我们不想等待结果。当发票准备就绪时(这是CompletableFuture的结果),回调方法应该计算总值,然后,另一个回调应该对其签名。这可以通过thenApply()方法实现。

thenApply()方法可用于CompletableFuture结果到达时的处理和转换。它以Function为参数。让我们在工作中看看:

public static void fetchInvoiceTotalSign() {
  CompletableFuture<String> cfFetchInvoice 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch invoice by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Invoice #3344";
  });
  CompletableFuture<String> cfTotalSign = cfFetchInvoice
    .thenApply(o -> o + " Total: $145")
    .thenApply(o -> o + " Signed");
  String result = cfTotalSign.get();
  logger.info(() -> "Invoice: " + result + "\n");
}

或者,我们可以将其链接如下:

public static void fetchInvoiceTotalSign() {
  CompletableFuture<String> cfTotalSign 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch invoice by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Invoice #3344";
  }).thenApply(o -> o + " Total: $145")
    .thenApply(o -> o + " Signed");
  String result = cfTotalSign.get();
  logger.info(() -> "Invoice: " + result + "\n");
}

同时检查applyToEither()applyToEitherAsync()。当这个或另一个给定的阶段以正常方式完成时,这两个方法将返回一个新的完成阶段,并将结果作为提供函数的参数执行。

附加处理异步任务结果并返回void的回调

用户问题:取某客户订单打印

通常,不返回结果的回调充当异步管道的终端操作。

这种行为可以通过thenAccept()方法获得。取Consumer返回CompletableFuture。此方法可以对CompletableFuture的结果进行处理和转换,但不返回结果。因此,它可以接受一个订单,它是CompletableFuture的结果,并按下面的代码片段打印出来:

public static void fetchAndPrintOrder() {
  CompletableFuture<String> cfFetchOrder 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch order by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Order #1024";
  });
  CompletableFuture<Void> cfPrintOrder = cfFetchOrder.thenAccept(
    o -> logger.info(() -> "Printing order " + o +
      " by: " + Thread.currentThread().getName()));
  cfPrintOrder.get();
  logger.info("Order was fetched and printed \n");
}

或者,它可以更紧凑,如下所示:

public static void fetchAndPrintOrder() {
  CompletableFuture<Void> cfFetchAndPrintOrder 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch order by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Order #1024";
  }).thenAccept(
      o -> logger.info(() -> "Printing order " + o + " by: "
        + Thread.currentThread().getName()));
  cfFetchAndPrintOrder.get();
  logger.info("Order was fetched and printed \n");
}

同时检查acceptEither()acceptEitherAsync()

附加在异步任务之后运行并返回void的回调

用户问题:下订单通知客户

通知客户应在交付订单后完成。这只是一条亲爱的客户,您的订单已经在今天送达之类的短信,所以通知任务不需要知道任何关于订单的信息。这类任务可以通过thenRun()来完成。此方法取Runnable,返回CompletableFuture。让我们在工作中看看:

public static void deliverOrderNotifyCustomer() {
  CompletableFuture<Void> cfDeliverOrder 
      = CompletableFuture.runAsync(() -> {
    logger.info(() -> "Order was delivered by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
  });
  CompletableFuture<Void> cfNotifyCustomer 
      = cfDeliverOrder.thenRun(() -> logger.info(
        () -> "Dear customer, your order has been delivered today by:"
          + Thread.currentThread().getName()));
  cfNotifyCustomer.get();
  logger.info(() -> "Order was delivered 
                       and customer was notified \n");
}

为了进一步的并行化,thenApply()thenAccept()thenRun()伴随着thenApplyAsync()thenAcceptAsync()thenRunAsync()。其中每一个都可以依赖于全局ForkJoinPool.commonPool()或自定义线程池(Executor。当thenApply/Accept/Run()在与之前执行的CompletableFuture任务相同的线程中执行时(或在主线程中),可以在不同的线程中执行thenApplyAsync/AcceptAsync/RunAsync()(来自ForkJoinPool.commonPool()或自定义线程池(Executor)。

通过exceptionally()处理异步任务的异常

用户问题:计算订单总数。如果出了问题,就抛出IllegalStateException

以下屏幕截图举例说明了异常是如何在异步管道中传播的;在某个点发生异常时,不会执行矩形中的代码:

以下截图显示了thenApply()thenAccept()中的异常:

因此,在supplyAsync()中,如果发生异常,则不会调用以下回调。此外,Future将得到解决,但这一异常除外。相同的规则适用于每个回调。如果第一个thenApply()出现异常,则不调用以下thenApply()thenAccept()

如果我们试图计算订单总数的结果是一个IllegalStateException,那么我们可以依赖exceptionally()回调,这给了我们一个恢复的机会。此方法接受一个Function,并返回一个CompletionStage,因此返回一个CompletableFuture。让我们在工作中看看:

public static void fetchOrderTotalException() {
  CompletableFuture<Integer> cfTotalOrder 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Compute total: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Invoice service is not responding");
    }
    return 1000;
  }).exceptionally(ex -> {
    logger.severe(() -> "Exception: " + ex
      + " Thread: " + Thread.currentThread().getName());
    return 0;
  });
  int result = cfTotalOrder.get();
  logger.info(() -> "Total: " + result + "\n");
}

异常情况下,输出如下:

Compute total: ForkJoinPool.commonPool-worker-3
Exception: java.lang.IllegalStateException: Invoice service
           is not responding Thread: ForkJoinPool.commonPool-worker-3
Total: 0

让我们看看另一个问题。

用户问题:取发票,计算合计,签字。如有问题,则抛出IllegalStateException,停止处理

如果我们用supplyAsync()取发票,用thenApply()计算合计,用另一个thenApply()签字,那么我们可以认为正确的实现如下:

public static void fetchInvoiceTotalSignChainOfException()
throws InterruptedException, ExecutionException {
  CompletableFuture<String> cfFetchInvoice 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch invoice by: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Invoice service is not responding");
    }
    return "Invoice #3344";
  }).exceptionally(ex -> {
    logger.severe(() -> "Exception: " + ex
      + " Thread: " + Thread.currentThread().getName());
    return "[Invoice-Exception]";
  }).thenApply(o -> {
      logger.info(() -> "Compute total by: "
        + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Total service is not responding");
    }
    return o + " Total: $145";
  }).exceptionally(ex -> {
    logger.severe(() -> "Exception: " + ex
      + " Thread: " + Thread.currentThread().getName());
    return "[Total-Exception]";
  }).thenApply(o -> {
    logger.info(() -> "Sign invoice by: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Signing service is not responding");
    }
    return o + " Signed";
  }).exceptionally(ex -> {
    logger.severe(() -> "Exception: " + ex
      + " Thread: " + Thread.currentThread().getName());
    return "[Sign-Exception]";
  });
  String result = cfFetchInvoice.get();
  logger.info(() -> "Result: " + result + "\n");
}

好吧,这里的问题是,我们可能面临如下输出:

[INFO] Fetch invoice by: ForkJoinPool.commonPool-worker-3
[SEVERE] Exception: java.lang.IllegalStateException: Invoice service
         is not responding Thread: ForkJoinPool.commonPool-worker-3
[INFO] Compute total by: ForkJoinPool.commonPool-worker-3
[INFO] Sign invoice by: ForkJoinPool.commonPool-worker-3
[SEVERE] Exception: java.lang.IllegalStateException: Signing service
         is not responding Thread: ForkJoinPool.commonPool-worker-3
[INFO] Result: [Sign-Exception]

即使发票拿不到,我们也会继续计算总数并签字。显然,这没有道理。如果无法提取发票,或者无法计算总额,则我们希望中止该过程。当我们可以恢复并继续时,这个实现可能是一个很好的选择,但它绝对不适合我们的场景。对于我们的场景,需要以下实现:

public static void fetchInvoiceTotalSignException()
throws InterruptedException, ExecutionException {
  CompletableFuture<String> cfFetchInvoice 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch invoice by: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Invoice service is not responding");
    }
    return "Invoice #3344";
  }).thenApply(o -> {
      logger.info(() -> "Compute total by: "
        + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Total service is not responding");
    }
    return o + " Total: $145";
  }).thenApply(o -> {
      logger.info(() -> "Sign invoice by: "
        + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Signing service is not responding");
    }
    return o + " Signed";
  }).exceptionally(ex -> {
    logger.severe(() -> "Exception: " + ex
      + " Thread: " + Thread.currentThread().getName());
    return "[No-Invoice-Exception]";
  });
  String result = cfFetchInvoice.get();
  logger.info(() -> "Result: " + result + "\n");
}

这一次,在任何隐含的CompletableFuture中发生的异常将停止该过程。以下是可能的输出:

[INFO ] Fetch invoice by: ForkJoinPool.commonPool-worker-3
[SEVERE] Exception: java.lang.IllegalStateException: Invoice service
         is not responding Thread: ForkJoinPool.commonPool-worker-3
[INFO ] Result: [No-Invoice-Exception]

从 JDK12 开始,异常情况可以通过exceptionallyAsync()进一步并行化,它可以使用与引起异常的代码相同的线程或给定线程池(Executor中的线程)。举个例子:

public static void fetchOrderTotalExceptionAsync() {
  ExecutorService executor = Executors.newSingleThreadExecutor();
  CompletableFuture<Integer> totalOrder 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Compute total by: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Computing service is not responding");
    }
    return 1000;
  }).exceptionallyAsync(ex -> {
    logger.severe(() -> "Exception: " + ex 
      + " Thread: " + Thread.currentThread().getName());
    return 0;
  }, executor);
  int result = totalOrder.get();
  logger.info(() -> "Total: " + result + "\n");
  executor.shutdownNow();
}

输出显示导致异常的代码是由名为ForkJoinPool.commonPool-worker-3的线程执行的,而异常代码是由给定线程池中名为pool-1-thread-1的线程执行的:

Compute total by: ForkJoinPool.commonPool-worker-3
Exception: java.lang.IllegalStateException: Computing service is
           not responding Thread: pool-1-thread-1
Total: 0

JDK12 exceptionallyCompose()

用户问题:通过打印服务获取打印机 IP 或回退到备份打印机 IP。或者,一般来说,当**这个阶段异常完成时,应该使用应用于这个阶段异常的所提供函数的结果来合成。

我们有CompletableFuture获取打印服务管理的打印机的 IP。如果服务没有响应,则抛出如下异常:

CompletableFuture<String> cfServicePrinterIp 
    = CompletableFuture.supplyAsync(() -> {
  int surrogate = new Random().nextInt(1000);
  if (surrogate < 500) {
    throw new IllegalStateException(
      "Printing service is not responding");
  }
  return "192.168.1.0";
});

我们还有获取备份打印机 IP 的CompletableFuture

CompletableFuture<String> cfBackupPrinterIp 
    = CompletableFuture.supplyAsync(() -> {
  return "192.192.192.192";
});

现在,如果没有打印服务,那么我们应该依靠备份打印机。这可以通过 JDK12exceptionallyCompose()实现,如下所示:

CompletableFuture<Void> printInvoice 
    = cfServicePrinterIp.exceptionallyCompose(th -> {
  logger.severe(() -> "Exception: " + th
    + " Thread: " + Thread.currentThread().getName());
  return cfBackupPrinterIp;
}).thenAccept((ip) -> logger.info(() -> "Printing at: " + ip));

调用printInvoice.get()可能会显示以下结果之一:

  • 如果打印服务可用:
[INFO] Printing at: 192.168.1.0
  • 如果打印服务不可用:
[SEVERE] Exception: java.util.concurrent.CompletionException ...
[INFO] Printing at: 192.192.192.192

对于进一步的并行化,我们可以依赖于exceptionallyComposeAsync()

通过handle()处理异步任务的异常

用户问题:计算订单总数。如果出现问题,则抛出一个IllegalStateException

有时我们希望执行一个异常代码块,即使没有发生异常。类似于try-catch块的finally子句。这可以使用handle()回调。无论是否发生异常,都会调用此方法,它类似于一个catch+finally。它使用一个函数来计算返回的CompletionStage, BiFunction的值并返回CompletionStageU是函数的返回类型)。

让我们在工作中看看:

public static void fetchOrderTotalHandle() {
  CompletableFuture<Integer> totalOrder 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Compute total by: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Computing service is not responding");
    }
    return 1000;
  }).handle((res, ex) -> {
    if (ex != null) {
      logger.severe(() -> "Exception: " + ex
        + " Thread: " + Thread.currentThread().getName());
      return 0;
    }
    if (res != null) {
      int vat = res * 24 / 100;
      res += vat;
    }
    return res;
  });
  int result = totalOrder.get();
  logger.info(() -> "Total: " + result + "\n");
}

注意,res将是null;否则,如果发生异常,ex将是null

如果我们需要在异常下完成,那么我们可以通过completeExceptionally()继续,如下例所示:

CompletableFuture<Integer> cf = new CompletableFuture<>();
...
cf.completeExceptionally(new RuntimeException("Ops!"));
...
cf.get(); // ExecutionException : RuntimeException

取消执行并抛出CancellationException可以通过cancel()方法完成:

CompletableFuture<Integer> cf = new CompletableFuture<>();
...
// is not important if the argument is set to true or false
cf.cancel(true/false);
...
cf.get(); // CancellationException

显式完成CompletableFuture

CompletableFuture可以使用complete(T value)completeAsync(Supplier supplier)completeAsync(Supplier supplier, Executor executor)显式完成。Tget()返回的值。这里是一个创建CompletableFuture并立即返回它的方法。另一个线程负责执行一些税务计算并用相应的结果完成CompletableFuture

public static CompletableFuture<Integer> taxes() {
  CompletableFuture<Integer> completableFuture 
    = new CompletableFuture<>();
  new Thread(() -> {
    int result = new Random().nextInt(100);
    Thread.sleep(10);
    completableFuture.complete(result);
  }).start();
  return completableFuture;
}

我们称这个方法为:

logger.info("Computing taxes ...");
CompletableFuture<Integer> cfTaxes = CustomerAsyncs.taxes();
while (!cfTaxes.isDone()) {
  logger.info("Still computing ...");
}
int result = cfTaxes.get();
logger.info(() -> "Result: " + result);

可能的输出如下:

[14:09:40] [INFO ] Computing taxes ...
[14:09:40] [INFO ] Still computing ...
[14:09:40] [INFO ] Still computing ...
...
[14:09:40] [INFO ] Still computing ...
[14:09:40] [INFO ] Result: 17

如果我们已经知道了CompletableFuture的结果,那么我们可以调用completedFuture(U value),如下例所示:

CompletableFuture<String> completableFuture 
  = CompletableFuture.completedFuture("How are you?");
String result = completableFuture.get();
logger.info(() -> "Result: " + result); // Result: How are you?

同时检查whenComplete()whenCompleteAsync()的文件。

Java 编程问题:十一、并发-深入探索3https://developer.aliyun.com/article/1426167

相关文章
|
22天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
24天前
|
安全 Java UED
深入浅出Java多线程编程
【10月更文挑战第40天】在Java的世界中,多线程是提升应用性能和响应能力的关键。本文将通过浅显易懂的方式介绍Java中的多线程编程,从基础概念到高级特性,再到实际应用案例,带你一步步深入了解如何在Java中高效地使用多线程。文章不仅涵盖了理论知识,还提供了实用的代码示例,帮助你在实际开发中更好地应用多线程技术。
41 5
|
10天前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
13天前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
13天前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
7天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
7天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
24 3
|
12天前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
32 2
|
17天前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界里,异常是程序运行中不可忽视的“惊喜”。它们可能突如其来,也可能悄无声息地潜伏。掌握异常处理的艺术,意味着你能够优雅地面对程序的不完美,并确保它即使在风雨飘摇中也能继续航行。本文将引导你理解Java异常的本质,探索捕获和处理这些异常的方法,并最终学会如何利用自定义异常为你的代码增添力量。
|
20天前
|
安全 Java 编译器
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将通过浅显易懂的语言和生动的比喻,带你了解Java异常处理的基本概念、分类以及如何优雅地处理它们。我们将一起探索try-catch-finally的结构,深入理解异常类层次结构,并通过实际案例学习如何创建自定义异常。最后,文章将介绍一些最佳实践,帮助你编写出既安全又高效的异常处理代码。准备好,让我们一起走进Java异常处理的奇妙世界!
60 11