Java 编程问题:十一、并发-深入探索1

简介: Java 编程问题:十一、并发-深入探索

本章包括涉及 Java 并发的 13 个问题,涉及 Fork/Join 框架、CompletableFutureReentrantLockReentrantReadWriteLockStampedLock、原子变量、任务取消、可中断方法、线程局部、死锁等方面。对于任何开发人员来说,并发性都是必需的主题之一,在工作面试中不能被忽视。这就是为什么这一章和最后一章如此重要。读完本章,您将对并发性有相当的了解,这是每个 Java 开发人员都需要的。

问题

使用以下问题来测试您的并发编程能力。我强烈建议您在使用解决方案和下载示例程序之前,先尝试一下每个问题:

  1. 可中断方法:编写一个程序,举例说明处理可中断方法的最佳方法。
  2. Fork/Join 框架:编写一个依赖 Fork/Join 框架对列表元素求和的程序。编写一个依赖 Fork/Join 框架的程序来计算给定位置的斐波那契数(例如,F(12) = 144)。另外,编写一个程序来举例说明CountedCompleter的用法。
  3. Fork/Join 和compareAndSetForkJoinTaskTag():编写一个程序,将 Fork/Join 框架应用到一组相互依存的任务,只需执行一次(例如任务 D 依赖于任务 C任务 B,但任务 C 依赖于任务 B 也一样,因此任务 B 只能执行一次,不能执行两次。
  4. CompletableFuture:通过CompletableFuture写几个代码片段来举例说明异步代码。
  5. 组合多个CompletableFuture对象:写几段代码举例说明组合多个CompletableFuture对象的不同解决方案。
  6. 优化忙等待:写一个概念证明来举例说明通过onSpinWait()优化忙等待技术。
  7. 任务取消:写一个概念证明,举例说明如何使用volatile变量来保存进程的取消状态。
  8. ThreadLocal:写一个概念证明,举例说明ThreadLocal的用法。
  9. 原子变量:使用多线程应用(Runnable编写一个从 1 到 1000000 的整数计数程序。
  10. ReentrantLock:编写一个程序,使用ReentrantLock将整数从 1 递增到 1000000。
  11. ReentrantReadWriteLock:通过ReentrantReadWriteLock编写模拟读写过程编排的程序。
  12. StampedLock:通过StampedLock编写模拟读写过程编排的程序。
  13. 死锁(哲学家就餐):编写一个程序,揭示并解决著名餐饮哲学家问题中可能出现的死锁(循环等待致命拥抱)。

以下各节介绍上述问题的解决方案。记住,通常没有一个正确的方法来解决一个特定的问题。另外,请记住,这里显示的解释仅包括解决问题所需的最有趣和最重要的细节。下载示例解决方案以查看更多详细信息,并在这个页面中试用程序。

213 可中断方法

所谓可中断方法,是指可以抛出InterruptedException的阻塞方法,例如Thread.sleep()BlockingQueue.take()BlockingQueue.poll(long timeout, TimeUnit unit)等。阻塞线程通常处于阻塞等待定时等待状态,如果被中断,则该方法尝试尽快抛出InterruptedException

因为InterruptedException是一个检查过的异常,所以我们必须捕获它和/或抛出它。换句话说,如果我们的方法调用了抛出InterruptedException的方法,那么我们必须准备好处理这个异常。如果我们可以抛出它(将异常传播给调用方),那么它就不再是我们的工作了。打电话的人必须进一步处理。所以,当我们必须抓住它的时候,让我们把注意力集中在这个案子上。当我们的代码在Runnable内运行时,就会出现这种情况,因为它不能抛出异常。

让我们从一个简单的例子开始。试图通过poll(long timeout, TimeUnit unit)BlockingQueue获取元素可以写为:

try {
  queue.poll(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
  ...
  logger.info(() -> "Thread is interrupted? "
    + Thread.currentThread().isInterrupted());
}

尝试轮询队列中的元素可能会导致InterruptedException。有一个 3000 毫秒的窗口可以中断线程。在中断的情况下(例如,Thread.interrupt()),我们可能会认为调用catch块中的Thread.currentThread().isInterrupted()将返回true。毕竟,我们处在一个InterruptedException catch街区,所以相信这一点是有道理的。实际上,它会返回false,答案在poll(long timeout, TimeUnit unit)方法的源代码中,如下所示:

1: public E poll(long timeout, TimeUnit unit) 
       throws InterruptedException {
2:   E e = xfer(null, false, TIMED, unit.toNanos(timeout));
3:   if (e != null || !Thread.interrupted())
4:     return e;
5:   throw new InterruptedException();
6: }

更准确地说,答案在第 3 行。如果线程被中断,那么Thread.interrupted()将返回true,并将导致第 5 行(throw new InterruptedException()。但是除了测试之外,如果当前线程被中断,Thread.interrupted()清除线程的中断状态。请查看以下连续调用中断线程:

Thread.currentThread().isInterrupted(); // true
Thread.interrupted() // true
Thread.currentThread().isInterrupted(); // false
Thread.interrupted() // false

注意,Thread.currentThread().isInterrupted()测试这个线程是否被中断,而不影响中断状态。

现在,让我们回到我们的案子。所以,我们知道线程在捕捉到InterruptedException后就中断了,但是中断状态被Thread.interrupted()清除了。这也意味着我们代码的调用者不会意识到中断。

我们有责任成为好公民,通过调用interrupt()方法恢复中断。这样,我们代码的调用者就可以看到发出了中断,并相应地采取行动。正确的代码如下:

try {
  queue.poll(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
  ...
  Thread.currentThread().interrupt(); // restore interrupt
}

根据经验,在捕捉到InterruptedException之后,不要忘记通过调用Thread.currentThread().interrupt()来恢复中断。

让我们来解决一个突出显示忘记恢复中断的问题。假设一个Runnable只要当前线程没有中断就可以运行(例如,while (!Thread.currentThread().isInterrupted()) { ... }

在每次迭代中,如果当前线程中断状态为false,那么我们尝试从BlockingQueue中获取一个元素。

实现代码如下:

Thread thread = new Thread(() -> {
  // some dummy queue
  TransferQueue<String> queue = new LinkedTransferQueue<>();
  while (!Thread.currentThread().isInterrupted()) {
    try {
      logger.info(() -> "For 3 seconds the thread " 
        + Thread.currentThread().getName() 
        + " will try to poll an element from queue ...");
      queue.poll(3000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ex) {
      logger.severe(() -> "InterruptedException! The thread "
        + Thread.currentThread().getName() + " was interrupted!");
      Thread.currentThread().interrupt();
    }
  }
  logger.info(() -> "The execution was stopped!");
});

作为调用者(另一个线程),我们启动上面的线程,睡眠 1.5 秒,只是给这个线程时间进入poll()方法,然后我们中断它。如下代码所示:

thread.start();
Thread.sleep(1500);
thread.interrupt();

这将导致InterruptedException

记录异常并恢复中断。

下一步,while计算Thread.currentThread().isInterrupted()false并退出。

因此,输出如下:

[18:02:43] [INFO] For 3 seconds the thread Thread-0
                  will try to poll an element from queue ...
[18:02:44] [SEVERE] InterruptedException!
                    The thread Thread-0 was interrupted!
[18:02:45] [INFO] The execution was stopped!

现在,让我们对恢复中断的行进行注释:

...
} catch (InterruptedException ex) {
  logger.severe(() -> "InterruptedException! The thread " 
    + Thread.currentThread().getName() + " was interrupted!");
  // notice that the below line is commented
  // Thread.currentThread().interrupt();
}
...

这一次,while块将永远运行,因为它的保护条件总是被求值为true

代码不能作用于中断,因此输出如下:

[18:05:47] [INFO] For 3 seconds the thread Thread-0
                  will try to poll an element from queue ...
[18:05:48] [SEVERE] InterruptedException!
                    The thread Thread-0 was interrupted!
[18:05:48] [INFO] For 3 seconds the thread Thread-0
                  will try to poll an element from queue ...
...

根据经验,当我们可以接受中断(而不是恢复中断)时,唯一可以接受的情况是我们可以控制整个调用栈(例如,extend Thread)。

否则,捕获的InterruptedException也应该包含Thread.currentThread().interrupt()

214 Fork/Join 框架

我们已经在“工作线程池”一节中介绍了 Fork/Join 框架。

Fork/Join 框架主要用于处理一个大任务(通常,通过大,我们可以理解大量的数据)并递归地将其拆分为可以并行执行的小任务(子任务)。最后,在完成所有子任务后,它们的结果将合并(合并)为一个结果。

下图是 Fork/Join 流的可视化表示:

在 API 方面,可以通过java.util.concurrent.ForkJoinPool创建叉/连接。

JDK8 之前,推荐的方法依赖于public static变量,如下所示:

public static ForkJoinPool forkJoinPool = new ForkJoinPool();

从 JDK8 开始,我们可以按如下方式进行:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

这两种方法都避免了在单个 JVM 上有太多池线程这一令人不快的情况,这是由创建它们自己的池的并行操作造成的。

对于自定义的ForkJoinPool,依赖于此类的构造器。JDK9 添加了迄今为止最全面的一个(详细信息见文档)。

AForkJoinPool对象操作任务。ForkJoinPool中执行的任务的基本类型为ForkJoinTask。更确切地说,执行以下任务:

  • RecursiveAction对于void任务
  • RecursiveTask对于返回值的任务
  • CountedCompleter对于需要记住挂起任务计数的任务

这三种类型的任务都有一个名为compute()的抽象方法,在这个方法中任务的逻辑是成形的。

ForkJoinPool提交任务可以通过以下方式完成:

  • execute()submit()
  • invoke()派生任务并等待结果
  • invokeAll()用于分叉一堆任务(例如,集合)
  • fork()用于安排在池中异步执行此任务,join()用于在完成时返回计算结果

让我们从一个通过RecursiveTask解决的问题开始。

通过RecursiveTask计算总和

为了演示框架的分叉行为,我们假设我们有一个数字列表,并且我们要计算这些数字的总和。为此,我们使用createSubtasks()方法递归地拆分(派生)这个列表,只要它大于指定的THRESHOLD。每个任务都被添加到List中。最后通过invokeAll(Collection tasks)方式将该列表提交给ForkJoinPool。这是使用以下代码完成的:

public class SumRecursiveTask extends RecursiveTask<Integer> {
  private static final Logger logger 
    = Logger.getLogger(SumRecursiveTask.class.getName());
  private static final int THRESHOLD = 10;
  private final List<Integer> worklist;
  public SumRecursiveTask(List<Integer> worklist) {
    this.worklist = worklist;
  }
  @Override
  protected Integer compute() {
    if (worklist.size() <= THRESHOLD) {
      return partialSum(worklist);
    }
    return ForkJoinTask.invokeAll(createSubtasks())
      .stream()
      .mapToInt(ForkJoinTask::join)
      .sum();
  }
  private List<SumRecursiveTask> createSubtasks() {
    List<SumRecursiveTask> subtasks = new ArrayList<>();
    int size = worklist.size();
    List<Integer> worklistLeft 
      = worklist.subList(0, (size + 1) / 2);
    List<Integer> worklistRight 
      = worklist.subList((size + 1) / 2, size);
    subtasks.add(new SumRecursiveTask(worklistLeft));
    subtasks.add(new SumRecursiveTask(worklistRight));
    return subtasks;
  }
  private Integer partialSum(List<Integer> worklist) {
    int sum = worklist.stream()
      .mapToInt(e -> e)
      .sum();
    logger.info(() -> "Partial sum: " + worklist + " = "
      + sum + "\tThread: " + Thread.currentThread().getName());
    return sum;
  }
}

为了测试它,我们需要一个列表和ForkJoinPool如下:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Random rnd = new Random();
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 200; i++) {
  list.add(1 + rnd.nextInt(10));
}
SumRecursiveTask sumRecursiveTask = new SumRecursiveTask(list);
Integer sumAll = forkJoinPool.invoke(sumRecursiveTask);
logger.info(() -> "Final sum: " + sumAll);

可能的输出如下:

...
[15:17:06] Partial sum: [1, 3, 6, 6, 2, 5, 9] = 32
ForkJoinPool.commonPool-worker-9
...
[15:17:06] Partial sum: [1, 9, 9, 8, 9, 5] = 41
ForkJoinPool.commonPool-worker-7
[15:17:06] Final sum: 1084

用递归运算计算斐波那契函数

斐波那契数通常表示为F(n),是一个遵循以下公式的序列:

F(0) = 0, 
F(1) = 1, 
..., 
F(n) = F(n-1) + F(n-2), n > 1

斐波那契数的快照是:

0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, ...

通过RecursiveAction实现斐波那契数可以如下完成:

public class FibonacciRecursiveAction extends RecursiveAction {
  private static final Logger logger =
    Logger.getLogger(FibonacciRecursiveAction.class.getName());
  private static final long THRESHOLD = 5;
  private long nr;
  public FibonacciRecursiveAction(long nr) {
    this.nr = nr;
  }
  @Override
  protected void compute() {
    final long n = nr;
    if (n <= THRESHOLD) {
      nr = fibonacci(n);
    } else {
      nr = ForkJoinTask.invokeAll(createSubtasks(n))
        .stream()
        .mapToLong(x -> x.fibonacciNumber())
        .sum();
    }
  }
  private List<FibonacciRecursiveAction> createSubtasks(long n) {
    List<FibonacciRecursiveAction> subtasks = new ArrayList<>();
    FibonacciRecursiveAction fibonacciMinusOne
      = new FibonacciRecursiveAction(n - 1);
    FibonacciRecursiveAction fibonacciMinusTwo
      = new FibonacciRecursiveAction(n - 2);
    subtasks.add(fibonacciMinusOne);
    subtasks.add(fibonacciMinusTwo);
    return subtasks;
  }
  private long fibonacci(long n) {
    logger.info(() -> "Number: " + n 
      + " Thread: " + Thread.currentThread().getName());
    if (n <= 1) {
      return n;
    }
    return fibonacci(n - 1) + fibonacci(n - 2);
  }
  public long fibonacciNumber() {
    return nr;
  }
}

为了测试它,我们需要以下ForkJoinPool对象:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
FibonacciRecursiveAction fibonacciRecursiveAction
  = new FibonacciRecursiveAction(12);
forkJoinPool.invoke(fibonacciRecursiveAction);
logger.info(() -> "Fibonacci: "
  + fibonacciRecursiveAction.fibonacciNumber());

F(12)的输出如下:

[15:40:46] Number: 5 Thread: ForkJoinPool.commonPool-worker-3
[15:40:46] Number: 5 Thread: ForkJoinPool.commonPool-worker-13
[15:40:46] Number: 4 Thread: ForkJoinPool.commonPool-worker-3
[15:40:46] Number: 4 Thread: ForkJoinPool.commonPool-worker-9
...
[15:40:49] Number: 0 Thread: ForkJoinPool.commonPool-worker-7
[15:40:49] Fibonacci: 144

使用CountedCompleter

CountedCompleter是 JDK8 中增加的ForkJoinTask类型。

CountedCompleter的任务是记住挂起的任务计数(不能少,不能多)。我们可以通过setPendingCount()设置挂起计数,也可以通过addToPendingCount(int delta)用显式的delta递增。通常,我们在分叉之前调用这些方法(例如,如果我们分叉两次,则根据具体情况调用addToPendingCount(2)setPendingCount(2))。

compute()方法中,我们通过tryComplete()propagateCompletion()减少挂起计数。当调用挂起计数为零的tryComplete()方法或调用无条件complete()方法时,调用onCompletion()方法。propagateCompletion()方法与tryComplete()类似,但不调用onCompletion()

CountedCompleter可以选择返回计算值。为此,我们必须重写getRawResult()方法来返回一个值。

下面的代码通过CountedCompleter对列表的所有值进行汇总:

public class SumCountedCompleter extends CountedCompleter<Long> {
  private static final Logger logger 
    = Logger.getLogger(SumCountedCompleter.class.getName());
  private static final int THRESHOLD = 10;
  private static final LongAdder sumAll = new LongAdder();
  private final List<Integer> worklist;
  public SumCountedCompleter(
    CountedCompleter<Long> c, List<Integer> worklist) {
    super(c);
    this.worklist = worklist;
  }
  @Override
  public void compute() {
    if (worklist.size() <= THRESHOLD) {
      partialSum(worklist);
    } else {
      int size = worklist.size();
      List<Integer> worklistLeft 
        = worklist.subList(0, (size + 1) / 2);
      List<Integer> worklistRight 
        = worklist.subList((size + 1) / 2, size);
      addToPendingCount(2);
      SumCountedCompleter leftTask
        = new SumCountedCompleter(this, worklistLeft);
      SumCountedCompleter rightTask
        = new SumCountedCompleter(this, worklistRight);
      leftTask.fork();
      rightTask.fork();
    }
    tryComplete();
  }
  @Override
  public void onCompletion(CountedCompleter<?> caller) {
    logger.info(() -> "Thread complete: " 
      + Thread.currentThread().getName());
  }
  @Override
  public Long getRawResult() {
    return sumAll.sum();
  }
  private Integer partialSum(List<Integer> worklist) {
    int sum = worklist.stream()
      .mapToInt(e -> e)
      .sum();
    sumAll.add(sum);
    logger.info(() -> "Partial sum: " + worklist + " = "
      + sum + "\tThread: " + Thread.currentThread().getName());
    return sum;
  }
}

现在,让我们看看一个潜在的调用和输出:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Random rnd = new Random();
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 200; i++) {
  list.add(1 + rnd.nextInt(10));
}
SumCountedCompleter sumCountedCompleter
  = new SumCountedCompleter(null, list);
forkJoinPool.invoke(sumCountedCompleter);
logger.info(() -> "Done! Result: "
  + sumCountedCompleter.getRawResult());

输出如下:

[11:11:07] Partial sum: [7, 7, 8, 5, 6, 10] = 43
  ForkJoinPool.commonPool-worker-7
[11:11:07] Partial sum: [9, 1, 1, 6, 1, 2] = 20
  ForkJoinPool.commonPool-worker-3
...
[11:11:07] Thread complete: ForkJoinPool.commonPool-worker-15
[11:11:07] Done! Result: 1159

215 Fork/Join 框架和compareAndSetForkJoinTaskTag()

现在,我们已经熟悉了 Fork/Join 框架,让我们看看另一个问题。这次让我们假设我们有一组相互依赖的对象。下图可以被视为一个用例:

以下是前面图表的说明:

  • TaskD有三个依赖项:TaskATaskBTaskC
  • TaskC有两个依赖项:TaskATaskB
  • TaskB有一个依赖关系:TaskA
  • TaskA没有依赖关系。

在代码行中,我们将对其进行如下塑造:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
Task taskA = new Task("Task-A", new Adder(1));
Task taskB = new Task("Task-B", new Adder(2), taskA);
Task taskC = new Task("Task-C", new Adder(3), taskA, taskB);
Task taskD = new Task("Task-D", new Adder(4), taskA, taskB, taskC);
forkJoinPool.invoke(taskD);

Adder是一个简单的Callable,每个任务只能执行一次(因此,对于TaskDTaskCTaskBTaskA执行一次)。Adder由以下代码启动:

private static class Adder implements Callable {
  private static final AtomicInteger result = new AtomicInteger();
  private Integer nr;
  public Adder(Integer nr) {
    this.nr = nr;
  }
  @Override
  public Integer call() {
    logger.info(() -> "Adding number: " + nr
      + " by thread:" + Thread.currentThread().getName());
    return result.addAndGet(nr);
  }
}

我们已经知道如何将 Fork/Join 框架用于具有非循环和/或不可重复(或者我们不关心它们是否重复)完成依赖关系的任务。但是如果我们用这种方式实现,那么每个任务都会多次调用Callable。例如,TaskA作为其他三个任务的依赖项出现,因此Callable将被调用三次。我们只想要一次。

JDK8 中添加的一个非常方便的特性ForkJoinPool是用short值进行原子标记:

  • short getForkJoinTaskTag():返回该任务的标签。
  • short setForkJoinTaskTag(short newValue):自动设置此任务的标记值,并返回旧值。
  • boolean compareAndSetForkJoinTaskTag(short expect, short update):如果当前值等于expect并且更改为update,则返回true

换句话说,compareAndSetForkJoinTaskTag()允许我们将任务标记为VISITED。一旦标记为VISITED,则不执行。让我们在以下代码行中看到它:

public class Task<Integer> extends RecursiveTask<Integer> {
  private static final Logger logger 
    = Logger.getLogger(Task.class.getName());
  private static final short UNVISITED = 0;
  private static final short VISITED = 1;
  private Set<Task<Integer>> dependencies = new HashSet<>();
  private final String name;
  private final Callable<Integer> callable;
  public Task(String name, Callable<Integer> callable,
      Task<Integer> ...dependencies) {
    this.name = name;
    this.callable = callable;
    this.dependencies = Set.of(dependencies);
  }
  @Override
  protected Integer compute() {
    dependencies.stream()
      .filter((task) -> (task.updateTaskAsVisited()))
      .forEachOrdered((task) -> {
        logger.info(() -> "Tagged: " + task + "("
          + task.getForkJoinTaskTag() + ")");
        task.fork();
      });
    for (Task task: dependencies) {
      task.join();
    }
    try {
      return callable.call();
    } catch (Exception ex) {
      logger.severe(() -> "Exception: " + ex);
    }
    return null;
  }
  public boolean updateTaskAsVisited() {
    return compareAndSetForkJoinTaskTag(UNVISITED, VISITED);
  }
  @Override
  public String toString() {
    return name + " | dependencies=" + dependencies + "}";
  }
}

可能的输出如下:

[10:30:53] [INFO] Tagged: Task-B(1)
[10:30:53] [INFO] Tagged: Task-C(1)
[10:30:53] [INFO] Tagged: Task-A(1)
[10:30:53] [INFO] Adding number: 1 
                   by thread:ForkJoinPool.commonPool-worker-3
[10:30:53] [INFO] Adding number: 2 
                   by thread:ForkJoinPool.commonPool-worker-3
[10:30:53] [INFO] Adding number: 3 
                   by thread:ForkJoinPool.commonPool-worker-5
[10:30:53] [INFO] Adding number: 4 
                   by thread:main
[10:30:53] [INFO] Result: 10

216 CompletableFuture

JDK8 通过用CompletableFuture增强Future,在异步编程领域迈出了重要的一步。Future的主要限制是:

  • 它不能显式地完成。
  • 它不支持对结果执行操作的回调。
  • 它们不能链接或组合以获得复杂的异步管道。
  • 它不提供异常处理。

CompletableFuture没有这些限制。一个简单但无用的CompletableFuture可以写如下:

CompletableFuture<Integer> completableFuture 
  = new CompletableFuture<>();

通过阻断get()方法可以得到结果:

completableFuture.get();

除此之外,让我们看几个在电子商务平台上下文中运行异步任务的示例。我们将这些示例添加到名为CustomerAsyncs的助手类中。

运行异步任务并返回void

用户问题:打印某个客户订单

因为打印是一个不需要返回结果的过程,所以这是一个针对runAsync()的作业。此方法可以异步运行任务,并且不返回结果。换句话说,它接受一个Runnable对象并返回CompletableFuture,如下代码所示:

public static void printOrder() {
  CompletableFuture<Void> cfPrintOrder 
      = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
      logger.info(() -> "Order is printed by: "
        + Thread.currentThread().getName());
      Thread.sleep(500);
    }
  });
  cfPrintOrder.get(); // block until the order is printed
  logger.info("Customer order was printed ...\n");
}

或者,我们可以用 Lambda 来写:

public static void printOrder() {
  CompletableFuture<Void> cfPrintOrder 
      = CompletableFuture.runAsync(() -> {
    logger.info(() -> "Order is printed by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
  });
  cfPrintOrder.get(); // block until the order is printed
  logger.info("Customer order was printed ...\n");
}

运行异步任务并返回结果

用户问题:获取某客户的订单汇总

这一次,异步任务必须返回一个结果,因此runAsync()没有用处。这是supplyAsync()的工作。取Supplier返回CompletableFutureT是通过get()方法从该供应器处获得的结果类型。在代码行中,我们可以如下解决此问题:

public static void fetchOrderSummary() {
  CompletableFuture<String> cfOrderSummary 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch order summary by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Order Summary #93443";
  });
  // wait for summary to be available, this is blocking
  String summary = cfOrderSummary.get();
  logger.info(() -> "Order summary: " + summary + "\n");
}

运行异步任务并通过显式线程池返回结果

用户问题:取某客户的订单摘要

默认情况下,与前面的示例一样,异步任务在从全局ForkJoinPool.commonPool()获取的线程中执行。通过简单地记录Thread.currentThread().getName(),我们可以看到ForkJoinPool.commonPool-worker-3

但是我们也可以使用显式的Executor自定义线程池。所有能够运行异步任务的CompletableFuture方法都提供了一种采用Executor的风格。

下面是使用单线程池的示例:

public static void fetchOrderSummaryExecutor() {
  ExecutorService executor = Executors.newSingleThreadExecutor();
  CompletableFuture<String> cfOrderSummary 
      = CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Fetch order summary by: "
      + Thread.currentThread().getName());
    Thread.sleep(500);
    return "Order Summary #91022";
  }, executor);
  // wait for summary to be available, this is blocking
  String summary = cfOrderSummary.get();
  logger.info(() -> "Order summary: " + summary + "\n");
  executor.shutdownNow();
}

Java 编程问题:十一、并发-深入探索2https://developer.aliyun.com/article/1426166

相关文章
|
4天前
|
数据采集 安全 Java
Java并发编程学习12-任务取消(上)
【5月更文挑战第6天】本篇介绍了取消策略、线程中断、中断策略 和 响应中断的内容
30 4
Java并发编程学习12-任务取消(上)
|
1天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【5月更文挑战第18天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将了解线程池的基本概念,应用场景,以及如何优化线程池的性能。通过实例分析,我们将看到线程池如何提高系统性能,减少资源消耗,并提高系统的响应速度。
11 5
|
1天前
|
消息中间件 安全 Java
理解Java中的多线程编程
【5月更文挑战第18天】本文介绍了Java中的多线程编程,包括线程和多线程的基本概念。Java通过继承Thread类或实现Runnable接口来创建线程,此外还支持使用线程池(如ExecutorService和Executors)进行更高效的管理。多线程编程需要注意线程安全、性能优化和线程间通信,以避免数据竞争、死锁等问题,并确保程序高效运行。
|
1天前
|
安全 Java 容器
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第18天】随着多核处理器的普及,并发编程变得越来越重要。Java提供了丰富的并发编程工具,如synchronized关键字、显式锁Lock、原子类、并发容器等。本文将深入探讨Java并发编程的核心概念,包括线程安全、死锁、资源竞争等,并分享一些性能优化的技巧。
|
1天前
|
安全 Java 开发者
Java中的多线程编程:理解与实践
【5月更文挑战第18天】在现代软件开发中,多线程编程是提高程序性能和响应速度的重要手段。Java作为一种广泛使用的编程语言,其内置的多线程支持使得开发者能够轻松地实现并行处理。本文将深入探讨Java多线程的基本概念、实现方式以及常见的并发问题,并通过实例代码演示如何高效地使用多线程技术。通过阅读本文,读者将对Java多线程编程有一个全面的认识,并能够在实际开发中灵活运用。
|
1天前
|
算法 Java 程序员
Java中的线程同步与并发控制
【5月更文挑战第18天】随着计算机技术的不断发展,多核处理器的普及使得多线程编程成为提高程序性能的关键。在Java中,线程是实现并发的一种重要手段。然而,线程的并发执行可能导致数据不一致、死锁等问题。本文将深入探讨Java中线程同步的方法和技巧,以及如何避免常见的并发问题,从而提高程序的性能和稳定性。
|
1天前
|
安全 Java 容器
Java一分钟之-并发编程:并发容器(ConcurrentHashMap, CopyOnWriteArrayList)
【5月更文挑战第18天】本文探讨了Java并发编程中的`ConcurrentHashMap`和`CopyOnWriteArrayList`,两者为多线程数据共享提供高效、线程安全的解决方案。`ConcurrentHashMap`采用分段锁策略,而`CopyOnWriteArrayList`适合读多写少的场景。注意,`ConcurrentHashMap`的`forEach`需避免手动同步,且并发修改时可能导致`ConcurrentModificationException`。`CopyOnWriteArrayList`在写操作时会复制数组。理解和正确使用这些特性是优化并发性能的关键。
7 1
|
1天前
|
安全 Java 容器
Java一分钟之-高级集合框架:并发集合(Collections.synchronizedXXX)
【5月更文挑战第18天】Java集合框架的`Collections.synchronizedXXX`方法可将普通集合转为线程安全,但使用时需注意常见问题和易错点。错误的同步范围(仅同步单个操作而非迭代)可能导致并发修改异常;错误地同步整个集合类可能引起死锁;并发遍历和修改集合需使用`Iterator`避免`ConcurrentModificationException`。示例代码展示了正确使用同步集合的方法。在复杂并发场景下,推荐使用`java.util.concurrent`包中的并发集合以提高性能。
9 3
|
1天前
|
Java 编译器
Java并发编程中的锁优化策略
【5月更文挑战第18天】在Java并发编程中,锁是一种常用的同步机制,用于保护共享资源的访问。然而,不当的锁使用可能导致性能问题和死锁风险。本文将探讨Java中锁的优化策略,包括锁粗化、锁消除、锁分离和读写锁等技术,以提高并发程序的性能和可靠性。
|
2天前
|
Java 编译器
Java 并发编程中的锁优化策略
【5月更文挑战第17天】在 Java 并发编程中,锁是一种常见的同步机制,用于保护共享资源的访问。然而,不当使用锁可能导致性能问题和死锁风险。本文将探讨 Java 中的锁优化策略,包括锁粗化、锁消除、锁降级以及读写锁等技术,以提高并发程序的性能和可靠性。