Java 编程问题:十一、并发-深入探索3

简介: Java 编程问题:十一、并发-深入探索

Java 编程问题:十一、并发-深入探索2https://developer.aliyun.com/article/1426166

217 组合多个CompletableFuture实例

在大多数情况下,组合CompletableFuture实例可以使用以下方法完成:

  • thenCompose()
  • thenCombine()
  • allOf()
  • anyOf()

通过结合CompletableFuture实例,我们可以形成复杂的异步解决方案。这样,多个CompletableFuture实例就可以合并它们的能力来达到一个共同的目标。

通过thenCompose()的组合

假设在名为CustomerAsyncs的助手类中有以下两个CompletableFuture实例:

private static CompletableFuture<String> 
    fetchOrder(String customerId) {
  return CompletableFuture.supplyAsync(() -> {
    return "Order of " + customerId;
  });
}
private static CompletableFuture<Integer> computeTotal(String order) {
  return CompletableFuture.supplyAsync(() -> {
    return order.length() + new Random().nextInt(1000);
  });
}

现在,我们要获取某个客户的订单,一旦订单可用,我们就要计算这个订单的总数。这意味着我们需要调用fetchOrder(),然后调用computeTotal()。我们可以通过thenApply()实现:

CompletableFuture<CompletableFuture<Integer>> cfTotal 
  = fetchOrder(customerId).thenApply(o -> computeTotal(o));
int total = cfTotal.get().get();

显然,这不是一个方便的解决方案,因为结果是CompletableFuture>类型的。为了避免嵌套CompletableFuture实例,我们可以依赖thenCompose()如下:

CompletableFuture<Integer> cfTotal 
  = fetchOrder(customerId).thenCompose(o -> computeTotal(o));
int total = cfTotal.get();
// e.g., Total: 734
logger.info(() -> "Total: " + total);

当我们需要从一系列的CompletableFuture实例中获得一个平坦的结果时,我们可以使用thenCompose()。这样我们就避免了嵌套的CompletableFuture实例。

使用thenComposeAsync()可以获得进一步的并行化。

通过thenCombine()的合并

thenCompose()用于链接两个依赖的CompletableFuture实例,thenCombine()用于链接两个独立的CompletableFuture实例。当两个CompletableFuture实例都完成时,我们可以继续。

假设我们有以下两个CompletableFuture实例:

private static CompletableFuture<Integer> computeTotal(String order) {
  return CompletableFuture.supplyAsync(() -> {
    return order.length() + new Random().nextInt(1000);
  });
}
private static CompletableFuture<String> packProducts(String order) {
  return CompletableFuture.supplyAsync(() -> {
    return "Order: " + order 
      + " | Product 1, Product 2, Product 3, ... ";
  });
}

为了交付客户订单,我们需要计算总金额(用于发出发票),并打包订购的产品。这两个动作可以并行完成。最后,我们把包裹寄了,里面有订购的产品和发票。通过thenCombine()实现此目的,可如下:

CompletableFuture<String> cfParcel = computeTotal(order)
  .thenCombine(packProducts(order), (total, products) -> {
    return "Parcel-[" + products + " Invoice: $" + total + "]";
  });
String parcel = cfParcel.get();
// e.g. Delivering: Parcel-[Order: #332 | Product 1, Product 2,
// Product 3, ... Invoice: $314]
logger.info(() -> "Delivering: " + parcel);

thenCombine()的回调函数将在两个CompletableFuture实例完成后调用。

如果我们只需要在两个CompletableFuture实例正常完成时做一些事情(这个和另一个),那么我们可以依赖thenAcceptBoth()。此方法返回一个新的CompletableFuture,将这两个结果作为所提供操作的参数来执行。这两个结果是此阶段和另一个给定阶段(它们必须正常完成)。下面是一个示例:

CompletableFuture<Void> voidResult = CompletableFuture
  .supplyAsync(() -> "Pick")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " me"),
    (pick, me) -> System.out.println(pick + me));

如果不需要这两个CompletableFuture实例的结果,则runAfterBoth()更为可取。

通过allOf()的组合

假设我们要下载以下发票列表:

List<String> invoices = Arrays.asList("#2334", "#122", "#55");

这可以看作是一堆可以并行完成的独立任务,所以我们可以使用CompletableFuture来完成,如下所示:

public static CompletableFuture<String> 
    downloadInvoices(String invoice) {
  return CompletableFuture.supplyAsync(() -> {
    logger.info(() -> "Downloading invoice: " + invoice);
    return "Downloaded invoice: " + invoice;
  });
}
CompletableFuture<String> [] cfInvoices = invoices.stream()
  .map(CustomerAsyncs::downloadInvoices)
  .toArray(CompletableFuture[]::new);

此时,我们有一个CompletableFuture实例数组,因此,还有一个异步计算数组。此外,我们希望并行运行它们。这可以通过allOf(CompletableFuture... cfs)方法实现。结果由一个CompletableFuture组成,如下所示:

CompletableFuture<Void> cfDownloaded 
  = CompletableFuture.allOf(cfInvoices);
cfDownloaded.get();

显然,allOf()的结果不是很有用。我们能用CompletableFuture做什么?在这个并行化过程中,当我们需要每次计算的结果时,肯定会遇到很多问题,因此我们需要一个获取结果的解决方案,而不是依赖于CompletableFuture

我们可以通过thenApply()来解决这个问题,如下所示:

List<String> results = cfDownloaded.thenApply(e -> {
  List<String> downloaded = new ArrayList<>();
  for (CompletableFuture<String> cfInvoice: cfInvoices) {
    downloaded.add(cfInvoice.join());
  }
  return downloaded;
}).get();

join()方法类似于get(),但是,如果基础CompletableFuture异常完成,则抛出非受检异常。

由于我们在所有相关CompletableFuture完成后调用join(),因此没有阻塞点。

返回的List包含调用downloadInvoices()方法得到的结果,如下所示:

Downloaded invoice: #2334
Downloaded invoice: #122
Downloaded invoice: #55

通过anyOf()的组合

假设我们想为客户组织一次抽奖活动:

List<String> customers = Arrays.asList(
  "#1", "#4", "#2", "#7", "#6", "#5"
);

我们可以通过定义以下琐碎的方法开始解决这个问题:

public static CompletableFuture<String> raffle(String customerId) {
  return CompletableFuture.supplyAsync(() -> {
    Thread.sleep(new Random().nextInt(5000));
    return customerId;
  });
}

现在,我们可以创建一个CompletableFuture实例数组,如下所示:

CompletableFuture<String>[] cfCustomers = customers.stream()
  .map(CustomerAsyncs::raffle)
  .toArray(CompletableFuture[]::new);

为了找到抽奖的赢家,我们要并行运行cfCustomers,第一个完成的CompletableFuture就是赢家。因为raffle()方法阻塞随机数秒,所以将随机选择获胜者。我们对其余的CompletableFuture实例不感兴趣,所以应该在选出获胜者后立即完成。

这是anyOf(CompletableFuture... cfs)的工作。它返回一个新的CompletableFuture,当涉及的任何CompletableFuture实例完成时,这个新的CompletableFuture就完成了。让我们在工作中看看:

CompletableFuture<Object> cfWinner 
  = CompletableFuture.anyOf(cfCustomers);
Object winner = cfWinner.get();
// e.g., Winner: #2
logger.info(() -> "Winner: " + winner);

注意依赖于返回不同类型结果的CompletableFuture的场景。因为anyOf()返回CompletableFuture,所以很难知道先完成的CompletableFuture类型。

218 优化忙等待

忙等待技术(也称为忙循环旋转)由检查条件(通常,标志条件)的循环组成。例如,以下循环等待服务启动:

private volatile boolean serviceAvailable;
...
while (!serviceAvailable) {}

Java9 介绍了Thread.onSpinWait()方法。这是一个热点,它向 JVM 提示以下代码处于自旋循环中:

while (!serviceAvailable) {
  Thread.onSpinWait();
}

英特尔 SSE2 暂停指令正是出于这个原因提供的。有关详细信息,请参阅英特尔官方文档。也看看这个链接

如果我们在上下文中添加这个while循环,那么我们得到以下类:

public class StartService implements Runnable {
  private volatile boolean serviceAvailable;
  @Override
  public void run() {
    System.out.println("Wait for service to be available ...");
    while (!serviceAvailable) {
      // Use a spin-wait hint (ask the processor to
      // optimize the resource)
      // This should perform better if the underlying
      // hardware supports the hint
      Thread.onSpinWait();
    }
    serviceRun();
  }
  public void serviceRun() {
    System.out.println("Service is running ...");
  }
  public void setServiceAvailable(boolean serviceAvailable) {
    this.serviceAvailable = serviceAvailable;
  }
}

而且,我们可以很容易地测试它(不要期望看到onSpinWait()的效果):

StartService startService = new StartService();
new Thread(startService).start();
Thread.sleep(5000);
startService.setServiceAvailable(true);

219 任务取消

取消是一种常用的技术,用于强制停止或完成当前正在运行的任务。取消的任务不会自然完成。取消对已完成的任务没有影响。可以将其视为 GUI 的取消按钮。

Java 没有提供一种抢先停止线程的方法。因此,对于取消任务,通常的做法是依赖于使用标志条件的循环。任务的职责是定期检查这个标志,当它找到设置的标志时,它应该尽快停止。下面的代码就是一个例子:

public class RandomList implements Runnable {
  private volatile boolean cancelled;
  private final List<Integer> randoms = new CopyOnWriteArrayList<>();
  private final Random rnd = new Random();
  @Override
  public void run() {
    while (!cancelled) {
      randoms.add(rnd.nextInt(100));
    }
  }
  public void cancel() {
    cancelled = true;
  }
  public List<Integer> getRandoms() {
    return randoms;
  }
}

这里的重点是canceled变量。注意,这个变量被声明为volatile(也称为轻量级同步机制)。作为一个volatile变量,它不会被线程缓存,对它的操作也不会在内存中重新排序;因此,线程看不到旧值。任何读取volatile字段的线程都将看到最近写入的值。这正是我们所需要的,以便将取消操作传递给对该操作感兴趣的所有正在运行的线程。下图描述了volatile和非volatile的工作原理:

注意,volatile变量不适合读-修改-写场景。对于这样的场景,我们将依赖于原子变量(例如,AtomicBooleanAtomicIntegerAtomicReference等等)。

现在,让我们提供一个简单的代码片段,用于取消在RandomList中实现的任务:

RandomList rl = new RandomList();
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
  executor.execute(rl);
}
Thread.sleep(100);
rl.cancel();
System.out.println(rl.getRandoms());

220 ThreadLocal

Java 线程共享相同的内存,但有时我们需要为每个线程提供专用内存。Java 提供ThreadLocal作为一种方法,分别存储和检索每个线程的值。ThreadLocal的一个实例可以存储和检索多个线程的值。如果线程A存储x值,线程BThreadLocal的同一实例中存储y值,那么稍后,线程A检索x值,线程B检索y值。

JavaThreadLocal通常用于以下两种场景:

  • 用于提供线程级别的实例(线程安全和内存效率)
  • 用于提供线程级别的上下文

让我们在下一节中看看每个场景的问题。

线程级别的实例

假设我们有一个使用StringBuilder类型的全局变量的单线程应用。为了在多线程应用中转换应用,我们必须处理StringBuilder,它不是线程安全的。基本上,我们有几种方法,例如同步和StringBuffer或其他方法。不过,我们也可以使用ThreadLocal。这里的主要思想是为每个线程提供一个单独的StringBuilder。使用ThreadLocal,我们可以做如下操作:

private static final ThreadLocal<StringBuilder> 
    threadLocal = new ThreadLocal<>() {
  @Override
  protected StringBuilder initialValue() {
    return new StringBuilder("ThreadSafe ");
  }
};

此线程局部变量的当前线程的初始值通过initialValue()方法设置。在 Java8 中,可以通过withInitial()重写如下:

private static final ThreadLocal<StringBuilder> threadLocal 
    = ThreadLocal.<StringBuilder> withInitial(() -> {
  return new StringBuilder("Thread-safe ");
});

使用get()set()ThreadLocal进行操作。set()的每次调用都将给定的值存储在只有当前线程才能访问的内存区域中。稍后,调用get()将从该区域检索值。另外,一旦工作完成,建议通过调用ThreadLocal实例上的remove()set(null)方法来避免内存泄漏。

让我们看看ThreadLocal在工作中使用Runnable

public class ThreadSafeStringBuilder implements Runnable {
  private static final Logger logger =
    Logger.getLogger(ThreadSafeStringBuilder.class.getName());
  private static final Random rnd = new Random();
  private static final ThreadLocal<StringBuilder> threadLocal 
      = ThreadLocal.<StringBuilder> withInitial(() -> {
    return new StringBuilder("Thread-safe ");
  });
  @Override
  public void run() {
    logger.info(() -> "-> " + Thread.currentThread().getName() 
      + " [" + threadLocal.get() + "]");
    Thread.sleep(rnd.nextInt(2000));
    // threadLocal.set(new StringBuilder(
    // Thread.currentThread().getName()));
    threadLocal.get().append(Thread.currentThread().getName());
    logger.info(() -> "-> " + Thread.currentThread().getName() 
      + " [" + threadLocal.get() + "]");
    threadLocal.set(null);
    // threadLocal.remove();
    logger.info(() -> "-> " + Thread.currentThread().getName() 
      + " [" + threadLocal.get() + "]");
  }
}

让我们用几个线程来测试它:

ThreadSafeStringBuilder threadSafe = new ThreadSafeStringBuilder();
for (int i = 0; i < 3; i++) {
  new Thread(threadSafe, "thread-" + i).start();
}

输出显示每个线程访问自己的StringBuilder

[14:26:39] [INFO] -> thread-1 [Thread-safe ]
[14:26:39] [INFO] -> thread-0 [Thread-safe ]
[14:26:39] [INFO] -> thread-2 [Thread-safe ]
[14:26:40] [INFO] -> thread-0 [Thread-safe thread-0]
[14:26:40] [INFO] -> thread-0 [null]
[14:26:41] [INFO] -> thread-1 [Thread-safe thread-1]
[14:26:41] [INFO] -> thread-1 [null]
[14:26:41] [INFO] -> thread-2 [Thread-safe thread-2]
[14:26:41] [INFO] -> thread-2 [null]

在上述场景中,也可以使用ExecutorService

下面是为每个线程提供 JDBCConnection的另一段代码:

private static final ThreadLocal<Connection> connections 
    = ThreadLocal.<Connection> withInitial(() -> {
  try {
    return DriverManager.getConnection("jdbc:mysql://...");
  } catch (SQLException ex) {
    throw new RuntimeException("Connection acquisition failed!", ex);
  }
});
public static Connection getConnection() {
  return connections.get();
}

线程级别的上下文

假设我们有以下Order类:

public class Order {
  private final int customerId;
  public Order(int customerId) {
    this.customerId = customerId;
  }
  // getter and toString() omitted for brevity
}

我们写下CustomerOrder如下:

public class CustomerOrder implements Runnable {
  private static final Logger logger
    = Logger.getLogger(CustomerOrder.class.getName());
  private static final Random rnd = new Random();
  private static final ThreadLocal<Order> 
    customerOrder = new ThreadLocal<>();
  private final int customerId;
  public CustomerOrder(int customerId) {
    this.customerId = customerId;
  }
  @Override
  public void run() {
    logger.info(() -> "Given customer id: " + customerId 
      + " | " + customerOrder.get() 
      + " | " + Thread.currentThread().getName());
    customerOrder.set(new Order(customerId));
    try {
      Thread.sleep(rnd.nextInt(2000));
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
      logger.severe(() -> "Exception: " + ex);
    }
    logger.info(() -> "Given customer id: " + customerId 
      + " | " + customerOrder.get() 
      + " | " + Thread.currentThread().getName());
    customerOrder.remove();
  }
}

对于每一个customerId,我们都有一个我们控制的专用线程:

CustomerOrder co1 = new CustomerOrder(1);
CustomerOrder co2 = new CustomerOrder(2);
CustomerOrder co3 = new CustomerOrder(3);
new Thread(co1).start();
new Thread(co2).start();
new Thread(co3).start();

因此,每个线程修改CustomerOrder的某个实例(每个实例都有一个特定的线程)。

run()方法取给定customerId的顺序,并用set()方法存储在ThreadLocal变量中。

可能的输出如下:

[14:48:20] [INFO] 
  Given customer id: 3 | null | Thread-2
[14:48:20] [INFO] 
  Given customer id: 2 | null | Thread-1
[14:48:20] [INFO] 
  Given customer id: 1 | null | Thread-0
[14:48:20] [INFO] 
  Given customer id: 2 | Order{customerId=2} | Thread-1
[14:48:21] [INFO] 
  Given customer id: 3 | Order{customerId=3} | Thread-2
[14:48:21] [INFO] 
  Given customer id: 1 | Order{customerId=1} | Thread-0

在前一种情况下,避免使用ExecutorService。无法保证(给定的customerId的)每个Runnable在每次执行时都会被同一个线程处理。这可能会导致奇怪的结果。

Java 编程问题:十一、并发-深入探索4https://developer.aliyun.com/article/1426168

相关文章
|
1月前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
23天前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
3天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
7天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
35 12
|
3天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
41 2
|
26天前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
26天前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
20天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
20天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
43 3
|
1月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界里,异常是程序运行中不可忽视的“惊喜”。它们可能突如其来,也可能悄无声息地潜伏。掌握异常处理的艺术,意味着你能够优雅地面对程序的不完美,并确保它即使在风雨飘摇中也能继续航行。本文将引导你理解Java异常的本质,探索捕获和处理这些异常的方法,并最终学会如何利用自定义异常为你的代码增添力量。