Java 编程问题:十一、并发-深入探索2

简介: Java 编程问题:十一、并发-深入探索

Java 编程问题:十一、并发-深入探索1https://developer.aliyun.com/article/1426165

附加处理异步任务结果并返回结果的回调

用户问题:取某客户的订单发票,然后计算总金额并签字

依赖阻塞get()对此类问题不是很有用。我们需要的是一个回调方法,当CompletableFuture的结果可用时,该方法将被自动调用。

所以,我们不想等待结果。当发票准备就绪时(这是CompletableFuture的结果),回调方法应该计算总值,然后,另一个回调应该对其签名。这可以通过thenApply()方法实现。

thenApply()方法可用于CompletableFuture结果到达时的处理和转换。它以Function为参数。让我们在工作中看看:

public static void fetchInvoiceTotalSign() {
  CompletableFuture<String> cfFetchInvoice 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch invoice by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Invoice #3344";
  });
  CompletableFuture<String> cfTotalSign = cfFetchInvoice
    .thenApply(o -> o + " Total: $145")
    .thenApply(o -> o + " Signed");
  String result = cfTotalSign.get();
  logger.info(() -> "Invoice: " + result + "\n");
}

或者,我们可以将其链接如下:

public static void fetchInvoiceTotalSign() {
  CompletableFuture<String> cfTotalSign 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch invoice by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Invoice #3344";
  }).thenApply(o -> o + " Total: $145")
    .thenApply(o -> o + " Signed");
  String result = cfTotalSign.get();
  logger.info(() -> "Invoice: " + result + "\n");
}

同时检查applyToEither()applyToEitherAsync()。当这个或另一个给定的阶段以正常方式完成时,这两个方法将返回一个新的完成阶段,并将结果作为提供函数的参数执行。

附加处理异步任务结果并返回void的回调

用户问题:取某客户订单打印

通常,不返回结果的回调充当异步管道的终端操作。

这种行为可以通过thenAccept()方法获得。取Consumer返回CompletableFuture。此方法可以对CompletableFuture的结果进行处理和转换,但不返回结果。因此,它可以接受一个订单,它是CompletableFuture的结果,并按下面的代码片段打印出来:

public static void fetchAndPrintOrder() {
  CompletableFuture<String> cfFetchOrder 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch order by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Order #1024";
  });
  CompletableFuture<Void> cfPrintOrder = cfFetchOrder.thenAccept(
    o -> logger.info(() -> "Printing order " + o +
      " by: " + Thread.currentThread().getName()));
  cfPrintOrder.get();
  logger.info("Order was fetched and printed \n");
}

或者,它可以更紧凑,如下所示:

public static void fetchAndPrintOrder() {
  CompletableFuture<Void> cfFetchAndPrintOrder 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch order by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Order #1024";
  }).thenAccept(
      o -> logger.info(() -> "Printing order " + o + " by: "
        + Thread.currentThread().getName()));
  cfFetchAndPrintOrder.get();
  logger.info("Order was fetched and printed \n");
}

同时检查acceptEither()acceptEitherAsync()

附加在异步任务之后运行并返回void的回调

用户问题:下订单通知客户

通知客户应在交付订单后完成。这只是一条亲爱的客户,您的订单已经在今天送达之类的短信,所以通知任务不需要知道任何关于订单的信息。这类任务可以通过thenRun()来完成。此方法取Runnable,返回CompletableFuture。让我们在工作中看看:

public static void deliverOrderNotifyCustomer() {
  CompletableFuture<Void> cfDeliverOrder 
      = CompletableFuture.runAsync(() -> {
    logger.info(() -> "Order was delivered by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
  });
  CompletableFuture<Void> cfNotifyCustomer 
      = cfDeliverOrder.thenRun(() -> logger.info(
        () -> "Dear customer, your order has been delivered today by:"
          + Thread.currentThread().getName()));
  cfNotifyCustomer.get();
  logger.info(() -> "Order was delivered 
                       and customer was notified \n");
}

为了进一步的并行化,thenApply()thenAccept()thenRun()伴随着thenApplyAsync()thenAcceptAsync()thenRunAsync()。其中每一个都可以依赖于全局ForkJoinPool.commonPool()或自定义线程池(Executor。当thenApply/Accept/Run()在与之前执行的CompletableFuture任务相同的线程中执行时(或在主线程中),可以在不同的线程中执行thenApplyAsync/AcceptAsync/RunAsync()(来自ForkJoinPool.commonPool()或自定义线程池(Executor)。

通过exceptionally()处理异步任务的异常

用户问题:计算订单总数。如果出了问题,就抛出IllegalStateException

以下屏幕截图举例说明了异常是如何在异步管道中传播的;在某个点发生异常时,不会执行矩形中的代码:

以下截图显示了thenApply()thenAccept()中的异常:

因此,在supplyAsync()中,如果发生异常,则不会调用以下回调。此外,Future将得到解决,但这一异常除外。相同的规则适用于每个回调。如果第一个thenApply()出现异常,则不调用以下thenApply()thenAccept()

如果我们试图计算订单总数的结果是一个IllegalStateException,那么我们可以依赖exceptionally()回调,这给了我们一个恢复的机会。此方法接受一个Function,并返回一个CompletionStage,因此返回一个CompletableFuture。让我们在工作中看看:

public static void fetchOrderTotalException() {
  CompletableFuture<Integer> cfTotalOrder 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Compute total: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Invoice service is not responding");
    }
    return 1000;
  }).exceptionally(ex -> {
    logger.severe(() -> "Exception: " + ex
      + " Thread: " + Thread.currentThread().getName());
    return 0;
  });
  int result = cfTotalOrder.get();
  logger.info(() -> "Total: " + result + "\n");
}

异常情况下,输出如下:

Compute total: ForkJoinPool.commonPool-worker-3
Exception: java.lang.IllegalStateException: Invoice service
           is not responding Thread: ForkJoinPool.commonPool-worker-3
Total: 0

让我们看看另一个问题。

用户问题:取发票,计算合计,签字。如有问题,则抛出IllegalStateException,停止处理

如果我们用supplyAsync()取发票,用thenApply()计算合计,用另一个thenApply()签字,那么我们可以认为正确的实现如下:

public static void fetchInvoiceTotalSignChainOfException()
throws InterruptedException, ExecutionException {
  CompletableFuture<String> cfFetchInvoice 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch invoice by: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Invoice service is not responding");
    }
    return "Invoice #3344";
  }).exceptionally(ex -> {
    logger.severe(() -> "Exception: " + ex
      + " Thread: " + Thread.currentThread().getName());
    return "[Invoice-Exception]";
  }).thenApply(o -> {
      logger.info(() -> "Compute total by: "
        + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Total service is not responding");
    }
    return o + " Total: $145";
  }).exceptionally(ex -> {
    logger.severe(() -> "Exception: " + ex
      + " Thread: " + Thread.currentThread().getName());
    return "[Total-Exception]";
  }).thenApply(o -> {
    logger.info(() -> "Sign invoice by: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Signing service is not responding");
    }
    return o + " Signed";
  }).exceptionally(ex -> {
    logger.severe(() -> "Exception: " + ex
      + " Thread: " + Thread.currentThread().getName());
    return "[Sign-Exception]";
  });
  String result = cfFetchInvoice.get();
  logger.info(() -> "Result: " + result + "\n");
}

好吧,这里的问题是,我们可能面临如下输出:

[INFO] Fetch invoice by: ForkJoinPool.commonPool-worker-3
[SEVERE] Exception: java.lang.IllegalStateException: Invoice service
         is not responding Thread: ForkJoinPool.commonPool-worker-3
[INFO] Compute total by: ForkJoinPool.commonPool-worker-3
[INFO] Sign invoice by: ForkJoinPool.commonPool-worker-3
[SEVERE] Exception: java.lang.IllegalStateException: Signing service
         is not responding Thread: ForkJoinPool.commonPool-worker-3
[INFO] Result: [Sign-Exception]

即使发票拿不到,我们也会继续计算总数并签字。显然,这没有道理。如果无法提取发票,或者无法计算总额,则我们希望中止该过程。当我们可以恢复并继续时,这个实现可能是一个很好的选择,但它绝对不适合我们的场景。对于我们的场景,需要以下实现:

public static void fetchInvoiceTotalSignException()
throws InterruptedException, ExecutionException {
  CompletableFuture<String> cfFetchInvoice 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch invoice by: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Invoice service is not responding");
    }
    return "Invoice #3344";
  }).thenApply(o -> {
      logger.info(() -> "Compute total by: "
        + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Total service is not responding");
    }
    return o + " Total: $145";
  }).thenApply(o -> {
      logger.info(() -> "Sign invoice by: "
        + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Signing service is not responding");
    }
    return o + " Signed";
  }).exceptionally(ex -> {
    logger.severe(() -> "Exception: " + ex
      + " Thread: " + Thread.currentThread().getName());
    return "[No-Invoice-Exception]";
  });
  String result = cfFetchInvoice.get();
  logger.info(() -> "Result: " + result + "\n");
}

这一次,在任何隐含的CompletableFuture中发生的异常将停止该过程。以下是可能的输出:

[INFO ] Fetch invoice by: ForkJoinPool.commonPool-worker-3
[SEVERE] Exception: java.lang.IllegalStateException: Invoice service
         is not responding Thread: ForkJoinPool.commonPool-worker-3
[INFO ] Result: [No-Invoice-Exception]

从 JDK12 开始,异常情况可以通过exceptionallyAsync()进一步并行化,它可以使用与引起异常的代码相同的线程或给定线程池(Executor中的线程)。举个例子:

public static void fetchOrderTotalExceptionAsync() {
  ExecutorService executor = Executors.newSingleThreadExecutor();
  CompletableFuture<Integer> totalOrder 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Compute total by: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Computing service is not responding");
    }
    return 1000;
  }).exceptionallyAsync(ex -> {
    logger.severe(() -> "Exception: " + ex 
      + " Thread: " + Thread.currentThread().getName());
    return 0;
  }, executor);
  int result = totalOrder.get();
  logger.info(() -> "Total: " + result + "\n");
  executor.shutdownNow();
}

输出显示导致异常的代码是由名为ForkJoinPool.commonPool-worker-3的线程执行的,而异常代码是由给定线程池中名为pool-1-thread-1的线程执行的:

Compute total by: ForkJoinPool.commonPool-worker-3
Exception: java.lang.IllegalStateException: Computing service is
           not responding Thread: pool-1-thread-1
Total: 0

JDK12 exceptionallyCompose()

用户问题:通过打印服务获取打印机 IP 或回退到备份打印机 IP。或者,一般来说,当**这个阶段异常完成时,应该使用应用于这个阶段异常的所提供函数的结果来合成。

我们有CompletableFuture获取打印服务管理的打印机的 IP。如果服务没有响应,则抛出如下异常:

CompletableFuture<String> cfServicePrinterIp 
    = CompletableFuture.supplyAsync(() -> {
  int surrogate = new Random().nextInt(1000);
  if (surrogate < 500) {
    throw new IllegalStateException(
      "Printing service is not responding");
  }
  return "192.168.1.0";
});

我们还有获取备份打印机 IP 的CompletableFuture

CompletableFuture<String> cfBackupPrinterIp 
    = CompletableFuture.supplyAsync(() -> {
  return "192.192.192.192";
});

现在,如果没有打印服务,那么我们应该依靠备份打印机。这可以通过 JDK12exceptionallyCompose()实现,如下所示:

CompletableFuture<Void> printInvoice 
    = cfServicePrinterIp.exceptionallyCompose(th -> {
  logger.severe(() -> "Exception: " + th
    + " Thread: " + Thread.currentThread().getName());
  return cfBackupPrinterIp;
}).thenAccept((ip) -> logger.info(() -> "Printing at: " + ip));

调用printInvoice.get()可能会显示以下结果之一:

  • 如果打印服务可用:
[INFO] Printing at: 192.168.1.0
  • 如果打印服务不可用:
[SEVERE] Exception: java.util.concurrent.CompletionException ...
[INFO] Printing at: 192.192.192.192

对于进一步的并行化,我们可以依赖于exceptionallyComposeAsync()

通过handle()处理异步任务的异常

用户问题:计算订单总数。如果出现问题,则抛出一个IllegalStateException

有时我们希望执行一个异常代码块,即使没有发生异常。类似于try-catch块的finally子句。这可以使用handle()回调。无论是否发生异常,都会调用此方法,它类似于一个catch+finally。它使用一个函数来计算返回的CompletionStage, BiFunction的值并返回CompletionStageU是函数的返回类型)。

让我们在工作中看看:

public static void fetchOrderTotalHandle() {
  CompletableFuture<Integer> totalOrder 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Compute total by: "
      + Thread.currentThread().getName());
    int surrogate = new Random().nextInt(1000);
    if (surrogate < 500) {
      throw new IllegalStateException(
        "Computing service is not responding");
    }
    return 1000;
  }).handle((res, ex) -> {
    if (ex != null) {
      logger.severe(() -> "Exception: " + ex
        + " Thread: " + Thread.currentThread().getName());
      return 0;
    }
    if (res != null) {
      int vat = res * 24 / 100;
      res += vat;
    }
    return res;
  });
  int result = totalOrder.get();
  logger.info(() -> "Total: " + result + "\n");
}

注意,res将是null;否则,如果发生异常,ex将是null

如果我们需要在异常下完成,那么我们可以通过completeExceptionally()继续,如下例所示:

CompletableFuture<Integer> cf = new CompletableFuture<>();
...
cf.completeExceptionally(new RuntimeException("Ops!"));
...
cf.get(); // ExecutionException : RuntimeException

取消执行并抛出CancellationException可以通过cancel()方法完成:

CompletableFuture<Integer> cf = new CompletableFuture<>();
...
// is not important if the argument is set to true or false
cf.cancel(true/false);
...
cf.get(); // CancellationException

显式完成CompletableFuture

CompletableFuture可以使用complete(T value)completeAsync(Supplier supplier)completeAsync(Supplier supplier, Executor executor)显式完成。Tget()返回的值。这里是一个创建CompletableFuture并立即返回它的方法。另一个线程负责执行一些税务计算并用相应的结果完成CompletableFuture

public static CompletableFuture<Integer> taxes() {
  CompletableFuture<Integer> completableFuture 
    = new CompletableFuture<>();
  new Thread(() -> {
    int result = new Random().nextInt(100);
    Thread.sleep(10);
    completableFuture.complete(result);
  }).start();
  return completableFuture;
}

我们称这个方法为:

logger.info("Computing taxes ...");
CompletableFuture<Integer> cfTaxes = CustomerAsyncs.taxes();
while (!cfTaxes.isDone()) {
  logger.info("Still computing ...");
}
int result = cfTaxes.get();
logger.info(() -> "Result: " + result);

可能的输出如下:

[14:09:40] [INFO ] Computing taxes ...
[14:09:40] [INFO ] Still computing ...
[14:09:40] [INFO ] Still computing ...
...
[14:09:40] [INFO ] Still computing ...
[14:09:40] [INFO ] Result: 17

如果我们已经知道了CompletableFuture的结果,那么我们可以调用completedFuture(U value),如下例所示:

CompletableFuture<String> completableFuture 
  = CompletableFuture.completedFuture("How are you?");
String result = completableFuture.get();
logger.info(() -> "Result: " + result); // Result: How are you?

同时检查whenComplete()whenCompleteAsync()的文件。

Java 编程问题:十一、并发-深入探索3https://developer.aliyun.com/article/1426167

相关文章
|
23天前
|
设计模式 安全 Java
Java编程中的单例模式:理解与实践
【10月更文挑战第31天】在Java的世界里,单例模式是一种优雅的解决方案,它确保一个类只有一个实例,并提供一个全局访问点。本文将深入探讨单例模式的实现方式、使用场景及其优缺点,同时提供代码示例以加深理解。无论你是Java新手还是有经验的开发者,掌握单例模式都将是你技能库中的宝贵财富。
33 2
|
13天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
19天前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
|
21天前
|
安全 Java 编译器
JDK 10中的局部变量类型推断:Java编程的简化与革新
JDK 10引入的局部变量类型推断通过`var`关键字简化了代码编写,提高了可读性。编译器根据初始化表达式自动推断变量类型,减少了冗长的类型声明。虽然带来了诸多优点,但也有一些限制,如只能用于局部变量声明,并需立即初始化。这一特性使Java更接近动态类型语言,增强了灵活性和易用性。
97 53
|
12天前
|
Java 开发者
Java多线程编程的艺术与实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的技术文档,本文以实战为导向,通过生动的实例和详尽的代码解析,引领读者领略多线程编程的魅力,掌握其在提升应用性能、优化资源利用方面的关键作用。无论你是Java初学者还是有一定经验的开发者,本文都将为你打开多线程编程的新视角。 ####
|
11天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
14天前
|
安全 Java 开发者
Java多线程编程中的常见问题与解决方案
本文深入探讨了Java多线程编程中常见的问题,包括线程安全问题、死锁、竞态条件等,并提供了相应的解决策略。文章首先介绍了多线程的基础知识,随后详细分析了每个问题的产生原因和典型场景,最后提出了实用的解决方案,旨在帮助开发者提高多线程程序的稳定性和性能。
|
20天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
17天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
19天前
|
存储 缓存 安全
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见。本文介绍了使用 `File.createTempFile` 方法和自定义创建临时文件的两种方式,详细探讨了它们的使用场景和注意事项,包括数据缓存、文件上传下载和日志记录等。强调了清理临时文件、确保文件名唯一性和合理设置文件权限的重要性。
42 2