👨🏻🎓博主介绍:大家好,我是芝士味的椒盐,一名在校大学生,热爱分享知识,很高兴在这里认识大家🌟
🌈擅长领域:Java、大数据、运维、电子
🙏🏻如果本文章各位小伙伴们有帮助的话,🍭关注+👍🏻点赞+🗣评论+📦收藏,相应的有空了我也会回访,互助!!!
🤝另本人水平有限,旨在创作简单易懂的文章,在文章描述时如有错,恳请各位大佬指正,在此感谢!!!
@[TOC]
ForkJoin是什么
什么是 ForkJoin
- ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。大数据量!
- ForkJoin处理流程:工作窃取
Java API
试验代码:
MyForkJoinTask:
package icu.lookyousmileface.forkjoin; import java.util.concurrent.RecursiveTask; /** * @author starrysky * @title: MyForkJoinTask * @projectName Juc_Pro * @description: ForkJon,必须要继承RecursiceTask * * 求和计算的任务! * * 3000 6000(ForkJoin) 9000(Stream并行流) * * // 如何使用 forkjoin * * // 1、forkjoinPool 通过它来执行 * * // 2、计算任务 forkjoinPool.execute(ForkJoinTask task) * * // 3. 计算类要继承 ForkJoinTask * @date 2021/1/301:11 上午 */ class MyForkJoinTask extends RecursiveTask<Long> { //开始和结束位置数 private Long start; private Long end; //临界值 private Long temp = 10000L; public MyForkJoinTask(Long start, Long end) { this.start = start; this.end = end; } //计算方法 @Override protected Long compute() { //小于临界值就进行计算不拆分 if ((end-start)<temp){ Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; }else { //取中位数 Long mdie = (start+end)/2; MyForkJoinTask task1 = new MyForkJoinTask(start, mdie); //拆分任务,把任务压入线程队列 task1.fork(); MyForkJoinTask task2 = new MyForkJoinTask(mdie + 1, end); //拆分任务,把任务压入线程队列 task2.fork(); //结果汇聚 return task1.join()+task2.join(); } } }
MainTask:
package icu.lookyousmileface.forkjoin; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; /** * @author starrysky * @title: MainTask * @projectName Juc_Pro * @description: ForkJoin主任务 * @date 2021/1/301:31 上午 */ public class MainTask { public static void main(String[] args) throws ExecutionException, InterruptedException { /** * 使用ForkJoin,适合大数据量 */ //创建forkjoin池 // ForkJoinPool forkJoinPool = new ForkJoinPool(); // //创建自己的ForkJoin计算程序 // ForkJoinTask forkJoinTask = new MyForkJoinTask(0L, 10_0000_0000L); // //提交计算任务 // ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinTask); // //获得计算的结果 // Long aLong = submit.get(); // System.out.println(aLong); /** * 使用stream并行流,非常快 */ long result = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum); System.out.println(result); } }
异步回调
试验代码:
package icu.lookyousmileface.completables; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** * @author starrysky * @title: CompletableUse * @projectName Juc_Pro * @description: CompletableFuture * * 异步调用: CompletableFuture * * // 异步执行 * * // 成功回调 * * // 失败回调 * @date 2021/1/302:13 上午 */ public class CompletableUse { public static void main(String[] args) throws ExecutionException, InterruptedException { // //没有返回值的异步回调 // CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{ // try { // TimeUnit.SECONDS.sleep(3); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println(" 异步任务执行成功!"); // }); // System.out.println("main主线程"); // //获取异步执行的结果 // completableFuture.get(); //又返回值的异步回调 CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()+":supplyAsync=>ok"); int sum = 10/0; return 1024; }); //编译 //编译成功 System.out.println(completableFuture.whenComplete((u1,u2)->{ System.out.println("t=>"+u1);//正常的返回结果 System.out.println("u=>"+u2);//错误信息 //编译失败 }).exceptionally((e)->{ e.printStackTrace(); return 2233; //错误的返回结果 }).get()); } }