Java并发JUC(java.util.concurrent)ForkJoin/异步回调

简介: Java并发JUC(java.util.concurrent)ForkJoin/异步回调

在这里插入图片描述

👨🏻‍🎓博主介绍:大家好,我是芝士味的椒盐,一名在校大学生,热爱分享知识,很高兴在这里认识大家🌟
🌈擅长领域:Java、大数据、运维、电子
🙏🏻如果本文章各位小伙伴们有帮助的话,🍭关注+👍🏻点赞+🗣评论+📦收藏,相应的有空了我也会回访,互助!!!
🤝另本人水平有限,旨在创作简单易懂的文章,在文章描述时如有错,恳请各位大佬指正,在此感谢!!!

@[TOC]

ForkJoin是什么

  • 什么是 ForkJoin

    • ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。大数据量!

    在这里插入图片描述

  • ForkJoin处理流程:工作窃取
    在这里插入图片描述

Java API

在这里插入图片描述
在这里插入图片描述

  • 试验代码:

    • MyForkJoinTask:

      package icu.lookyousmileface.forkjoin;
      
      import java.util.concurrent.RecursiveTask;
      
      /**
       * @author starrysky
       * @title: MyForkJoinTask
       * @projectName Juc_Pro
       * @description: ForkJon,必须要继承RecursiceTask
       *  * 求和计算的任务!
       *  * 3000   6000(ForkJoin)  9000(Stream并行流)
       *  * // 如何使用 forkjoin
       *  * // 1、forkjoinPool 通过它来执行
       *  * // 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
       *  * // 3. 计算类要继承 ForkJoinTask
       * @date 2021/1/301:11 上午
       */
      class MyForkJoinTask extends RecursiveTask<Long> {
          //开始和结束位置数
          private Long start;
          private Long end;
          //临界值
          private Long temp = 10000L;
      
          public MyForkJoinTask(Long start, Long end) {
              this.start = start;
              this.end = end;
          }
          //计算方法
          @Override
          protected Long compute() {
              //小于临界值就进行计算不拆分
              if ((end-start)<temp){
                  Long sum = 0L;
                  for (Long i = start; i <= end; i++) {
                      sum += i;
                  }
                  return sum;
              }else {
                  //取中位数
                  Long mdie = (start+end)/2;
                  MyForkJoinTask task1 = new MyForkJoinTask(start, mdie);
                  //拆分任务,把任务压入线程队列
                  task1.fork();
                  MyForkJoinTask task2 = new MyForkJoinTask(mdie + 1, end);
                  //拆分任务,把任务压入线程队列
                  task2.fork();
                  //结果汇聚
                  return task1.join()+task2.join();
              }
          }
      }
    • MainTask:

      package icu.lookyousmileface.forkjoin;
      
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.ForkJoinPool;
      import java.util.concurrent.ForkJoinTask;
      import java.util.stream.LongStream;
      
      /**
       * @author starrysky
       * @title: MainTask
       * @projectName Juc_Pro
       * @description: ForkJoin主任务
       * @date 2021/1/301:31 上午
       */
      public class MainTask {
          public static void main(String[] args) throws ExecutionException, InterruptedException {
              /**
               * 使用ForkJoin,适合大数据量
               */
              //创建forkjoin池
      //        ForkJoinPool forkJoinPool = new ForkJoinPool();
      //        //创建自己的ForkJoin计算程序
      //        ForkJoinTask forkJoinTask = new MyForkJoinTask(0L, 10_0000_0000L);
      //        //提交计算任务
      //        ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinTask);
      //        //获得计算的结果
      //        Long aLong = submit.get();
      //        System.out.println(aLong);
      
              /**
               * 使用stream并行流,非常快
               */
              long result = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
              System.out.println(result);
      
          }
      }

异步回调

在这里插入图片描述

  • 试验代码:

    package icu.lookyousmileface.completables;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author starrysky
     * @title: CompletableUse
     * @projectName Juc_Pro
     * @description: CompletableFuture
     *  * 异步调用: CompletableFuture
     *  * // 异步执行
     *  * // 成功回调
     *  * // 失败回调
     * @date 2021/1/302:13 上午
     */
    public class CompletableUse {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    //        //没有返回值的异步回调
    //        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
    //            try {
    //                TimeUnit.SECONDS.sleep(3);
    //            } catch (InterruptedException e) {
    //                e.printStackTrace();
    //            }
    //            System.out.println(" 异步任务执行成功!");
    //        });
    //        System.out.println("main主线程");
    //        //获取异步执行的结果
    //        completableFuture.get();
    
            //又返回值的异步回调
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName()+":supplyAsync=>ok");
                int sum = 10/0;
                return 1024;
            });
            //编译
            //编译成功
            System.out.println(completableFuture.whenComplete((u1,u2)->{
                System.out.println("t=>"+u1);//正常的返回结果
                System.out.println("u=>"+u2);//错误信息
                //编译失败
            }).exceptionally((e)->{
                e.printStackTrace();
                return 2233; //错误的返回结果
            }).get());
        }
    }
相关文章
|
5月前
|
缓存 安全 Java
JUC系列《深入浅出Java并发容器:CopyOnWriteArrayList全解析》
CopyOnWriteArrayList是Java中基于“写时复制”实现的线程安全List,读操作无锁、性能高,适合读多写少场景,如配置管理、事件监听器等,但频繁写入时因复制开销大需谨慎使用。
|
5月前
|
设计模式 算法 安全
JUC系列之《深入理解AQS:Java并发锁的基石与灵魂 》
本文深入解析Java并发核心组件AQS(AbstractQueuedSynchronizer),从其设计动机、核心思想到源码实现,系统阐述了AQS如何通过state状态、CLH队列和模板方法模式构建通用同步框架,并结合独占与共享模式分析典型应用,最后通过自定义锁的实战案例,帮助读者掌握其原理与最佳实践。
|
前端开发 Java C++
JUC系列之《CompletableFuture:Java异步编程的终极武器》
本文深入解析Java 8引入的CompletableFuture,对比传统Future的局限,详解其非阻塞回调、链式编排、多任务组合及异常处理等核心功能,结合实战示例展示异步编程的最佳实践,助你构建高效、响应式的Java应用。
|
10月前
|
存储 缓存 Java
【高薪程序员必看】万字长文拆解Java并发编程!(5):深入理解JMM:Java内存模型的三大特性与volatile底层原理
JMM,Java Memory Model,Java内存模型,定义了主内存,工作内存,确保Java在不同平台上的正确运行主内存Main Memory:所有线程共享的内存区域,所有的变量都存储在主存中工作内存Working Memory:每个线程拥有自己的工作内存,用于保存变量的副本.线程执行过程中先将主内存中的变量读到工作内存中,对变量进行操作之后再将变量写入主内存,jvm概念说明主内存所有线程共享的内存区域,存储原始变量(堆内存中的对象实例和静态变量)工作内存。
299 0
|
11月前
|
消息中间件 算法 安全
JUC并发—1.Java集合包底层源码剖析
本文主要对JDK中的集合包源码进行了剖析。
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
470 17
|
JavaScript Java 中间件
Java CompletableFuture 异步超时实现探索
本文探讨了在JDK 8中`CompletableFuture`缺乏超时中断任务能力的问题,提出了一种异步超时实现方案,通过自定义工具类模拟JDK 9中`orTimeout`方法的功能,解决了任务超时无法精确控制的问题,适用于多线程并行执行优化场景。
423 0
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
JAVA并发编程系列(13)Future、FutureTask异步小王子
本文详细解析了Future及其相关类FutureTask的工作原理与应用场景。首先介绍了Future的基本概念和接口方法,强调其异步计算特性。接着通过FutureTask实现了一个模拟外卖订单处理的示例,展示了如何并发查询外卖信息并汇总结果。最后深入分析了FutureTask的源码,包括其内部状态转换机制及关键方法的实现原理。通过本文,读者可以全面理解Future在并发编程中的作用及其实现细节。
|
安全 Java API
JAVA并发编程JUC包之CAS原理
在JDK 1.5之后,Java API引入了`java.util.concurrent`包(简称JUC包),提供了多种并发工具类,如原子类`AtomicXX`、线程池`Executors`、信号量`Semaphore`、阻塞队列等。这些工具类简化了并发编程的复杂度。原子类`Atomic`尤其重要,它提供了线程安全的变量更新方法,支持整型、长整型、布尔型、数组及对象属性的原子修改。结合`volatile`关键字,可以实现多线程环境下共享变量的安全修改。