Java 编程问题:十、并发-线程池、可调用对象和同步器2

简介: Java 编程问题:十、并发-线程池、可调用对象和同步器

Java 编程问题:十、并发-线程池、可调用对象和同步器1https://developer.aliyun.com/article/1426161

201 Java 中的线程池

线程池是可用于执行任务的线程的集合。线程池负责管理其线程的创建、分配和生命周期,并有助于提高性能。现在,我们来谈谈遗嘱执行人。

Executor

java.util.concurrent包中,有一堆专用于执行任务的接口。最简单的一个叫做Executor。这个接口公开了一个名为execute(Runnable command)的方法。下面是使用此方法执行单个任务的示例:

public class SimpleExecutor implements Executor {
  @Override
  public void execute(Runnable r) {
    (new Thread(r)).start();
  }
}
SimpleExecutor se = new SimpleExecutor();
se.execute(() -> {
  System.out.println("Simple task executed via Executor interface");
});

ExecutorService

一个更复杂、更全面的接口提供了许多附加方法,它是ExecutorService。这是Executor的丰富版本。Java 附带了一个成熟的实现ExecutorService,名为ThreadPoolExecutor。这是一个线程池,可以用一组参数实例化,如下所示:

ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler)

下面是对前面代码中实例化的每个参数的简短描述:

  • corePoolSize:池中要保留的线程数,即使它们是空闲的(除非设置了allowCoreThreadTimeOut
  • maximumPoolSize:允许的最大线程数
  • keepAliveTime:当这个时间过去后,空闲线程将从池中移除(这些是超过corePoolSize的空闲线程)
  • unitkeepAliveTime参数的时间单位
  • workQueue:在Runnable的实例(只有execute()方法提交的Runnable任务)执行之前,用来存放这些实例的队列
  • threadFactory:执行器创建新线程时使用此工厂
  • handler:当ThreadPoolExecutor由于饱和而无法执行Runnable时,即线程边界和队列容量已满(例如,workQueue大小固定,同时设置了maximumPoolSize),它将控制和决策交给这个处理器

为了优化池大小,我们需要收集以下信息:

  • CPU 数量(Runtime.getRuntime().availableProcessors()
  • 目标 CPU 利用率(在范围内,[0, 1]
  • 等待时间(W)
  • 计算时间(C)

以下公式有助于我们确定池的最佳大小:

Number of threads 
  = Number of CPUs * Target CPU utilization * (1 + W/C)

根据经验,对于计算密集型任务(通常是小型任务),最好使用线程数等于处理器数或处理器数 +1(以防止潜在的暂停)来对线程池进行基准测试。对于耗时且阻塞的任务(例如,I/O),更大的池更好,因为线程将无法以高速率进行调度。另外,还要注意与其他池(例如,数据库连接池和套接字连接池)的干扰。

让我们看一个ThreadPoolExecutor的例子:

public class SimpleThreadPoolExecutor implements Runnable {
  private final int taskId;
  public SimpleThreadPoolExecutor(int taskId) {
    this.taskId = taskId;
  }
  @Override
  public void run() {
    Thread.sleep(2000);
    System.out.println("Executing task " + taskId 
      + " via " + Thread.currentThread().getName());
  }
  public static void main(String[] args) {
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    final AtomicInteger counter = new AtomicInteger();
    ThreadFactory threadFactory = (Runnable r) -> {
      System.out.println("Creating a new Cool-Thread-" 
        + counter.incrementAndGet());
      return new Thread(r, "Cool-Thread-" + counter.get());
    };
    RejectedExecutionHandler rejectedHandler
      = (Runnable r, ThreadPoolExecutor executor) -> {
        if (r instanceof SimpleThreadPoolExecutor) {
          SimpleThreadPoolExecutor task=(SimpleThreadPoolExecutor) r;
          System.out.println("Rejecting task " + task.taskId);
        }
    };
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 1,
      TimeUnit.SECONDS, queue, threadFactory, rejectedHandler);
    for (int i = 0; i < 50; i++) {
      executor.execute(new SimpleThreadPoolExecutor(i));
    }
    executor.shutdown();
    executor.awaitTermination(
      Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
  }
}

main()方法触发Runnable50 个实例。每个Runnable睡两秒钟,打印一条消息。工作队列限制为Runnable的 5 个实例——核心线程为 10,线程数最多为 20 个,空闲超时为 1 秒。可能的输出如下:

Creating a new Cool-Thread-1
...
Creating a new Cool-Thread-20
Rejecting task 25
...
Rejecting task 49
Executing task 22 via Cool-Thread-18
...
Executing task 12 via Cool-Thread-2

ScheduledExecutorService

ScheduledExecutorService是一个ExecutorService,它可以安排任务在给定的延迟后执行,或者定期执行。这里,我们有schedule()scheduleAtFixedRate()scheduleWithFixedDelay()等方法。schedule()用于一次性任务,scheduleAtFixedRate()scheduleWithFixedDelay()用于周期性任务。

通过执行器的线程池

更进一步,我们将介绍辅助类Executors。此类使用以下方法公开几种类型的线程池:

  • newSingleThreadExecutor():这是一个线程池,只管理一个线程,队列无限,一次只执行一个任务:
ExecutorService executor 
  = Executors.newSingleThreadExecutor();
  • newCachedThreadPool():这是一个线程池,根据需要创建新线程并删除空闲线程(60 秒后);核心池大小为 0,最大池大小为Integer.MAX_VALUE(此线程池在需求增加时扩展,在需求减少时收缩):
ExecutorService executor = Executors.newCachedThreadPool();
  • newFixedThreadPool():这是一个线程数固定、队列无限的线程池,产生无限超时的效果(核心池大小和最大池大小等于指定的大小):
ExecutorService executor = Executors.newFixedThreadPool(5);
  • newWorkStealingThreadPool():这是一个基于工作窃取算法的线程池(它充当 Fork/Join 框架上的一层):
ExecutorService executor = Executors.newWorkStealingPool();
  • newScheduledThreadPool():一个线程池,可以安排命令在给定的延迟后运行,或者定期执行(我们可以指定核心池的大小):
ScheduledExecutorService executor 
  = Executors.newScheduledThreadPool(5);

202 具有单个线程的线程池

为了演示单线程线程池的工作原理,假设我们想编写一个程序,模拟装配线(或输送机),用两个工作器检查和包装灯泡。

通过检查,我们了解到工作器测试灯泡是否亮起。通过包装,我们了解到,工作器将经过验证的灯泡拿到盒子里。这种工艺在几乎所有工厂都很常见。

两名工作器如下:

  • 一种所谓的生产者(或检查者),负责测试每个灯泡,看灯泡是否亮起
  • 一个所谓的消费者(或包装商),负责将每个检查过的灯泡包装到一个盒子里

这种问题完全适合于下图所示的生产者消费者设计模式:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wMW6fHCN-1657345732716)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/817c7786-9093-401c-9afd-bf9b39aabd25.png)]

最常见的是,在这种模式中,生产者和消费者通过队列(生产者排队数据,消费者排队数据)进行通信。这个队列称为数据缓冲区。当然,根据流程设计,其他数据结构也可以起到数据缓冲的作用。

现在,让我们看看如果生产者等待消费者可用,我们如何实现这个模式。

稍后,我们将为不等待消费者的生产者实现此模式。

生产者等待消费者出现

装配线启动时,生产者将逐个检查进线灯泡,而消费者将将其打包(每个盒子中有一个灯泡)。此流重复,直到装配线停止。

下图是生产者消费者之间的流程图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yPBj7z7O-1657345732716)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/3cf100e5-7ad8-4548-9a33-1e55a146b655.png)]

我们可以将装配线视为我们工厂的助手,因此它可以实现为助手或工具类(当然,它也可以很容易地切换到非static实现,因此如果对您的情况更有意义,请随意切换):

public final class AssemblyLine {
  private AssemblyLine() {
    throw new AssertionError("There is a single assembly line!");
  }
  ...
}

当然,实现这个场景的方法很多,但是我们对使用 JavaExecutorService感兴趣,更准确地说是Executors.newSingleThreadExecutor()。使用一个工作线程来操作未绑定队列的Executor通过此方法创建。

我们只有两个工作器,所以我们可以使用两个实例Executor(一个Executor将启动生产者,另一个将启动消费者)。因此,生产者将是一个线程,消费者将是另一个线程:

private static ExecutorService producerService;
private static ExecutorService consumerService;

由于生产者和消费者是好朋友,他们决定根据一个简单的场景来工作:

  • 只有消费者不忙时,生产者才会检查灯泡并将其传递给消费者(如果消费者忙,生产者会等待一段时间,直到消费者有空)
  • 生产者在将当前灯泡传递给用户之前不会检查下一个灯泡
  • 消费者将尽快包装每个进入的灯泡

这个场景适用于TransferQueueSynchronousQueue,它执行的过程与前面提到的场景非常相似。让我们使用TransferQueue。这是一个BlockingQueue,其中生产者可以等待消费者接收元素。BlockingQueue实现是线程安全的:

private static final TransferQueue<String> queue 
  = new LinkedTransferQueue<>();

生产者和消费者之间的工作流程是先进先出FIFO)类型:第一个检查的灯泡是第一个打包的灯泡),因此LinkedTransferQueue是一个不错的选择。

一旦装配线启动,生产者将持续检查灯泡,因此我们可以将其作为一个类来实现,如下所示:

private static final int MAX_PROD_TIME_MS = 5 * 1000;
private static final int MAX_CONS_TIME_MS = 7 * 1000;
private static final int TIMEOUT_MS = MAX_CONS_TIME_MS + 1000;
private static final Random rnd = new Random();
private static volatile boolean runningProducer;
...
private static class Producer implements Runnable {
  @Override
  public void run() {
    while (runningProducer) {
      try {
        String bulb = "bulb-" + rnd.nextInt(1000);
        Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS));
        boolean transfered = queue.tryTransfer(bulb,
          TIMEOUT_MS, TimeUnit.MILLISECONDS);
        if (transfered) {
          logger.info(() -> "Checked: " + bulb);
        }
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
        break;
      }
    }
  }
}

因此,生产者通过tryTransfer()方法将检查过的灯泡转移给消费者。如果可以在超时时间过去之前将元素传输到使用者,则此方法将执行此操作。

避免使用transfer()方法,这可能会无限期地堵塞螺纹。

为了模拟生产者检查灯泡所花的时间,相应的线程将在 0 到 5 之间随机休眠几秒(5 秒是检查灯泡所需的最长时间)。如果消费者在此时间之后不可用,则会花费更多时间(在tryTransfer()中),直到消费者可用或超时结束。

另一方面,使用另一个类实现使用者,如下所示:

private static volatile boolean runningConsumer;
...
private static class Consumer implements Runnable {
  @Override
  public void run() {
    while (runningConsumer) {
      try {
        String bulb = queue.poll(
          MAX_PROD_TIME_MS, TimeUnit.MILLISECONDS);
        if (bulb != null) {
          Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS));
          logger.info(() -> "Packed: " + bulb);
        }
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        logger.severe(() -> "Exception: " + ex);
        break;
      }
    }
  }
}

消费者可以通过queue.take()方法从生产者处取灯泡。此方法检索并删除此队列的头,如有必要,请等待,直到灯泡可用。也可以调用poll()方法,在该方法中检索并移除队列的头,或者如果该队列为空,则返回null。但这两个都不适合我们。如果生产者不在,消费者可能仍然停留在take()方法中。另一方面,如果队列是空的(生产者现在正在检查当前灯泡),poll()方法将很快被一次又一次地调用,导致伪重复。解决这个问题的方法是poll(long timeout, TimeUnit unit)。此方法检索并删除此队列的头,并在指定的等待时间内(如果需要)等待灯泡变为可用。仅当等待时间过后队列为空时,才会返回null

为了模拟耗电元件包装灯泡所需的时间,相应的线程将在 0 到 7 之间随机休眠几秒(7 秒是包装灯泡所需的最长时间)。

启动生产者和消费者是一项非常简单的任务,它是通过一种名为startAssemblyLine()的方法完成的,如下所示:

public static void startAssemblyLine() {
  if (runningProducer || runningConsumer) {
    logger.info("Assembly line is already running ...");
    return;
  }
  logger.info("\n\nStarting assembly line ...");
  logger.info(() -> "Remaining bulbs from previous run: \n"
    + queue + "\n\n");
  runningProducer = true;
  producerService = Executors.newSingleThreadExecutor();
  producerService.execute(producer);
  runningConsumer = true;
  consumerService = Executors.newSingleThreadExecutor();
  consumerService.execute(consumer);
}

停止装配线是一个微妙的过程,可以通过不同的场景来解决。主要是,当装配线停止时,生产者应检查当前灯泡作为最后一个灯泡,消费者必须包装它。生产者可能需要等待消费者包装好当前灯泡,然后才能转移最后一个灯泡;此外,消费者也必须包装好这个灯泡。

为了遵循此场景,我们首先停止生产者,然后停止消费者:

public static void stopAssemblyLine() {
  logger.info("Stopping assembly line ...");
  boolean isProducerDown = shutdownProducer();
  boolean isConsumerDown = shutdownConsumer();
  if (!isProducerDown || !isConsumerDown) {
    logger.severe("Something abnormal happened during
      shutting down the assembling line!");
    System.exit(0);
  }
  logger.info("Assembling line was successfully stopped!");
}
private static boolean shutdownProducer() {
  runningProducer = false;
  return shutdownExecutor(producerService);
}
private static boolean shutdownConsumer() {
  runningConsumer = false;
  return shutdownExecutor(consumerService);
}

最后,我们给生产者和消费者足够的时间来正常停止(不中断线程)。这在shutdownExecutor()方法中发生,如下所示:

private static boolean shutdownExecutor(ExecutorService executor) {
  executor.shutdown();
  try {
    if (!executor.awaitTermination(TIMEOUT_MS * 2,
        TimeUnit.MILLISECONDS)) {
      executor.shutdownNow();
      return executor.awaitTermination(TIMEOUT_MS * 2,
        TimeUnit.MILLISECONDS);
    }
    return true;
  } catch (InterruptedException ex) {
    executor.shutdownNow();
    Thread.currentThread().interrupt();
    logger.severe(() -> "Exception: " + ex);
  }
  return false;
}

我们要做的第一件事是将runningProducer static变量设置为false。这将破坏while(runningProducer),因此这将是最后一次检查灯泡。此外,我们启动生产者的关闭程序。

对于消费者,我们要做的第一件事是将runningConsumer static变量设置为false。这将打破while(runningConsumer),因此这将是最后一个灯泡包装。此外,我们启动耗电元件的关闭程序。

让我们看看装配线的可能执行(运行 10 秒):

AssemblyLine.startAssemblyLine();
Thread.sleep(10 * 1000);
AssemblyLine.stopAssemblyLine();

可能的输出如下:

Starting assembly line ...
...
[2019-04-14 07:39:40] [INFO] Checked: bulb-89
[2019-04-14 07:39:43] [INFO] Packed: bulb-89
...
Stopping assembly line ...
...
[2019-04-14 07:39:53] [INFO] Packed: bulb-322
Assembling line was successfully stopped!

一般来说,如果停产需要很长时间(就好像停产一样),那么生产者和消费者的数量和/或生产和消费时间之间可能存在不平衡率。您可能需要添加或减去生产者或消费者。

生产者不等待消费者出现

如果生产者检查灯泡的速度比消费者包装灯泡的速度快,那么他们很可能会决定采用以下工作流程:

  • 生产者将逐一检查灯泡,并将其推入队列
  • 消费者将从队列中轮询并打包灯泡

由于消费者比生产者慢,队列将容纳已检查但未包装的灯泡(我们可以假设有空队列的可能性很低)。在下图中,我们有生产者、消费者和用于存储已检查但未包装灯泡的队列:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZpLZp4de-1657345732717)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/00124b9f-15a5-4afd-993e-ee96b1ec47d1.png)]

为了形成这种情况,我们可以依赖于ConcurrentLinkedQueue(或LinkedBlockingQueue)。这是一个基于链接节点的无限线程安全队列:

private static final Queue<String> queue 
  = new ConcurrentLinkedQueue<>();

为了在队列中推一个灯泡,生产者调用offer()方法:

queue.offer(bulb);

另一方面,消费者使用poll()方法处理队列中的灯泡(因为消费者比生产者慢,所以当poll()返回null时应该是罕见的情况):

String bulb = queue.poll();

让我们第一次启动装配线 10 秒钟。这将输出以下内容:

Starting assembly line ...
...
[2019-04-14 07:44:58] [INFO] Checked: bulb-827
[2019-04-14 07:44:59] [INFO] Checked: bulb-257
[2019-04-14 07:44:59] [INFO] Packed: bulb-827
...
Stopping assembly line ...
...
[2019-04-14 07:45:08] [INFO] Checked: bulb-369
[2019-04-14 07:45:09] [INFO] Packed: bulb-690
...
Assembling line was successfully stopped!

此时,装配线停止,在队列中,我们有以下内容(这些灯泡已检查,但未包装):

[bulb-968, bulb-782, bulb-627, bulb-886, ...]

我们重新启动装配线并检查突出显示的行,这表明消费者从停止的位置恢复其工作:

Starting assembly line ...
[2019-04-14 07:45:12] [INFO ] Packed: bulb-968 [2019-04-14 07:45:12] [INFO ] Checked: bulb-812
[2019-04-14 07:45:12] [INFO ] Checked: bulb-470
[2019-04-14 07:45:14] [INFO ] Packed: bulb-782 [2019-04-14 07:45:15] [INFO ] Checked: bulb-601
[2019-04-14 07:45:16] [INFO ] Packed: bulb-627 ...

Java 编程问题:十、并发-线程池、可调用对象和同步器3https://developer.aliyun.com/article/1426163

相关文章
|
6天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
25 9
|
5天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
7天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
8天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
20 1
|
1月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
43 1
C++ 多线程之初识多线程
|
24天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
18 3
|
24天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
16 2
|
24天前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
28 2
|
24天前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
28 1
|
24天前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
34 1