JUC系列(七) ForkJion任务拆分与异步回调

简介: ForkJion任务拆分与异步回调 也是业务中的常客了
📣 📣 📣 📢📢📢
☀️☀️你好啊!小伙伴,我是小冷。是一个兴趣驱动自学练习两年半的的Java工程师。
📒 一位十分喜欢将知识分享出来的Java博主⭐️⭐️⭐️,擅长使用Java技术开发web项目和工具
📒 文章内容丰富:覆盖大部分java必学技术栈,前端,计算机基础,容器等方面的文章
📒 如果你也对Java感兴趣,关注小冷吧,一起探索Java技术的生态与进步,一起讨论Java技术的使用与学习
✏️高质量技术专栏专栏链接: 微服务netty单点登录SSMSpringCloudAlibaba
😝公众号😝想全栈的小冷,分享一些技术上的文章,以及解决问题的经验
当前专栏JUC系列

ForkJion

什么是ForkJoin

ForkJoin 下 JDK 1.7 并行执行任务的,数量越大,效率越高

比如 :大数据 Map Reduce(把大任务拆分成小任务)

image-20220304004113183

ForkJoin 特点: 工作窃取

举例子:

PS: 维护的是双端队列 Deuue

A线程执行任务到 第二个

B线程执行完毕,那么B线程回去讲A线程的东西拿来执行,从而提高效率

image-20220304004235249

认识forkjion

ForkJoin 使用两个类来完成以上两件事情:

  • ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:

    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务。
  • ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

image-20220304005022113

代码实例

task 类 里面编写的是我们继承了 递归任务继承的实现方法
public class forkjoinDemo extends RecursiveTask<Long> {
    /* 解决方案 也是有三六九等的,比如案例 求和
     * 最低等 就是直接for循环求和
     * 中等 使用forkjion
     * 高等 stream 并行流
     * */
//开始
    private long start;
    //结束
    private long end;
    //到多少值,才开始分开任务
    private long threshold = 10000L;

    public forkjoinDemo(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        //判断超过阈值的时候 开始使用 fork join
        if (end - start > threshold) {
            long sum = 0L;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            //    求出中间值
            long mid = (start - end) / 2;
            forkjoinDemo task1 = new forkjoinDemo(start, mid);

            //拆分任务,把任务压入线程队列
            task1.fork();
            forkjoinDemo task2 = new forkjoinDemo(mid + 1, end);
            task2.fork();
            return task1.join() + task2.join();
        }
    }
}
测试类

三种方法的速度

public class test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //test1(); 7042;
        //test2(); 969
        //test3(); 179;
    }

    public static void test1() {
        Long sum = 0L;
        long start = System.currentTimeMillis();
        for (long i = 1L; i <= 10_0000_0000; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println("sum" + sum + "=> 执行时间" + (end - start));
    }


    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new forkjoinDemo(0L, 10_0000_0000L);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        Long sum = submit.get();
        long end = System.currentTimeMillis();
        System.out.println("sum" + sum + "=> 执行时间" + (end - start));
    }

    public static void test3() {

        long start = System.currentTimeMillis();
        //并行流
        long reduce = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
        long end = System.currentTimeMillis();
        System.out.println("sum" + reduce + "=> 执行时间" + (end - start));
    }
}

异步回调

什么是future

常见的两种创建线程的方式。一种是直接继承Thread,另外一种就是实现Runnable接口。

这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。

从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。(因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果)

image-20220304014056949

上图简单描述了不使用Future和使用Future的区别,不使用Future模式,主线程在invoke完一些耗时逻辑之后需要等待,这个耗时逻辑在实际应用中可能是一次RPC调用,可能是一个本地IO操作等。B图表达的是使用Future模式之后,我们主线程在invoke之后可以立即返回,去做其他的事情,回头再来看看刚才提交的invoke有没有结果。

Future接口的局限性

当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值,注意我加粗的地方,Future的get() 方法会阻塞主线程。即使我们使用isDone()方法轮询去查看线程执行状态,但是这样也非常浪费cpu资源。

image-20220304014316398

我们需要新的,更强大的拓展,CompletableFuture

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。

通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。

实例化:

有两种格式,一种是supply开头的方法,一种是run开头的方法

  • supply开头:这种方法,可以返回异步线程执行之后的结果
  • run开头:这种不会返回结果,就只是执行线程任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

获取结果

同步获取结果

public T    get()
public T    get(long timeout, TimeUnit unit)
public T    getNow(T valueIfAbsent)
public T    join()

简单的例子

CompletableFuture<Integer> future = new CompletableFuture<>();
Integer integer = future.get();

get() 方法同样会阻塞直到任务完成,上面的代码,主线程会一直阻塞,因为这种方式创建的future从未完成。有兴趣的小伙伴可以打个断点看看,状态会一直是not completed

代码使用案例
 public static void main(String[] args) throws ExecutionException, InterruptedException {
        ////没有返回值的异步回调, runAsync
        //CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
        //    System.out.println(Thread.currentThread().getName() + "runAsync=> Void");
        //});
        //System.out.println("1111");
        ////获取执行结果
        //completableFuture.get();
     
     
        //    有返回值的
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "runAsync=>integer");
            int i = 10 / 0;
            return 1024;
        });
        completableFuture.whenComplete((t, u) -> {
            //t是正常的返回结果
            //u是返回报错信息
            System.out.println("t=>" + t);
            System.out.println("u=>" + u);
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 233;
        }).get();
    }
相关文章
|
前端开发
异步转同步的几种方法
在循环等待中,我们可以使用一个变量来指示异步操作是否已完成。然后,我们可以在循环中检查该变量,如果它指示异步操作已完成,则退出循环。
552 0
|
7天前
|
前端开发 JavaScript
如何使用 Promise 处理异步并发操作?
通过使用 `Promise.all()` 和 `Promise.race()` 方法,可以灵活地处理各种异步并发操作,根据不同的业务需求选择合适的方法来提高代码的性能和效率,同时也使异步代码的逻辑更加清晰和易于维护。
|
3月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
5月前
|
Java
java线程之异步回调
java线程之异步回调
|
6月前
|
Python
同步和异步的区别
同步和异步的区别
|
JavaScript 前端开发 UED
同步和异步区别
同步和异步区别
151 0
|
6月前
同步和异步的区别?
同步和异步的区别?
160 0
|
6月前
|
前端开发 JavaScript
同步和异步有什么区别
同步和异步有什么区别
218 0
同步和异步[多线程的异步执行操作]
同步和异步[多线程的异步执行操作]
52 0
3 # 通过回调函数处理异步并发问题
3 # 通过回调函数处理异步并发问题
47 0