Java 内Fork/Join框架的尝试

简介: Java 内Fork/Join框架的尝试

写在前面

整个周末,都是在咳嗽中度过。夏日的感冒,感觉要死的节奏。没法思考,没法学习,是最尴尬的事情,喝了药就想睡觉。

那么,只能把存货,上一下。

Fork / Join框架是使用并发分治法解决问题的框架。引入它们是为了补充现有的并发API。在介绍它们之前,现有的Executor Service实现是运行异步任务的流行选择,但是当任务同质且独立时,它们会发挥最佳作用。运行依赖的任务并使用这些实现来组合其结果并不容易。随着Fork / Join框架的引入,试图解决这一缺陷。


解决非阻塞任务

简单粗暴地直接跳入代码。在代码中创建一个任务,该任务将返回列表的所有元素的总和。以下步骤以伪代码表示我们的算法:

01.查找列表的中间索引

02.在中间划分列表

03.递归创建一个新任务,该任务将计算左侧部分的总和

04.递归创建一个新任务,该任务将计算正确部分的总和

05.将左总和,中间元素和右总和的结果相加

@Slf4j
public class ListSummer extends RecursiveTask<Integer> {
 private final List<Integer> listToSum;
 ListSummer(List<Integer> listToSum) {
   this.listToSum = listToSum;
}
 @Override
 protected Integer compute() {
   if (listToSum.isEmpty()) {
     log.info("空集合");
     return 0;
  }
   int middleIndex = listToSum.size() / 2;
   log.info("集合 {}, 中部索引: {}", listToSum, middleIndex);
   List<Integer> leftSublist = listToSum.subList(0, middleIndex);
   List<Integer> rightSublist = listToSum.subList(middleIndex + 1, listToSum.size());
   ListSummer leftSummer = new ListSummer(leftSublist);
   ListSummer rightSummer = new ListSummer(rightSublist);
   leftSummer.fork();
   rightSummer.fork();
   Integer leftSum = leftSummer.join();
   Integer rightSum = rightSummer.join();
   int total = leftSum + listToSum.get(middleIndex) + rightSum;
   log.info("左部分和 {}, 右部分和 {}, 总和 {}", leftSum, rightSum, total);
   return total;
}
}

首先,我们通过继承ForkJoinTask的子类RecursiveTask。这是能够在执行并发任务并返回结果时的子类。当任务不返回结果而仅执行效果时,继承递归操作RecursiveAction的子类。对于我们解决的大多数实际任务,这两个子类就足够了。

其次,RecursiveTask和RecursiveAction都定义了一种抽象计算方法compute。这是进行计算执行代码的地方。

第三,在计算方法内部,检查通过构造函数传递的列表的大小。如果为空,则已经知道总和的结果为零,然后立即返回。否则,将列表分为两个子列表,并创建List Summer类型的两个实例。然后,在这两个实例上调用fork()(在ForkJoinTask中定义)方法

leftSummer.fork();
rightSummer.fork();

导致将这些任务安排为异步执行的原因,稍后将在本文中解释用于此目的的确切机制。

之后,调用join()方法(也在ForkJoinTask中定义)等待这两个部分的结果

Integer leftSum = leftSummer.join();
Integer rightSum = rightSummer.join();

然后将其与列表的中间元素相加以获得最终结果。

代码内添加了许多日志消息,以使更易于理解。

注:当我们处理包含数千个条目的列表时,进行详细的日志记录(尤其是记录整个列表),对于解决方案来说其实很不优雅。

差不多了,现在,为测试运行创建一个测试类

public class ListSummerTest {
 @Test
 public void shouldSumEmptyList() {
   ListSummer summer = new ListSummer(List.of());
   ForkJoinPool forkJoinPool = new ForkJoinPool();
   forkJoinPool.submit(summer);
   int result = summer.join();
   assertThat(result).isZero();
}
 @Test
 public void shouldSumListWithOneElement() {
   ListSummer summer = new ListSummer(List.of(5));
   ForkJoinPool forkJoinPool = new ForkJoinPool();
   forkJoinPool.submit(summer);
   int result = summer.join();
   assertThat(result).isEqualTo(5);
}
 @Test
 public void shouldSumListWithMultipleElements() {
   ListSummer summer = new ListSummer(List.of(
       1, 2, 3, 4, 5, 6, 7, 8, 9
  ));
   ForkJoinPool forkJoinPool = new ForkJoinPool();
   forkJoinPool.submit(summer);
   int result = summer.join();
   assertThat(result).isEqualTo(45);
}
}

测试类中,创建一个 ForkJoinPool的实例。

ForkJoinPool是用于运行Fork Join任务的独特Executor Service实现。它采用一种称为工作窃取算法的特殊算法。与其他执行器服务实现相反,在其他实现器服务实现中,只有一个队列包含要执行的所有任务,在工作窃取实现中,每个工作线程都获得其工作队列。每个线程都从其队列开始执行任务。具体的原理,可以查看源代码进行查看。

当我们检测到Fork Join Task可以分解为多个较小的子任务时,便将它们分解为较小的任务,然后在这些任务上调用fork()方法。这种调用导致子任务被推入执行线程的队列中。在执行期间,当一个线程用尽队列/没有要执行的任务时,它可以从另一线程的队列中“窃取”任务(因此称为“工作窃取”)。与使用任何其他Executor Service实现相比,这种窃取行为可以带来更高的吞吐量。

之前,当我们在代码中执行leftSummer和rightSummer对象的fork()时,它们被推入执行线程的工作队列中,之后它们被池中的其他活动线程“窃取”(依此类推)。


解决阻塞任务

刚才解决的问题本质上是非阻塞的。如果想解决一个阻塞操作的问题,那么为了获得更好的吞吐量,则需要改变策略。

用另一个代码实例来描述一下。假设要创建一个非常简单的网页爬虫。该爬虫将接收HTTP链接列表,执行GET请求以获取响应体,然后计算响应长度。

这是代码–

@Slf4j
public class ResponseLengthCalculator extends RecursiveTask<Map<String, Integer>> {
 private final List<String> links;
 ResponseLengthCalculator(List<String> links) {
   this.links = links;
}
 @Override
 protected Map<String, Integer> compute() {
   if (links.isEmpty()) {
     log.info("没获取更多链接");
     return Collections.emptyMap();
  }
   int middle = links.size() / 2;
   log.info("索引: {}", links, middle);
   ResponseLengthCalculator leftPartition = new ResponseLengthCalculator(links.subList(0, middle));
   ResponseLengthCalculator rightPartition = new ResponseLengthCalculator(links.subList(middle + 1, links.size()));
   log.info("执行左区域加入");
   leftPartition.fork();
   log.info("左区域被加入,现在执行右区域加入");
   rightPartition.fork();
   log.info("右区域被加入");
   String middleLink = links.get(middle);
   HttpRequester httpRequester = new HttpRequester(middleLink);
   String response;
   try {
     log.info("执行回调{}", middleLink);
     ForkJoinPool.managedBlock(httpRequester);
     response = httpRequester.response;
  } catch (InterruptedException ex) {
     log.error("发生异常", ex);
     response = "";
  }
   Map<String, Integer> responseMap = new HashMap<>(links.size());
   Map<String, Integer> leftLinks = leftPartition.join();
   responseMap.putAll(leftLinks);
   responseMap.put(middleLink, response.length());
   Map<String, Integer> rightLinks = rightPartition.join();
   responseMap.putAll(rightLinks);
   log.info("左部链接集合{}, 响应长度 {}, 右部链接集合{}", leftLinks, response.length(), rightLinks);
   return responseMap;
}
 private static class HttpRequester implements ForkJoinPool.ManagedBlocker {
   private final String link;
   private String response;
   private HttpRequester(String link) {
     this.link = link;
  }
   @Override
   public boolean block() {
     HttpGet headRequest = new HttpGet(link);
     CloseableHttpClient client = HttpClientBuilder
        .create()
        .disableRedirectHandling()
        .build();
     try {
       log.info("正在执行请求{}", link);
       CloseableHttpResponse response = client.execute(headRequest);
       log.info("链接url{}已经被请求", link);
       this.response = EntityUtils.toString(response.getEntity());
    } catch (IOException e) {
       log.error("当从链接url{}中获取响应报错:{}", link, e.getMessage());
       this.response = "";
    }
     return true;
  }
   @Override
   public boolean isReleasable() {
     return false;
  }
}
}

上述代码创建了一个ForkJoinPool.Managed Blocker的实现类HttpRequester,在其中放置阻塞的HTTP调用。该接口定义了两个方法-block()和is Releasable()。block()方法是我们进行阻塞调用的地方。在完成阻塞操作之后,我们返回true,指示不再需要进一步的阻塞。我们从is Releasable()实现中返回false,以向fork-join工作线程指示block()方法实现实际上可能在阻塞。is Releasable()实现将在调用block()方法之前先由fork-join工作线程调用。最后,我们通过调用Fork Join Pool.managed Block()静态方法将Http Requester实例提交到池中。之后,我们的阻止任务将开始执行。当它阻止HTTP请求时,Fork Join Pool.managed Block()方法还将安排在必要时激活备用线程,以确保足够的并行性。

测试代码如下所示:

public class ResponseLengthCalculatorTest {
 @Test
 public void shouldReturnEmptyMapForEmptyList() {
   ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(Collections.emptyList());
   ForkJoinPool pool = new ForkJoinPool();
   pool.submit(responseLengthCalculator);
   Map<String, Integer> result = responseLengthCalculator.join();
   assertThat(result).isEmpty();
}
 @Test
 public void shouldHandle200Ok() {
   ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of(
       "http://huclele.ml/200"
  ));
   ForkJoinPool pool = new ForkJoinPool();
   pool.submit(responseLengthCalculator);
   Map<String, Integer> result = responseLengthCalculator.join();
   assertThat(result)
      .hasSize(1)
      .containsKeys("http://huclele.ml/200")
      .containsValue(0);
}
 @Test
 public void shouldFetchResponseForDifferentResponseStatus() {
   ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of(
       "http://huclele.ml/200",
       "http://huclele.ml/302",
       "http://huclele.ml/404",
       "http://huclele.ml/502"
  ));
   ForkJoinPool pool = new ForkJoinPool();
   pool.submit(responseLengthCalculator);
   Map<String, Integer> result = responseLengthCalculator.join();
   assertThat(result)
      .hasSize(4);
}
}


总结

今天,聊了Fork/Join的用法,以及两个实例,还有更多玩法,可以学习。有什么问题,留言给我o~

目录
相关文章
|
7天前
|
存储 缓存 Java
java语言后台管理ruoyi后台管理框架-登录提示“无效的会话,或者会话已过期,请重新登录。”-扩展知识数据库中密码加密的方法-问题如何解决-以及如何重置若依后台管理框架admin密码-优雅草卓伊凡
java语言后台管理ruoyi后台管理框架-登录提示“无效的会话,或者会话已过期,请重新登录。”-扩展知识数据库中密码加密的方法-问题如何解决-以及如何重置若依后台管理框架admin密码-优雅草卓伊凡
30 3
java语言后台管理ruoyi后台管理框架-登录提示“无效的会话,或者会话已过期,请重新登录。”-扩展知识数据库中密码加密的方法-问题如何解决-以及如何重置若依后台管理框架admin密码-优雅草卓伊凡
|
1月前
|
并行计算 算法 Java
Java中的Fork/Join框架详解
Fork/Join框架是Java并行计算的强大工具,尤其适用于需要将任务分解为子任务的场景。通过正确使用Fork/Join框架,可以显著提升应用程序的性能和响应速度。在实际应用中,应结合具体需求选择合适的任务拆分策略,以最大化并行计算的效率。
51 23
|
2月前
|
存储 安全 Java
Java 集合框架中的老炮与新秀:HashTable 和 HashMap 谁更胜一筹?
嗨,大家好,我是技术伙伴小米。今天通过讲故事的方式,详细介绍 Java 中 HashMap 和 HashTable 的区别。从版本、线程安全、null 值支持、性能及迭代器行为等方面对比,帮助你轻松应对面试中的经典问题。HashMap 更高效灵活,适合单线程或需手动处理线程安全的场景;HashTable 较古老,线程安全但性能不佳。现代项目推荐使用 ConcurrentHashMap。关注我的公众号“软件求生”,获取更多技术干货!
53 3
|
3月前
|
存储 缓存 安全
Java 集合框架优化:从基础到高级应用
《Java集合框架优化:从基础到高级应用》深入解析Java集合框架的核心原理与优化技巧,涵盖列表、集合、映射等常用数据结构,结合实际案例,指导开发者高效使用和优化Java集合。
72 4
|
3月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
3月前
|
消息中间件 Java Kafka
在Java中实现分布式事务的常用框架和方法
总之,选择合适的分布式事务框架和方法需要综合考虑业务需求、性能、复杂度等因素。不同的框架和方法都有其特点和适用场景,需要根据具体情况进行评估和选择。同时,随着技术的不断发展,分布式事务的解决方案也在不断更新和完善,以更好地满足业务的需求。你还可以进一步深入研究和了解这些框架和方法,以便在实际应用中更好地实现分布式事务管理。
|
3月前
|
开发框架 Java 关系型数据库
Java哪个框架适合开发API接口?
在快速发展的软件开发领域,API接口连接了不同的系统和服务。Java作为成熟的编程语言,其生态系统中出现了许多API开发框架。Magic-API因其独特优势和强大功能,成为Java开发者优选的API开发框架。本文将从核心优势、实际应用价值及未来展望等方面,深入探讨Magic-API为何值得选择。
133 2
|
3月前
|
Java 数据库连接 API
Spring 框架的介绍(Java EE 学习笔记02)
Spring是一个由Rod Johnson开发的轻量级Java SE/EE一站式开源框架,旨在解决Java EE应用中的多种问题。它采用非侵入式设计,通过IoC和AOP技术简化了Java应用的开发流程,降低了组件间的耦合度,支持事务管理和多种框架的无缝集成,极大提升了开发效率和代码质量。Spring 5引入了响应式编程等新特性,进一步增强了框架的功能性和灵活性。
77 0
|
7月前
|
SQL Java 数据库连接
Java面试题:简述ORM框架(如Hibernate、MyBatis)的工作原理及其优缺点。
Java面试题:简述ORM框架(如Hibernate、MyBatis)的工作原理及其优缺点。
108 0
|
7月前
|
存储 安全 Java
Java面试题:请解释Java中的泛型集合框架?以及泛型的经典应用案例
Java面试题:请解释Java中的泛型集合框架?以及泛型的经典应用案例
79 0

热门文章

最新文章