Java 编程问题:十一、并发-深入探索1

简介: Java 编程问题:十一、并发-深入探索

本章包括涉及 Java 并发的 13 个问题,涉及 Fork/Join 框架、CompletableFutureReentrantLockReentrantReadWriteLockStampedLock、原子变量、任务取消、可中断方法、线程局部、死锁等方面。对于任何开发人员来说,并发性都是必需的主题之一,在工作面试中不能被忽视。这就是为什么这一章和最后一章如此重要。读完本章,您将对并发性有相当的了解,这是每个 Java 开发人员都需要的。

问题

使用以下问题来测试您的并发编程能力。我强烈建议您在使用解决方案和下载示例程序之前,先尝试一下每个问题:

  1. 可中断方法:编写一个程序,举例说明处理可中断方法的最佳方法。
  2. Fork/Join 框架:编写一个依赖 Fork/Join 框架对列表元素求和的程序。编写一个依赖 Fork/Join 框架的程序来计算给定位置的斐波那契数(例如,F(12) = 144)。另外,编写一个程序来举例说明CountedCompleter的用法。
  3. Fork/Join 和compareAndSetForkJoinTaskTag():编写一个程序,将 Fork/Join 框架应用到一组相互依存的任务,只需执行一次(例如任务 D 依赖于任务 C任务 B,但任务 C 依赖于任务 B 也一样,因此任务 B 只能执行一次,不能执行两次。
  4. CompletableFuture:通过CompletableFuture写几个代码片段来举例说明异步代码。
  5. 组合多个CompletableFuture对象:写几段代码举例说明组合多个CompletableFuture对象的不同解决方案。
  6. 优化忙等待:写一个概念证明来举例说明通过onSpinWait()优化忙等待技术。
  7. 任务取消:写一个概念证明,举例说明如何使用volatile变量来保存进程的取消状态。
  8. ThreadLocal:写一个概念证明,举例说明ThreadLocal的用法。
  9. 原子变量:使用多线程应用(Runnable编写一个从 1 到 1000000 的整数计数程序。
  10. ReentrantLock:编写一个程序,使用ReentrantLock将整数从 1 递增到 1000000。
  11. ReentrantReadWriteLock:通过ReentrantReadWriteLock编写模拟读写过程编排的程序。
  12. StampedLock:通过StampedLock编写模拟读写过程编排的程序。
  13. 死锁(哲学家就餐):编写一个程序,揭示并解决著名餐饮哲学家问题中可能出现的死锁(循环等待致命拥抱)。

以下各节介绍上述问题的解决方案。记住,通常没有一个正确的方法来解决一个特定的问题。另外,请记住,这里显示的解释仅包括解决问题所需的最有趣和最重要的细节。下载示例解决方案以查看更多详细信息,并在这个页面中试用程序。

213 可中断方法

所谓可中断方法,是指可以抛出InterruptedException的阻塞方法,例如Thread.sleep()BlockingQueue.take()BlockingQueue.poll(long timeout, TimeUnit unit)等。阻塞线程通常处于阻塞等待定时等待状态,如果被中断,则该方法尝试尽快抛出InterruptedException

因为InterruptedException是一个检查过的异常,所以我们必须捕获它和/或抛出它。换句话说,如果我们的方法调用了抛出InterruptedException的方法,那么我们必须准备好处理这个异常。如果我们可以抛出它(将异常传播给调用方),那么它就不再是我们的工作了。打电话的人必须进一步处理。所以,当我们必须抓住它的时候,让我们把注意力集中在这个案子上。当我们的代码在Runnable内运行时,就会出现这种情况,因为它不能抛出异常。

让我们从一个简单的例子开始。试图通过poll(long timeout, TimeUnit unit)BlockingQueue获取元素可以写为:

try {
  queue.poll(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
  ...
  logger.info(() -> "Thread is interrupted? "
    + Thread.currentThread().isInterrupted());
}

尝试轮询队列中的元素可能会导致InterruptedException。有一个 3000 毫秒的窗口可以中断线程。在中断的情况下(例如,Thread.interrupt()),我们可能会认为调用catch块中的Thread.currentThread().isInterrupted()将返回true。毕竟,我们处在一个InterruptedException catch街区,所以相信这一点是有道理的。实际上,它会返回false,答案在poll(long timeout, TimeUnit unit)方法的源代码中,如下所示:

1: public E poll(long timeout, TimeUnit unit) 
       throws InterruptedException {
2:   E e = xfer(null, false, TIMED, unit.toNanos(timeout));
3:   if (e != null || !Thread.interrupted())
4:     return e;
5:   throw new InterruptedException();
6: }

更准确地说,答案在第 3 行。如果线程被中断,那么Thread.interrupted()将返回true,并将导致第 5 行(throw new InterruptedException()。但是除了测试之外,如果当前线程被中断,Thread.interrupted()清除线程的中断状态。请查看以下连续调用中断线程:

Thread.currentThread().isInterrupted(); // true
Thread.interrupted() // true
Thread.currentThread().isInterrupted(); // false
Thread.interrupted() // false

注意,Thread.currentThread().isInterrupted()测试这个线程是否被中断,而不影响中断状态。

现在,让我们回到我们的案子。所以,我们知道线程在捕捉到InterruptedException后就中断了,但是中断状态被Thread.interrupted()清除了。这也意味着我们代码的调用者不会意识到中断。

我们有责任成为好公民,通过调用interrupt()方法恢复中断。这样,我们代码的调用者就可以看到发出了中断,并相应地采取行动。正确的代码如下:

try {
  queue.poll(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
  ...
  Thread.currentThread().interrupt(); // restore interrupt
}

根据经验,在捕捉到InterruptedException之后,不要忘记通过调用Thread.currentThread().interrupt()来恢复中断。

让我们来解决一个突出显示忘记恢复中断的问题。假设一个Runnable只要当前线程没有中断就可以运行(例如,while (!Thread.currentThread().isInterrupted()) { ... }

在每次迭代中,如果当前线程中断状态为false,那么我们尝试从BlockingQueue中获取一个元素。

实现代码如下:

Thread thread = new Thread(() -> {
  // some dummy queue
  TransferQueue<String> queue = new LinkedTransferQueue<>();
  while (!Thread.currentThread().isInterrupted()) {
    try {
      logger.info(() -> "For 3 seconds the thread " 
        + Thread.currentThread().getName() 
        + " will try to poll an element from queue ...");
      queue.poll(3000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ex) {
      logger.severe(() -> "InterruptedException! The thread "
        + Thread.currentThread().getName() + " was interrupted!");
      Thread.currentThread().interrupt();
    }
  }
  logger.info(() -> "The execution was stopped!");
});

作为调用者(另一个线程),我们启动上面的线程,睡眠 1.5 秒,只是给这个线程时间进入poll()方法,然后我们中断它。如下代码所示:

thread.start();
Thread.sleep(1500);
thread.interrupt();

这将导致InterruptedException

记录异常并恢复中断。

下一步,while计算Thread.currentThread().isInterrupted()false并退出。

因此,输出如下:

[18:02:43] [INFO] For 3 seconds the thread Thread-0
                  will try to poll an element from queue ...
[18:02:44] [SEVERE] InterruptedException!
                    The thread Thread-0 was interrupted!
[18:02:45] [INFO] The execution was stopped!

现在,让我们对恢复中断的行进行注释:

...
} catch (InterruptedException ex) {
  logger.severe(() -> "InterruptedException! The thread " 
    + Thread.currentThread().getName() + " was interrupted!");
  // notice that the below line is commented
  // Thread.currentThread().interrupt();
}
...

这一次,while块将永远运行,因为它的保护条件总是被求值为true

代码不能作用于中断,因此输出如下:

[18:05:47] [INFO] For 3 seconds the thread Thread-0
                  will try to poll an element from queue ...
[18:05:48] [SEVERE] InterruptedException!
                    The thread Thread-0 was interrupted!
[18:05:48] [INFO] For 3 seconds the thread Thread-0
                  will try to poll an element from queue ...
...

根据经验,当我们可以接受中断(而不是恢复中断)时,唯一可以接受的情况是我们可以控制整个调用栈(例如,extend Thread)。

否则,捕获的InterruptedException也应该包含Thread.currentThread().interrupt()

214 Fork/Join 框架

我们已经在“工作线程池”一节中介绍了 Fork/Join 框架。

Fork/Join 框架主要用于处理一个大任务(通常,通过大,我们可以理解大量的数据)并递归地将其拆分为可以并行执行的小任务(子任务)。最后,在完成所有子任务后,它们的结果将合并(合并)为一个结果。

下图是 Fork/Join 流的可视化表示:

在 API 方面,可以通过java.util.concurrent.ForkJoinPool创建叉/连接。

JDK8 之前,推荐的方法依赖于public static变量,如下所示:

public static ForkJoinPool forkJoinPool = new ForkJoinPool();

从 JDK8 开始,我们可以按如下方式进行:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

这两种方法都避免了在单个 JVM 上有太多池线程这一令人不快的情况,这是由创建它们自己的池的并行操作造成的。

对于自定义的ForkJoinPool,依赖于此类的构造器。JDK9 添加了迄今为止最全面的一个(详细信息见文档)。

AForkJoinPool对象操作任务。ForkJoinPool中执行的任务的基本类型为ForkJoinTask。更确切地说,执行以下任务:

  • RecursiveAction对于void任务
  • RecursiveTask对于返回值的任务
  • CountedCompleter对于需要记住挂起任务计数的任务

这三种类型的任务都有一个名为compute()的抽象方法,在这个方法中任务的逻辑是成形的。

ForkJoinPool提交任务可以通过以下方式完成:

  • execute()submit()
  • invoke()派生任务并等待结果
  • invokeAll()用于分叉一堆任务(例如,集合)
  • fork()用于安排在池中异步执行此任务,join()用于在完成时返回计算结果

让我们从一个通过RecursiveTask解决的问题开始。

通过RecursiveTask计算总和

为了演示框架的分叉行为,我们假设我们有一个数字列表,并且我们要计算这些数字的总和。为此,我们使用createSubtasks()方法递归地拆分(派生)这个列表,只要它大于指定的THRESHOLD。每个任务都被添加到List中。最后通过invokeAll(Collection tasks)方式将该列表提交给ForkJoinPool。这是使用以下代码完成的:

public class SumRecursiveTask extends RecursiveTask<Integer> {
  private static final Logger logger 
    = Logger.getLogger(SumRecursiveTask.class.getName());
  private static final int THRESHOLD = 10;
  private final List<Integer> worklist;
  public SumRecursiveTask(List<Integer> worklist) {
    this.worklist = worklist;
  }
  @Override
  protected Integer compute() {
    if (worklist.size() <= THRESHOLD) {
      return partialSum(worklist);
    }
    return ForkJoinTask.invokeAll(createSubtasks())
      .stream()
      .mapToInt(ForkJoinTask::join)
      .sum();
  }
  private List<SumRecursiveTask> createSubtasks() {
    List<SumRecursiveTask> subtasks = new ArrayList<>();
    int size = worklist.size();
    List<Integer> worklistLeft 
      = worklist.subList(0, (size + 1) / 2);
    List<Integer> worklistRight 
      = worklist.subList((size + 1) / 2, size);
    subtasks.add(new SumRecursiveTask(worklistLeft));
    subtasks.add(new SumRecursiveTask(worklistRight));
    return subtasks;
  }
  private Integer partialSum(List<Integer> worklist) {
    int sum = worklist.stream()
      .mapToInt(e -> e)
      .sum();
    logger.info(() -> "Partial sum: " + worklist + " = "
      + sum + "\tThread: " + Thread.currentThread().getName());
    return sum;
  }
}

为了测试它,我们需要一个列表和ForkJoinPool如下:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Random rnd = new Random();
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 200; i++) {
  list.add(1 + rnd.nextInt(10));
}
SumRecursiveTask sumRecursiveTask = new SumRecursiveTask(list);
Integer sumAll = forkJoinPool.invoke(sumRecursiveTask);
logger.info(() -> "Final sum: " + sumAll);

可能的输出如下:

...
[15:17:06] Partial sum: [1, 3, 6, 6, 2, 5, 9] = 32
ForkJoinPool.commonPool-worker-9
...
[15:17:06] Partial sum: [1, 9, 9, 8, 9, 5] = 41
ForkJoinPool.commonPool-worker-7
[15:17:06] Final sum: 1084

用递归运算计算斐波那契函数

斐波那契数通常表示为F(n),是一个遵循以下公式的序列:

F(0) = 0, 
F(1) = 1, 
..., 
F(n) = F(n-1) + F(n-2), n > 1

斐波那契数的快照是:

0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, ...

通过RecursiveAction实现斐波那契数可以如下完成:

public class FibonacciRecursiveAction extends RecursiveAction {
  private static final Logger logger =
    Logger.getLogger(FibonacciRecursiveAction.class.getName());
  private static final long THRESHOLD = 5;
  private long nr;
  public FibonacciRecursiveAction(long nr) {
    this.nr = nr;
  }
  @Override
  protected void compute() {
    final long n = nr;
    if (n <= THRESHOLD) {
      nr = fibonacci(n);
    } else {
      nr = ForkJoinTask.invokeAll(createSubtasks(n))
        .stream()
        .mapToLong(x -> x.fibonacciNumber())
        .sum();
    }
  }
  private List<FibonacciRecursiveAction> createSubtasks(long n) {
    List<FibonacciRecursiveAction> subtasks = new ArrayList<>();
    FibonacciRecursiveAction fibonacciMinusOne
      = new FibonacciRecursiveAction(n - 1);
    FibonacciRecursiveAction fibonacciMinusTwo
      = new FibonacciRecursiveAction(n - 2);
    subtasks.add(fibonacciMinusOne);
    subtasks.add(fibonacciMinusTwo);
    return subtasks;
  }
  private long fibonacci(long n) {
    logger.info(() -> "Number: " + n 
      + " Thread: " + Thread.currentThread().getName());
    if (n <= 1) {
      return n;
    }
    return fibonacci(n - 1) + fibonacci(n - 2);
  }
  public long fibonacciNumber() {
    return nr;
  }
}

为了测试它,我们需要以下ForkJoinPool对象:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
FibonacciRecursiveAction fibonacciRecursiveAction
  = new FibonacciRecursiveAction(12);
forkJoinPool.invoke(fibonacciRecursiveAction);
logger.info(() -> "Fibonacci: "
  + fibonacciRecursiveAction.fibonacciNumber());

F(12)的输出如下:

[15:40:46] Number: 5 Thread: ForkJoinPool.commonPool-worker-3
[15:40:46] Number: 5 Thread: ForkJoinPool.commonPool-worker-13
[15:40:46] Number: 4 Thread: ForkJoinPool.commonPool-worker-3
[15:40:46] Number: 4 Thread: ForkJoinPool.commonPool-worker-9
...
[15:40:49] Number: 0 Thread: ForkJoinPool.commonPool-worker-7
[15:40:49] Fibonacci: 144

使用CountedCompleter

CountedCompleter是 JDK8 中增加的ForkJoinTask类型。

CountedCompleter的任务是记住挂起的任务计数(不能少,不能多)。我们可以通过setPendingCount()设置挂起计数,也可以通过addToPendingCount(int delta)用显式的delta递增。通常,我们在分叉之前调用这些方法(例如,如果我们分叉两次,则根据具体情况调用addToPendingCount(2)setPendingCount(2))。

compute()方法中,我们通过tryComplete()propagateCompletion()减少挂起计数。当调用挂起计数为零的tryComplete()方法或调用无条件complete()方法时,调用onCompletion()方法。propagateCompletion()方法与tryComplete()类似,但不调用onCompletion()

CountedCompleter可以选择返回计算值。为此,我们必须重写getRawResult()方法来返回一个值。

下面的代码通过CountedCompleter对列表的所有值进行汇总:

public class SumCountedCompleter extends CountedCompleter<Long> {
  private static final Logger logger 
    = Logger.getLogger(SumCountedCompleter.class.getName());
  private static final int THRESHOLD = 10;
  private static final LongAdder sumAll = new LongAdder();
  private final List<Integer> worklist;
  public SumCountedCompleter(
    CountedCompleter<Long> c, List<Integer> worklist) {
    super(c);
    this.worklist = worklist;
  }
  @Override
  public void compute() {
    if (worklist.size() <= THRESHOLD) {
      partialSum(worklist);
    } else {
      int size = worklist.size();
      List<Integer> worklistLeft 
        = worklist.subList(0, (size + 1) / 2);
      List<Integer> worklistRight 
        = worklist.subList((size + 1) / 2, size);
      addToPendingCount(2);
      SumCountedCompleter leftTask
        = new SumCountedCompleter(this, worklistLeft);
      SumCountedCompleter rightTask
        = new SumCountedCompleter(this, worklistRight);
      leftTask.fork();
      rightTask.fork();
    }
    tryComplete();
  }
  @Override
  public void onCompletion(CountedCompleter<?> caller) {
    logger.info(() -> "Thread complete: " 
      + Thread.currentThread().getName());
  }
  @Override
  public Long getRawResult() {
    return sumAll.sum();
  }
  private Integer partialSum(List<Integer> worklist) {
    int sum = worklist.stream()
      .mapToInt(e -> e)
      .sum();
    sumAll.add(sum);
    logger.info(() -> "Partial sum: " + worklist + " = "
      + sum + "\tThread: " + Thread.currentThread().getName());
    return sum;
  }
}

现在,让我们看看一个潜在的调用和输出:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Random rnd = new Random();
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 200; i++) {
  list.add(1 + rnd.nextInt(10));
}
SumCountedCompleter sumCountedCompleter
  = new SumCountedCompleter(null, list);
forkJoinPool.invoke(sumCountedCompleter);
logger.info(() -> "Done! Result: "
  + sumCountedCompleter.getRawResult());

输出如下:

[11:11:07] Partial sum: [7, 7, 8, 5, 6, 10] = 43
  ForkJoinPool.commonPool-worker-7
[11:11:07] Partial sum: [9, 1, 1, 6, 1, 2] = 20
  ForkJoinPool.commonPool-worker-3
...
[11:11:07] Thread complete: ForkJoinPool.commonPool-worker-15
[11:11:07] Done! Result: 1159

215 Fork/Join 框架和compareAndSetForkJoinTaskTag()

现在,我们已经熟悉了 Fork/Join 框架,让我们看看另一个问题。这次让我们假设我们有一组相互依赖的对象。下图可以被视为一个用例:

以下是前面图表的说明:

  • TaskD有三个依赖项:TaskATaskBTaskC
  • TaskC有两个依赖项:TaskATaskB
  • TaskB有一个依赖关系:TaskA
  • TaskA没有依赖关系。

在代码行中,我们将对其进行如下塑造:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Task taskA = new Task("Task-A", new Adder(1));
Task taskB = new Task("Task-B", new Adder(2), taskA);
Task taskC = new Task("Task-C", new Adder(3), taskA, taskB);
Task taskD = new Task("Task-D", new Adder(4), taskA, taskB, taskC);
forkJoinPool.invoke(taskD);

Adder是一个简单的Callable,每个任务只能执行一次(因此,对于TaskDTaskCTaskBTaskA执行一次)。Adder由以下代码启动:

private static class Adder implements Callable {
  private static final AtomicInteger result = new AtomicInteger();
  private Integer nr;
  public Adder(Integer nr) {
    this.nr = nr;
  }
  @Override
  public Integer call() {
    logger.info(() -> "Adding number: " + nr
      + " by thread:" + Thread.currentThread().getName());
    return result.addAndGet(nr);
  }
}

我们已经知道如何将 Fork/Join 框架用于具有非循环和/或不可重复(或者我们不关心它们是否重复)完成依赖关系的任务。但是如果我们用这种方式实现,那么每个任务都会多次调用Callable。例如,TaskA作为其他三个任务的依赖项出现,因此Callable将被调用三次。我们只想要一次。

JDK8 中添加的一个非常方便的特性ForkJoinPool是用short值进行原子标记:

  • short getForkJoinTaskTag():返回该任务的标签。
  • short setForkJoinTaskTag(short newValue):自动设置此任务的标记值,并返回旧值。
  • boolean compareAndSetForkJoinTaskTag(short expect, short update):如果当前值等于expect并且更改为update,则返回true

换句话说,compareAndSetForkJoinTaskTag()允许我们将任务标记为VISITED。一旦标记为VISITED,则不执行。让我们在以下代码行中看到它:

public class Task<Integer> extends RecursiveTask<Integer> {
  private static final Logger logger 
    = Logger.getLogger(Task.class.getName());
  private static final short UNVISITED = 0;
  private static final short VISITED = 1;
  private Set<Task<Integer>> dependencies = new HashSet<>();
  private final String name;
  private final Callable<Integer> callable;
  public Task(String name, Callable<Integer> callable,
      Task<Integer> ...dependencies) {
    this.name = name;
    this.callable = callable;
    this.dependencies = Set.of(dependencies);
  }
  @Override
  protected Integer compute() {
    dependencies.stream()
      .filter((task) -> (task.updateTaskAsVisited()))
      .forEachOrdered((task) -> {
        logger.info(() -> "Tagged: " + task + "("
          + task.getForkJoinTaskTag() + ")");
        task.fork();
      });
    for (Task task: dependencies) {
      task.join();
    }
    try {
      return callable.call();
    } catch (Exception ex) {
      logger.severe(() -> "Exception: " + ex);
    }
    return null;
  }
  public boolean updateTaskAsVisited() {
    return compareAndSetForkJoinTaskTag(UNVISITED, VISITED);
  }
  @Override
  public String toString() {
    return name + " | dependencies=" + dependencies + "}";
  }
}

可能的输出如下:

[10:30:53] [INFO] Tagged: Task-B(1)
[10:30:53] [INFO] Tagged: Task-C(1)
[10:30:53] [INFO] Tagged: Task-A(1)
[10:30:53] [INFO] Adding number: 1 
                   by thread:ForkJoinPool.commonPool-worker-3
[10:30:53] [INFO] Adding number: 2 
                   by thread:ForkJoinPool.commonPool-worker-3
[10:30:53] [INFO] Adding number: 3 
                   by thread:ForkJoinPool.commonPool-worker-5
[10:30:53] [INFO] Adding number: 4 
                   by thread:main
[10:30:53] [INFO] Result: 10

216 CompletableFuture

JDK8 通过用CompletableFuture增强Future,在异步编程领域迈出了重要的一步。Future的主要限制是:

  • 它不能显式地完成。
  • 它不支持对结果执行操作的回调。
  • 它们不能链接或组合以获得复杂的异步管道。
  • 它不提供异常处理。

CompletableFuture没有这些限制。一个简单但无用的CompletableFuture可以写如下:

CompletableFuture<Integer> completableFuture 
  = new CompletableFuture<>();

通过阻断get()方法可以得到结果:

completableFuture.get();

除此之外,让我们看几个在电子商务平台上下文中运行异步任务的示例。我们将这些示例添加到名为CustomerAsyncs的助手类中。

运行异步任务并返回void

用户问题:打印某个客户订单

因为打印是一个不需要返回结果的过程,所以这是一个针对runAsync()的作业。此方法可以异步运行任务,并且不返回结果。换句话说,它接受一个Runnable对象并返回CompletableFuture,如下代码所示:

public static void printOrder() {
  CompletableFuture<Void> cfPrintOrder 
      = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
      logger.info(() -> "Order is printed by: "
        + Thread.currentThread().getName());
      Thread.sleep(500);
    }
  });
  cfPrintOrder.get(); // block until the order is printed
  logger.info("Customer order was printed ...\n");
}

或者,我们可以用 Lambda 来写:

public static void printOrder() {
  CompletableFuture<Void> cfPrintOrder 
      = CompletableFuture.runAsync(() -> {
    logger.info(() -> "Order is printed by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
  });
  cfPrintOrder.get(); // block until the order is printed
  logger.info("Customer order was printed ...\n");
}

运行异步任务并返回结果

用户问题:获取某客户的订单汇总

这一次,异步任务必须返回一个结果,因此runAsync()没有用处。这是supplyAsync()的工作。取Supplier返回CompletableFutureT是通过get()方法从该供应器处获得的结果类型。在代码行中,我们可以如下解决此问题:

public static void fetchOrderSummary() {
  CompletableFuture<String> cfOrderSummary 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch order summary by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Order Summary #93443";
  });
  // wait for summary to be available, this is blocking
  String summary = cfOrderSummary.get();
  logger.info(() -> "Order summary: " + summary + "\n");
}

运行异步任务并通过显式线程池返回结果

用户问题:取某客户的订单摘要

默认情况下,与前面的示例一样,异步任务在从全局ForkJoinPool.commonPool()获取的线程中执行。通过简单地记录Thread.currentThread().getName(),我们可以看到ForkJoinPool.commonPool-worker-3

但是我们也可以使用显式的Executor自定义线程池。所有能够运行异步任务的CompletableFuture方法都提供了一种采用Executor的风格。

下面是使用单线程池的示例:

public static void fetchOrderSummaryExecutor() {
  ExecutorService executor = Executors.newSingleThreadExecutor();
  CompletableFuture<String> cfOrderSummary 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch order summary by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Order Summary #91022";
  }, executor);
  // wait for summary to be available, this is blocking
  String summary = cfOrderSummary.get();
  logger.info(() -> "Order summary: " + summary + "\n");
  executor.shutdownNow();
}

Java 编程问题:十一、并发-深入探索2https://developer.aliyun.com/article/1426166

相关文章
|
1月前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
18天前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
2天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
25 12
|
22天前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
22天前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
15天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
15天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
40 3
|
26天前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界里,异常是程序运行中不可忽视的“惊喜”。它们可能突如其来,也可能悄无声息地潜伏。掌握异常处理的艺术,意味着你能够优雅地面对程序的不完美,并确保它即使在风雨飘摇中也能继续航行。本文将引导你理解Java异常的本质,探索捕获和处理这些异常的方法,并最终学会如何利用自定义异常为你的代码增添力量。
|
20天前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
36 2
|
29天前
|
安全 Java 编译器
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将通过浅显易懂的语言和生动的比喻,带你了解Java异常处理的基本概念、分类以及如何优雅地处理它们。我们将一起探索try-catch-finally的结构,深入理解异常类层次结构,并通过实际案例学习如何创建自定义异常。最后,文章将介绍一些最佳实践,帮助你编写出既安全又高效的异常处理代码。准备好,让我们一起走进Java异常处理的奇妙世界!
66 11
下一篇
DataWorks