Java 编程问题:十、并发-线程池、可调用对象和同步器1https://developer.aliyun.com/article/1426161
201 Java 中的线程池
线程池是可用于执行任务的线程的集合。线程池负责管理其线程的创建、分配和生命周期,并有助于提高性能。现在,我们来谈谈遗嘱执行人。
Executor
在java.util.concurrent
包中,有一堆专用于执行任务的接口。最简单的一个叫做Executor
。这个接口公开了一个名为execute(Runnable command)
的方法。下面是使用此方法执行单个任务的示例:
public class SimpleExecutor implements Executor { @Override public void execute(Runnable r) { (new Thread(r)).start(); } } SimpleExecutor se = new SimpleExecutor(); se.execute(() -> { System.out.println("Simple task executed via Executor interface"); });
ExecutorService
一个更复杂、更全面的接口提供了许多附加方法,它是ExecutorService
。这是Executor
的丰富版本。Java 附带了一个成熟的实现ExecutorService
,名为ThreadPoolExecutor
。这是一个线程池,可以用一组参数实例化,如下所示:
ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
下面是对前面代码中实例化的每个参数的简短描述:
corePoolSize
:池中要保留的线程数,即使它们是空闲的(除非设置了allowCoreThreadTimeOut
)maximumPoolSize
:允许的最大线程数keepAliveTime
:当这个时间过去后,空闲线程将从池中移除(这些是超过corePoolSize
的空闲线程)unit
:keepAliveTime
参数的时间单位workQueue
:在Runnable
的实例(只有execute()
方法提交的Runnable
任务)执行之前,用来存放这些实例的队列threadFactory
:执行器创建新线程时使用此工厂handler
:当ThreadPoolExecutor
由于饱和而无法执行Runnable
时,即线程边界和队列容量已满(例如,workQueue
大小固定,同时设置了maximumPoolSize
),它将控制和决策交给这个处理器
为了优化池大小,我们需要收集以下信息:
- CPU 数量(
Runtime.getRuntime().availableProcessors()
) - 目标 CPU 利用率(在范围内,
[0, 1]
) - 等待时间(W)
- 计算时间(C)
以下公式有助于我们确定池的最佳大小:
Number of threads = Number of CPUs * Target CPU utilization * (1 + W/C)
根据经验,对于计算密集型任务(通常是小型任务),最好使用线程数等于处理器数或处理器数 +1(以防止潜在的暂停)来对线程池进行基准测试。对于耗时且阻塞的任务(例如,I/O),更大的池更好,因为线程将无法以高速率进行调度。另外,还要注意与其他池(例如,数据库连接池和套接字连接池)的干扰。
让我们看一个ThreadPoolExecutor
的例子:
public class SimpleThreadPoolExecutor implements Runnable { private final int taskId; public SimpleThreadPoolExecutor(int taskId) { this.taskId = taskId; } @Override public void run() { Thread.sleep(2000); System.out.println("Executing task " + taskId + " via " + Thread.currentThread().getName()); } public static void main(String[] args) { BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5); final AtomicInteger counter = new AtomicInteger(); ThreadFactory threadFactory = (Runnable r) -> { System.out.println("Creating a new Cool-Thread-" + counter.incrementAndGet()); return new Thread(r, "Cool-Thread-" + counter.get()); }; RejectedExecutionHandler rejectedHandler = (Runnable r, ThreadPoolExecutor executor) -> { if (r instanceof SimpleThreadPoolExecutor) { SimpleThreadPoolExecutor task=(SimpleThreadPoolExecutor) r; System.out.println("Rejecting task " + task.taskId); } }; ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 1, TimeUnit.SECONDS, queue, threadFactory, rejectedHandler); for (int i = 0; i < 50; i++) { executor.execute(new SimpleThreadPoolExecutor(i)); } executor.shutdown(); executor.awaitTermination( Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } }
main()
方法触发Runnable
50 个实例。每个Runnable
睡两秒钟,打印一条消息。工作队列限制为Runnable
的 5 个实例——核心线程为 10,线程数最多为 20 个,空闲超时为 1 秒。可能的输出如下:
Creating a new Cool-Thread-1 ... Creating a new Cool-Thread-20 Rejecting task 25 ... Rejecting task 49 Executing task 22 via Cool-Thread-18 ... Executing task 12 via Cool-Thread-2
ScheduledExecutorService
ScheduledExecutorService
是一个ExecutorService
,它可以安排任务在给定的延迟后执行,或者定期执行。这里,我们有schedule()
、scheduleAtFixedRate()
和scheduleWithFixedDelay()
等方法。schedule()
用于一次性任务,scheduleAtFixedRate()
和scheduleWithFixedDelay()
用于周期性任务。
通过执行器的线程池
更进一步,我们将介绍辅助类Executors
。此类使用以下方法公开几种类型的线程池:
newSingleThreadExecutor()
:这是一个线程池,只管理一个线程,队列无限,一次只执行一个任务:
ExecutorService executor = Executors.newSingleThreadExecutor();
newCachedThreadPool()
:这是一个线程池,根据需要创建新线程并删除空闲线程(60 秒后);核心池大小为 0,最大池大小为Integer.MAX_VALUE
(此线程池在需求增加时扩展,在需求减少时收缩):
ExecutorService executor = Executors.newCachedThreadPool();
newFixedThreadPool()
:这是一个线程数固定、队列无限的线程池,产生无限超时的效果(核心池大小和最大池大小等于指定的大小):
ExecutorService executor = Executors.newFixedThreadPool(5);
newWorkStealingThreadPool()
:这是一个基于工作窃取算法的线程池(它充当 Fork/Join 框架上的一层):
ExecutorService executor = Executors.newWorkStealingPool();
newScheduledThreadPool()
:一个线程池,可以安排命令在给定的延迟后运行,或者定期执行(我们可以指定核心池的大小):
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
202 具有单个线程的线程池
为了演示单线程线程池的工作原理,假设我们想编写一个程序,模拟装配线(或输送机),用两个工作器检查和包装灯泡。
通过检查,我们了解到工作器测试灯泡是否亮起。通过包装,我们了解到,工作器将经过验证的灯泡拿到盒子里。这种工艺在几乎所有工厂都很常见。
两名工作器如下:
- 一种所谓的生产者(或检查者),负责测试每个灯泡,看灯泡是否亮起
- 一个所谓的消费者(或包装商),负责将每个检查过的灯泡包装到一个盒子里
这种问题完全适合于下图所示的生产者消费者设计模式:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wMW6fHCN-1657345732716)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/817c7786-9093-401c-9afd-bf9b39aabd25.png)]
最常见的是,在这种模式中,生产者和消费者通过队列(生产者排队数据,消费者排队数据)进行通信。这个队列称为数据缓冲区。当然,根据流程设计,其他数据结构也可以起到数据缓冲的作用。
现在,让我们看看如果生产者等待消费者可用,我们如何实现这个模式。
稍后,我们将为不等待消费者的生产者实现此模式。
生产者等待消费者出现
装配线启动时,生产者将逐个检查进线灯泡,而消费者将将其打包(每个盒子中有一个灯泡)。此流重复,直到装配线停止。
下图是生产者和消费者之间的流程图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yPBj7z7O-1657345732716)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/3cf100e5-7ad8-4548-9a33-1e55a146b655.png)]
我们可以将装配线视为我们工厂的助手,因此它可以实现为助手或工具类(当然,它也可以很容易地切换到非static
实现,因此如果对您的情况更有意义,请随意切换):
public final class AssemblyLine { private AssemblyLine() { throw new AssertionError("There is a single assembly line!"); } ... }
当然,实现这个场景的方法很多,但是我们对使用 JavaExecutorService
感兴趣,更准确地说是Executors.newSingleThreadExecutor()
。使用一个工作线程来操作未绑定队列的Executor
通过此方法创建。
我们只有两个工作器,所以我们可以使用两个实例Executor
(一个Executor
将启动生产者,另一个将启动消费者)。因此,生产者将是一个线程,消费者将是另一个线程:
private static ExecutorService producerService; private static ExecutorService consumerService;
由于生产者和消费者是好朋友,他们决定根据一个简单的场景来工作:
- 只有消费者不忙时,生产者才会检查灯泡并将其传递给消费者(如果消费者忙,生产者会等待一段时间,直到消费者有空)
- 生产者在将当前灯泡传递给用户之前不会检查下一个灯泡
- 消费者将尽快包装每个进入的灯泡
这个场景适用于TransferQueue
或SynchronousQueue
,它执行的过程与前面提到的场景非常相似。让我们使用TransferQueue
。这是一个BlockingQueue
,其中生产者可以等待消费者接收元素。BlockingQueue
实现是线程安全的:
private static final TransferQueue<String> queue = new LinkedTransferQueue<>();
生产者和消费者之间的工作流程是先进先出(FIFO)类型:第一个检查的灯泡是第一个打包的灯泡),因此LinkedTransferQueue
是一个不错的选择。
一旦装配线启动,生产者将持续检查灯泡,因此我们可以将其作为一个类来实现,如下所示:
private static final int MAX_PROD_TIME_MS = 5 * 1000; private static final int MAX_CONS_TIME_MS = 7 * 1000; private static final int TIMEOUT_MS = MAX_CONS_TIME_MS + 1000; private static final Random rnd = new Random(); private static volatile boolean runningProducer; ... private static class Producer implements Runnable { @Override public void run() { while (runningProducer) { try { String bulb = "bulb-" + rnd.nextInt(1000); Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS)); boolean transfered = queue.tryTransfer(bulb, TIMEOUT_MS, TimeUnit.MILLISECONDS); if (transfered) { logger.info(() -> "Checked: " + bulb); } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); break; } } } }
因此,生产者通过tryTransfer()
方法将检查过的灯泡转移给消费者。如果可以在超时时间过去之前将元素传输到使用者,则此方法将执行此操作。
避免使用transfer()
方法,这可能会无限期地堵塞螺纹。
为了模拟生产者检查灯泡所花的时间,相应的线程将在 0 到 5 之间随机休眠几秒(5 秒是检查灯泡所需的最长时间)。如果消费者在此时间之后不可用,则会花费更多时间(在tryTransfer()
中),直到消费者可用或超时结束。
另一方面,使用另一个类实现使用者,如下所示:
private static volatile boolean runningConsumer; ... private static class Consumer implements Runnable { @Override public void run() { while (runningConsumer) { try { String bulb = queue.poll( MAX_PROD_TIME_MS, TimeUnit.MILLISECONDS); if (bulb != null) { Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS)); logger.info(() -> "Packed: " + bulb); } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); break; } } } }
消费者可以通过queue.take()
方法从生产者处取灯泡。此方法检索并删除此队列的头,如有必要,请等待,直到灯泡可用。也可以调用poll()
方法,在该方法中检索并移除队列的头,或者如果该队列为空,则返回null
。但这两个都不适合我们。如果生产者不在,消费者可能仍然停留在take()
方法中。另一方面,如果队列是空的(生产者现在正在检查当前灯泡),poll()
方法将很快被一次又一次地调用,导致伪重复。解决这个问题的方法是poll(long timeout, TimeUnit unit)
。此方法检索并删除此队列的头,并在指定的等待时间内(如果需要)等待灯泡变为可用。仅当等待时间过后队列为空时,才会返回null
。
为了模拟耗电元件包装灯泡所需的时间,相应的线程将在 0 到 7 之间随机休眠几秒(7 秒是包装灯泡所需的最长时间)。
启动生产者和消费者是一项非常简单的任务,它是通过一种名为startAssemblyLine()
的方法完成的,如下所示:
public static void startAssemblyLine() { if (runningProducer || runningConsumer) { logger.info("Assembly line is already running ..."); return; } logger.info("\n\nStarting assembly line ..."); logger.info(() -> "Remaining bulbs from previous run: \n" + queue + "\n\n"); runningProducer = true; producerService = Executors.newSingleThreadExecutor(); producerService.execute(producer); runningConsumer = true; consumerService = Executors.newSingleThreadExecutor(); consumerService.execute(consumer); }
停止装配线是一个微妙的过程,可以通过不同的场景来解决。主要是,当装配线停止时,生产者应检查当前灯泡作为最后一个灯泡,消费者必须包装它。生产者可能需要等待消费者包装好当前灯泡,然后才能转移最后一个灯泡;此外,消费者也必须包装好这个灯泡。
为了遵循此场景,我们首先停止生产者,然后停止消费者:
public static void stopAssemblyLine() { logger.info("Stopping assembly line ..."); boolean isProducerDown = shutdownProducer(); boolean isConsumerDown = shutdownConsumer(); if (!isProducerDown || !isConsumerDown) { logger.severe("Something abnormal happened during shutting down the assembling line!"); System.exit(0); } logger.info("Assembling line was successfully stopped!"); } private static boolean shutdownProducer() { runningProducer = false; return shutdownExecutor(producerService); } private static boolean shutdownConsumer() { runningConsumer = false; return shutdownExecutor(consumerService); }
最后,我们给生产者和消费者足够的时间来正常停止(不中断线程)。这在shutdownExecutor()
方法中发生,如下所示:
private static boolean shutdownExecutor(ExecutorService executor) { executor.shutdown(); try { if (!executor.awaitTermination(TIMEOUT_MS * 2, TimeUnit.MILLISECONDS)) { executor.shutdownNow(); return executor.awaitTermination(TIMEOUT_MS * 2, TimeUnit.MILLISECONDS); } return true; } catch (InterruptedException ex) { executor.shutdownNow(); Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } return false; }
我们要做的第一件事是将runningProducer static
变量设置为false
。这将破坏while(runningProducer)
,因此这将是最后一次检查灯泡。此外,我们启动生产者的关闭程序。
对于消费者,我们要做的第一件事是将runningConsumer static
变量设置为false
。这将打破while(runningConsumer)
,因此这将是最后一个灯泡包装。此外,我们启动耗电元件的关闭程序。
让我们看看装配线的可能执行(运行 10 秒):
AssemblyLine.startAssemblyLine(); Thread.sleep(10 * 1000); AssemblyLine.stopAssemblyLine();
可能的输出如下:
Starting assembly line ... ... [2019-04-14 07:39:40] [INFO] Checked: bulb-89 [2019-04-14 07:39:43] [INFO] Packed: bulb-89 ... Stopping assembly line ... ... [2019-04-14 07:39:53] [INFO] Packed: bulb-322 Assembling line was successfully stopped!
一般来说,如果停产需要很长时间(就好像停产一样),那么生产者和消费者的数量和/或生产和消费时间之间可能存在不平衡率。您可能需要添加或减去生产者或消费者。
生产者不等待消费者出现
如果生产者检查灯泡的速度比消费者包装灯泡的速度快,那么他们很可能会决定采用以下工作流程:
- 生产者将逐一检查灯泡,并将其推入队列
- 消费者将从队列中轮询并打包灯泡
由于消费者比生产者慢,队列将容纳已检查但未包装的灯泡(我们可以假设有空队列的可能性很低)。在下图中,我们有生产者、消费者和用于存储已检查但未包装灯泡的队列:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZpLZp4de-1657345732717)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/00124b9f-15a5-4afd-993e-ee96b1ec47d1.png)]
为了形成这种情况,我们可以依赖于ConcurrentLinkedQueue
(或LinkedBlockingQueue
)。这是一个基于链接节点的无限线程安全队列:
private static final Queue<String> queue = new ConcurrentLinkedQueue<>();
为了在队列中推一个灯泡,生产者调用offer()
方法:
queue.offer(bulb);
另一方面,消费者使用poll()
方法处理队列中的灯泡(因为消费者比生产者慢,所以当poll()
返回null
时应该是罕见的情况):
String bulb = queue.poll();
让我们第一次启动装配线 10 秒钟。这将输出以下内容:
Starting assembly line ... ... [2019-04-14 07:44:58] [INFO] Checked: bulb-827 [2019-04-14 07:44:59] [INFO] Checked: bulb-257 [2019-04-14 07:44:59] [INFO] Packed: bulb-827 ... Stopping assembly line ... ... [2019-04-14 07:45:08] [INFO] Checked: bulb-369 [2019-04-14 07:45:09] [INFO] Packed: bulb-690 ... Assembling line was successfully stopped!
此时,装配线停止,在队列中,我们有以下内容(这些灯泡已检查,但未包装):
[bulb-968, bulb-782, bulb-627, bulb-886, ...]
我们重新启动装配线并检查突出显示的行,这表明消费者从停止的位置恢复其工作:
Starting assembly line ... [2019-04-14 07:45:12] [INFO ] Packed: bulb-968 [2019-04-14 07:45:12] [INFO ] Checked: bulb-812 [2019-04-14 07:45:12] [INFO ] Checked: bulb-470 [2019-04-14 07:45:14] [INFO ] Packed: bulb-782 [2019-04-14 07:45:15] [INFO ] Checked: bulb-601 [2019-04-14 07:45:16] [INFO ] Packed: bulb-627 ...
Java 编程问题:十、并发-线程池、可调用对象和同步器3https://developer.aliyun.com/article/1426163