写在前面
整个周末,都是在咳嗽中度过。夏日的感冒,感觉要死的节奏。没法思考,没法学习,是最尴尬的事情,喝了药就想睡觉。
那么,只能把存货,上一下。
Fork / Join框架是使用并发分治法解决问题的框架。引入它们是为了补充现有的并发API。在介绍它们之前,现有的Executor Service实现是运行异步任务的流行选择,但是当任务同质且独立时,它们会发挥最佳作用。运行依赖的任务并使用这些实现来组合其结果并不容易。随着Fork / Join框架的引入,试图解决这一缺陷。
解决非阻塞任务
简单粗暴地直接跳入代码。在代码中创建一个任务,该任务将返回列表的所有元素的总和。以下步骤以伪代码表示我们的算法:
01.查找列表的中间索引
02.在中间划分列表
03.递归创建一个新任务,该任务将计算左侧部分的总和
04.递归创建一个新任务,该任务将计算正确部分的总和
05.将左总和,中间元素和右总和的结果相加
@Slf4j public class ListSummer extends RecursiveTask<Integer> { private final List<Integer> listToSum; ListSummer(List<Integer> listToSum) { this.listToSum = listToSum; } @Override protected Integer compute() { if (listToSum.isEmpty()) { log.info("空集合"); return 0; } int middleIndex = listToSum.size() / 2; log.info("集合 {}, 中部索引: {}", listToSum, middleIndex); List<Integer> leftSublist = listToSum.subList(0, middleIndex); List<Integer> rightSublist = listToSum.subList(middleIndex + 1, listToSum.size()); ListSummer leftSummer = new ListSummer(leftSublist); ListSummer rightSummer = new ListSummer(rightSublist); leftSummer.fork(); rightSummer.fork(); Integer leftSum = leftSummer.join(); Integer rightSum = rightSummer.join(); int total = leftSum + listToSum.get(middleIndex) + rightSum; log.info("左部分和 {}, 右部分和 {}, 总和 {}", leftSum, rightSum, total); return total; } }
首先,我们通过继承ForkJoinTask的子类RecursiveTask。这是能够在执行并发任务并返回结果时的子类。当任务不返回结果而仅执行效果时,继承递归操作RecursiveAction的子类。对于我们解决的大多数实际任务,这两个子类就足够了。
其次,RecursiveTask和RecursiveAction都定义了一种抽象计算方法compute。这是进行计算执行代码的地方。
第三,在计算方法内部,检查通过构造函数传递的列表的大小。如果为空,则已经知道总和的结果为零,然后立即返回。否则,将列表分为两个子列表,并创建List Summer类型的两个实例。然后,在这两个实例上调用fork()(在ForkJoinTask中定义)方法
leftSummer.fork(); rightSummer.fork();
导致将这些任务安排为异步执行的原因,稍后将在本文中解释用于此目的的确切机制。
之后,调用join()方法(也在ForkJoinTask中定义)等待这两个部分的结果
Integer leftSum = leftSummer.join(); Integer rightSum = rightSummer.join();
然后将其与列表的中间元素相加以获得最终结果。
代码内添加了许多日志消息,以使更易于理解。
注:当我们处理包含数千个条目的列表时,进行详细的日志记录(尤其是记录整个列表),对于解决方案来说其实很不优雅。
差不多了,现在,为测试运行创建一个测试类
public class ListSummerTest { @Test public void shouldSumEmptyList() { ListSummer summer = new ListSummer(List.of()); ForkJoinPool forkJoinPool = new ForkJoinPool(); forkJoinPool.submit(summer); int result = summer.join(); assertThat(result).isZero(); } @Test public void shouldSumListWithOneElement() { ListSummer summer = new ListSummer(List.of(5)); ForkJoinPool forkJoinPool = new ForkJoinPool(); forkJoinPool.submit(summer); int result = summer.join(); assertThat(result).isEqualTo(5); } @Test public void shouldSumListWithMultipleElements() { ListSummer summer = new ListSummer(List.of( 1, 2, 3, 4, 5, 6, 7, 8, 9 )); ForkJoinPool forkJoinPool = new ForkJoinPool(); forkJoinPool.submit(summer); int result = summer.join(); assertThat(result).isEqualTo(45); } }
测试类中,创建一个 ForkJoinPool的实例。
ForkJoinPool是用于运行Fork Join任务的独特Executor Service实现。它采用一种称为工作窃取算法的特殊算法。与其他执行器服务实现相反,在其他实现器服务实现中,只有一个队列包含要执行的所有任务,在工作窃取实现中,每个工作线程都获得其工作队列。每个线程都从其队列开始执行任务。具体的原理,可以查看源代码进行查看。
当我们检测到Fork Join Task可以分解为多个较小的子任务时,便将它们分解为较小的任务,然后在这些任务上调用fork()方法。这种调用导致子任务被推入执行线程的队列中。在执行期间,当一个线程用尽队列/没有要执行的任务时,它可以从另一线程的队列中“窃取”任务(因此称为“工作窃取”)。与使用任何其他Executor Service实现相比,这种窃取行为可以带来更高的吞吐量。
之前,当我们在代码中执行leftSummer和rightSummer对象的fork()时,它们被推入执行线程的工作队列中,之后它们被池中的其他活动线程“窃取”(依此类推)。
解决阻塞任务
刚才解决的问题本质上是非阻塞的。如果想解决一个阻塞操作的问题,那么为了获得更好的吞吐量,则需要改变策略。
用另一个代码实例来描述一下。假设要创建一个非常简单的网页爬虫。该爬虫将接收HTTP链接列表,执行GET请求以获取响应体,然后计算响应长度。
这是代码–
@Slf4j public class ResponseLengthCalculator extends RecursiveTask<Map<String, Integer>> { private final List<String> links; ResponseLengthCalculator(List<String> links) { this.links = links; } @Override protected Map<String, Integer> compute() { if (links.isEmpty()) { log.info("没获取更多链接"); return Collections.emptyMap(); } int middle = links.size() / 2; log.info("索引: {}", links, middle); ResponseLengthCalculator leftPartition = new ResponseLengthCalculator(links.subList(0, middle)); ResponseLengthCalculator rightPartition = new ResponseLengthCalculator(links.subList(middle + 1, links.size())); log.info("执行左区域加入"); leftPartition.fork(); log.info("左区域被加入,现在执行右区域加入"); rightPartition.fork(); log.info("右区域被加入"); String middleLink = links.get(middle); HttpRequester httpRequester = new HttpRequester(middleLink); String response; try { log.info("执行回调{}", middleLink); ForkJoinPool.managedBlock(httpRequester); response = httpRequester.response; } catch (InterruptedException ex) { log.error("发生异常", ex); response = ""; } Map<String, Integer> responseMap = new HashMap<>(links.size()); Map<String, Integer> leftLinks = leftPartition.join(); responseMap.putAll(leftLinks); responseMap.put(middleLink, response.length()); Map<String, Integer> rightLinks = rightPartition.join(); responseMap.putAll(rightLinks); log.info("左部链接集合{}, 响应长度 {}, 右部链接集合{}", leftLinks, response.length(), rightLinks); return responseMap; } private static class HttpRequester implements ForkJoinPool.ManagedBlocker { private final String link; private String response; private HttpRequester(String link) { this.link = link; } @Override public boolean block() { HttpGet headRequest = new HttpGet(link); CloseableHttpClient client = HttpClientBuilder .create() .disableRedirectHandling() .build(); try { log.info("正在执行请求{}", link); CloseableHttpResponse response = client.execute(headRequest); log.info("链接url{}已经被请求", link); this.response = EntityUtils.toString(response.getEntity()); } catch (IOException e) { log.error("当从链接url{}中获取响应报错:{}", link, e.getMessage()); this.response = ""; } return true; } @Override public boolean isReleasable() { return false; } } }
上述代码创建了一个ForkJoinPool.Managed Blocker的实现类HttpRequester,在其中放置阻塞的HTTP调用。该接口定义了两个方法-block()和is Releasable()。block()方法是我们进行阻塞调用的地方。在完成阻塞操作之后,我们返回true,指示不再需要进一步的阻塞。我们从is Releasable()实现中返回false,以向fork-join工作线程指示block()方法实现实际上可能在阻塞。is Releasable()实现将在调用block()方法之前先由fork-join工作线程调用。最后,我们通过调用Fork Join Pool.managed Block()静态方法将Http Requester实例提交到池中。之后,我们的阻止任务将开始执行。当它阻止HTTP请求时,Fork Join Pool.managed Block()方法还将安排在必要时激活备用线程,以确保足够的并行性。
测试代码如下所示:
public class ResponseLengthCalculatorTest { @Test public void shouldReturnEmptyMapForEmptyList() { ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(Collections.emptyList()); ForkJoinPool pool = new ForkJoinPool(); pool.submit(responseLengthCalculator); Map<String, Integer> result = responseLengthCalculator.join(); assertThat(result).isEmpty(); } @Test public void shouldHandle200Ok() { ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of( "http://huclele.ml/200" )); ForkJoinPool pool = new ForkJoinPool(); pool.submit(responseLengthCalculator); Map<String, Integer> result = responseLengthCalculator.join(); assertThat(result) .hasSize(1) .containsKeys("http://huclele.ml/200") .containsValue(0); } @Test public void shouldFetchResponseForDifferentResponseStatus() { ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of( "http://huclele.ml/200", "http://huclele.ml/302", "http://huclele.ml/404", "http://huclele.ml/502" )); ForkJoinPool pool = new ForkJoinPool(); pool.submit(responseLengthCalculator); Map<String, Integer> result = responseLengthCalculator.join(); assertThat(result) .hasSize(4); } }
总结
今天,聊了Fork/Join的用法,以及两个实例,还有更多玩法,可以学习。有什么问题,留言给我o~