Java 编程问题:十、并发-线程池、可调用对象和同步器3

简介: Java 编程问题:十、并发-线程池、可调用对象和同步器

Java 编程问题:十、并发-线程池、可调用对象和同步器2https://developer.aliyun.com/article/1426162

203 具有固定线程数的线程池

这个问题重复了“线程池中具有单个线程”部分的场景。这一次,装配线使用了三个生产者和两个消费者,如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WWnz1HDy-1657345732718)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/99335ce7-243d-4e71-ac87-c22ada2bd28d.png)]

我们可以依靠Executors.newFixedThreadPool(int nThreads)来模拟固定数量的生产者和消费者。我们为每个生产者(分别为消费者)分配一个线程,因此代码非常简单:

private static final int PRODUCERS = 3;
private static final int CONSUMERS = 2;
private static final Producer producer = new Producer();
private static final Consumer consumer = new Consumer();
private static ExecutorService producerService;
private static ExecutorService consumerService;
...
producerService = Executors.newFixedThreadPool(PRODUCERS);
for (int i = 0; i < PRODUCERS; i++) {
  producerService.execute(producer);
}
consumerService = Executors.newFixedThreadPool(CONSUMERS);
for (int i = 0; i < CONSUMERS; i++) {
  consumerService.execute(consumer);
}

生产者可以在其中添加已检查灯泡的队列可以是LinkedTransferQueueConcurrentLinkedQueue类型,依此类推。

基于LinkedTransferQueueConcurrentLinkedQueue的完整源代码可以在本书附带的代码中找到。

204 带有缓存和调度的线程池

这个问题重复了“线程池中具有单个线程”部分的场景。这一次,我们假设生产者(也可以使用多个生产者)在不超过 1 秒的时间内检查一个灯泡。此外,一个耗电元件(包装器)最多需要 10 秒来包装一个灯泡。生产器和耗电元件的时间可以如下所示:

private static final int MAX_PROD_TIME_MS = 1 * 1000;
private static final int MAX_CONS_TIME_MS = 10 * 1000;

显然,在这种情况下,一个消费者无法面对即将到来的流量。用于存储灯泡的队列将不断增加,直到它们被打包。生产者添加到此队列的速度比消费者可以轮询的速度快得多。因此,需要更多的消费者,如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0MvAjJGv-1657345732719)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/093d41e1-c37f-4a60-ab5f-d2be96c7762a.png)]

由于只有一个生产者,我们可以依赖Executors.newSingleThreadExecutor()

private static volatile boolean runningProducer;
private static ExecutorService producerService;
private static final Producer producer = new Producer();
...
public static void startAssemblyLine() {
  ...
  runningProducer = true;
  producerService = Executors.newSingleThreadExecutor();
  producerService.execute(producer);
  ...
}

除了extraProdTime变量外,Producer与前面的问题几乎相同:

private static int extraProdTime;
private static final Random rnd = new Random();
...
private static class Producer implements Runnable {
  @Override
  public void run() {
    while (runningProducer) {
      try {
        String bulb = "bulb-" + rnd.nextInt(1000);
        Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS) + extraProdTime);
        queue.offer(bulb);
        logger.info(() -> "Checked: " + bulb);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
        break;
      }
    }
  }
}

extraProdTime变量最初为 0。当我们放慢生产速度时,需要这样做:

Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS) + extraProdTime);

在高速运行一段时间后,生产者会感到疲倦,需要更多的时间来检查每个灯泡。如果生产者放慢生产速度,消费者的数量也应该减少。

当生产者高速运转时,我们将需要更多的消费者(包装商)。但是有多少?使用固定数量的消费者(newFixedThreadPool()会带来至少两个缺点:

  • 如果生产者在某个时候放慢速度,一些消费者将继续失业,只会继续留在那里
  • 如果生产者变得更有效率,就需要更多的消费者来面对即将到来的流量

基本上,我们应该能够根据生产者的效率来改变消费者的数量。

对于这类工作,我们有Executors.newCachedThreadPool()。缓存的线程池将重用现有的线程,并根据需要创建新的线程(我们可以添加更多的使用者)。如果线程在 60 秒内未被使用,那么线程将被终止并从缓存中删除(我们可以删除使用者)。

让我们从一个活动消费者开始:

private static volatile boolean runningConsumer;
private static final AtomicInteger 
  nrOfConsumers = new AtomicInteger();
private static final ThreadGroup threadGroup 
  = new ThreadGroup("consumers");
private static final Consumer consumer = new Consumer();
private static ExecutorService consumerService;
...
public static void startAssemblyLine() {
  ...
  runningConsumer = true;
  consumerService = Executors
    .newCachedThreadPool((Runnable r) -> new Thread(threadGroup, r));
  nrOfConsumers.incrementAndGet();
  consumerService.execute(consumer);
  ...
}

因为我们希望能够看到一个时刻有多少线程(使用者)处于活动状态,所以我们通过一个自定义的ThreadFactory将它们添加到ThreadGroup中:

consumerService = Executors
  .newCachedThreadPool((Runnable r) -> new Thread(threadGroup, r));

稍后,我们将能够使用以下代码获取活动消费者的数量:

threadGroup.activeCount();

了解活动消费者的数量是一个很好的指标,可以与灯泡队列的当前大小相结合,以确定是否需要更多消费者。

使用者实现如下所示:

private static class Consumer implements Runnable {
  @Override
  public void run() {
    while (runningConsumer && queue.size() > 0
                           || nrOfConsumers.get() == 1) {
      try {
        String bulb = queue.poll(MAX_PROD_TIME_MS 
           + extraProdTime, TimeUnit.MILLISECONDS);
        if (bulb != null) {
          Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS));
          logger.info(() -> "Packed: " + bulb + " by consumer: " 
            + Thread.currentThread().getName());
        }
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
        break;
      }
    }
    nrOfConsumers.decrementAndGet();
    logger.warning(() -> "### Thread " +
      Thread.currentThread().getName() 
        + " is going back to the pool in 60 seconds for now!");
  }
}

假设装配线正在运行,只要队列不是空的或者他们是剩下的唯一消费者(我们不能有 0 个消费者),消费者就会继续打包灯泡。我们可以解释为,空队列意味着有太多的消费者。因此,当使用者看到队列为空并且他们不是唯一的工作使用者时,他们将变为空闲(60 秒后,他们将自动从缓存的线程池中删除)。

不要混淆nrOfConsumersthreadGroup.activeCount()nrOfConsumers变量存储当前打包灯泡的使用者(线程)的数量,而threadGroup.activeCount()表示所有活动使用者(线程),包括那些当前不工作(空闲)并且正等待从缓存中重用或调度的使用者(线程)。

现在,在一个真实的案例中,一个主管将监控装配线,当他们注意到当前数量的消费者无法面对即将到来的涌入时,他们将调用更多的消费者加入(最多允许 50 个消费者)。此外,当他们注意到一些消费者只是停留在附近,他们会派遣他们到其他工作。下图是此场景的图形表示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eF6hIUCC-1657345732719)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/a7ddcb56-2877-4575-96ed-e3e9b7aef403.png)]

出于测试目的,我们的监管者newSingleThreadScheduledExecutor()将是一个单线程执行器,可以调度给定的命令在指定的延迟后运行。它还可以定期执行命令:

private static final int MAX_NUMBER_OF_CONSUMERS = 50;
private static final int MAX_QUEUE_SIZE_ALLOWED = 5;
private static final int MONITOR_QUEUE_INITIAL_DELAY_MS = 5000;
private static final int MONITOR_QUEUE_RATE_MS = 3000;
private static ScheduledExecutorService monitorService;
...
private static void monitorQueueSize() {
  monitorService = Executors.newSingleThreadScheduledExecutor();
  monitorService.scheduleAtFixedRate(() -> {
    if (queue.size() > MAX_QUEUE_SIZE_ALLOWED 
        && threadGroup.activeCount() < MAX_NUMBER_OF_CONSUMERS) {
      logger.warning("### Adding a new consumer (command) ...");
      nrOfConsumers.incrementAndGet();
      consumerService.execute(consumer);
    }
    logger.warning(() -> "### Bulbs in queue: " + queue.size() 
      + " | Active threads: " + threadGroup.activeCount() 
      + " | Consumers: " + nrOfConsumers.get() 
      + " | Idle: " + (threadGroup.activeCount() 
        - nrOfConsumers.get()));
  }, MONITOR_QUEUE_INITIAL_DELAY_MS, MONITOR_QUEUE_RATE_MS,
        TimeUnit.MILLISECONDS);
}

我们依靠scheduleAtFixedRate()每 3 秒监控一次装配线,初始延迟 5 秒。因此,每三秒,主管检查一次灯泡队列大小。如果排队的灯泡超过 5 个,消费者少于 50 个,主管会要求新的消费者加入装配线。如果队列包含 5 个或更少的灯泡,或者已经有 50 个消费者,则主管不会采取任何行动。

如果我们现在开始装配线,我们可以看到消费者的数量是如何增加的,直到队列大小小于 6。可能的快照如下所示:

Starting assembly line ...
[11:53:20] [INFO] Checked: bulb-488
...
[11:53:24] [WARNING] ### Adding a new consumer (command) ...
[11:53:24] [WARNING] ### Bulbs in queue: 7 
                       | Active threads: 2 
                       | Consumers: 2 
                       | Idle: 0
[11:53:25] [INFO] Checked: bulb-738
...
[11:53:36] [WARNING] ### Bulbs in queue: 23 
                       | Active threads: 6
                       | Consumers: 6
                       | Idle: 0
...

当线程数超过需要时,其中一些线程将变为空闲线程。如果在 60 秒内没有收到作业,则会将其从缓存中删除。如果作业在没有空闲线程时发生,则将创建一个新线程。这个过程不断重复,直到我们注意到装配线上的平衡。过了一段时间,事情开始平静下来,适当数量的消费者会在一个小范围内(小波动)。这是因为生产者输出的速度是随机的,最大值为 1 秒。

一段时间后(例如,20 秒后),让我们将生产者的速度降低 4 秒(这样,灯泡现在最多可以在 5 秒钟内检查):

private static final int SLOW_DOWN_PRODUCER_MS = 20 * 1000;
private static final int EXTRA_TIME_MS = 4 * 1000;

这可以使用另一个newSingleThreadScheduledExecutor()来完成,如下所示:

private static void slowdownProducer() {
  slowdownerService = Executors.newSingleThreadScheduledExecutor();
  slowdownerService.schedule(() -> {
    logger.warning("### Slow down producer ...");
    extraProdTime = EXTRA_TIME_MS;
  }, SLOW_DOWN_PRODUCER_MS, TimeUnit.MILLISECONDS);
}

这只会发生一次,在装配线启动 20 秒后。由于生产者的速度降低了 4 秒,因此不需要有相同数量的消费者来维持最多 5 个灯泡的队列。

输出中显示了这一点,如图所示(请注意,有时只需要一个使用者来处理队列):

...
[11:53:36] [WARNING] ### Bulbs in queue: 23 
                       | Active threads: 6
                       | Consumers: 6
                       | Idle: 0
...
[11:53:39] [WARNING] ### Slow down producer ...
...
[11:53:56] [WARNING] ### Thread Thread-5 is going
                         back to the pool in 60 seconds for now!
[11:53:56] [INFO] Packed: bulb-346 by consumer: Thread-2
...
[11:54:36] [WARNING] ### Bulbs in queue: 1 
                       | Active threads: 12
                       | Consumers: 1
                       | Idle: 11
...
[11:55:48] [WARNING] ### Bulbs in queue: 3 
                       | Active threads: 1
                       | Consumers: 1 
                       | Idle: 0
...
Assembling line was successfully stopped!

在启动装配线后启动主管:

public static void startAssemblyLine() {
  ...
  monitorQueueSize();
  slowdownProducer();
}

完整的应用可以在与本书捆绑的代码中使用。

使用缓存线程池时,请注意为容纳提交的任务而创建的线程数。对于单线程池和固定线程池,我们控制创建的线程数,而缓存池可以决定创建太多的线程。基本上,不可控地创建线程可能会很快耗尽资源。因此,在容易过载的系统中,最好依赖固定线程池。

205 偷工线程池

让我们关注打包过程,它应该通过一个窃取工作的线程池来实现。首先,让我们讨论什么是偷工作线程池,并通过与经典线程池的比较来实现。下图描述了经典线程池的工作原理:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zICytzbE-1657345732720)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/db7c18aa-598d-4b12-bf5c-cfee7496c8cb.png)]

因此,线程池依赖于内部入站队列来存储任务。每个线程必须将一个任务出列并执行它。这适用于任务耗时且数量相对较少的情况。另一方面,如果这些任务多而小(它们需要很少的时间来执行),也会有很多争论。这是不好的,即使这是一个无锁队列,问题也没有完全解决。

为了减少争用并提高性能,线程池可以依赖于工作窃取算法和每个线程的队列。在这种情况下,所有任务都有一个中心入站队列,每个线程(工作线程)都有一个额外的队列(称为本地任务队列),如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jMlp3yqf-1657345732721)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/ec37eb0e-2a8f-4fa8-8dc2-7f302d81a4a8.png)]

因此,每个线程都会将任务从中心队列中出列,并将它们放入自己的队列中。每个线程都有自己的本地任务队列。此外,当一个线程想要处理一个任务时,它只是将一个任务从它自己的本地队列中取出。只要它的本地队列不是空的,线程就将继续处理来自它的任务,而不会影响其他线程(与其他线程没有冲突)。当其本地队列为空时(如上图中的线程 2 的情况),它尝试从属于其他线程的本地队列中窃取(通过工作窃取算法)任务(例如,线程 2线程 3 窃取任务)。如果找不到任何可窃取的内容,它将访问共享的中心入站队列。

每个本地队列实际上是一个 Deque(简称双向队列),因此可以从两端高效访问。线程将其双向队列视为一个栈,这意味着它将只从一端排队(添加新任务)和出列(获取要处理的任务)。另一方面,当一个线程试图从另一个线程的队列中窃取时,它将访问另一端(例如,线程 2 从另一端从线程 3 队列中窃取)。因此,任务从一端处理,从另一端窃取。

如果两个线程试图从同一个本地队列中窃取数据,那么就存在争用,但通常情况下这应该是无关紧要的。

我们刚才描述的是 JDK7 中引入的 Fork/Join 框架,“Fork/Join 框架”部分举例说明。从 JDK8 开始,Executors类通过使用可用处理器的数量作为其目标并行级别的工作窃取线程池进行了丰富。可通过Executors.newWorkStealingPool()Executors.newWorkStealingPool(int parallelism)获取。

让我们看看这个线程池的源代码:

public static ExecutorService newWorkStealingPool() {
  return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
      null, true);
}

因此,在内部,这个线程池通过以下构造器实例化ForkJoinPool

public ForkJoinPool(int parallelism,
  ForkJoinPool.ForkJoinWorkerThreadFactory factory,
  Thread.UncaughtExceptionHandler handler,
  boolean asyncMode)

我们将并行级别设置为availableProcessors(),返回新线程的默认线程工厂Thread.UncaughtExceptionHandler,作为null传递,asyncMode设置为true。将asyncMode设置为true意味着它授权本地先进先出FIFO)调度模式,用于分叉且从未连接的任务。在依赖工作线程仅处理事件样式异步任务的程序中,此模式可能比默认模式(基于本地栈)更合适。

不过,不要忘记,只有当工作线程在自己的本地队列中调度新任务时,本地任务队列和工作窃取算法才被授权。否则,ForkJoinPool只是一个额外开销的ThreadPoolExecutor

当我们直接使用ForkJoinPool时,我们可以使用ForkJoinTask(通常通过RecursiveTaskRecursiveAction指示任务在执行期间显式地调度新任务。

但是由于newWorkStealingPool()ForkJoinPool的更高抽象级别,我们不能指示任务在执行期间显式地调度新任务。因此,newWorkStealingPool()将根据我们通过的任务在内部决定如何工作。我们可以尝试比较一下newWorkStealingPool()newCachedThreadPool()newFixedThreadPool(),看看它们在两种情况下的表现:

  • 对于大量的小任务
  • 对于少量耗时的任务

在下一节中,我们来看看这两种场景的解决方案。

大量的小任务

由于生产者(检查器)和消费者(打包器)不同时工作,我们可以通过一个简单的for循环(我们对装配线的这部分不太感兴趣)轻松地用 15000000 个灯泡填满一个队列。这在以下代码段中显示:

private static final Random rnd = new Random();
private static final int MAX_PROD_BULBS = 15_000_000;
private static final BlockingQueue<String> queue 
  = new LinkedBlockingQueue<>();
...
private static void simulatingProducers() {
  logger.info("Simulating the job of the producers overnight ...");
  logger.info(() -> "The producers checked " 
    + MAX_PROD_BULBS + " bulbs ...");
  for (int i = 0; i < MAX_PROD_BULBS; i++) {
    queue.offer("bulb-" + rnd.nextInt(1000));
  }
}

此外,让我们创建一个默认的工作线程池:

private static ExecutorService consumerService 
  = Executors.newWorkStealingPool();

为了进行比较,我们还将使用以下线程池:

  • 缓存的线程池:
private static ExecutorService consumerService 
  = Executors.newCachedThreadPool();
  • 使用可用处理器数作为线程数的固定线程池(默认工作线程池使用处理器数作为并行级别):
private static final Consumer consumer = new Consumer();
private static final int PROCESSORS 
  = Runtime.getRuntime().availableProcessors();
private static ExecutorService consumerService 
  = Executors.newFixedThreadPool(PROCESSORS);

让我们开始 15000000 个小任务:

for (int i = 0; i < queueSize; i++) {
  consumerService.execute(consumer);
}

Consumer包装了一个简单的queue.poll()操作,因此它应该运行得非常快,如下面的代码片段所示:

private static class Consumer implements Runnable {
  @Override
  public void run() {
    String bulb = queue.poll();
    if (bulb != null) {
      // nothing
    }
  }
}

下图显示了 10 次运行的收集数据:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V40gRsBk-1657345732721)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/0ac3b907-bb12-4ac9-83a6-290e5d32f8f3.png)]

即使这不是一个专业的基准测试,我们也可以看到工作线程池获得了最好的结果,而缓存线程轮询的结果更差。

少量的耗时任务

与其让一个队列装满 15000000 个灯泡,不如让我们让 15 个队列装满 1000000 个灯泡:

private static final int MAX_PROD_BULBS = 15 _000_000;
private static final int CHUNK_BULBS = 1 _000_000;
private static final Random rnd = new Random();
private static final Queue<BlockingQueue<String>> chunks 
  = new LinkedBlockingQueue<>();
...
private static Queue<BlockingQueue<String>> simulatingProducers() {
  logger.info("Simulating the job of the producers overnight ...");
  logger.info(() -> "The producers checked " 
    + MAX_PROD_BULBS + " bulbs ...");
  int counter = 0;
  while (counter < MAX_PROD_BULBS) {
    BlockingQueue chunk = new LinkedBlockingQueue<>(CHUNK_BULBS);
    for (int i = 0; i < CHUNK_BULBS; i++) {
      chunk.offer("bulb-" + rnd.nextInt(1000));
    }
    chunks.offer(chunk);
    counter += CHUNK_BULBS;
  }
  return chunks;
}

让我们使用以下代码启动 15 个任务:

while (!chunks.isEmpty()) {
  Consumer consumer = new Consumer(chunks.poll());
  consumerService.execute(consumer);
}

每个Consumer循环 1000000 个灯泡,使用此代码:

private static class Consumer implements Runnable {
  private final BlockingQueue<String> bulbs;
  public Consumer(BlockingQueue<String> bulbs) {
    this.bulbs = bulbs;
  }
  @Override
  public void run() {
    while (!bulbs.isEmpty()) {
      String bulb = bulbs.poll();
      if (bulb != null) {}
    }
  }
}

下图显示了 10 次运行的收集数据:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ckn5reEF-1657345732722)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/e862acc6-7256-40fa-b744-df570c929c95.png)]

这一次,工作线程池看起来像一个常规线程池。

206 CallableFuture

这个问题重复了“线程池中具有单个线程”部分的场景。我们需要一个单一的生产者和消费者遵循以下场景:

  1. 一个自动系统向生产者发出一个请求,说,检查这个灯泡,如果没有问题,就把它还给我,否则告诉我这个灯泡出了什么问题
  2. 自动系统等待生产者检查灯泡。
  3. 当自动系统接收到检查过的灯泡时,它会进一步传递给耗电元件(打包机)并重复此过程。
  4. 如果灯泡有缺陷,生产者抛出异常(DefectBulbException),自动系统将检查问题的原因。

该场景如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xNGQzJTZ-1657345732722)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/39a7ac96-dfd5-4571-a37e-0b912577de3d.png)]

为了形成这个场景,生产者应该能够返回一个结果并抛出一个异常。因为我们的制作人是Runnable,所以这两个都做不到。但是 Java 定义了一个名为Callable的接口。这是一个函数式接口,其方法名为call()。与Runnablerun()方法相比,call()方法可以返回结果,甚至抛出异常V call() throws Exception

这意味着生产者(检查者)可以写为:

private static volatile boolean runningProducer;
private static final int MAX_PROD_TIME_MS = 5 * 1000;
private static final Random rnd = new Random();
...
private static class Producer implements Callable {
  private final String bulb;
  private Producer(String bulb) {
    this.bulb = bulb;
  }
  @Override
  public String call() 
      throws DefectBulbException, InterruptedException {
    if (runningProducer) {
      Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS));
      if (rnd.nextInt(100) < 5) {
        throw new DefectBulbException("Defect: " + bulb);
      } else {
        logger.info(() -> "Checked: " + bulb);
      }
      return bulb;
    }
    return "";
  }
}

执行者服务可以通过submit()方法向Callable提交任务,但不知道提交任务的结果何时可用。因此,Callable立即返回一个特殊类型,名为Future。异步计算的结果由Future表示,通过Future可以在任务可用时获取任务结果。从概念上讲,我们可以将Future看作 JavaScript Promise,或者是在稍后时间点进行的计算的结果。现在,我们创建一个Producer提交给Callable

String bulb = "bulb-" + rnd.nextInt(1000);
Producer producer = new Producer(bulb);
Future<String> bulbFuture = producerService.submit(producer);
// this line executes immediately

由于Callable会立即返回一个Future,所以我们可以在等待提交任务结果的同时执行其他任务(如果该任务完成,isDone()标志方法返回true):

while (!future.isDone()) {
  System.out.println("Do something else ...");
}

检索Future的结果可以使用阻塞方法Future.get()来完成。此方法将阻塞,直到结果可用或指定的超时已过(如果在超时之前结果不可用,则抛出一个TimeoutException

String checkedBulb = bulbFuture.get(
  MAX_PROD_TIME_MS + 1000, TimeUnit.MILLISECONDS);
// this line executes only after the result is available

一旦得到结果,我们就可以将其传递给Consumer,并向Producer提交另一个任务。只要消费者和生产者都在运行,这个循环就会重复。其代码如下:

private static void automaticSystem() {
  while (runningProducer &amp;&amp; runningConsumer) {
    String bulb = "bulb-" + rnd.nextInt(1000);
    Producer producer = new Producer(bulb);
    Future<String> bulbFuture = producerService.submit(producer);
    ...
    String checkedBulb = bulbFuture.get(
      MAX_PROD_TIME_MS + 1000, TimeUnit.MILLISECONDS);
    Consumer consumer = new Consumer(checkedBulb);
    if (runningConsumer) {
      consumerService.execute(consumer);
    }
  }
  ...
}

Consumer仍然是Runnable,因此不能返回结果或抛出异常:

private static final int MAX_CONS_TIME_MS = 3 * 1000;
...
private static class Consumer implements Runnable {
  private final String bulb;
  private Consumer(String bulb) {
    this.bulb = bulb;
  }
  @Override
  public void run() {
    if (runningConsumer) {
      try {
        Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS));
        logger.info(() -> "Packed: " + bulb);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
      }
    }
  }
}

最后,我们需要启动自动系统。其代码如下:

public static void startAssemblyLine() {
  ...
  runningProducer = true;
  consumerService = Executors.newSingleThreadExecutor();
  runningConsumer = true;
  producerService = Executors.newSingleThreadExecutor();
  new Thread(() -> {
    automaticSystem();
  }).start();
}

注意,我们不想阻塞主线程,因此我们在一个新线程中启动自动系统。这样主线程就可以控制装配线的启停过程。

让我们运行装配线几分钟来收集一些输出:

Starting assembly line ...
[08:38:41] [INFO ] Checked: bulb-879
...
[08:38:52] [SEVERE ] Exception: DefectBulbException: Defect: bulb-553
[08:38:53] [INFO ] Packed: bulb-305
...

好了,任务完成了!让我们来讨论最后一个话题。

取消Future

Future可以取消。这是使用cancel(boolean mayInterruptIfRunning)方法完成的。如果我们将其作为true传递,则执行该任务的线程被中断,否则,该线程可以完成该任务。如果任务取消成功,则返回true,否则返回false(通常是因为任务已经正常完成)。下面是一个简单的示例,用于在运行任务所需时间超过 1 秒时取消该任务:

long startTime = System.currentTimeMillis();
Future<String> future = executorService.submit(() -> {
  Thread.sleep(3000);
  return "Task completed";
});
while (!future.isDone()) {
  System.out.println("Task is in progress ...");
  Thread.sleep(100);
  long elapsedTime = (System.currentTimeMillis() - startTime);
  if (elapsedTime > 1000) {
    future.cancel(true);
  }
}

如果任务在正常完成前被取消,isCancelled()方法返回true

System.out.println("Task was cancelled: " + future.isCancelled() 
  + "\nTask is done: " + future.isDone());

输出如下:

Task is in progress ...
Task is in progress ...
...
Task was cancelled: true
Task is done: true

以下是一些额外的例子:

  • 使用Callable和 Lambda:
Future<String> future = executorService.submit(() -> {
  return "Hello to you!";
});
  • 获取通过Executors.callable(Runnable task)返回nullCallable
Callable<Object> callable = Executors.callable(() -> {
  System.out.println("Hello to you!");
});
Future<Object> future = executorService.submit(callable);
  • 通过Executors.callable(Runnable task, T result)获取返回结果(TCallable
Callable<String> callable = Executors.callable(() -> {
  System.out.println("Hello to you!");
}, "Hi");
Future<String> future = executorService.submit(callable);

Java 编程问题:十、并发-线程池、可调用对象和同步器4https://developer.aliyun.com/article/1426164

相关文章
|
1天前
|
缓存 并行计算 Java
重温JAVA线程池精髓:Executor、ExecutorService及Executors的源码剖析与应用指南
重温JAVA线程池精髓:Executor、ExecutorService及Executors的源码剖析与应用指南
11 5
|
23小时前
|
Java
java线程池执行任务(一次任务、固定间隔时间任务等)
java线程池执行任务(一次任务、固定间隔时间任务等)
5 1
|
1天前
|
监控 Java 调度
Java并发编程:深入理解线程池
【6月更文挑战第26天】在Java并发编程的世界中,线程池是提升应用性能、优化资源管理的关键组件。本文将深入探讨线程池的内部机制,从核心概念到实际应用,揭示如何有效利用线程池来处理并发任务,同时避免常见的陷阱和错误实践。通过实例分析,我们将了解线程池配置的策略和对性能的影响,以及如何监控和维护线程池的健康状况。
6 1
|
1天前
|
数据采集 Java Unix
10-多线程、多进程和线程池编程(2)
10-多线程、多进程和线程池编程
|
15小时前
|
存储 缓存 Java
老程序员分享:Java并发编程:线程池的使用
老程序员分享:Java并发编程:线程池的使用
|
1天前
|
存储 设计模式 并行计算
CopyOnWriteArrayList:深入理解Java中的线程安全List原理和应用
CopyOnWriteArrayList:深入理解Java中的线程安全List原理和应用
7 0
|
1天前
|
缓存 并行计算 安全
【并发编程系列一】并发编年史:线程的双刃剑——从优势到风险的全面解析
【并发编程系列一】并发编年史:线程的双刃剑——从优势到风险的全面解析
|
1天前
|
Java 测试技术 开发者
Java并发编程:深入理解线程池
本文将带领读者深入了解Java中的线程池,探索其内部机制、使用场景以及如何有效地利用线程池来提高程序的性能和可维护性。我们将通过实例演示如何创建和配置线程池,并讨论常见的并发模式和最佳实践。文章旨在为开发者提供实用的线程池应用知识,帮助他们在面对多线程编程挑战时,能够设计出更加高效和稳定的系统。
|
1月前
|
数据可视化 Java 测试技术
Java 编程问题:十一、并发-深入探索1
Java 编程问题:十一、并发-深入探索
53 0
|
1月前
|
存储 设计模式 安全
Java 编程问题:十、并发-线程池、可调用对象和同步器2
Java 编程问题:十、并发-线程池、可调用对象和同步器
44 0