Java 编程问题:十、并发-线程池、可调用对象和同步器2

简介: Java 编程问题:十、并发-线程池、可调用对象和同步器

Java 编程问题:十、并发-线程池、可调用对象和同步器1https://developer.aliyun.com/article/1426161

201 Java 中的线程池

线程池是可用于执行任务的线程的集合。线程池负责管理其线程的创建、分配和生命周期,并有助于提高性能。现在,我们来谈谈遗嘱执行人。

Executor

java.util.concurrent包中,有一堆专用于执行任务的接口。最简单的一个叫做Executor。这个接口公开了一个名为execute(Runnable command)的方法。下面是使用此方法执行单个任务的示例:

public class SimpleExecutor implements Executor {
  @Override
  public void execute(Runnable r) {
    (new Thread(r)).start();
  }
}
SimpleExecutor se = new SimpleExecutor();
se.execute(() -> {
  System.out.println("Simple task executed via Executor interface");
});

ExecutorService

一个更复杂、更全面的接口提供了许多附加方法,它是ExecutorService。这是Executor的丰富版本。Java 附带了一个成熟的实现ExecutorService,名为ThreadPoolExecutor。这是一个线程池,可以用一组参数实例化,如下所示:

ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler)

下面是对前面代码中实例化的每个参数的简短描述:

  • corePoolSize:池中要保留的线程数,即使它们是空闲的(除非设置了allowCoreThreadTimeOut
  • maximumPoolSize:允许的最大线程数
  • keepAliveTime:当这个时间过去后,空闲线程将从池中移除(这些是超过corePoolSize的空闲线程)
  • unitkeepAliveTime参数的时间单位
  • workQueue:在Runnable的实例(只有execute()方法提交的Runnable任务)执行之前,用来存放这些实例的队列
  • threadFactory:执行器创建新线程时使用此工厂
  • handler:当ThreadPoolExecutor由于饱和而无法执行Runnable时,即线程边界和队列容量已满(例如,workQueue大小固定,同时设置了maximumPoolSize),它将控制和决策交给这个处理器

为了优化池大小,我们需要收集以下信息:

  • CPU 数量(Runtime.getRuntime().availableProcessors()
  • 目标 CPU 利用率(在范围内,[0, 1]
  • 等待时间(W)
  • 计算时间(C)

以下公式有助于我们确定池的最佳大小:

Number of threads 
  = Number of CPUs * Target CPU utilization * (1 + W/C)

根据经验,对于计算密集型任务(通常是小型任务),最好使用线程数等于处理器数或处理器数 +1(以防止潜在的暂停)来对线程池进行基准测试。对于耗时且阻塞的任务(例如,I/O),更大的池更好,因为线程将无法以高速率进行调度。另外,还要注意与其他池(例如,数据库连接池和套接字连接池)的干扰。

让我们看一个ThreadPoolExecutor的例子:

public class SimpleThreadPoolExecutor implements Runnable {
  private final int taskId;
  public SimpleThreadPoolExecutor(int taskId) {
    this.taskId = taskId;
  }
  @Override
  public void run() {
    Thread.sleep(2000);
    System.out.println("Executing task " + taskId 
      + " via " + Thread.currentThread().getName());
  }
  public static void main(String[] args) {
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    final AtomicInteger counter = new AtomicInteger();
    ThreadFactory threadFactory = (Runnable r) -> {
      System.out.println("Creating a new Cool-Thread-" 
        + counter.incrementAndGet());
      return new Thread(r, "Cool-Thread-" + counter.get());
    };
    RejectedExecutionHandler rejectedHandler
      = (Runnable r, ThreadPoolExecutor executor) -> {
        if (r instanceof SimpleThreadPoolExecutor) {
          SimpleThreadPoolExecutor task=(SimpleThreadPoolExecutor) r;
          System.out.println("Rejecting task " + task.taskId);
        }
    };
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 1,
      TimeUnit.SECONDS, queue, threadFactory, rejectedHandler);
    for (int i = 0; i < 50; i++) {
      executor.execute(new SimpleThreadPoolExecutor(i));
    }
    executor.shutdown();
    executor.awaitTermination(
      Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
  }
}

main()方法触发Runnable50 个实例。每个Runnable睡两秒钟,打印一条消息。工作队列限制为Runnable的 5 个实例——核心线程为 10,线程数最多为 20 个,空闲超时为 1 秒。可能的输出如下:

Creating a new Cool-Thread-1
...
Creating a new Cool-Thread-20
Rejecting task 25
...
Rejecting task 49
Executing task 22 via Cool-Thread-18
...
Executing task 12 via Cool-Thread-2

ScheduledExecutorService

ScheduledExecutorService是一个ExecutorService,它可以安排任务在给定的延迟后执行,或者定期执行。这里,我们有schedule()scheduleAtFixedRate()scheduleWithFixedDelay()等方法。schedule()用于一次性任务,scheduleAtFixedRate()scheduleWithFixedDelay()用于周期性任务。

通过执行器的线程池

更进一步,我们将介绍辅助类Executors。此类使用以下方法公开几种类型的线程池:

  • newSingleThreadExecutor():这是一个线程池,只管理一个线程,队列无限,一次只执行一个任务:
ExecutorService executor 
  = Executors.newSingleThreadExecutor();
  • newCachedThreadPool():这是一个线程池,根据需要创建新线程并删除空闲线程(60 秒后);核心池大小为 0,最大池大小为Integer.MAX_VALUE(此线程池在需求增加时扩展,在需求减少时收缩):
ExecutorService executor = Executors.newCachedThreadPool();
  • newFixedThreadPool():这是一个线程数固定、队列无限的线程池,产生无限超时的效果(核心池大小和最大池大小等于指定的大小):
ExecutorService executor = Executors.newFixedThreadPool(5);
  • newWorkStealingThreadPool():这是一个基于工作窃取算法的线程池(它充当 Fork/Join 框架上的一层):
ExecutorService executor = Executors.newWorkStealingPool();
  • newScheduledThreadPool():一个线程池,可以安排命令在给定的延迟后运行,或者定期执行(我们可以指定核心池的大小):
ScheduledExecutorService executor 
  = Executors.newScheduledThreadPool(5);

202 具有单个线程的线程池

为了演示单线程线程池的工作原理,假设我们想编写一个程序,模拟装配线(或输送机),用两个工作器检查和包装灯泡。

通过检查,我们了解到工作器测试灯泡是否亮起。通过包装,我们了解到,工作器将经过验证的灯泡拿到盒子里。这种工艺在几乎所有工厂都很常见。

两名工作器如下:

  • 一种所谓的生产者(或检查者),负责测试每个灯泡,看灯泡是否亮起
  • 一个所谓的消费者(或包装商),负责将每个检查过的灯泡包装到一个盒子里

这种问题完全适合于下图所示的生产者消费者设计模式:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wMW6fHCN-1657345732716)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/817c7786-9093-401c-9afd-bf9b39aabd25.png)]

最常见的是,在这种模式中,生产者和消费者通过队列(生产者排队数据,消费者排队数据)进行通信。这个队列称为数据缓冲区。当然,根据流程设计,其他数据结构也可以起到数据缓冲的作用。

现在,让我们看看如果生产者等待消费者可用,我们如何实现这个模式。

稍后,我们将为不等待消费者的生产者实现此模式。

生产者等待消费者出现

装配线启动时,生产者将逐个检查进线灯泡,而消费者将将其打包(每个盒子中有一个灯泡)。此流重复,直到装配线停止。

下图是生产者消费者之间的流程图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yPBj7z7O-1657345732716)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/3cf100e5-7ad8-4548-9a33-1e55a146b655.png)]

我们可以将装配线视为我们工厂的助手,因此它可以实现为助手或工具类(当然,它也可以很容易地切换到非static实现,因此如果对您的情况更有意义,请随意切换):

public final class AssemblyLine {
  private AssemblyLine() {
    throw new AssertionError("There is a single assembly line!");
  }
  ...
}

当然,实现这个场景的方法很多,但是我们对使用 JavaExecutorService感兴趣,更准确地说是Executors.newSingleThreadExecutor()。使用一个工作线程来操作未绑定队列的Executor通过此方法创建。

我们只有两个工作器,所以我们可以使用两个实例Executor(一个Executor将启动生产者,另一个将启动消费者)。因此,生产者将是一个线程,消费者将是另一个线程:

private static ExecutorService producerService;
private static ExecutorService consumerService;

由于生产者和消费者是好朋友,他们决定根据一个简单的场景来工作:

  • 只有消费者不忙时,生产者才会检查灯泡并将其传递给消费者(如果消费者忙,生产者会等待一段时间,直到消费者有空)
  • 生产者在将当前灯泡传递给用户之前不会检查下一个灯泡
  • 消费者将尽快包装每个进入的灯泡

这个场景适用于TransferQueueSynchronousQueue,它执行的过程与前面提到的场景非常相似。让我们使用TransferQueue。这是一个BlockingQueue,其中生产者可以等待消费者接收元素。BlockingQueue实现是线程安全的:

private static final TransferQueue<String> queue 
  = new LinkedTransferQueue<>();

生产者和消费者之间的工作流程是先进先出FIFO)类型:第一个检查的灯泡是第一个打包的灯泡),因此LinkedTransferQueue是一个不错的选择。

一旦装配线启动,生产者将持续检查灯泡,因此我们可以将其作为一个类来实现,如下所示:

private static final int MAX_PROD_TIME_MS = 5 * 1000;
private static final int MAX_CONS_TIME_MS = 7 * 1000;
private static final int TIMEOUT_MS = MAX_CONS_TIME_MS + 1000;
private static final Random rnd = new Random();
private static volatile boolean runningProducer;
...
private static class Producer implements Runnable {
  @Override
  public void run() {
    while (runningProducer) {
      try {
        String bulb = "bulb-" + rnd.nextInt(1000);
        Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS));
        boolean transfered = queue.tryTransfer(bulb,
          TIMEOUT_MS, TimeUnit.MILLISECONDS);
        if (transfered) {
          logger.info(() -> "Checked: " + bulb);
        }
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
        break;
      }
    }
  }
}

因此,生产者通过tryTransfer()方法将检查过的灯泡转移给消费者。如果可以在超时时间过去之前将元素传输到使用者,则此方法将执行此操作。

避免使用transfer()方法,这可能会无限期地堵塞螺纹。

为了模拟生产者检查灯泡所花的时间,相应的线程将在 0 到 5 之间随机休眠几秒(5 秒是检查灯泡所需的最长时间)。如果消费者在此时间之后不可用,则会花费更多时间(在tryTransfer()中),直到消费者可用或超时结束。

另一方面,使用另一个类实现使用者,如下所示:

private static volatile boolean runningConsumer;
...
private static class Consumer implements Runnable {
  @Override
  public void run() {
    while (runningConsumer) {
      try {
        String bulb = queue.poll(
          MAX_PROD_TIME_MS, TimeUnit.MILLISECONDS);
        if (bulb != null) {
          Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS));
          logger.info(() -> "Packed: " + bulb);
        }
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
        break;
      }
    }
  }
}

消费者可以通过queue.take()方法从生产者处取灯泡。此方法检索并删除此队列的头,如有必要,请等待,直到灯泡可用。也可以调用poll()方法,在该方法中检索并移除队列的头,或者如果该队列为空,则返回null。但这两个都不适合我们。如果生产者不在,消费者可能仍然停留在take()方法中。另一方面,如果队列是空的(生产者现在正在检查当前灯泡),poll()方法将很快被一次又一次地调用,导致伪重复。解决这个问题的方法是poll(long timeout, TimeUnit unit)。此方法检索并删除此队列的头,并在指定的等待时间内(如果需要)等待灯泡变为可用。仅当等待时间过后队列为空时,才会返回null

为了模拟耗电元件包装灯泡所需的时间,相应的线程将在 0 到 7 之间随机休眠几秒(7 秒是包装灯泡所需的最长时间)。

启动生产者和消费者是一项非常简单的任务,它是通过一种名为startAssemblyLine()的方法完成的,如下所示:

public static void startAssemblyLine() {
  if (runningProducer || runningConsumer) {
    logger.info("Assembly line is already running ...");
    return;
  }
  logger.info("\n\nStarting assembly line ...");
  logger.info(() -> "Remaining bulbs from previous run: \n"
    + queue + "\n\n");
  runningProducer = true;
  producerService = Executors.newSingleThreadExecutor();
  producerService.execute(producer);
  runningConsumer = true;
  consumerService = Executors.newSingleThreadExecutor();
  consumerService.execute(consumer);
}

停止装配线是一个微妙的过程,可以通过不同的场景来解决。主要是,当装配线停止时,生产者应检查当前灯泡作为最后一个灯泡,消费者必须包装它。生产者可能需要等待消费者包装好当前灯泡,然后才能转移最后一个灯泡;此外,消费者也必须包装好这个灯泡。

为了遵循此场景,我们首先停止生产者,然后停止消费者:

public static void stopAssemblyLine() {
  logger.info("Stopping assembly line ...");
  boolean isProducerDown = shutdownProducer();
  boolean isConsumerDown = shutdownConsumer();
  if (!isProducerDown || !isConsumerDown) {
    logger.severe("Something abnormal happened during
      shutting down the assembling line!");
    System.exit(0);
  }
  logger.info("Assembling line was successfully stopped!");
}
private static boolean shutdownProducer() {
  runningProducer = false;
  return shutdownExecutor(producerService);
}
private static boolean shutdownConsumer() {
  runningConsumer = false;
  return shutdownExecutor(consumerService);
}

最后,我们给生产者和消费者足够的时间来正常停止(不中断线程)。这在shutdownExecutor()方法中发生,如下所示:

private static boolean shutdownExecutor(ExecutorService executor) {
  executor.shutdown();
  try {
    if (!executor.awaitTermination(TIMEOUT_MS * 2,
        TimeUnit.MILLISECONDS)) {
      executor.shutdownNow();
      return executor.awaitTermination(TIMEOUT_MS * 2,
        TimeUnit.MILLISECONDS);
    }
    return true;
  } catch (InterruptedException ex) {
    executor.shutdownNow();
    Thread.currentThread().interrupt();
    logger.severe(() -> "Exception: " + ex);
  }
  return false;
}

我们要做的第一件事是将runningProducer static变量设置为false。这将破坏while(runningProducer),因此这将是最后一次检查灯泡。此外,我们启动生产者的关闭程序。

对于消费者,我们要做的第一件事是将runningConsumer static变量设置为false。这将打破while(runningConsumer),因此这将是最后一个灯泡包装。此外,我们启动耗电元件的关闭程序。

让我们看看装配线的可能执行(运行 10 秒):

AssemblyLine.startAssemblyLine();
Thread.sleep(10 * 1000);
AssemblyLine.stopAssemblyLine();

可能的输出如下:

Starting assembly line ...
...
[2019-04-14 07:39:40] [INFO] Checked: bulb-89
[2019-04-14 07:39:43] [INFO] Packed: bulb-89
...
Stopping assembly line ...
...
[2019-04-14 07:39:53] [INFO] Packed: bulb-322
Assembling line was successfully stopped!

一般来说,如果停产需要很长时间(就好像停产一样),那么生产者和消费者的数量和/或生产和消费时间之间可能存在不平衡率。您可能需要添加或减去生产者或消费者。

生产者不等待消费者出现

如果生产者检查灯泡的速度比消费者包装灯泡的速度快,那么他们很可能会决定采用以下工作流程:

  • 生产者将逐一检查灯泡,并将其推入队列
  • 消费者将从队列中轮询并打包灯泡

由于消费者比生产者慢,队列将容纳已检查但未包装的灯泡(我们可以假设有空队列的可能性很低)。在下图中,我们有生产者、消费者和用于存储已检查但未包装灯泡的队列:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZpLZp4de-1657345732717)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/00124b9f-15a5-4afd-993e-ee96b1ec47d1.png)]

为了形成这种情况,我们可以依赖于ConcurrentLinkedQueue(或LinkedBlockingQueue)。这是一个基于链接节点的无限线程安全队列:

private static final Queue<String> queue 
  = new ConcurrentLinkedQueue<>();

为了在队列中推一个灯泡,生产者调用offer()方法:

queue.offer(bulb);

另一方面,消费者使用poll()方法处理队列中的灯泡(因为消费者比生产者慢,所以当poll()返回null时应该是罕见的情况):

String bulb = queue.poll();

让我们第一次启动装配线 10 秒钟。这将输出以下内容:

Starting assembly line ...
...
[2019-04-14 07:44:58] [INFO] Checked: bulb-827
[2019-04-14 07:44:59] [INFO] Checked: bulb-257
[2019-04-14 07:44:59] [INFO] Packed: bulb-827
...
Stopping assembly line ...
...
[2019-04-14 07:45:08] [INFO] Checked: bulb-369
[2019-04-14 07:45:09] [INFO] Packed: bulb-690
...
Assembling line was successfully stopped!

此时,装配线停止,在队列中,我们有以下内容(这些灯泡已检查,但未包装):

[bulb-968, bulb-782, bulb-627, bulb-886, ...]

我们重新启动装配线并检查突出显示的行,这表明消费者从停止的位置恢复其工作:

Starting assembly line ...
[2019-04-14 07:45:12] [INFO ] Packed: bulb-968 [2019-04-14 07:45:12] [INFO ] Checked: bulb-812
[2019-04-14 07:45:12] [INFO ] Checked: bulb-470
[2019-04-14 07:45:14] [INFO ] Packed: bulb-782 [2019-04-14 07:45:15] [INFO ] Checked: bulb-601
[2019-04-14 07:45:16] [INFO ] Packed: bulb-627 ...

Java 编程问题:十、并发-线程池、可调用对象和同步器3https://developer.aliyun.com/article/1426163

相关文章
|
7天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
46 17
|
18天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
3天前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
20天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
8月前
|
数据可视化 Java 测试技术
Java 编程问题:十一、并发-深入探索1
Java 编程问题:十一、并发-深入探索
82 0
|
5月前
|
安全 Java 调度
解锁Java并发编程高阶技能:深入剖析无锁CAS机制、揭秘魔法类Unsafe、精通原子包Atomic,打造高效并发应用
【8月更文挑战第4天】在Java并发编程中,无锁编程以高性能和低延迟应对高并发挑战。核心在于无锁CAS(Compare-And-Swap)机制,它基于硬件支持,确保原子性更新;Unsafe类提供底层内存操作,实现CAS;原子包java.util.concurrent.atomic封装了CAS操作,简化并发编程。通过`AtomicInteger`示例,展现了线程安全的自增操作,突显了这些技术在构建高效并发程序中的关键作用。
80 1
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
4月前
|
Java API 容器
JAVA并发编程系列(10)Condition条件队列-并发协作者
本文通过一线大厂面试真题,模拟消费者-生产者的场景,通过简洁的代码演示,帮助读者快速理解并复用。文章还详细解释了Condition与Object.wait()、notify()的区别,并探讨了Condition的核心原理及其实现机制。
|
6月前
|
安全 Java 开发者
Java并发编程:理解并发安全与性能优化
在当今软件开发中,Java作为一种广泛使用的编程语言,其并发编程能力显得尤为重要。本文深入探讨了Java中的并发编程,包括如何确保并发安全性以及优化并发程序的性能。通过分析常见的并发问题和解决方案,读者将能够更好地理解如何利用Java的并发工具包来构建可靠和高效的多线程应用程序。 【7月更文挑战第10天】
68 3