Java 编程问题:十一、并发-深入探索2https://developer.aliyun.com/article/1426166
217 组合多个CompletableFuture
实例
在大多数情况下,组合CompletableFuture
实例可以使用以下方法完成:
thenCompose()
thenCombine()
allOf()
anyOf()
通过结合CompletableFuture
实例,我们可以形成复杂的异步解决方案。这样,多个CompletableFuture
实例就可以合并它们的能力来达到一个共同的目标。
通过thenCompose()
的组合
假设在名为CustomerAsyncs
的助手类中有以下两个CompletableFuture
实例:
private static CompletableFuture<String> fetchOrder(String customerId) { return CompletableFuture.supplyAsync(() -> { return "Order of " + customerId; }); } private static CompletableFuture<Integer> computeTotal(String order) { return CompletableFuture.supplyAsync(() -> { return order.length() + new Random().nextInt(1000); }); }
现在,我们要获取某个客户的订单,一旦订单可用,我们就要计算这个订单的总数。这意味着我们需要调用fetchOrder()
,然后调用computeTotal()
。我们可以通过thenApply()
实现:
CompletableFuture<CompletableFuture<Integer>> cfTotal = fetchOrder(customerId).thenApply(o -> computeTotal(o)); int total = cfTotal.get().get();
显然,这不是一个方便的解决方案,因为结果是CompletableFuture>
类型的。为了避免嵌套CompletableFuture
实例,我们可以依赖thenCompose()
如下:
CompletableFuture<Integer> cfTotal = fetchOrder(customerId).thenCompose(o -> computeTotal(o)); int total = cfTotal.get(); // e.g., Total: 734 logger.info(() -> "Total: " + total);
当我们需要从一系列的CompletableFuture
实例中获得一个平坦的结果时,我们可以使用thenCompose()
。这样我们就避免了嵌套的CompletableFuture
实例。
使用thenComposeAsync()
可以获得进一步的并行化。
通过thenCombine()
的合并
thenCompose()
用于链接两个依赖的CompletableFuture
实例,thenCombine()
用于链接两个独立的CompletableFuture
实例。当两个CompletableFuture
实例都完成时,我们可以继续。
假设我们有以下两个CompletableFuture
实例:
private static CompletableFuture<Integer> computeTotal(String order) { return CompletableFuture.supplyAsync(() -> { return order.length() + new Random().nextInt(1000); }); } private static CompletableFuture<String> packProducts(String order) { return CompletableFuture.supplyAsync(() -> { return "Order: " + order + " | Product 1, Product 2, Product 3, ... "; }); }
为了交付客户订单,我们需要计算总金额(用于发出发票),并打包订购的产品。这两个动作可以并行完成。最后,我们把包裹寄了,里面有订购的产品和发票。通过thenCombine()
实现此目的,可如下:
CompletableFuture<String> cfParcel = computeTotal(order) .thenCombine(packProducts(order), (total, products) -> { return "Parcel-[" + products + " Invoice: $" + total + "]"; }); String parcel = cfParcel.get(); // e.g. Delivering: Parcel-[Order: #332 | Product 1, Product 2, // Product 3, ... Invoice: $314] logger.info(() -> "Delivering: " + parcel);
给thenCombine()
的回调函数将在两个CompletableFuture
实例完成后调用。
如果我们只需要在两个CompletableFuture
实例正常完成时做一些事情(这个和另一个),那么我们可以依赖thenAcceptBoth()
。此方法返回一个新的CompletableFuture
,将这两个结果作为所提供操作的参数来执行。这两个结果是此阶段和另一个给定阶段(它们必须正常完成)。下面是一个示例:
CompletableFuture<Void> voidResult = CompletableFuture .supplyAsync(() -> "Pick") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " me"), (pick, me) -> System.out.println(pick + me));
如果不需要这两个CompletableFuture
实例的结果,则runAfterBoth()
更为可取。
通过allOf()
的组合
假设我们要下载以下发票列表:
List<String> invoices = Arrays.asList("#2334", "#122", "#55");
这可以看作是一堆可以并行完成的独立任务,所以我们可以使用CompletableFuture
来完成,如下所示:
public static CompletableFuture<String> downloadInvoices(String invoice) { return CompletableFuture.supplyAsync(() -> { logger.info(() -> "Downloading invoice: " + invoice); return "Downloaded invoice: " + invoice; }); } CompletableFuture<String> [] cfInvoices = invoices.stream() .map(CustomerAsyncs::downloadInvoices) .toArray(CompletableFuture[]::new);
此时,我们有一个CompletableFuture
实例数组,因此,还有一个异步计算数组。此外,我们希望并行运行它们。这可以通过allOf(CompletableFuture... cfs)
方法实现。结果由一个CompletableFuture
组成,如下所示:
CompletableFuture<Void> cfDownloaded = CompletableFuture.allOf(cfInvoices); cfDownloaded.get();
显然,allOf()
的结果不是很有用。我们能用CompletableFuture
做什么?在这个并行化过程中,当我们需要每次计算的结果时,肯定会遇到很多问题,因此我们需要一个获取结果的解决方案,而不是依赖于CompletableFuture
。
我们可以通过thenApply()
来解决这个问题,如下所示:
List<String> results = cfDownloaded.thenApply(e -> { List<String> downloaded = new ArrayList<>(); for (CompletableFuture<String> cfInvoice: cfInvoices) { downloaded.add(cfInvoice.join()); } return downloaded; }).get();
join()
方法类似于get()
,但是,如果基础CompletableFuture
异常完成,则抛出非受检异常。
由于我们在所有相关CompletableFuture
完成后调用join()
,因此没有阻塞点。
返回的List
包含调用downloadInvoices()
方法得到的结果,如下所示:
Downloaded invoice: #2334 Downloaded invoice: #122 Downloaded invoice: #55
通过anyOf()
的组合
假设我们想为客户组织一次抽奖活动:
List<String> customers = Arrays.asList( "#1", "#4", "#2", "#7", "#6", "#5" );
我们可以通过定义以下琐碎的方法开始解决这个问题:
public static CompletableFuture<String> raffle(String customerId) { return CompletableFuture.supplyAsync(() -> { Thread.sleep(new Random().nextInt(5000)); return customerId; }); }
现在,我们可以创建一个CompletableFuture
实例数组,如下所示:
CompletableFuture<String>[] cfCustomers = customers.stream() .map(CustomerAsyncs::raffle) .toArray(CompletableFuture[]::new);
为了找到抽奖的赢家,我们要并行运行cfCustomers
,第一个完成的CompletableFuture
就是赢家。因为raffle()
方法阻塞随机数秒,所以将随机选择获胜者。我们对其余的CompletableFuture
实例不感兴趣,所以应该在选出获胜者后立即完成。
这是anyOf(CompletableFuture... cfs)
的工作。它返回一个新的CompletableFuture
,当涉及的任何CompletableFuture
实例完成时,这个新的CompletableFuture
就完成了。让我们在工作中看看:
CompletableFuture<Object> cfWinner = CompletableFuture.anyOf(cfCustomers); Object winner = cfWinner.get(); // e.g., Winner: #2 logger.info(() -> "Winner: " + winner);
注意依赖于返回不同类型结果的CompletableFuture
的场景。因为anyOf()
返回CompletableFuture,所以很难知道先完成的
CompletableFuture
类型。
218 优化忙等待
忙等待技术(也称为忙循环或旋转)由检查条件(通常,标志条件)的循环组成。例如,以下循环等待服务启动:
private volatile boolean serviceAvailable; ... while (!serviceAvailable) {}
Java9 介绍了
Thread.onSpinWait()
方法。这是一个热点,它向 JVM 提示以下代码处于自旋循环中:
while (!serviceAvailable) { Thread.onSpinWait(); }
英特尔 SSE2 暂停指令正是出于这个原因提供的。有关详细信息,请参阅英特尔官方文档。也看看这个链接。
如果我们在上下文中添加这个
while
循环,那么我们得到以下类:
public class StartService implements Runnable { private volatile boolean serviceAvailable; @Override public void run() { System.out.println("Wait for service to be available ..."); while (!serviceAvailable) { // Use a spin-wait hint (ask the processor to // optimize the resource) // This should perform better if the underlying // hardware supports the hint Thread.onSpinWait(); } serviceRun(); } public void serviceRun() { System.out.println("Service is running ..."); } public void setServiceAvailable(boolean serviceAvailable) { this.serviceAvailable = serviceAvailable; } }
而且,我们可以很容易地测试它(不要期望看到
onSpinWait()
的效果):
StartService startService = new StartService(); new Thread(startService).start(); Thread.sleep(5000); startService.setServiceAvailable(true);
219 任务取消
取消是一种常用的技术,用于强制停止或完成当前正在运行的任务。取消的任务不会自然完成。取消对已完成的任务没有影响。可以将其视为 GUI 的取消按钮。
Java 没有提供一种抢先停止线程的方法。因此,对于取消任务,通常的做法是依赖于使用标志条件的循环。任务的职责是定期检查这个标志,当它找到设置的标志时,它应该尽快停止。下面的代码就是一个例子:
public class RandomList implements Runnable { private volatile boolean cancelled; private final List<Integer> randoms = new CopyOnWriteArrayList<>(); private final Random rnd = new Random(); @Override public void run() { while (!cancelled) { randoms.add(rnd.nextInt(100)); } } public void cancel() { cancelled = true; } public List<Integer> getRandoms() { return randoms; } }
这里的重点是
canceled
变量。注意,这个变量被声明为volatile
(也称为轻量级同步机制)。作为一个volatile
变量,它不会被线程缓存,对它的操作也不会在内存中重新排序;因此,线程看不到旧值。任何读取volatile
字段的线程都将看到最近写入的值。这正是我们所需要的,以便将取消操作传递给对该操作感兴趣的所有正在运行的线程。下图描述了volatile
和非volatile
的工作原理:
注意,
volatile
变量不适合读-修改-写场景。对于这样的场景,我们将依赖于原子变量(例如,AtomicBoolean
、AtomicInteger
、AtomicReference
等等)。
现在,让我们提供一个简单的代码片段,用于取消在
RandomList
中实现的任务:
RandomList rl = new RandomList(); ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 100; i++) { executor.execute(rl); } Thread.sleep(100); rl.cancel(); System.out.println(rl.getRandoms());
220 ThreadLocal
ThreadLocal
Java 线程共享相同的内存,但有时我们需要为每个线程提供专用内存。Java 提供
ThreadLocal
作为一种方法,分别存储和检索每个线程的值。ThreadLocal
的一个实例可以存储和检索多个线程的值。如果线程A
存储x
值,线程B
在ThreadLocal
的同一实例中存储y
值,那么稍后,线程A
检索x
值,线程B
检索y
值。
Java
ThreadLocal
通常用于以下两种场景:
用于提供线程级别的实例(线程安全和内存效率)
用于提供线程级别的上下文
让我们在下一节中看看每个场景的问题。
线程级别的实例
假设我们有一个使用
StringBuilder
类型的全局变量的单线程应用。为了在多线程应用中转换应用,我们必须处理StringBuilder
,它不是线程安全的。基本上,我们有几种方法,例如同步和StringBuffer
或其他方法。不过,我们也可以使用ThreadLocal
。这里的主要思想是为每个线程提供一个单独的StringBuilder
。使用ThreadLocal
,我们可以做如下操作:
private static final ThreadLocal<StringBuilder> threadLocal = new ThreadLocal<>() { @Override protected StringBuilder initialValue() { return new StringBuilder("ThreadSafe "); } };
此线程局部变量的当前线程的初始值通过
initialValue()
方法设置。在 Java8 中,可以通过withInitial()
重写如下:
private static final ThreadLocal<StringBuilder> threadLocal = ThreadLocal.<StringBuilder> withInitial(() -> { return new StringBuilder("Thread-safe "); });
使用
get()
和set()
对ThreadLocal
进行操作。set()
的每次调用都将给定的值存储在只有当前线程才能访问的内存区域中。稍后,调用get()
将从该区域检索值。另外,一旦工作完成,建议通过调用ThreadLocal
实例上的remove()
或set(null)
方法来避免内存泄漏。
让我们看看
ThreadLocal
在工作中使用Runnable
public class ThreadSafeStringBuilder implements Runnable { private static final Logger logger = Logger.getLogger(ThreadSafeStringBuilder.class.getName()); private static final Random rnd = new Random(); private static final ThreadLocal<StringBuilder> threadLocal = ThreadLocal.<StringBuilder> withInitial(() -> { return new StringBuilder("Thread-safe "); }); @Override public void run() { logger.info(() -> "-> " + Thread.currentThread().getName() + " [" + threadLocal.get() + "]"); Thread.sleep(rnd.nextInt(2000)); // threadLocal.set(new StringBuilder( // Thread.currentThread().getName())); threadLocal.get().append(Thread.currentThread().getName()); logger.info(() -> "-> " + Thread.currentThread().getName() + " [" + threadLocal.get() + "]"); threadLocal.set(null); // threadLocal.remove(); logger.info(() -> "-> " + Thread.currentThread().getName() + " [" + threadLocal.get() + "]"); } }
让我们用几个线程来测试它:
ThreadSafeStringBuilder threadSafe = new ThreadSafeStringBuilder(); for (int i = 0; i < 3; i++) { new Thread(threadSafe, "thread-" + i).start(); }
输出显示每个线程访问自己的
StringBuilder
:
[14:26:39] [INFO] -> thread-1 [Thread-safe ] [14:26:39] [INFO] -> thread-0 [Thread-safe ] [14:26:39] [INFO] -> thread-2 [Thread-safe ] [14:26:40] [INFO] -> thread-0 [Thread-safe thread-0] [14:26:40] [INFO] -> thread-0 [null] [14:26:41] [INFO] -> thread-1 [Thread-safe thread-1] [14:26:41] [INFO] -> thread-1 [null] [14:26:41] [INFO] -> thread-2 [Thread-safe thread-2] [14:26:41] [INFO] -> thread-2 [null]
在上述场景中,也可以使用
ExecutorService
。
下面是为每个线程提供 JDBC
Connection
的另一段代码:
private static final ThreadLocal<Connection> connections = ThreadLocal.<Connection> withInitial(() -> { try { return DriverManager.getConnection("jdbc:mysql://..."); } catch (SQLException ex) { throw new RuntimeException("Connection acquisition failed!", ex); } }); public static Connection getConnection() { return connections.get(); }
线程级别的上下文
假设我们有以下
Order
类:
public class Order { private final int customerId; public Order(int customerId) { this.customerId = customerId; } // getter and toString() omitted for brevity }
我们写下
CustomerOrder
如下:
public class CustomerOrder implements Runnable { private static final Logger logger = Logger.getLogger(CustomerOrder.class.getName()); private static final Random rnd = new Random(); private static final ThreadLocal<Order> customerOrder = new ThreadLocal<>(); private final int customerId; public CustomerOrder(int customerId) { this.customerId = customerId; } @Override public void run() { logger.info(() -> "Given customer id: " + customerId + " | " + customerOrder.get() + " | " + Thread.currentThread().getName()); customerOrder.set(new Order(customerId)); try { Thread.sleep(rnd.nextInt(2000)); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } logger.info(() -> "Given customer id: " + customerId + " | " + customerOrder.get() + " | " + Thread.currentThread().getName()); customerOrder.remove(); } }
对于每一个
customerId
,我们都有一个我们控制的专用线程:
CustomerOrder co1 = new CustomerOrder(1); CustomerOrder co2 = new CustomerOrder(2); CustomerOrder co3 = new CustomerOrder(3); new Thread(co1).start(); new Thread(co2).start(); new Thread(co3).start();
因此,每个线程修改
CustomerOrder
的某个实例(每个实例都有一个特定的线程)。
run()
方法取给定customerId
的顺序,并用set()
方法存储在ThreadLocal
变量中。
可能的输出如下:
[14:48:20] [INFO] Given customer id: 3 | null | Thread-2 [14:48:20] [INFO] Given customer id: 2 | null | Thread-1 [14:48:20] [INFO] Given customer id: 1 | null | Thread-0 [14:48:20] [INFO] Given customer id: 2 | Order{customerId=2} | Thread-1 [14:48:21] [INFO] Given customer id: 3 | Order{customerId=3} | Thread-2 [14:48:21] [INFO] Given customer id: 1 | Order{customerId=1} | Thread-0
在前一种情况下,避免使用
ExecutorService
。无法保证(给定的customerId
的)每个Runnable
在每次执行时都会被同一个线程处理。这可能会导致奇怪的结果。
Java 编程问题:十一、并发-深入探索4https://developer.aliyun.com/article/1426168