Java 编程问题:十、并发-线程池、可调用对象和同步器2

简介: Java 编程问题:十、并发-线程池、可调用对象和同步器

Java 编程问题:十、并发-线程池、可调用对象和同步器1https://developer.aliyun.com/article/1426161

201 Java 中的线程池

线程池是可用于执行任务的线程的集合。线程池负责管理其线程的创建、分配和生命周期,并有助于提高性能。现在,我们来谈谈遗嘱执行人。

Executor

java.util.concurrent包中,有一堆专用于执行任务的接口。最简单的一个叫做Executor。这个接口公开了一个名为execute(Runnable command)的方法。下面是使用此方法执行单个任务的示例:

public class SimpleExecutor implements Executor {
  @Override
  public void execute(Runnable r) {
    (new Thread(r)).start();
  }
}
SimpleExecutor se = new SimpleExecutor();
se.execute(() -> {
  System.out.println("Simple task executed via Executor interface");
});

ExecutorService

一个更复杂、更全面的接口提供了许多附加方法,它是ExecutorService。这是Executor的丰富版本。Java 附带了一个成熟的实现ExecutorService,名为ThreadPoolExecutor。这是一个线程池,可以用一组参数实例化,如下所示:

ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler)

下面是对前面代码中实例化的每个参数的简短描述:

  • corePoolSize:池中要保留的线程数,即使它们是空闲的(除非设置了allowCoreThreadTimeOut
  • maximumPoolSize:允许的最大线程数
  • keepAliveTime:当这个时间过去后,空闲线程将从池中移除(这些是超过corePoolSize的空闲线程)
  • unitkeepAliveTime参数的时间单位
  • workQueue:在Runnable的实例(只有execute()方法提交的Runnable任务)执行之前,用来存放这些实例的队列
  • threadFactory:执行器创建新线程时使用此工厂
  • handler:当ThreadPoolExecutor由于饱和而无法执行Runnable时,即线程边界和队列容量已满(例如,workQueue大小固定,同时设置了maximumPoolSize),它将控制和决策交给这个处理器

为了优化池大小,我们需要收集以下信息:

  • CPU 数量(Runtime.getRuntime().availableProcessors()
  • 目标 CPU 利用率(在范围内,[0, 1]
  • 等待时间(W)
  • 计算时间(C)

以下公式有助于我们确定池的最佳大小:

Number of threads 
  = Number of CPUs * Target CPU utilization * (1 + W/C)

根据经验,对于计算密集型任务(通常是小型任务),最好使用线程数等于处理器数或处理器数 +1(以防止潜在的暂停)来对线程池进行基准测试。对于耗时且阻塞的任务(例如,I/O),更大的池更好,因为线程将无法以高速率进行调度。另外,还要注意与其他池(例如,数据库连接池和套接字连接池)的干扰。

让我们看一个ThreadPoolExecutor的例子:

public class SimpleThreadPoolExecutor implements Runnable {
  private final int taskId;
  public SimpleThreadPoolExecutor(int taskId) {
    this.taskId = taskId;
  }
  @Override
  public void run() {
    Thread.sleep(2000);
    System.out.println("Executing task " + taskId 
      + " via " + Thread.currentThread().getName());
  }
  public static void main(String[] args) {
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    final AtomicInteger counter = new AtomicInteger();
    ThreadFactory threadFactory = (Runnable r) -> {
      System.out.println("Creating a new Cool-Thread-" 
        + counter.incrementAndGet());
      return new Thread(r, "Cool-Thread-" + counter.get());
    };
    RejectedExecutionHandler rejectedHandler
      = (Runnable r, ThreadPoolExecutor executor) -> {
        if (r instanceof SimpleThreadPoolExecutor) {
          SimpleThreadPoolExecutor task=(SimpleThreadPoolExecutor) r;
          System.out.println("Rejecting task " + task.taskId);
        }
    };
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 1,
      TimeUnit.SECONDS, queue, threadFactory, rejectedHandler);
    for (int i = 0; i < 50; i++) {
      executor.execute(new SimpleThreadPoolExecutor(i));
    }
    executor.shutdown();
    executor.awaitTermination(
      Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
  }
}

main()方法触发Runnable50 个实例。每个Runnable睡两秒钟,打印一条消息。工作队列限制为Runnable的 5 个实例——核心线程为 10,线程数最多为 20 个,空闲超时为 1 秒。可能的输出如下:

Creating a new Cool-Thread-1
...
Creating a new Cool-Thread-20
Rejecting task 25
...
Rejecting task 49
Executing task 22 via Cool-Thread-18
...
Executing task 12 via Cool-Thread-2

ScheduledExecutorService

ScheduledExecutorService是一个ExecutorService,它可以安排任务在给定的延迟后执行,或者定期执行。这里,我们有schedule()scheduleAtFixedRate()scheduleWithFixedDelay()等方法。schedule()用于一次性任务,scheduleAtFixedRate()scheduleWithFixedDelay()用于周期性任务。

通过执行器的线程池

更进一步,我们将介绍辅助类Executors。此类使用以下方法公开几种类型的线程池:

  • newSingleThreadExecutor():这是一个线程池,只管理一个线程,队列无限,一次只执行一个任务:
ExecutorService executor 
  = Executors.newSingleThreadExecutor();
  • newCachedThreadPool():这是一个线程池,根据需要创建新线程并删除空闲线程(60 秒后);核心池大小为 0,最大池大小为Integer.MAX_VALUE(此线程池在需求增加时扩展,在需求减少时收缩):
ExecutorService executor = Executors.newCachedThreadPool();
  • newFixedThreadPool():这是一个线程数固定、队列无限的线程池,产生无限超时的效果(核心池大小和最大池大小等于指定的大小):
ExecutorService executor = Executors.newFixedThreadPool(5);
  • newWorkStealingThreadPool():这是一个基于工作窃取算法的线程池(它充当 Fork/Join 框架上的一层):
ExecutorService executor = Executors.newWorkStealingPool();
  • newScheduledThreadPool():一个线程池,可以安排命令在给定的延迟后运行,或者定期执行(我们可以指定核心池的大小):
ScheduledExecutorService executor 
  = Executors.newScheduledThreadPool(5);

202 具有单个线程的线程池

为了演示单线程线程池的工作原理,假设我们想编写一个程序,模拟装配线(或输送机),用两个工作器检查和包装灯泡。

通过检查,我们了解到工作器测试灯泡是否亮起。通过包装,我们了解到,工作器将经过验证的灯泡拿到盒子里。这种工艺在几乎所有工厂都很常见。

两名工作器如下:

  • 一种所谓的生产者(或检查者),负责测试每个灯泡,看灯泡是否亮起
  • 一个所谓的消费者(或包装商),负责将每个检查过的灯泡包装到一个盒子里

这种问题完全适合于下图所示的生产者消费者设计模式:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wMW6fHCN-1657345732716)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/817c7786-9093-401c-9afd-bf9b39aabd25.png)]

最常见的是,在这种模式中,生产者和消费者通过队列(生产者排队数据,消费者排队数据)进行通信。这个队列称为数据缓冲区。当然,根据流程设计,其他数据结构也可以起到数据缓冲的作用。

现在,让我们看看如果生产者等待消费者可用,我们如何实现这个模式。

稍后,我们将为不等待消费者的生产者实现此模式。

生产者等待消费者出现

装配线启动时,生产者将逐个检查进线灯泡,而消费者将将其打包(每个盒子中有一个灯泡)。此流重复,直到装配线停止。

下图是生产者消费者之间的流程图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yPBj7z7O-1657345732716)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/3cf100e5-7ad8-4548-9a33-1e55a146b655.png)]

我们可以将装配线视为我们工厂的助手,因此它可以实现为助手或工具类(当然,它也可以很容易地切换到非static实现,因此如果对您的情况更有意义,请随意切换):

public final class AssemblyLine {
  private AssemblyLine() {
    throw new AssertionError("There is a single assembly line!");
  }
  ...
}

当然,实现这个场景的方法很多,但是我们对使用 JavaExecutorService感兴趣,更准确地说是Executors.newSingleThreadExecutor()。使用一个工作线程来操作未绑定队列的Executor通过此方法创建。

我们只有两个工作器,所以我们可以使用两个实例Executor(一个Executor将启动生产者,另一个将启动消费者)。因此,生产者将是一个线程,消费者将是另一个线程:

private static ExecutorService producerService;
private static ExecutorService consumerService;

由于生产者和消费者是好朋友,他们决定根据一个简单的场景来工作:

  • 只有消费者不忙时,生产者才会检查灯泡并将其传递给消费者(如果消费者忙,生产者会等待一段时间,直到消费者有空)
  • 生产者在将当前灯泡传递给用户之前不会检查下一个灯泡
  • 消费者将尽快包装每个进入的灯泡

这个场景适用于TransferQueueSynchronousQueue,它执行的过程与前面提到的场景非常相似。让我们使用TransferQueue。这是一个BlockingQueue,其中生产者可以等待消费者接收元素。BlockingQueue实现是线程安全的:

private static final TransferQueue<String> queue 
  = new LinkedTransferQueue<>();

生产者和消费者之间的工作流程是先进先出FIFO)类型:第一个检查的灯泡是第一个打包的灯泡),因此LinkedTransferQueue是一个不错的选择。

一旦装配线启动,生产者将持续检查灯泡,因此我们可以将其作为一个类来实现,如下所示:

private static final int MAX_PROD_TIME_MS = 5 * 1000;
private static final int MAX_CONS_TIME_MS = 7 * 1000;
private static final int TIMEOUT_MS = MAX_CONS_TIME_MS + 1000;
private static final Random rnd = new Random();
private static volatile boolean runningProducer;
...
private static class Producer implements Runnable {
  @Override
  public void run() {
    while (runningProducer) {
      try {
        String bulb = "bulb-" + rnd.nextInt(1000);
        Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS));
        boolean transfered = queue.tryTransfer(bulb,
          TIMEOUT_MS, TimeUnit.MILLISECONDS);
        if (transfered) {
          logger.info(() -> "Checked: " + bulb);
        }
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
        break;
      }
    }
  }
}

因此,生产者通过tryTransfer()方法将检查过的灯泡转移给消费者。如果可以在超时时间过去之前将元素传输到使用者,则此方法将执行此操作。

避免使用transfer()方法,这可能会无限期地堵塞螺纹。

为了模拟生产者检查灯泡所花的时间,相应的线程将在 0 到 5 之间随机休眠几秒(5 秒是检查灯泡所需的最长时间)。如果消费者在此时间之后不可用,则会花费更多时间(在tryTransfer()中),直到消费者可用或超时结束。

另一方面,使用另一个类实现使用者,如下所示:

private static volatile boolean runningConsumer;
...
private static class Consumer implements Runnable {
  @Override
  public void run() {
    while (runningConsumer) {
      try {
        String bulb = queue.poll(
          MAX_PROD_TIME_MS, TimeUnit.MILLISECONDS);
        if (bulb != null) {
          Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS));
          logger.info(() -> "Packed: " + bulb);
        }
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
        break;
      }
    }
  }
}

消费者可以通过queue.take()方法从生产者处取灯泡。此方法检索并删除此队列的头,如有必要,请等待,直到灯泡可用。也可以调用poll()方法,在该方法中检索并移除队列的头,或者如果该队列为空,则返回null。但这两个都不适合我们。如果生产者不在,消费者可能仍然停留在take()方法中。另一方面,如果队列是空的(生产者现在正在检查当前灯泡),poll()方法将很快被一次又一次地调用,导致伪重复。解决这个问题的方法是poll(long timeout, TimeUnit unit)。此方法检索并删除此队列的头,并在指定的等待时间内(如果需要)等待灯泡变为可用。仅当等待时间过后队列为空时,才会返回null

为了模拟耗电元件包装灯泡所需的时间,相应的线程将在 0 到 7 之间随机休眠几秒(7 秒是包装灯泡所需的最长时间)。

启动生产者和消费者是一项非常简单的任务,它是通过一种名为startAssemblyLine()的方法完成的,如下所示:

public static void startAssemblyLine() {
  if (runningProducer || runningConsumer) {
    logger.info("Assembly line is already running ...");
    return;
  }
  logger.info("\n\nStarting assembly line ...");
  logger.info(() -> "Remaining bulbs from previous run: \n"
    + queue + "\n\n");
  runningProducer = true;
  producerService = Executors.newSingleThreadExecutor();
  producerService.execute(producer);
  runningConsumer = true;
  consumerService = Executors.newSingleThreadExecutor();
  consumerService.execute(consumer);
}

停止装配线是一个微妙的过程,可以通过不同的场景来解决。主要是,当装配线停止时,生产者应检查当前灯泡作为最后一个灯泡,消费者必须包装它。生产者可能需要等待消费者包装好当前灯泡,然后才能转移最后一个灯泡;此外,消费者也必须包装好这个灯泡。

为了遵循此场景,我们首先停止生产者,然后停止消费者:

public static void stopAssemblyLine() {
  logger.info("Stopping assembly line ...");
  boolean isProducerDown = shutdownProducer();
  boolean isConsumerDown = shutdownConsumer();
  if (!isProducerDown || !isConsumerDown) {
    logger.severe("Something abnormal happened during
      shutting down the assembling line!");
    System.exit(0);
  }
  logger.info("Assembling line was successfully stopped!");
}
private static boolean shutdownProducer() {
  runningProducer = false;
  return shutdownExecutor(producerService);
}
private static boolean shutdownConsumer() {
  runningConsumer = false;
  return shutdownExecutor(consumerService);
}

最后,我们给生产者和消费者足够的时间来正常停止(不中断线程)。这在shutdownExecutor()方法中发生,如下所示:

private static boolean shutdownExecutor(ExecutorService executor) {
  executor.shutdown();
  try {
    if (!executor.awaitTermination(TIMEOUT_MS * 2,
        TimeUnit.MILLISECONDS)) {
      executor.shutdownNow();
      return executor.awaitTermination(TIMEOUT_MS * 2,
        TimeUnit.MILLISECONDS);
    }
    return true;
  } catch (InterruptedException ex) {
    executor.shutdownNow();
    Thread.currentThread().interrupt();
    logger.severe(() -> "Exception: " + ex);
  }
  return false;
}

我们要做的第一件事是将runningProducer static变量设置为false。这将破坏while(runningProducer),因此这将是最后一次检查灯泡。此外,我们启动生产者的关闭程序。

对于消费者,我们要做的第一件事是将runningConsumer static变量设置为false。这将打破while(runningConsumer),因此这将是最后一个灯泡包装。此外,我们启动耗电元件的关闭程序。

让我们看看装配线的可能执行(运行 10 秒):

AssemblyLine.startAssemblyLine();
Thread.sleep(10 * 1000);
AssemblyLine.stopAssemblyLine();

可能的输出如下:

Starting assembly line ...
...
[2019-04-14 07:39:40] [INFO] Checked: bulb-89
[2019-04-14 07:39:43] [INFO] Packed: bulb-89
...
Stopping assembly line ...
...
[2019-04-14 07:39:53] [INFO] Packed: bulb-322
Assembling line was successfully stopped!

一般来说,如果停产需要很长时间(就好像停产一样),那么生产者和消费者的数量和/或生产和消费时间之间可能存在不平衡率。您可能需要添加或减去生产者或消费者。

生产者不等待消费者出现

如果生产者检查灯泡的速度比消费者包装灯泡的速度快,那么他们很可能会决定采用以下工作流程:

  • 生产者将逐一检查灯泡,并将其推入队列
  • 消费者将从队列中轮询并打包灯泡

由于消费者比生产者慢,队列将容纳已检查但未包装的灯泡(我们可以假设有空队列的可能性很低)。在下图中,我们有生产者、消费者和用于存储已检查但未包装灯泡的队列:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZpLZp4de-1657345732717)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/00124b9f-15a5-4afd-993e-ee96b1ec47d1.png)]

为了形成这种情况,我们可以依赖于ConcurrentLinkedQueue(或LinkedBlockingQueue)。这是一个基于链接节点的无限线程安全队列:

private static final Queue<String> queue 
  = new ConcurrentLinkedQueue<>();

为了在队列中推一个灯泡,生产者调用offer()方法:

queue.offer(bulb);

另一方面,消费者使用poll()方法处理队列中的灯泡(因为消费者比生产者慢,所以当poll()返回null时应该是罕见的情况):

String bulb = queue.poll();

让我们第一次启动装配线 10 秒钟。这将输出以下内容:

Starting assembly line ...
...
[2019-04-14 07:44:58] [INFO] Checked: bulb-827
[2019-04-14 07:44:59] [INFO] Checked: bulb-257
[2019-04-14 07:44:59] [INFO] Packed: bulb-827
...
Stopping assembly line ...
...
[2019-04-14 07:45:08] [INFO] Checked: bulb-369
[2019-04-14 07:45:09] [INFO] Packed: bulb-690
...
Assembling line was successfully stopped!

此时,装配线停止,在队列中,我们有以下内容(这些灯泡已检查,但未包装):

[bulb-968, bulb-782, bulb-627, bulb-886, ...]

我们重新启动装配线并检查突出显示的行,这表明消费者从停止的位置恢复其工作:

Starting assembly line ...
[2019-04-14 07:45:12] [INFO ] Packed: bulb-968 [2019-04-14 07:45:12] [INFO ] Checked: bulb-812
[2019-04-14 07:45:12] [INFO ] Checked: bulb-470
[2019-04-14 07:45:14] [INFO ] Packed: bulb-782 [2019-04-14 07:45:15] [INFO ] Checked: bulb-601
[2019-04-14 07:45:16] [INFO ] Packed: bulb-627 ...

Java 编程问题:十、并发-线程池、可调用对象和同步器3https://developer.aliyun.com/article/1426163

相关文章
|
22小时前
|
Java 数据处理 调度
Java多线程编程入门指南
Java多线程编程入门指南
|
1天前
|
监控 安全 算法
如何有效地处理Java中的多线程
如何有效地处理Java中的多线程
|
2天前
|
Java
解析Java线程池:参数详解与执行流程
解析Java线程池:参数详解与执行流程
6 1
|
2天前
|
Java 调度
Java多线程编程与并发控制策略
Java多线程编程与并发控制策略
|
22小时前
|
安全 Java 数据处理
Android多线程编程实践与优化技巧
Android多线程编程实践与优化技巧
|
23小时前
|
并行计算 安全 Java
多线程编程中的线程安全问题与解决方案*
多线程编程中的线程安全问题与解决方案*
|
1天前
|
安全 Java 开发者
Java并发编程中的线程安全策略
在现代软件开发中,Java语言的并发编程特性使得多线程应用成为可能。然而,随着线程数量的增加,如何确保数据的一致性和系统的稳定性成为开发者面临的挑战。本文将探讨Java并发编程中实现线程安全的几种策略,包括同步机制、volatile关键字的使用、以及java.util.concurrent包提供的工具类,旨在为Java开发者提供一系列实用的方法来应对并发问题。
8 0
|
1天前
|
监控 Java UED
Java并发编程:深入理解线程池的设计与应用
本文旨在通过数据导向和科学严谨的方式,深入探讨Java并发编程中的关键组件——线程池。文章首先概述了线程池的基本概念与重要性,随后详细解读了线程池的核心参数及其对性能的影响,并通过实验数据支持分析结果。此外,文中还将介绍如何根据不同的应用场景选择或设计合适的线程池,以及如何避免常见的并发问题。最后,通过案例研究,展示线程池在实际应用中的优化效果,为开发人员提供实践指导。
7 0
|
2天前
|
存储 缓存 Java
Java并发编程之线程池的使用
Java并发编程之线程池的使用
|
2天前
|
NoSQL Redis
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
Redis系列学习文章分享---第五篇(Redis实战篇--优惠券秒杀,全局唯一id 添加优惠券 实现秒杀下单 库存超卖问题分析 乐观锁解决超卖 实现一人一单功能 集群下的线程并发安全问题)
5 0