Java 编程问题:十、并发-线程池、可调用对象和同步器2

简介: Java 编程问题:十、并发-线程池、可调用对象和同步器

Java 编程问题:十、并发-线程池、可调用对象和同步器1https://developer.aliyun.com/article/1426161

201 Java 中的线程池

线程池是可用于执行任务的线程的集合。线程池负责管理其线程的创建、分配和生命周期,并有助于提高性能。现在,我们来谈谈遗嘱执行人。

Executor

java.util.concurrent包中,有一堆专用于执行任务的接口。最简单的一个叫做Executor。这个接口公开了一个名为execute(Runnable command)的方法。下面是使用此方法执行单个任务的示例:

public class SimpleExecutor implements Executor {
  @Override
  public void execute(Runnable r) {
    (new Thread(r)).start();
  }
}
SimpleExecutor se = new SimpleExecutor();
se.execute(() -> {
  System.out.println("Simple task executed via Executor interface");
});

ExecutorService

一个更复杂、更全面的接口提供了许多附加方法,它是ExecutorService。这是Executor的丰富版本。Java 附带了一个成熟的实现ExecutorService,名为ThreadPoolExecutor。这是一个线程池,可以用一组参数实例化,如下所示:

ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler)

下面是对前面代码中实例化的每个参数的简短描述:

  • corePoolSize:池中要保留的线程数,即使它们是空闲的(除非设置了allowCoreThreadTimeOut
  • maximumPoolSize:允许的最大线程数
  • keepAliveTime:当这个时间过去后,空闲线程将从池中移除(这些是超过corePoolSize的空闲线程)
  • unitkeepAliveTime参数的时间单位
  • workQueue:在Runnable的实例(只有execute()方法提交的Runnable任务)执行之前,用来存放这些实例的队列
  • threadFactory:执行器创建新线程时使用此工厂
  • handler:当ThreadPoolExecutor由于饱和而无法执行Runnable时,即线程边界和队列容量已满(例如,workQueue大小固定,同时设置了maximumPoolSize),它将控制和决策交给这个处理器

为了优化池大小,我们需要收集以下信息:

  • CPU 数量(Runtime.getRuntime().availableProcessors()
  • 目标 CPU 利用率(在范围内,[0, 1]
  • 等待时间(W)
  • 计算时间(C)

以下公式有助于我们确定池的最佳大小:

Number of threads 
  = Number of CPUs * Target CPU utilization * (1 + W/C)

根据经验,对于计算密集型任务(通常是小型任务),最好使用线程数等于处理器数或处理器数 +1(以防止潜在的暂停)来对线程池进行基准测试。对于耗时且阻塞的任务(例如,I/O),更大的池更好,因为线程将无法以高速率进行调度。另外,还要注意与其他池(例如,数据库连接池和套接字连接池)的干扰。

让我们看一个ThreadPoolExecutor的例子:

public class SimpleThreadPoolExecutor implements Runnable {
  private final int taskId;
  public SimpleThreadPoolExecutor(int taskId) {
    this.taskId = taskId;
  }
  @Override
  public void run() {
    Thread.sleep(2000);
    System.out.println("Executing task " + taskId 
      + " via " + Thread.currentThread().getName());
  }
  public static void main(String[] args) {
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    final AtomicInteger counter = new AtomicInteger();
    ThreadFactory threadFactory = (Runnable r) -> {
      System.out.println("Creating a new Cool-Thread-" 
        + counter.incrementAndGet());
      return new Thread(r, "Cool-Thread-" + counter.get());
    };
    RejectedExecutionHandler rejectedHandler
      = (Runnable r, ThreadPoolExecutor executor) -> {
        if (r instanceof SimpleThreadPoolExecutor) {
          SimpleThreadPoolExecutor task=(SimpleThreadPoolExecutor) r;
          System.out.println("Rejecting task " + task.taskId);
        }
    };
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 1,
      TimeUnit.SECONDS, queue, threadFactory, rejectedHandler);
    for (int i = 0; i < 50; i++) {
      executor.execute(new SimpleThreadPoolExecutor(i));
    }
    executor.shutdown();
    executor.awaitTermination(
      Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
  }
}

main()方法触发Runnable50 个实例。每个Runnable睡两秒钟,打印一条消息。工作队列限制为Runnable的 5 个实例——核心线程为 10,线程数最多为 20 个,空闲超时为 1 秒。可能的输出如下:

Creating a new Cool-Thread-1
...
Creating a new Cool-Thread-20
Rejecting task 25
...
Rejecting task 49
Executing task 22 via Cool-Thread-18
...
Executing task 12 via Cool-Thread-2

ScheduledExecutorService

ScheduledExecutorService是一个ExecutorService,它可以安排任务在给定的延迟后执行,或者定期执行。这里,我们有schedule()scheduleAtFixedRate()scheduleWithFixedDelay()等方法。schedule()用于一次性任务,scheduleAtFixedRate()scheduleWithFixedDelay()用于周期性任务。

通过执行器的线程池

更进一步,我们将介绍辅助类Executors。此类使用以下方法公开几种类型的线程池:

  • newSingleThreadExecutor():这是一个线程池,只管理一个线程,队列无限,一次只执行一个任务:
ExecutorService executor 
  = Executors.newSingleThreadExecutor();
  • newCachedThreadPool():这是一个线程池,根据需要创建新线程并删除空闲线程(60 秒后);核心池大小为 0,最大池大小为Integer.MAX_VALUE(此线程池在需求增加时扩展,在需求减少时收缩):
ExecutorService executor = Executors.newCachedThreadPool();
  • newFixedThreadPool():这是一个线程数固定、队列无限的线程池,产生无限超时的效果(核心池大小和最大池大小等于指定的大小):
ExecutorService executor = Executors.newFixedThreadPool(5);
  • newWorkStealingThreadPool():这是一个基于工作窃取算法的线程池(它充当 Fork/Join 框架上的一层):
ExecutorService executor = Executors.newWorkStealingPool();
  • newScheduledThreadPool():一个线程池,可以安排命令在给定的延迟后运行,或者定期执行(我们可以指定核心池的大小):
ScheduledExecutorService executor 
  = Executors.newScheduledThreadPool(5);

202 具有单个线程的线程池

为了演示单线程线程池的工作原理,假设我们想编写一个程序,模拟装配线(或输送机),用两个工作器检查和包装灯泡。

通过检查,我们了解到工作器测试灯泡是否亮起。通过包装,我们了解到,工作器将经过验证的灯泡拿到盒子里。这种工艺在几乎所有工厂都很常见。

两名工作器如下:

  • 一种所谓的生产者(或检查者),负责测试每个灯泡,看灯泡是否亮起
  • 一个所谓的消费者(或包装商),负责将每个检查过的灯泡包装到一个盒子里

这种问题完全适合于下图所示的生产者消费者设计模式:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wMW6fHCN-1657345732716)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/817c7786-9093-401c-9afd-bf9b39aabd25.png)]

最常见的是,在这种模式中,生产者和消费者通过队列(生产者排队数据,消费者排队数据)进行通信。这个队列称为数据缓冲区。当然,根据流程设计,其他数据结构也可以起到数据缓冲的作用。

现在,让我们看看如果生产者等待消费者可用,我们如何实现这个模式。

稍后,我们将为不等待消费者的生产者实现此模式。

生产者等待消费者出现

装配线启动时,生产者将逐个检查进线灯泡,而消费者将将其打包(每个盒子中有一个灯泡)。此流重复,直到装配线停止。

下图是生产者消费者之间的流程图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yPBj7z7O-1657345732716)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/3cf100e5-7ad8-4548-9a33-1e55a146b655.png)]

我们可以将装配线视为我们工厂的助手,因此它可以实现为助手或工具类(当然,它也可以很容易地切换到非static实现,因此如果对您的情况更有意义,请随意切换):

public final class AssemblyLine {
  private AssemblyLine() {
    throw new AssertionError("There is a single assembly line!");
  }
  ...
}

当然,实现这个场景的方法很多,但是我们对使用 JavaExecutorService感兴趣,更准确地说是Executors.newSingleThreadExecutor()。使用一个工作线程来操作未绑定队列的Executor通过此方法创建。

我们只有两个工作器,所以我们可以使用两个实例Executor(一个Executor将启动生产者,另一个将启动消费者)。因此,生产者将是一个线程,消费者将是另一个线程:

private static ExecutorService producerService;
private static ExecutorService consumerService;

由于生产者和消费者是好朋友,他们决定根据一个简单的场景来工作:

  • 只有消费者不忙时,生产者才会检查灯泡并将其传递给消费者(如果消费者忙,生产者会等待一段时间,直到消费者有空)
  • 生产者在将当前灯泡传递给用户之前不会检查下一个灯泡
  • 消费者将尽快包装每个进入的灯泡

这个场景适用于TransferQueueSynchronousQueue,它执行的过程与前面提到的场景非常相似。让我们使用TransferQueue。这是一个BlockingQueue,其中生产者可以等待消费者接收元素。BlockingQueue实现是线程安全的:

private static final TransferQueue<String> queue 
  = new LinkedTransferQueue<>();

生产者和消费者之间的工作流程是先进先出FIFO)类型:第一个检查的灯泡是第一个打包的灯泡),因此LinkedTransferQueue是一个不错的选择。

一旦装配线启动,生产者将持续检查灯泡,因此我们可以将其作为一个类来实现,如下所示:

private static final int MAX_PROD_TIME_MS = 5 * 1000;
private static final int MAX_CONS_TIME_MS = 7 * 1000;
private static final int TIMEOUT_MS = MAX_CONS_TIME_MS + 1000;
private static final Random rnd = new Random();
private static volatile boolean runningProducer;
...
private static class Producer implements Runnable {
  @Override
  public void run() {
    while (runningProducer) {
      try {
        String bulb = "bulb-" + rnd.nextInt(1000);
        Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS));
        boolean transfered = queue.tryTransfer(bulb,
          TIMEOUT_MS, TimeUnit.MILLISECONDS);
        if (transfered) {
          logger.info(() -> "Checked: " + bulb);
        }
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
        break;
      }
    }
  }
}

因此,生产者通过tryTransfer()方法将检查过的灯泡转移给消费者。如果可以在超时时间过去之前将元素传输到使用者,则此方法将执行此操作。

避免使用transfer()方法,这可能会无限期地堵塞螺纹。

为了模拟生产者检查灯泡所花的时间,相应的线程将在 0 到 5 之间随机休眠几秒(5 秒是检查灯泡所需的最长时间)。如果消费者在此时间之后不可用,则会花费更多时间(在tryTransfer()中),直到消费者可用或超时结束。

另一方面,使用另一个类实现使用者,如下所示:

private static volatile boolean runningConsumer;
...
private static class Consumer implements Runnable {
  @Override
  public void run() {
    while (runningConsumer) {
      try {
        String bulb = queue.poll(
          MAX_PROD_TIME_MS, TimeUnit.MILLISECONDS);
        if (bulb != null) {
          Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS));
          logger.info(() -> "Packed: " + bulb);
        }
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
        break;
      }
    }
  }
}

消费者可以通过queue.take()方法从生产者处取灯泡。此方法检索并删除此队列的头,如有必要,请等待,直到灯泡可用。也可以调用poll()方法,在该方法中检索并移除队列的头,或者如果该队列为空,则返回null。但这两个都不适合我们。如果生产者不在,消费者可能仍然停留在take()方法中。另一方面,如果队列是空的(生产者现在正在检查当前灯泡),poll()方法将很快被一次又一次地调用,导致伪重复。解决这个问题的方法是poll(long timeout, TimeUnit unit)。此方法检索并删除此队列的头,并在指定的等待时间内(如果需要)等待灯泡变为可用。仅当等待时间过后队列为空时,才会返回null

为了模拟耗电元件包装灯泡所需的时间,相应的线程将在 0 到 7 之间随机休眠几秒(7 秒是包装灯泡所需的最长时间)。

启动生产者和消费者是一项非常简单的任务,它是通过一种名为startAssemblyLine()的方法完成的,如下所示:

public static void startAssemblyLine() {
  if (runningProducer || runningConsumer) {
    logger.info("Assembly line is already running ...");
    return;
  }
  logger.info("\n\nStarting assembly line ...");
  logger.info(() -> "Remaining bulbs from previous run: \n"
    + queue + "\n\n");
  runningProducer = true;
  producerService = Executors.newSingleThreadExecutor();
  producerService.execute(producer);
  runningConsumer = true;
  consumerService = Executors.newSingleThreadExecutor();
  consumerService.execute(consumer);
}

停止装配线是一个微妙的过程,可以通过不同的场景来解决。主要是,当装配线停止时,生产者应检查当前灯泡作为最后一个灯泡,消费者必须包装它。生产者可能需要等待消费者包装好当前灯泡,然后才能转移最后一个灯泡;此外,消费者也必须包装好这个灯泡。

为了遵循此场景,我们首先停止生产者,然后停止消费者:

public static void stopAssemblyLine() {
  logger.info("Stopping assembly line ...");
  boolean isProducerDown = shutdownProducer();
  boolean isConsumerDown = shutdownConsumer();
  if (!isProducerDown || !isConsumerDown) {
    logger.severe("Something abnormal happened during
      shutting down the assembling line!");
    System.exit(0);
  }
  logger.info("Assembling line was successfully stopped!");
}
private static boolean shutdownProducer() {
  runningProducer = false;
  return shutdownExecutor(producerService);
}
private static boolean shutdownConsumer() {
  runningConsumer = false;
  return shutdownExecutor(consumerService);
}

最后,我们给生产者和消费者足够的时间来正常停止(不中断线程)。这在shutdownExecutor()方法中发生,如下所示:

private static boolean shutdownExecutor(ExecutorService executor) {
  executor.shutdown();
  try {
    if (!executor.awaitTermination(TIMEOUT_MS * 2,
        TimeUnit.MILLISECONDS)) {
      executor.shutdownNow();
      return executor.awaitTermination(TIMEOUT_MS * 2,
        TimeUnit.MILLISECONDS);
    }
    return true;
  } catch (InterruptedException ex) {
    executor.shutdownNow();
    Thread.currentThread().interrupt();
    logger.severe(() -> "Exception: " + ex);
  }
  return false;
}

我们要做的第一件事是将runningProducer static变量设置为false。这将破坏while(runningProducer),因此这将是最后一次检查灯泡。此外,我们启动生产者的关闭程序。

对于消费者,我们要做的第一件事是将runningConsumer static变量设置为false。这将打破while(runningConsumer),因此这将是最后一个灯泡包装。此外,我们启动耗电元件的关闭程序。

让我们看看装配线的可能执行(运行 10 秒):

AssemblyLine.startAssemblyLine();
Thread.sleep(10 * 1000);
AssemblyLine.stopAssemblyLine();

可能的输出如下:

Starting assembly line ...
...
[2019-04-14 07:39:40] [INFO] Checked: bulb-89
[2019-04-14 07:39:43] [INFO] Packed: bulb-89
...
Stopping assembly line ...
...
[2019-04-14 07:39:53] [INFO] Packed: bulb-322
Assembling line was successfully stopped!

一般来说,如果停产需要很长时间(就好像停产一样),那么生产者和消费者的数量和/或生产和消费时间之间可能存在不平衡率。您可能需要添加或减去生产者或消费者。

生产者不等待消费者出现

如果生产者检查灯泡的速度比消费者包装灯泡的速度快,那么他们很可能会决定采用以下工作流程:

  • 生产者将逐一检查灯泡,并将其推入队列
  • 消费者将从队列中轮询并打包灯泡

由于消费者比生产者慢,队列将容纳已检查但未包装的灯泡(我们可以假设有空队列的可能性很低)。在下图中,我们有生产者、消费者和用于存储已检查但未包装灯泡的队列:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZpLZp4de-1657345732717)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/00124b9f-15a5-4afd-993e-ee96b1ec47d1.png)]

为了形成这种情况,我们可以依赖于ConcurrentLinkedQueue(或LinkedBlockingQueue)。这是一个基于链接节点的无限线程安全队列:

private static final Queue<String> queue 
  = new ConcurrentLinkedQueue<>();

为了在队列中推一个灯泡,生产者调用offer()方法:

queue.offer(bulb);

另一方面,消费者使用poll()方法处理队列中的灯泡(因为消费者比生产者慢,所以当poll()返回null时应该是罕见的情况):

String bulb = queue.poll();

让我们第一次启动装配线 10 秒钟。这将输出以下内容:

Starting assembly line ...
...
[2019-04-14 07:44:58] [INFO] Checked: bulb-827
[2019-04-14 07:44:59] [INFO] Checked: bulb-257
[2019-04-14 07:44:59] [INFO] Packed: bulb-827
...
Stopping assembly line ...
...
[2019-04-14 07:45:08] [INFO] Checked: bulb-369
[2019-04-14 07:45:09] [INFO] Packed: bulb-690
...
Assembling line was successfully stopped!

此时,装配线停止,在队列中,我们有以下内容(这些灯泡已检查,但未包装):

[bulb-968, bulb-782, bulb-627, bulb-886, ...]

我们重新启动装配线并检查突出显示的行,这表明消费者从停止的位置恢复其工作:

Starting assembly line ...
[2019-04-14 07:45:12] [INFO ] Packed: bulb-968 [2019-04-14 07:45:12] [INFO ] Checked: bulb-812
[2019-04-14 07:45:12] [INFO ] Checked: bulb-470
[2019-04-14 07:45:14] [INFO ] Packed: bulb-782 [2019-04-14 07:45:15] [INFO ] Checked: bulb-601
[2019-04-14 07:45:16] [INFO ] Packed: bulb-627 ...

Java 编程问题:十、并发-线程池、可调用对象和同步器3https://developer.aliyun.com/article/1426163

相关文章
|
5天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
4天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
|
4天前
|
Java 开发者
Java多线程编程的艺术与实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的技术文档,本文以实战为导向,通过生动的实例和详尽的代码解析,引领读者领略多线程编程的魅力,掌握其在提升应用性能、优化资源利用方面的关键作用。无论你是Java初学者还是有一定经验的开发者,本文都将为你打开多线程编程的新视角。 ####
|
3天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
6天前
|
安全 Java 开发者
Java多线程编程中的常见问题与解决方案
本文深入探讨了Java多线程编程中常见的问题,包括线程安全问题、死锁、竞态条件等,并提供了相应的解决策略。文章首先介绍了多线程的基础知识,随后详细分析了每个问题的产生原因和典型场景,最后提出了实用的解决方案,旨在帮助开发者提高多线程程序的稳定性和性能。
|
13天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
22天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
9天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
32 9
|
12天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
9天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
下一篇
无影云桌面