本章包括涉及 Java 并发的 13 个问题,涉及 Fork/Join 框架、CompletableFuture
、ReentrantLock
、ReentrantReadWriteLock
、StampedLock
、原子变量、任务取消、可中断方法、线程局部、死锁等方面。对于任何开发人员来说,并发性都是必需的主题之一,在工作面试中不能被忽视。这就是为什么这一章和最后一章如此重要。读完本章,您将对并发性有相当的了解,这是每个 Java 开发人员都需要的。
问题
使用以下问题来测试您的并发编程能力。我强烈建议您在使用解决方案和下载示例程序之前,先尝试一下每个问题:
- 可中断方法:编写一个程序,举例说明处理可中断方法的最佳方法。
- Fork/Join 框架:编写一个依赖 Fork/Join 框架对列表元素求和的程序。编写一个依赖 Fork/Join 框架的程序来计算给定位置的斐波那契数(例如,
F(12) = 144
)。另外,编写一个程序来举例说明CountedCompleter
的用法。 - Fork/Join 和
compareAndSetForkJoinTaskTag()
:编写一个程序,将 Fork/Join 框架应用到一组相互依存的任务,只需执行一次(例如任务 D 依赖于任务 C 和任务 B,但任务 C 依赖于任务 B 也一样,因此任务 B 只能执行一次,不能执行两次。 CompletableFuture
:通过CompletableFuture
写几个代码片段来举例说明异步代码。- 组合多个
CompletableFuture
对象:写几段代码举例说明组合多个CompletableFuture
对象的不同解决方案。 - 优化忙等待:写一个概念证明来举例说明通过
onSpinWait()
优化忙等待技术。 - 任务取消:写一个概念证明,举例说明如何使用
volatile
变量来保存进程的取消状态。 ThreadLocal
:写一个概念证明,举例说明ThreadLocal
的用法。- 原子变量:使用多线程应用(
Runnable
编写一个从 1 到 1000000 的整数计数程序。 ReentrantLock
:编写一个程序,使用ReentrantLock
将整数从 1 递增到 1000000。ReentrantReadWriteLock
:通过ReentrantReadWriteLock
编写模拟读写过程编排的程序。StampedLock
:通过StampedLock
编写模拟读写过程编排的程序。- 死锁(哲学家就餐):编写一个程序,揭示并解决著名餐饮哲学家问题中可能出现的死锁(循环等待或致命拥抱)。
以下各节介绍上述问题的解决方案。记住,通常没有一个正确的方法来解决一个特定的问题。另外,请记住,这里显示的解释仅包括解决问题所需的最有趣和最重要的细节。下载示例解决方案以查看更多详细信息,并在这个页面中试用程序。
213 可中断方法
所谓可中断方法,是指可以抛出InterruptedException
的阻塞方法,例如Thread.sleep()
、BlockingQueue.take()
、BlockingQueue.poll(long timeout, TimeUnit unit)
等。阻塞线程通常处于阻塞、等待或定时等待状态,如果被中断,则该方法尝试尽快抛出InterruptedException
。
因为InterruptedException
是一个检查过的异常,所以我们必须捕获它和/或抛出它。换句话说,如果我们的方法调用了抛出InterruptedException
的方法,那么我们必须准备好处理这个异常。如果我们可以抛出它(将异常传播给调用方),那么它就不再是我们的工作了。打电话的人必须进一步处理。所以,当我们必须抓住它的时候,让我们把注意力集中在这个案子上。当我们的代码在Runnable
内运行时,就会出现这种情况,因为它不能抛出异常。
让我们从一个简单的例子开始。试图通过poll(long timeout, TimeUnit unit)
从BlockingQueue
获取元素可以写为:
try { queue.poll(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { ... logger.info(() -> "Thread is interrupted? " + Thread.currentThread().isInterrupted()); }
尝试轮询队列中的元素可能会导致InterruptedException
。有一个 3000 毫秒的窗口可以中断线程。在中断的情况下(例如,Thread.interrupt()
),我们可能会认为调用catch
块中的Thread.currentThread().isInterrupted()
将返回true
。毕竟,我们处在一个InterruptedException catch
街区,所以相信这一点是有道理的。实际上,它会返回false
,答案在poll(long timeout, TimeUnit unit)
方法的源代码中,如下所示:
1: public E poll(long timeout, TimeUnit unit) throws InterruptedException { 2: E e = xfer(null, false, TIMED, unit.toNanos(timeout)); 3: if (e != null || !Thread.interrupted()) 4: return e; 5: throw new InterruptedException(); 6: }
更准确地说,答案在第 3 行。如果线程被中断,那么Thread.interrupted()
将返回true
,并将导致第 5 行(throw new InterruptedException()
。但是除了测试之外,如果当前线程被中断,Thread.interrupted()
清除线程的中断状态。请查看以下连续调用中断线程:
Thread.currentThread().isInterrupted(); // true Thread.interrupted() // true Thread.currentThread().isInterrupted(); // false Thread.interrupted() // false
注意,Thread.currentThread().isInterrupted()
测试这个线程是否被中断,而不影响中断状态。
现在,让我们回到我们的案子。所以,我们知道线程在捕捉到InterruptedException
后就中断了,但是中断状态被Thread.interrupted()
清除了。这也意味着我们代码的调用者不会意识到中断。
我们有责任成为好公民,通过调用interrupt()
方法恢复中断。这样,我们代码的调用者就可以看到发出了中断,并相应地采取行动。正确的代码如下:
try { queue.poll(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { ... Thread.currentThread().interrupt(); // restore interrupt }
根据经验,在捕捉到InterruptedException
之后,不要忘记通过调用Thread.currentThread().interrupt()
来恢复中断。
让我们来解决一个突出显示忘记恢复中断的问题。假设一个Runnable
只要当前线程没有中断就可以运行(例如,while (!Thread.currentThread().isInterrupted()) { ... }
。
在每次迭代中,如果当前线程中断状态为false
,那么我们尝试从BlockingQueue
中获取一个元素。
实现代码如下:
Thread thread = new Thread(() -> { // some dummy queue TransferQueue<String> queue = new LinkedTransferQueue<>(); while (!Thread.currentThread().isInterrupted()) { try { logger.info(() -> "For 3 seconds the thread " + Thread.currentThread().getName() + " will try to poll an element from queue ..."); queue.poll(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { logger.severe(() -> "InterruptedException! The thread " + Thread.currentThread().getName() + " was interrupted!"); Thread.currentThread().interrupt(); } } logger.info(() -> "The execution was stopped!"); });
作为调用者(另一个线程),我们启动上面的线程,睡眠 1.5 秒,只是给这个线程时间进入poll()
方法,然后我们中断它。如下代码所示:
thread.start(); Thread.sleep(1500); thread.interrupt();
这将导致InterruptedException
。
记录异常并恢复中断。
下一步,while
计算Thread.currentThread().isInterrupted()
到false
并退出。
因此,输出如下:
[18:02:43] [INFO] For 3 seconds the thread Thread-0 will try to poll an element from queue ... [18:02:44] [SEVERE] InterruptedException! The thread Thread-0 was interrupted! [18:02:45] [INFO] The execution was stopped!
现在,让我们对恢复中断的行进行注释:
... } catch (InterruptedException ex) { logger.severe(() -> "InterruptedException! The thread " + Thread.currentThread().getName() + " was interrupted!"); // notice that the below line is commented // Thread.currentThread().interrupt(); } ...
这一次,while
块将永远运行,因为它的保护条件总是被求值为true
。
代码不能作用于中断,因此输出如下:
[18:05:47] [INFO] For 3 seconds the thread Thread-0 will try to poll an element from queue ... [18:05:48] [SEVERE] InterruptedException! The thread Thread-0 was interrupted! [18:05:48] [INFO] For 3 seconds the thread Thread-0 will try to poll an element from queue ... ...
根据经验,当我们可以接受中断(而不是恢复中断)时,唯一可以接受的情况是我们可以控制整个调用栈(例如,extend Thread
)。
否则,捕获的InterruptedException
也应该包含Thread.currentThread().interrupt()
。
214 Fork/Join 框架
我们已经在“工作线程池”一节中介绍了 Fork/Join 框架。
Fork/Join 框架主要用于处理一个大任务(通常,通过大,我们可以理解大量的数据)并递归地将其拆分为可以并行执行的小任务(子任务)。最后,在完成所有子任务后,它们的结果将合并(合并)为一个结果。
下图是 Fork/Join 流的可视化表示:
在 API 方面,可以通过java.util.concurrent.ForkJoinPool
创建叉/连接。
JDK8 之前,推荐的方法依赖于public static
变量,如下所示:
public static ForkJoinPool forkJoinPool = new ForkJoinPool();
从 JDK8 开始,我们可以按如下方式进行:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
这两种方法都避免了在单个 JVM 上有太多池线程这一令人不快的情况,这是由创建它们自己的池的并行操作造成的。
对于自定义的ForkJoinPool
,依赖于此类的构造器。JDK9 添加了迄今为止最全面的一个(详细信息见文档)。
AForkJoinPool
对象操作任务。ForkJoinPool
中执行的任务的基本类型为ForkJoinTask
。更确切地说,执行以下任务:
RecursiveAction
对于void
任务RecursiveTask
对于返回值的任务CountedCompleter
对于需要记住挂起任务计数的任务
这三种类型的任务都有一个名为compute()
的抽象方法,在这个方法中任务的逻辑是成形的。
向ForkJoinPool
提交任务可以通过以下方式完成:
execute()
和submit()
invoke()
派生任务并等待结果invokeAll()
用于分叉一堆任务(例如,集合)fork()
用于安排在池中异步执行此任务,join()
用于在完成时返回计算结果
让我们从一个通过RecursiveTask
解决的问题开始。
通过RecursiveTask
计算总和
为了演示框架的分叉行为,我们假设我们有一个数字列表,并且我们要计算这些数字的总和。为此,我们使用createSubtasks()
方法递归地拆分(派生)这个列表,只要它大于指定的THRESHOLD
。每个任务都被添加到List
中。最后通过invokeAll(Collection tasks)
方式将该列表提交给ForkJoinPool
。这是使用以下代码完成的:
public class SumRecursiveTask extends RecursiveTask<Integer> { private static final Logger logger = Logger.getLogger(SumRecursiveTask.class.getName()); private static final int THRESHOLD = 10; private final List<Integer> worklist; public SumRecursiveTask(List<Integer> worklist) { this.worklist = worklist; } @Override protected Integer compute() { if (worklist.size() <= THRESHOLD) { return partialSum(worklist); } return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } private List<SumRecursiveTask> createSubtasks() { List<SumRecursiveTask> subtasks = new ArrayList<>(); int size = worklist.size(); List<Integer> worklistLeft = worklist.subList(0, (size + 1) / 2); List<Integer> worklistRight = worklist.subList((size + 1) / 2, size); subtasks.add(new SumRecursiveTask(worklistLeft)); subtasks.add(new SumRecursiveTask(worklistRight)); return subtasks; } private Integer partialSum(List<Integer> worklist) { int sum = worklist.stream() .mapToInt(e -> e) .sum(); logger.info(() -> "Partial sum: " + worklist + " = " + sum + "\tThread: " + Thread.currentThread().getName()); return sum; } }
为了测试它,我们需要一个列表和ForkJoinPool
如下:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); Random rnd = new Random(); List<Integer> list = new ArrayList<>(); for (int i = 0; i < 200; i++) { list.add(1 + rnd.nextInt(10)); } SumRecursiveTask sumRecursiveTask = new SumRecursiveTask(list); Integer sumAll = forkJoinPool.invoke(sumRecursiveTask); logger.info(() -> "Final sum: " + sumAll);
可能的输出如下:
... [15:17:06] Partial sum: [1, 3, 6, 6, 2, 5, 9] = 32 ForkJoinPool.commonPool-worker-9 ... [15:17:06] Partial sum: [1, 9, 9, 8, 9, 5] = 41 ForkJoinPool.commonPool-worker-7 [15:17:06] Final sum: 1084
用递归运算计算斐波那契函数
斐波那契数通常表示为F(n)
,是一个遵循以下公式的序列:
F(0) = 0, F(1) = 1, ..., F(n) = F(n-1) + F(n-2), n > 1
斐波那契数的快照是:
0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, ...
通过RecursiveAction
实现斐波那契数可以如下完成:
public class FibonacciRecursiveAction extends RecursiveAction { private static final Logger logger = Logger.getLogger(FibonacciRecursiveAction.class.getName()); private static final long THRESHOLD = 5; private long nr; public FibonacciRecursiveAction(long nr) { this.nr = nr; } @Override protected void compute() { final long n = nr; if (n <= THRESHOLD) { nr = fibonacci(n); } else { nr = ForkJoinTask.invokeAll(createSubtasks(n)) .stream() .mapToLong(x -> x.fibonacciNumber()) .sum(); } } private List<FibonacciRecursiveAction> createSubtasks(long n) { List<FibonacciRecursiveAction> subtasks = new ArrayList<>(); FibonacciRecursiveAction fibonacciMinusOne = new FibonacciRecursiveAction(n - 1); FibonacciRecursiveAction fibonacciMinusTwo = new FibonacciRecursiveAction(n - 2); subtasks.add(fibonacciMinusOne); subtasks.add(fibonacciMinusTwo); return subtasks; } private long fibonacci(long n) { logger.info(() -> "Number: " + n + " Thread: " + Thread.currentThread().getName()); if (n <= 1) { return n; } return fibonacci(n - 1) + fibonacci(n - 2); } public long fibonacciNumber() { return nr; } }
为了测试它,我们需要以下ForkJoinPool
对象:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); FibonacciRecursiveAction fibonacciRecursiveAction = new FibonacciRecursiveAction(12); forkJoinPool.invoke(fibonacciRecursiveAction); logger.info(() -> "Fibonacci: " + fibonacciRecursiveAction.fibonacciNumber());
F(12)
的输出如下:
[15:40:46] Number: 5 Thread: ForkJoinPool.commonPool-worker-3 [15:40:46] Number: 5 Thread: ForkJoinPool.commonPool-worker-13 [15:40:46] Number: 4 Thread: ForkJoinPool.commonPool-worker-3 [15:40:46] Number: 4 Thread: ForkJoinPool.commonPool-worker-9 ... [15:40:49] Number: 0 Thread: ForkJoinPool.commonPool-worker-7 [15:40:49] Fibonacci: 144
使用CountedCompleter
CountedCompleter
是 JDK8 中增加的ForkJoinTask
类型。
CountedCompleter
的任务是记住挂起的任务计数(不能少,不能多)。我们可以通过setPendingCount()
设置挂起计数,也可以通过addToPendingCount(int delta)
用显式的delta
递增。通常,我们在分叉之前调用这些方法(例如,如果我们分叉两次,则根据具体情况调用addToPendingCount(2)
或setPendingCount(2)
)。
在compute()
方法中,我们通过tryComplete()
或propagateCompletion()
减少挂起计数。当调用挂起计数为零的tryComplete()
方法或调用无条件complete()
方法时,调用onCompletion()
方法。propagateCompletion()
方法与tryComplete()
类似,但不调用onCompletion()
。
CountedCompleter
可以选择返回计算值。为此,我们必须重写getRawResult()
方法来返回一个值。
下面的代码通过CountedCompleter
对列表的所有值进行汇总:
public class SumCountedCompleter extends CountedCompleter<Long> { private static final Logger logger = Logger.getLogger(SumCountedCompleter.class.getName()); private static final int THRESHOLD = 10; private static final LongAdder sumAll = new LongAdder(); private final List<Integer> worklist; public SumCountedCompleter( CountedCompleter<Long> c, List<Integer> worklist) { super(c); this.worklist = worklist; } @Override public void compute() { if (worklist.size() <= THRESHOLD) { partialSum(worklist); } else { int size = worklist.size(); List<Integer> worklistLeft = worklist.subList(0, (size + 1) / 2); List<Integer> worklistRight = worklist.subList((size + 1) / 2, size); addToPendingCount(2); SumCountedCompleter leftTask = new SumCountedCompleter(this, worklistLeft); SumCountedCompleter rightTask = new SumCountedCompleter(this, worklistRight); leftTask.fork(); rightTask.fork(); } tryComplete(); } @Override public void onCompletion(CountedCompleter<?> caller) { logger.info(() -> "Thread complete: " + Thread.currentThread().getName()); } @Override public Long getRawResult() { return sumAll.sum(); } private Integer partialSum(List<Integer> worklist) { int sum = worklist.stream() .mapToInt(e -> e) .sum(); sumAll.add(sum); logger.info(() -> "Partial sum: " + worklist + " = " + sum + "\tThread: " + Thread.currentThread().getName()); return sum; } }
现在,让我们看看一个潜在的调用和输出:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); Random rnd = new Random(); List<Integer> list = new ArrayList<>(); for (int i = 0; i < 200; i++) { list.add(1 + rnd.nextInt(10)); } SumCountedCompleter sumCountedCompleter = new SumCountedCompleter(null, list); forkJoinPool.invoke(sumCountedCompleter); logger.info(() -> "Done! Result: " + sumCountedCompleter.getRawResult());
输出如下:
[11:11:07] Partial sum: [7, 7, 8, 5, 6, 10] = 43 ForkJoinPool.commonPool-worker-7 [11:11:07] Partial sum: [9, 1, 1, 6, 1, 2] = 20 ForkJoinPool.commonPool-worker-3 ... [11:11:07] Thread complete: ForkJoinPool.commonPool-worker-15 [11:11:07] Done! Result: 1159
215 Fork/Join 框架和compareAndSetForkJoinTaskTag()
现在,我们已经熟悉了 Fork/Join 框架,让我们看看另一个问题。这次让我们假设我们有一组相互依赖的对象。下图可以被视为一个用例:
以下是前面图表的说明:
TaskD
有三个依赖项:TaskA
、TaskB
、TaskC
。TaskC
有两个依赖项:TaskA
和TaskB
。TaskB
有一个依赖关系:TaskA
。TaskA
没有依赖关系。
在代码行中,我们将对其进行如下塑造:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); Task taskA = new Task("Task-A", new Adder(1)); Task taskB = new Task("Task-B", new Adder(2), taskA); Task taskC = new Task("Task-C", new Adder(3), taskA, taskB); Task taskD = new Task("Task-D", new Adder(4), taskA, taskB, taskC); forkJoinPool.invoke(taskD);
Adder
是一个简单的Callable
,每个任务只能执行一次(因此,对于TaskD
、TaskC
、TaskB
、TaskA
执行一次)。Adder
由以下代码启动:
private static class Adder implements Callable { private static final AtomicInteger result = new AtomicInteger(); private Integer nr; public Adder(Integer nr) { this.nr = nr; } @Override public Integer call() { logger.info(() -> "Adding number: " + nr + " by thread:" + Thread.currentThread().getName()); return result.addAndGet(nr); } }
我们已经知道如何将 Fork/Join 框架用于具有非循环和/或不可重复(或者我们不关心它们是否重复)完成依赖关系的任务。但是如果我们用这种方式实现,那么每个任务都会多次调用Callable
。例如,TaskA
作为其他三个任务的依赖项出现,因此Callable
将被调用三次。我们只想要一次。
JDK8 中添加的一个非常方便的特性ForkJoinPool
是用short
值进行原子标记:
short getForkJoinTaskTag()
:返回该任务的标签。short setForkJoinTaskTag(short newValue)
:自动设置此任务的标记值,并返回旧值。boolean compareAndSetForkJoinTaskTag(short expect, short update)
:如果当前值等于expect
并且更改为update
,则返回true
。
换句话说,compareAndSetForkJoinTaskTag()
允许我们将任务标记为VISITED
。一旦标记为VISITED
,则不执行。让我们在以下代码行中看到它:
public class Task<Integer> extends RecursiveTask<Integer> { private static final Logger logger = Logger.getLogger(Task.class.getName()); private static final short UNVISITED = 0; private static final short VISITED = 1; private Set<Task<Integer>> dependencies = new HashSet<>(); private final String name; private final Callable<Integer> callable; public Task(String name, Callable<Integer> callable, Task<Integer> ...dependencies) { this.name = name; this.callable = callable; this.dependencies = Set.of(dependencies); } @Override protected Integer compute() { dependencies.stream() .filter((task) -> (task.updateTaskAsVisited())) .forEachOrdered((task) -> { logger.info(() -> "Tagged: " + task + "(" + task.getForkJoinTaskTag() + ")"); task.fork(); }); for (Task task: dependencies) { task.join(); } try { return callable.call(); } catch (Exception ex) { logger.severe(() -> "Exception: " + ex); } return null; } public boolean updateTaskAsVisited() { return compareAndSetForkJoinTaskTag(UNVISITED, VISITED); } @Override public String toString() { return name + " | dependencies=" + dependencies + "}"; } }
可能的输出如下:
[10:30:53] [INFO] Tagged: Task-B(1) [10:30:53] [INFO] Tagged: Task-C(1) [10:30:53] [INFO] Tagged: Task-A(1) [10:30:53] [INFO] Adding number: 1 by thread:ForkJoinPool.commonPool-worker-3 [10:30:53] [INFO] Adding number: 2 by thread:ForkJoinPool.commonPool-worker-3 [10:30:53] [INFO] Adding number: 3 by thread:ForkJoinPool.commonPool-worker-5 [10:30:53] [INFO] Adding number: 4 by thread:main [10:30:53] [INFO] Result: 10
216 CompletableFuture
JDK8 通过用CompletableFuture
增强Future
,在异步编程领域迈出了重要的一步。Future
的主要限制是:
- 它不能显式地完成。
- 它不支持对结果执行操作的回调。
- 它们不能链接或组合以获得复杂的异步管道。
- 它不提供异常处理。
CompletableFuture
没有这些限制。一个简单但无用的CompletableFuture
可以写如下:
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
通过阻断get()
方法可以得到结果:
completableFuture.get();
除此之外,让我们看几个在电子商务平台上下文中运行异步任务的示例。我们将这些示例添加到名为CustomerAsyncs
的助手类中。
运行异步任务并返回void
用户问题:打印某个客户订单。
因为打印是一个不需要返回结果的过程,所以这是一个针对runAsync()
的作业。此方法可以异步运行任务,并且不返回结果。换句话说,它接受一个Runnable
对象并返回CompletableFuture
,如下代码所示:
public static void printOrder() { CompletableFuture<Void> cfPrintOrder = CompletableFuture.runAsync(new Runnable() { @Override public void run() { logger.info(() -> "Order is printed by: " + Thread.currentThread().getName()); Thread.sleep(500); } }); cfPrintOrder.get(); // block until the order is printed logger.info("Customer order was printed ...\n"); }
或者,我们可以用 Lambda 来写:
public static void printOrder() { CompletableFuture<Void> cfPrintOrder = CompletableFuture.runAsync(() -> { logger.info(() -> "Order is printed by: " + Thread.currentThread().getName()); Thread.sleep(500); }); cfPrintOrder.get(); // block until the order is printed logger.info("Customer order was printed ...\n"); }
运行异步任务并返回结果
用户问题:获取某客户的订单汇总。
这一次,异步任务必须返回一个结果,因此runAsync()
没有用处。这是supplyAsync()
的工作。取Supplier
返回CompletableFuture
。T
是通过get()
方法从该供应器处获得的结果类型。在代码行中,我们可以如下解决此问题:
public static void fetchOrderSummary() { CompletableFuture<String> cfOrderSummary = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Fetch order summary by: " + Thread.currentThread().getName()); Thread.sleep(500); return "Order Summary #93443"; }); // wait for summary to be available, this is blocking String summary = cfOrderSummary.get(); logger.info(() -> "Order summary: " + summary + "\n"); }
运行异步任务并通过显式线程池返回结果
用户问题:取某客户的订单摘要。
默认情况下,与前面的示例一样,异步任务在从全局ForkJoinPool.commonPool()
获取的线程中执行。通过简单地记录Thread.currentThread().getName()
,我们可以看到ForkJoinPool.commonPool-worker-3
。
但是我们也可以使用显式的Executor
自定义线程池。所有能够运行异步任务的CompletableFuture
方法都提供了一种采用Executor
的风格。
下面是使用单线程池的示例:
public static void fetchOrderSummaryExecutor() { ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<String> cfOrderSummary = CompletableFuture.supplyAsync(() -> { logger.info(() -> "Fetch order summary by: " + Thread.currentThread().getName()); Thread.sleep(500); return "Order Summary #91022"; }, executor); // wait for summary to be available, this is blocking String summary = cfOrderSummary.get(); logger.info(() -> "Order summary: " + summary + "\n"); executor.shutdownNow(); }
Java 编程问题:十一、并发-深入探索2https://developer.aliyun.com/article/1426166