Java 编程问题:十、并发-线程池、可调用对象和同步器3https://developer.aliyun.com/article/1426163
207 调用多个可调用任务
由于生产者(检查器)不与消费者(打包器)同时工作,我们可以通过一个for
来模拟他们的工作,这个for
在一个队列中添加 100 个选中的灯泡:
private static final BlockingQueue<String> queue = new LinkedBlockingQueue<>(); ... private static void simulatingProducers() { for (int i = 0; i < MAX_PROD_BULBS; i++) { queue.offer("bulb-" + rnd.nextInt(1000)); } }
现在,消费者必须将每个灯泡打包并退回。这意味着Consumer
是Callable
:
private static class Consumer implements Callable { @Override public String call() throws InterruptedException { String bulb = queue.poll(); Thread.sleep(100); if (bulb != null) { logger.info(() -> "Packed: " + bulb + " by consumer: " + Thread.currentThread().getName()); return bulb; } return ""; } }
但是请记住,我们应该提交所有的任务并等待它们全部完成。这可以通过ExecutorService.invokeAll()
方法实现。此方法接受任务集合(Collection>)
,并返回Future
(List>
的实例列表作为参数。对Future.get()
的任何调用都将被阻止,直到Future
的所有实例都完成。
因此,首先我们创建一个包含 100 个任务的列表:
private static final Consumer consumer = new Consumer(); ... List<Callable<String>> tasks = new ArrayList<>(); for (int i = 0; i < queue.size(); i++) { tasks.add(consumer); }
进一步,我们执行所有这些任务并得到Future
的列表:
private static ExecutorService consumerService = Executors.newWorkStealingPool(); ... List<Future<String>> futures = consumerService.invokeAll(tasks);
最后,我们处理(在本例中,显示)结果:
for (Future<String> future: futures) { String bulb = future.get(); logger.info(() -> "Future done: " + bulb); }
请注意,对future.get()
语句的第一次调用将阻塞直到所有的Future
实例都完成。这将导致以下输出:
[12:06:41] [INFO] Packed: bulb-595 by consumer: ForkJoinPool-1-worker-9 ... [12:06:42] [INFO] Packed: bulb-478 by consumer: ForkJoinPool-1-worker-15 [12:06:43] [INFO] Future done: bulb-595 ...
有时,我们需要提交几个任务,然后等待其中任何一个任务完成。这可以通过ExecutorService.invokeAny()
实现。与invokeAll()
完全一样,此方法获取一组任务(Collection>
作为参数)。但它返回最快任务的结果(不是一个Future
),并取消所有其他尚未完成的任务,例如:
String bulb = consumerService.invokeAny(tasks);
如果您不想等待所有Future
完成,请按以下步骤进行:
int queueSize = queue.size(); List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < queueSize; i++) { futures.add(consumerService.submit(consumer)); } for (Future<String> future: futures) { String bulb = future.get(); logger.info(() -> "Future done: " + bulb); }
在所有任务完成之前,这不会阻塞。请看以下输出示例:
[12:08:56] [INFO ] Packed: bulb-894 by consumer: ForkJoinPool-1-worker-7 [12:08:56] [INFO ] Future done: bulb-894 [12:08:56] [INFO ] Packed: bulb-953 by consumer: ForkJoinPool-1-worker-5 ...
208 锁存器
锁存器是一个 Java 同步器,它允许一个或多个线程等待其他线程中的一组事件完成。它从给定的计数器开始(通常表示应该等待的事件数),完成的每个事件负责递减计数器。当计数器达到零时,所有等待的线程都可以通过。这是锁存器的终端状态。锁存器不能重置或重用,因此等待的事件只能发生一次。下图分四个步骤显示了具有三个线程的锁存器的工作原理:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1mDTW4FG-1657345732723)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/cb9e4c29-f8bd-4812-a42d-6008818744aa.png)]
在 API 术语中,锁存器是使用java.util.concurrent.CountDownLatch
实现的。
初始计数器在CountDownLatch
构造器中设置为整数。例如,计数器等于3
的CountDownLatch
可以定义为:
CountDownLatch latch = new CountDownLatch(3);
所有调用await()
方法的线程都将被阻塞,直到计数器达到零。因此,一个线程要被阻塞直到锁存器达到终端状态,它将调用await()
。每个完成的事件都可以调用countDown()
方法。此方法用一个值递减计数器。在计数器变为零之前,调用await()
的线程仍然被阻塞。
锁存器可用于各种各样的问题。现在,让我们集中讨论应该模拟启动服务器过程的问题。服务器在其内部服务启动后被视为已启动。服务可以同时启动并且相互独立。启动服务器需要一段时间,需要我们启动该服务器的所有底层服务。因此,完成并验证服务器启动的线程应该等到其他线程中的所有服务器服务(事件)都已启动。如果我们假设我们有三个服务,我们可以编写一个ServerService
类,如下所示:
public class ServerInstance implements Runnable { private static final Logger logger = Logger.getLogger(ServerInstance.class.getName()); private final CountDownLatch latch = new CountDownLatch(3); @Override public void run() { logger.info("The server is getting ready to start "); logger.info("Starting services ...\n"); long starting = System.currentTimeMillis(); Thread service1 = new Thread( new ServerService(latch, "HTTP Listeners")); Thread service2 = new Thread( new ServerService(latch, "JMX")); Thread service3 = new Thread( new ServerService(latch, "Connectors")); service1.start(); service2.start(); service3.start(); try { latch.await(); logger.info(() -> "Server has successfully started in " + (System.currentTimeMillis() - starting) / 1000 + " seconds"); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); // log ex } } }
首先,我们定义一个计数器为 3 的CountDownLatch
。其次,我们在三个不同的线程中启动服务。最后,我们通过await()
阻塞这个线程。现在,下面的类通过随机睡眠模拟服务的启动过程:
public class ServerService implements Runnable { private static final Logger logger = Logger.getLogger(ServerService.class.getName()); private final String serviceName; private final CountDownLatch latch; private final Random rnd = new Random(); public ServerService(CountDownLatch latch, String serviceName) { this.latch = latch; this.serviceName = serviceName; } @Override public void run() { int startingIn = rnd.nextInt(10) * 1000; try { logger.info(() -> "Starting service '" + serviceName + "' ..."); Thread.sleep(startingIn); logger.info(() -> "Service '" + serviceName + "' has successfully started in " + startingIn / 1000 + " seconds"); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); // log ex } finally { latch.countDown(); logger.info(() -> "Service '" + serviceName + "' running ..."); } } }
每个启动成功(或失败)的服务将通过countDown()
减少锁存。一旦计数器达到零,服务器就被认为已启动。我们称之为:
Thread server = new Thread(new ServerInstance()); server.start();
以下是可能的输出:
[08:49:17] [INFO] The server is getting ready to start [08:49:17] [INFO] Starting services ... [08:49:17] [INFO] Starting service 'JMX' ... [08:49:17] [INFO] Starting service 'Connectors' ... [08:49:17] [INFO] Starting service 'HTTP Listeners' ... [08:49:22] [INFO] Service 'HTTP Listeners' started in 5 seconds [08:49:22] [INFO] Service 'HTTP Listeners' running ... [08:49:25] [INFO] Service 'JMX' started in 8 seconds [08:49:25] [INFO] Service 'JMX' running ... [08:49:26] [INFO] Service 'Connectors' started in 9 seconds [08:49:26] [INFO] Service 'Connectors' running ... [08:49:26] [INFO] Server has successfully started in 9 seconds
为了避免不确定的等待,CountDownLatch
类具有接受超时的await()
风格await(long timeout, TimeUnit unit)
。如果在计数为零之前等待时间已过,则此方法返回false
。
209 屏障
屏障是一种 Java 同步器,它允许一组线程(称为方到达共同的屏障点。基本上,一组线程在屏障处等待彼此相遇。就像一帮朋友决定一个会议点,当他们都明白这一点时,他们会走得更远。他们不会离开会议地点,直到他们所有人都到了,或者直到他们觉得他们已经等了太久。
对于依赖于可划分为子任务的任务的问题,此同步器工作得很好。每个子任务在不同的线程中运行,并等待其余的线程。当所有线程完成时,它们将结果合并为一个结果。
下图显示了具有三个线程的屏障流的示例:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dGnvAoDP-1657345732723)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/b1611792-f8cc-4ce7-b50c-e4ed47548ca2.png)]
在 API 术语中,屏障是使用java.util.concurrent.CyclicBarrier
实现的。
一个CyclicBarrier
可以通过两个构造器来构造:
- 其中一个允许我们指定参与方的数量(这是一个整数)
- 另一个允许我们添加一个动作,该动作应该在各方都到达障碍后发生(这是一个
Runnable
)
此操作在参与方中的所有线程到达时发生,但在释放任何线程之前发生。
当线程准备在屏障处等待时,它只调用await()
方法。此方法可以无限期地等待或直到指定的超时(如果指定的超时已过或线程被中断,则用一个TimeoutException
释放此线程;屏障被认为已损坏,屏障处所有等待的线程都用一个BrokenBarrierException
释放)。我们可以通过getParties()
方法找出需要多少方跳过此障碍,以及目前有多少方通过getNumberWaiting()
方法在障碍处等待。
await()
方法返回一个整数,表示当前线程的到达索引,其中索引getParties()
-1 或 0 分别表示第一个或最后一个到达的线程。
假设我们要启动一个服务器。服务器在其内部服务启动后被视为已启动。服务可以同时启动(这很耗时),但它们是相互依赖的,因此,一旦准备好启动,就必须一次启动所有服务。
因此,每个服务都可以准备在单独的线程中启动。一旦准备好启动,线程将在屏障处等待其余的服务。当他们都准备好出发时,他们就越过障碍开始奔跑。让我们考虑三种服务,CyclicBarrier
可以定义如下:
Runnable barrierAction = () -> logger.info("Services are ready to start ..."); CyclicBarrier barrier = new CyclicBarrier(3, barrierAction);
让我们通过三个线程来准备服务:
public class ServerInstance implements Runnable { private static final Logger logger = Logger.getLogger(ServerInstance.class.getName()); private final Runnable barrierAction = () -> logger.info("Services are ready to start ..."); private final CyclicBarrier barrier = new CyclicBarrier(3, barrierAction); @Override public void run() { logger.info("The server is getting ready to start "); logger.info("Starting services ...\n"); long starting = System.currentTimeMillis(); Thread service1 = new Thread( new ServerService(barrier, "HTTP Listeners")); Thread service2 = new Thread( new ServerService(barrier, "JMX")); Thread service3 = new Thread( new ServerService(barrier, "Connectors")); service1.start(); service2.start(); service3.start(); try { service1.join(); service2.join(); service3.join(); logger.info(() -> "Server has successfully started in " + (System.currentTimeMillis() - starting) / 1000 + " seconds"); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } } }
ServerService
负责准备每一项服务启动,并通过await()
将其阻塞在屏障上:
public class ServerService implements Runnable { private static final Logger logger = Logger.getLogger(ServerService.class.getName()); private final String serviceName; private final CyclicBarrier barrier; private final Random rnd = new Random(); public ServerService(CyclicBarrier barrier, String serviceName) { this.barrier = barrier; this.serviceName = serviceName; } @Override public void run() { int startingIn = rnd.nextInt(10) * 1000; try { logger.info(() -> "Preparing service '" + serviceName + "' ..."); Thread.sleep(startingIn); logger.info(() -> "Service '" + serviceName + "' was prepared in " + startingIn / 1000 + " seconds (waiting for remaining services)"); barrier.await(); logger.info(() -> "The service '" + serviceName + "' is running ..."); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } catch (BrokenBarrierException ex) { logger.severe(() -> "Exception ... barrier is broken! " + ex); } } }
现在,让我们运行它:
Thread server = new Thread(new ServerInstance()); server.start();
下面是一个可能的输出(请注意线程是如何被释放以跨越屏障的):
[10:38:34] [INFO] The server is getting ready to start [10:38:34] [INFO] Starting services ... [10:38:34] [INFO] Preparing service 'Connectors' ... [10:38:34] [INFO] Preparing service 'JMX' ... [10:38:34] [INFO] Preparing service 'HTTP Listeners' ... [10:38:35] [INFO] Service 'HTTP Listeners' was prepared in 1 seconds (waiting for remaining services) [10:38:36] [INFO] Service 'JMX' was prepared in 2 seconds (waiting for remaining services) [10:38:38] [INFO] Service 'Connectors' was prepared in 4 seconds (waiting for remaining services) [10:38:38] [INFO] Services are ready to start ... [10:38:38] [INFO] The service 'Connectors' is running ... [10:38:38] [INFO] The service 'HTTP Listeners' is running ... [10:38:38] [INFO] The service 'JMX' is running ... [10:38:38] [INFO] Server has successfully started in 4 seconds
CyclicBarrier
是循环的,因为它可以重置和重用。为此,请在释放所有等待屏障的线程后调用reset()
方法,否则会抛出BrokenBarrierException
。
处于已损坏状态的屏障将导致isBroken()
标志方法返回true
。
210 交换器
交换器是一个 Java 同步器,它允许两个线程在一个交换点或同步点交换对象。
主要是这种同步器起到了屏障作用。两个线程在一个屏障处互相等待。他们交换一个对象,并在两个到达时继续他们通常的任务。
下图分四个步骤描述了交换器的流量:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wRyq9xpi-1657345732724)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/d122d5a4-65d1-4b48-b209-3f9aed333c51.png)]
在 API 术语中,这个同步器是由java.util.concurrent.Exchanger
公开的。
一个Exchanger
可以通过一个空构造器创建,并公开了两个exchange()
方法:
- 只得到它将提供的对象的人
- 获得超时的线程(在另一个线程进入交换之前,如果经过指定的等待时间,将抛出一个
TimeoutException
)。
还记得我们的灯泡装配线吗?好吧,假设生产者(检查者)将检查过的灯泡添加到篮子中(例如,List
。当篮子满了,生产者将其与消费者(包装机)交换为空篮子(例如,另一个List
。只要装配线正在运行,该过程就会重复。
下图表示此流程:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BI4kjOVM-1657345732724)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/4e0f5ce1-b9f9-40cf-bdb3-4d79e3aa6c2a.png)]
所以,首先我们需要Exchanger
:
private static final int BASKET_CAPACITY = 5; ... private static final Exchanger<List<String>> exchanger = new Exchanger<>();
生产者装满篮子,在交换点等待消费者:
private static final int MAX_PROD_TIME_MS = 2 * 1000; private static final Random rnd = new Random(); private static volatile boolean runningProducer; ... private static class Producer implements Runnable { private List<String> basket = new ArrayList<>(BASKET_CAPACITY); @Override public void run() { while (runningProducer) { try { for (int i = 0; i < BASKET_CAPACITY; i++) { String bulb = "bulb-" + rnd.nextInt(1000); Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS)); basket.add(bulb); logger.info(() -> "Checked and added in the basket: " + bulb); } logger.info("Producer: Waiting to exchange baskets ..."); basket = exchanger.exchange(basket); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); break; } } } }
另一方面,消费者在交换点等待从生产者那里收到装满灯泡的篮子,然后给出一个空的篮子作为交换。此外,当生产者再次装满篮子时,消费者从收到的篮子中包装灯泡。完成后,他们将再次前往兑换点等待另一个满满的篮子。因此,Consumer
可以写成:
private static final int MAX_CONS_TIME_MS = 5 * 1000; private static final Random rnd = new Random(); private static volatile boolean runningConsumer; ... private static class Consumer implements Runnable { private List<String> basket = new ArrayList<>(BASKET_CAPACITY); @Override public void run() { while (runningConsumer) { try { logger.info("Consumer: Waiting to exchange baskets ..."); basket = exchanger.exchange(basket); logger.info(() -> "Consumer: Received the following bulbs: " + basket); for (String bulb: basket) { if (bulb != null) { Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS)); logger.info(() -> "Packed from basket: " + bulb); } } basket.clear(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); break; } } } }
为了简洁起见,代码的其余部分被省略了。
现在,让我们看看可能的输出:
Starting assembly line ... [13:23:13] [INFO] Consumer: Waiting to exchange baskets ... [13:23:15] [INFO] Checked and added in the basket: bulb-606 ... [13:23:18] [INFO] Producer: Waiting to exchange baskets ... [13:23:18] [INFO] Consumer: Received the following bulbs: [bulb-606, bulb-251, bulb-102, bulb-454, bulb-280] [13:23:19] [INFO] Checked and added in the basket: bulb-16 ... [13:23:21] [INFO] Packed from basket: bulb-606 ...
211 信号量
信号量是一个 Java 同步器,它允许我们控制在任何时候可以访问资源的线程数。从概念上讲,这个同步器管理一组许可(例如,类似于令牌)。需要访问资源的线程必须从同步器获得许可。在线程使用资源完成其工作之后,它必须通过将许可返回给信号量来释放它,以便另一个线程可以获取它。线程可以立即获取许可证(如果许可证是空闲的),可以等待一定的时间,或者可以等待直到许可证变为空闲。此外,一个线程一次可以获取和释放多个许可证,一个线程即使没有获取许可证也可以释放许可证。这将向信号量添加一个许可证;因此信号量可以从一个许可证数开始,然后从另一个许可证数结束。
在 API 术语中,这个同步器用java.util.concurrent.Semaphore
表示。
创建一个Semaphore
就像调用它的两个构造器中的一个一样简单:
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
一个公平的Semaphore
保证 FIFO 在争议中授予许可。
可使用acquire()
方法获得许可证。该过程可以用以下项目符号表示:
- 如果没有参数,这个方法将从这个信号量获取一个许可,阻塞直到一个可用,或者线程被中断
- 要获得多个许可证,请使用
acquire(int permits)
- 要尝试获取许可证并立即返回标志值,请使用
tryAcquire()
或tryAcquire(int permits)
- 要在给定的等待时间内等待一个线程变为可用(并且当前线程未被中断),请使用
tryAcquire(int permits, long timeout, TimeUnit unit)
- 为了从这个信号机获得许可,可以通过
acquireUninterruptibly()
和acquireUninterruptibly(int permits)
获得阻塞直到一个可用 - 要发布许可证,请使用
release()
现在,在我们的场景中,理发店有三个座位,并以先进先出的方式为顾客服务。一位顾客试了五秒钟才坐下。最后,它释放了获得的座位。查看以下代码以了解如何获取和释放座椅:
public class Barbershop { private static final Logger logger = Logger.getLogger(Barbershop.class.getName()); private final Semaphore seats; public Barbershop(int seatsCount) { this.seats = new Semaphore(seatsCount, true); } public boolean acquireSeat(int customerId) { logger.info(() -> "Customer #" + customerId + " is trying to get a seat"); try { boolean acquired = seats.tryAcquire( 5 * 1000, TimeUnit.MILLISECONDS); if (!acquired) { logger.info(() -> "Customer #" + customerId + " has left the barbershop"); return false; } logger.info(() -> "Customer #" + customerId + " got a seat"); return true; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } return false; } public void releaseSeat(int customerId) { logger.info(() -> "Customer #" + customerId + " has released a seat"); seats.release(); } }
如果在这五秒钟内没有座位被释放,那么这个人就离开理发店。另一方面,成功入座的顾客由理发师服务(这将需要 0 到 10 之间的随机秒数)。最后,客户松开座椅。在代码行中,可以按以下方式编写:
public class BarbershopCustomer implements Runnable { private static final Logger logger = Logger.getLogger(BarbershopCustomer.class.getName()); private static final Random rnd = new Random(); private final Barbershop barbershop; private final int customerId; public BarbershopCustomer(Barbershop barbershop, int customerId) { this.barbershop = barbershop; this.customerId = customerId; } @Override public void run() { boolean acquired = barbershop.acquireSeat(customerId); if (acquired) { try { Thread.sleep(rnd.nextInt(10 * 1000)); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } finally { barbershop.releaseSeat(customerId); } } else { Thread.currentThread().interrupt(); } } }
让我们带 10 位顾客来我们的理发店:
Barbershop bs = new Barbershop(3); for (int i = 1; i <= 10; i++) { BarbershopCustomer bc = new BarbershopCustomer(bs, i); new Thread(bc).start(); }
以下是可能输出的快照:
[16:36:17] [INFO] Customer #10 is trying to get a seat [16:36:17] [INFO] Customer #5 is trying to get a seat [16:36:17] [INFO] Customer #7 is trying to get a seat [16:36:17] [INFO] Customer #5 got a seat [16:36:17] [INFO] Customer #10 got a seat [16:36:19] [INFO] Customer #10 has released a seat ...
许可证不是在线程级别获取的。
这意味着T1
线程可以从Semaphore
获得许可,而T2
线程可以释放它。 当然,开发人员负责管理过程。
212 移相器
移相器是一种灵活的 Java 同步器,结合了CyclicBarrier
和CountDownLatch
在以下上下文中的功能:
- 一个移相器由一个或多个相位组成,这些相位充当动态数量的参与方(线程)的屏障。
- 在移相器寿命期间,可以动态修改同步方(线程)的数量。我们可以注册/注销当事人。
- 当前注册方必须在当前阶段(障碍)中等待,然后才能进入下一个执行步骤(下一阶段)-如
CyclicBarrier
的情况。 - 移相器的每个相位可以通过从 0 开始的相关数字/索引来识别。第一阶段为 0,下一阶段为 1,下一阶段为 2,等至
Integer.MAX_VALUE
。 - 一个移相器的任何一个阶段都可以有三种类型的参与方:注册、到达(这些是在当前阶段/关卡等待的注册方)和未到达(这些是在前往当前阶段的途中的注册方)。
- 缔约方的动态计数器有三种类型:登记缔约方计数器、到达缔约方计数器和未完结缔约方计数器。当所有参与方到达当前阶段(注册参与方的数量等于到达参与方的数量)时,阶段器将进入下一阶段。
- 或者,我们可以在进入下一个阶段之前(当所有各方到达阶段/关卡时)执行一个操作(代码片段)。
- 移相器具有终止状态。注册方的计数不受终止的影响,但是在终止之后,所有同步方法立即返回,而不必等待进入另一个阶段。同样,在终止时尝试注册也没有效果。
在下图中,我们可以看到一个移相器,在相位 0 中有四个注册方,在相位 1 中有三个注册方。我们还将进一步讨论一些 API 风格:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7xgjpOnH-1657345732725)(https://github.com/apachecn/apachecn-java-zh/raw/master/docs/java-coding-prob/img/fbead787-4a73-4958-aeec-ecfd5784f44e.png)]
通常,通过参与方,我们理解线程(一方=一个线程),但是移相器不执行参与方和特定线程之间的关联。一个移相器只是统计和管理注册方和注销方的数量。
在 API 术语中,这个同步器用java.util.concurrent.Phaser
表示。
一个Phaser
可以由零个参与方、一个通过空构造器的显式参与方数或一个采用整数参数Phaser(int parties)
的构造器创建。Phaser
还可以通过Phaser(Phaser parent)
或Phaser(Phaser parent, int parties)
指定父级。通常由一方启动Phaser
,称为控制器或控制方。通常,这个聚会在Phaser
寿命期内寿命最长。
一方可以通过register()
方式随时注册(在上图中,在 0 期和 1 期之间,我们注册T5
和T6
)。我们也可以通过bulkRegister(int parties)
注册一大批当事人。注册方可以通过arriveAndDeregister()
取消注册,无需等待其他方。此方法允许一方到达当前屏障(Phaser
)并取消注册,而无需等待其他方到达(在上图中,T4
、T3
、T2
方逐一取消注册)。每个注销方减少一个注册方的数量。
为了达到当前阶段(障碍),等待其他方到达,需要调用arriveAndAwaitAdvance()
方法。这种方法将阻止所有登记方到达当前阶段。一旦最后一注册方到达本阶段,各方将进入Phaser
的下一阶段。
或者,当所有注册方到达当前阶段时,我们可以通过覆盖onAdvance()
方法onAdvance(int phase, int registeredParties)
来运行特定操作。如果要触发Phaser
的终止,则此方法返回一个boolean
值,即true
。另外,我们可以通过forceTermination()
强制终止,也可以通过isTerminated()
的标志方法进行测试。覆盖onAdvance()
方法需要我们扩展Phaser
类(通常通过匿名类)。
现在,我们应该有足够的细节来解决我们的问题。因此,我们必须在Phaser
的三个阶段中模拟服务器的启动过程。服务器被认为是在其五个内部服务启动之后启动并运行的。在第一阶段,我们需要同时启动三个服务。在第二阶段,我们需要同时启动另外两个服务(只有在前三个服务已经运行的情况下才能启动)。在第三阶段,服务器执行最后一次签入,并被视为已启动并正在运行。
因此,管理服务器启动进程的线程(参与方)可以被视为控制其余线程(参与方)的线程。这意味着我们可以创建Phaser
并通过Phaser
构造器注册这个控制线程(或,控制器):
public class ServerInstance implements Runnable { private static final Logger logger = Logger.getLogger(ServerInstance.class.getName()); private final Phaser phaser = new Phaser(1) { @Override protected boolean onAdvance(int phase, int registeredParties) { logger.warning(() -> "Phase:" + phase + " Registered parties: " + registeredParties); return registeredParties == 0; } }; ... }
使用匿名类,我们创建这个Phaser
对象并覆盖其onAdvance()
方法来定义一个有两个主要目的的操作:
- 打印当前阶段的快速状态和注册方的数量
- 如果没有注册方,触发
Phaser
终止
当所有当前注册方到达当前屏障(当前阶段)时,将为每个阶段调用此方法。
管理服务器服务的线程需要启动这些服务并从Phaser
注销它们自己。因此,每个服务在一个单独的线程中启动,该线程将在其作业结束时通过arriveAndDeregister()
取消注册。为此,我们可以使用以下Runnable
:
public class ServerService implements Runnable { private static final Logger logger = Logger.getLogger(ServerService.class.getName()); private final String serviceName; private final Phaser phaser; private final Random rnd = new Random(); public ServerService(Phaser phaser, String serviceName) { this.phaser = phaser; this.serviceName = serviceName; this.phaser.register(); } @Override public void run() { int startingIn = rnd.nextInt(10) * 1000; try { logger.info(() -> "Starting service '" + serviceName + "' ..."); Thread.sleep(startingIn); logger.info(() -> "Service '" + serviceName + "' was started in " + startingIn / 1000 + " seconds (waiting for remaining services)"); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } finally { phaser.arriveAndDeregister(); } } }
现在,控制线程可以触发service1
、service2
和service3
的启动进程。此过程按以下方法成形:
private void startFirstThreeServices() { Thread service1 = new Thread( new ServerService(phaser, "HTTP Listeners")); Thread service2 = new Thread( new ServerService(phaser, "JMX")); Thread service3 = new Thread( new ServerService(phaser, "Connectors")); service1.start(); service2.start(); service3.start(); phaser.arriveAndAwaitAdvance(); // phase 0 }
注意,在这个方法的末尾,我们调用了phaser.arriveAndAwaitAdvance()
。这是等待其他注册方到达的控制方。其余注册方(service1
、service2
、service3
逐一注销,直至Phaser
中只剩下控制方。此时,是时候进入下一阶段了。所以,控制方是唯一进入下一阶段的。
与此实现类似,控制线程可以触发service4
和service5
的启动进程。此过程按以下方法成形:
private void startNextTwoServices() { Thread service4 = new Thread( new ServerService(phaser, "Virtual Hosts")); Thread service5 = new Thread( new ServerService(phaser, "Ports")); service4.start(); service5.start(); phaser.arriveAndAwaitAdvance(); // phase 1 }
最后,在这五个服务启动之后,控制线程执行最后一个检查,该检查在下面的方法中作为虚拟的Thread.sleep()
实现。注意,在这个操作结束时,启动服务器的控制线程从Phaser
注销了自己。当发生这种情况时,意味着不再有注册方,并且由于从onAdvance()
方法返回true
而终止Phaser
:
private void finalCheckIn() { try { logger.info("Finalizing process (should take 2 seconds) ..."); Thread.sleep(2000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.severe(() -> "Exception: " + ex); } finally { phaser.arriveAndDeregister(); // phase 2 } }
控制线程的任务是按正确的顺序调用前面的三个方法。代码的其余部分由一些日志组成;因此为了简洁起见,跳过了它。本书附带了这个问题的完整源代码。
在任何时候,我们都可以通过getRegisteredParties()
查询到注册方的数量,通过getArrivedParties()
查询到到达方的数量,通过getUnarrivedParties()
查询到未到达方的数量。您可能还需要检查arrive()
、awaitAdvance(int phase)
和awaitAdvanceInterruptibly(int phase)
方法。
总结
本章概述了 Java 并发的主要坐标,应该为下一章做好准备。我们讨论了线程生命周期、对象级和类级锁定、线程池以及Callable
和Future
等几个基本问题
下载本章中的应用以查看结果并查看一些其他详细信息。