Java 编程问题:十一、并发-深入探索3

简介: Java 编程问题:十一、并发-深入探索

Java 编程问题:十一、并发-深入探索2https://developer.aliyun.com/article/1426166

217 组合多个CompletableFuture实例

在大多数情况下,组合CompletableFuture实例可以使用以下方法完成:

  • thenCompose()
  • thenCombine()
  • allOf()
  • anyOf()

通过结合CompletableFuture实例,我们可以形成复杂的异步解决方案。这样,多个CompletableFuture实例就可以合并它们的能力来达到一个共同的目标。

通过thenCompose()的组合

假设在名为CustomerAsyncs的助手类中有以下两个CompletableFuture实例:

private static CompletableFuture<String> 
    fetchOrder(String customerId) {
  return CompletableFuture.supplyAsync(() -> {
    return "Order of " + customerId;
  });
}
private static CompletableFuture<Integer> computeTotal(String order) {
  return CompletableFuture.supplyAsync(() -> {
    return order.length() + new Random().nextInt(1000);
  });
}

现在,我们要获取某个客户的订单,一旦订单可用,我们就要计算这个订单的总数。这意味着我们需要调用fetchOrder(),然后调用computeTotal()。我们可以通过thenApply()实现:

CompletableFuture<CompletableFuture<Integer>> cfTotal 
  = fetchOrder(customerId).thenApply(o -> computeTotal(o));
int total = cfTotal.get().get();

显然,这不是一个方便的解决方案,因为结果是CompletableFuture>类型的。为了避免嵌套CompletableFuture实例,我们可以依赖thenCompose()如下:

CompletableFuture<Integer> cfTotal 
  = fetchOrder(customerId).thenCompose(o -> computeTotal(o));
int total = cfTotal.get();
// e.g., Total: 734
logger.info(() -> "Total: " + total);

当我们需要从一系列的CompletableFuture实例中获得一个平坦的结果时,我们可以使用thenCompose()。这样我们就避免了嵌套的CompletableFuture实例。

使用thenComposeAsync()可以获得进一步的并行化。

通过thenCombine()的合并

thenCompose()用于链接两个依赖的CompletableFuture实例,thenCombine()用于链接两个独立的CompletableFuture实例。当两个CompletableFuture实例都完成时,我们可以继续。

假设我们有以下两个CompletableFuture实例:

private static CompletableFuture<Integer> computeTotal(String order) {
  return CompletableFuture.supplyAsync(() -> {
    return order.length() + new Random().nextInt(1000);
  });
}
private static CompletableFuture<String> packProducts(String order) {
  return CompletableFuture.supplyAsync(() -> {
    return "Order: " + order 
      + " | Product 1, Product 2, Product 3, ... ";
  });
}

为了交付客户订单,我们需要计算总金额(用于发出发票),并打包订购的产品。这两个动作可以并行完成。最后,我们把包裹寄了,里面有订购的产品和发票。通过thenCombine()实现此目的,可如下:

CompletableFuture<String> cfParcel = computeTotal(order)
  .thenCombine(packProducts(order), (total, products) -> {
    return "Parcel-[" + products + " Invoice: $" + total + "]";
  });
String parcel = cfParcel.get();
// e.g. Delivering: Parcel-[Order: #332 | Product 1, Product 2,
// Product 3, ... Invoice: $314]
logger.info(() -> "Delivering: " + parcel);

thenCombine()的回调函数将在两个CompletableFuture实例完成后调用。

如果我们只需要在两个CompletableFuture实例正常完成时做一些事情(这个和另一个),那么我们可以依赖thenAcceptBoth()。此方法返回一个新的CompletableFuture,将这两个结果作为所提供操作的参数来执行。这两个结果是此阶段和另一个给定阶段(它们必须正常完成)。下面是一个示例:

CompletableFuture<Void> voidResult = CompletableFuture
  .supplyAsync(() -> "Pick")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " me"),
    (pick, me) -> System.out.println(pick + me));

如果不需要这两个CompletableFuture实例的结果,则runAfterBoth()更为可取。

通过allOf()的组合

假设我们要下载以下发票列表:

List<String> invoices = Arrays.asList("#2334", "#122", "#55");

这可以看作是一堆可以并行完成的独立任务,所以我们可以使用CompletableFuture来完成,如下所示:

public static CompletableFuture<String> 
    downloadInvoices(String invoice) {
  return CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Downloading invoice: " + invoice);
    return "Downloaded invoice: " + invoice;
  });
}
CompletableFuture<String> [] cfInvoices = invoices.stream()
  .map(CustomerAsyncs::downloadInvoices)
  .toArray(CompletableFuture[]::new);

此时,我们有一个CompletableFuture实例数组,因此,还有一个异步计算数组。此外,我们希望并行运行它们。这可以通过allOf(CompletableFuture... cfs)方法实现。结果由一个CompletableFuture组成,如下所示:

CompletableFuture<Void> cfDownloaded 
  = CompletableFuture.allOf(cfInvoices);
cfDownloaded.get();

显然,allOf()的结果不是很有用。我们能用CompletableFuture做什么?在这个并行化过程中,当我们需要每次计算的结果时,肯定会遇到很多问题,因此我们需要一个获取结果的解决方案,而不是依赖于CompletableFuture

我们可以通过thenApply()来解决这个问题,如下所示:

List<String> results = cfDownloaded.thenApply(e -> {
  List<String> downloaded = new ArrayList<>();
  for (CompletableFuture<String> cfInvoice: cfInvoices) {
    downloaded.add(cfInvoice.join());
  }
  return downloaded;
}).get();

join()方法类似于get(),但是,如果基础CompletableFuture异常完成,则抛出非受检异常。

由于我们在所有相关CompletableFuture完成后调用join(),因此没有阻塞点。

返回的List包含调用downloadInvoices()方法得到的结果,如下所示:

Downloaded invoice: #2334
Downloaded invoice: #122
Downloaded invoice: #55

通过anyOf()的组合

假设我们想为客户组织一次抽奖活动:

List<String> customers = Arrays.asList(
  "#1", "#4", "#2", "#7", "#6", "#5"
);

我们可以通过定义以下琐碎的方法开始解决这个问题:

public static CompletableFuture<String> raffle(String customerId) {
  return CompletableFuture.supplyAsync(() -> {
    Thread.sleep(new Random().nextInt(5000));
    return customerId;
  });
}

现在,我们可以创建一个CompletableFuture实例数组,如下所示:

CompletableFuture<String>[] cfCustomers = customers.stream()
  .map(CustomerAsyncs::raffle)
  .toArray(CompletableFuture[]::new);

为了找到抽奖的赢家,我们要并行运行cfCustomers,第一个完成的CompletableFuture就是赢家。因为raffle()方法阻塞随机数秒,所以将随机选择获胜者。我们对其余的CompletableFuture实例不感兴趣,所以应该在选出获胜者后立即完成。

这是anyOf(CompletableFuture... cfs)的工作。它返回一个新的CompletableFuture,当涉及的任何CompletableFuture实例完成时,这个新的CompletableFuture就完成了。让我们在工作中看看:

CompletableFuture<Object> cfWinner 
  = CompletableFuture.anyOf(cfCustomers);
Object winner = cfWinner.get();
// e.g., Winner: #2
logger.info(() -> "Winner: " + winner);

注意依赖于返回不同类型结果的CompletableFuture的场景。因为anyOf()返回CompletableFuture,所以很难知道先完成的CompletableFuture类型。

218 优化忙等待

忙等待技术(也称为忙循环旋转)由检查条件(通常,标志条件)的循环组成。例如,以下循环等待服务启动:

private volatile boolean serviceAvailable;
...
while (!serviceAvailable) {}

Java9 介绍了Thread.onSpinWait()方法。这是一个热点,它向 JVM 提示以下代码处于自旋循环中:

while (!serviceAvailable) {
  Thread.onSpinWait();
}

英特尔 SSE2 暂停指令正是出于这个原因提供的。有关详细信息,请参阅英特尔官方文档。也看看这个链接

如果我们在上下文中添加这个while循环,那么我们得到以下类:

public class StartService implements Runnable {
  private volatile boolean serviceAvailable;
  @Override
  public void run() {
    System.out.println("Wait for service to be available ...");
    while (!serviceAvailable) {
      // Use a spin-wait hint (ask the processor to
      // optimize the resource)
      // This should perform better if the underlying
      // hardware supports the hint
      Thread.onSpinWait();
    }
    serviceRun();
  }
  public void serviceRun() {
    System.out.println("Service is running ...");
  }
  public void setServiceAvailable(boolean serviceAvailable) {
    this.serviceAvailable = serviceAvailable;
  }
}

而且,我们可以很容易地测试它(不要期望看到onSpinWait()的效果):

StartService startService = new StartService();
new Thread(startService).start();
Thread.sleep(5000);
startService.setServiceAvailable(true);

219 任务取消

取消是一种常用的技术,用于强制停止或完成当前正在运行的任务。取消的任务不会自然完成。取消对已完成的任务没有影响。可以将其视为 GUI 的取消按钮。

Java 没有提供一种抢先停止线程的方法。因此,对于取消任务,通常的做法是依赖于使用标志条件的循环。任务的职责是定期检查这个标志,当它找到设置的标志时,它应该尽快停止。下面的代码就是一个例子:

public class RandomList implements Runnable {
  private volatile boolean cancelled;
  private final List<Integer> randoms = new CopyOnWriteArrayList<>();
  private final Random rnd = new Random();
  @Override
  public void run() {
    while (!cancelled) {
      randoms.add(rnd.nextInt(100));
    }
  }
  public void cancel() {
    cancelled = true;
  }
  public List<Integer> getRandoms() {
    return randoms;
  }
}

这里的重点是canceled变量。注意,这个变量被声明为volatile(也称为轻量级同步机制)。作为一个volatile变量,它不会被线程缓存,对它的操作也不会在内存中重新排序;因此,线程看不到旧值。任何读取volatile字段的线程都将看到最近写入的值。这正是我们所需要的,以便将取消操作传递给对该操作感兴趣的所有正在运行的线程。下图描述了volatile和非volatile的工作原理:

注意,volatile变量不适合读-修改-写场景。对于这样的场景,我们将依赖于原子变量(例如,AtomicBooleanAtomicIntegerAtomicReference等等)。

现在,让我们提供一个简单的代码片段,用于取消在RandomList中实现的任务:

RandomList rl = new RandomList();
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
  executor.execute(rl);
}
Thread.sleep(100);
rl.cancel();
System.out.println(rl.getRandoms());

220 ThreadLocal

Java 线程共享相同的内存,但有时我们需要为每个线程提供专用内存。Java 提供ThreadLocal作为一种方法,分别存储和检索每个线程的值。ThreadLocal的一个实例可以存储和检索多个线程的值。如果线程A存储x值,线程BThreadLocal的同一实例中存储y值,那么稍后,线程A检索x值,线程B检索y值。

JavaThreadLocal通常用于以下两种场景:

  • 用于提供线程级别的实例(线程安全和内存效率)
  • 用于提供线程级别的上下文

让我们在下一节中看看每个场景的问题。

线程级别的实例

假设我们有一个使用StringBuilder类型的全局变量的单线程应用。为了在多线程应用中转换应用,我们必须处理StringBuilder,它不是线程安全的。基本上,我们有几种方法,例如同步和StringBuffer或其他方法。不过,我们也可以使用ThreadLocal。这里的主要思想是为每个线程提供一个单独的StringBuilder。使用ThreadLocal,我们可以做如下操作:

private static final ThreadLocal<StringBuilder> 
    threadLocal = new ThreadLocal<>() {
  @Override
  protected StringBuilder initialValue() {
    return new StringBuilder("ThreadSafe ");
  }
};

此线程局部变量的当前线程的初始值通过initialValue()方法设置。在 Java8 中,可以通过withInitial()重写如下:

private static final ThreadLocal<StringBuilder> threadLocal 
    = ThreadLocal.<StringBuilder> withInitial(() -> {
  return new StringBuilder("Thread-safe ");
});

使用get()set()ThreadLocal进行操作。set()的每次调用都将给定的值存储在只有当前线程才能访问的内存区域中。稍后,调用get()将从该区域检索值。另外,一旦工作完成,建议通过调用ThreadLocal实例上的remove()set(null)方法来避免内存泄漏。

让我们看看ThreadLocal在工作中使用Runnable

public class ThreadSafeStringBuilder implements Runnable {
  private static final Logger logger =
    Logger.getLogger(ThreadSafeStringBuilder.class.getName());
  private static final Random rnd = new Random();
  private static final ThreadLocal<StringBuilder> threadLocal 
      = ThreadLocal.<StringBuilder> withInitial(() -> {
    return new StringBuilder("Thread-safe ");
  });
  @Override
  public void run() {
    logger.info(() -> "-> " + Thread.currentThread().getName() 
      + " [" + threadLocal.get() + "]");
    Thread.sleep(rnd.nextInt(2000));
    // threadLocal.set(new StringBuilder(
    // Thread.currentThread().getName()));
    threadLocal.get().append(Thread.currentThread().getName());
    logger.info(() -> "-> " + Thread.currentThread().getName() 
      + " [" + threadLocal.get() + "]");
    threadLocal.set(null);
    // threadLocal.remove();
    logger.info(() -> "-> " + Thread.currentThread().getName() 
      + " [" + threadLocal.get() + "]");
  }
}

让我们用几个线程来测试它:

ThreadSafeStringBuilder threadSafe = new ThreadSafeStringBuilder();
for (int i = 0; i < 3; i++) {
  new Thread(threadSafe, "thread-" + i).start();
}

输出显示每个线程访问自己的StringBuilder

[14:26:39] [INFO] -> thread-1 [Thread-safe ]
[14:26:39] [INFO] -> thread-0 [Thread-safe ]
[14:26:39] [INFO] -> thread-2 [Thread-safe ]
[14:26:40] [INFO] -> thread-0 [Thread-safe thread-0]
[14:26:40] [INFO] -> thread-0 [null]
[14:26:41] [INFO] -> thread-1 [Thread-safe thread-1]
[14:26:41] [INFO] -> thread-1 [null]
[14:26:41] [INFO] -> thread-2 [Thread-safe thread-2]
[14:26:41] [INFO] -> thread-2 [null]

在上述场景中,也可以使用ExecutorService

下面是为每个线程提供 JDBCConnection的另一段代码:

private static final ThreadLocal<Connection> connections 
    = ThreadLocal.<Connection> withInitial(() -> {
  try {
    return DriverManager.getConnection("jdbc:mysql://...");
  } catch (SQLException ex) {
    throw new RuntimeException("Connection acquisition failed!", ex);
  }
});
public static Connection getConnection() {
  return connections.get();
}

线程级别的上下文

假设我们有以下Order类:

public class Order {
  private final int customerId;
  public Order(int customerId) {
    this.customerId = customerId;
  }
  // getter and toString() omitted for brevity
}

我们写下CustomerOrder如下:

public class CustomerOrder implements Runnable {
  private static final Logger logger
    = Logger.getLogger(CustomerOrder.class.getName());
  private static final Random rnd = new Random();
  private static final ThreadLocal<Order> 
    customerOrder = new ThreadLocal<>();
  private final int customerId;
  public CustomerOrder(int customerId) {
    this.customerId = customerId;
  }
  @Override
  public void run() {
    logger.info(() -> "Given customer id: " + customerId 
      + " | " + customerOrder.get() 
      + " | " + Thread.currentThread().getName());
    customerOrder.set(new Order(customerId));
    try {
      Thread.sleep(rnd.nextInt(2000));
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
      logger.severe(() -> "Exception: " + ex);
    }
    logger.info(() -> "Given customer id: " + customerId 
      + " | " + customerOrder.get() 
      + " | " + Thread.currentThread().getName());
    customerOrder.remove();
  }
}

对于每一个customerId,我们都有一个我们控制的专用线程:

CustomerOrder co1 = new CustomerOrder(1);
CustomerOrder co2 = new CustomerOrder(2);
CustomerOrder co3 = new CustomerOrder(3);
new Thread(co1).start();
new Thread(co2).start();
new Thread(co3).start();

因此,每个线程修改CustomerOrder的某个实例(每个实例都有一个特定的线程)。

run()方法取给定customerId的顺序,并用set()方法存储在ThreadLocal变量中。

可能的输出如下:

[14:48:20] [INFO] 
  Given customer id: 3 | null | Thread-2
[14:48:20] [INFO] 
  Given customer id: 2 | null | Thread-1
[14:48:20] [INFO] 
  Given customer id: 1 | null | Thread-0
[14:48:20] [INFO] 
  Given customer id: 2 | Order{customerId=2} | Thread-1
[14:48:21] [INFO] 
  Given customer id: 3 | Order{customerId=3} | Thread-2
[14:48:21] [INFO] 
  Given customer id: 1 | Order{customerId=1} | Thread-0

在前一种情况下,避免使用ExecutorService。无法保证(给定的customerId的)每个Runnable在每次执行时都会被同一个线程处理。这可能会导致奇怪的结果。

Java 编程问题:十一、并发-深入探索4https://developer.aliyun.com/article/1426168

相关文章
|
2月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
234 83
|
19天前
|
安全 Java 数据库连接
2025 年最新 Java 学习路线图含实操指南助你高效入门 Java 编程掌握核心技能
2025年最新Java学习路线图,涵盖基础环境搭建、核心特性(如密封类、虚拟线程)、模块化开发、响应式编程、主流框架(Spring Boot 3、Spring Security 6)、数据库操作(JPA + Hibernate 6)及微服务实战,助你掌握企业级开发技能。
172 3
|
2月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
274 83
|
2月前
|
存储 Java 调度
Java虚拟线程:轻量级并发的革命性突破
Java虚拟线程:轻量级并发的革命性突破
219 83
|
1月前
|
Java
Java编程:理解while循环的使用
总结而言, 使用 while 迴圈可以有效解决需要多次重复操作直至特定條件被触发才停止執行任务场景下问题; 它简单、灵活、易于实现各种逻辑控制需求但同时也要注意防止因邏各错误导致無限迁璇発生及及時處理可能発生异常以确保程序稳定运作。
154 0
|
3月前
|
Java 物联网 数据处理
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
Java Solon v3.2.0 是一款性能卓越的后端开发框架,新版本并发性能提升700%,内存占用节省50%。本文将从核心特性(如事件驱动模型与内存优化)、技术方案示例(Web应用搭建与数据库集成)到实际应用案例(电商平台与物联网平台)全面解析其优势与使用方法。通过简单代码示例和真实场景展示,帮助开发者快速掌握并应用于项目中,大幅提升系统性能与资源利用率。
99 6
Java Solon v3.2.0 史上最强性能优化版本发布 并发能力提升 700% 内存占用节省 50%
|
1月前
|
安全 Cloud Native Java
Java:历久弥新的企业级编程基石
Java:历久弥新的企业级编程基石
|
1月前
|
移动开发 Cloud Native Java
Java:历久弥新的企业级编程基石
Java:历久弥新的企业级编程基石