多线程之 completableFuture

简介: Callable与Runnable的功能大致相似,但是call()函数有返回值. Callable一般是和ExecutorService配合来使用的

先谈谈Future


        Callable与Runnable的功能大致相似,但是call()函数有返回值. Callable一般是和ExecutorService配合来使用的

 

       Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成在Future接口中声明了5个方法

 

  •     cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
  •     isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  •    isDone方法表示任务是否已经完成,若任务完成,则返回true;
  •    get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  •    get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

 

   也就是说Future提供了三种功能:

   1)判断任务是否完成;

   2)能够中断任务;

   3)能够获取任务执行结果。


       因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了FutureTask。


   来两个demo:


 public static void futureDemo1() throws ExecutionException, InterruptedException {
    ThreadPoolExecutor pool = CommonThreadPool.getPool();
    Future<Integer> f = pool.submit(() -> {
      // 长时间的异步计算
      Thread.sleep(2000);
      // 然后返回结果
      return 100;
    });
    while (!f.isDone()) {
      System.out.println(System.currentTimeMillis() + " 还没结束");
    }
    //结束后,获取结果
    System.out.println(f.get());
  }


Future只实现了异步,而没有实现回调,主线程get时会阻塞,可以轮询以便获取异步调用是否完成。在实际的使用中建议使用Guava ListenableFuture 来实现异步非阻塞,目的就是多任务异步执行,通过回调的方式来获取执行结果而不需轮询任务状态。


 public static void futureDemo2() {
    ListeningExecutorService executorService = MoreExecutors
        .listeningDecorator(CommonThreadPool.getPool());
    IntStream.rangeClosed(1, 10).forEach(i -> {
      ListenableFuture<Integer> listenableFuture = executorService
          .submit(() -> {
            // 长时间的异步计算
            // Thread.sleep(3000);
            // 然后返回结果
            return 100;
          });
      Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
        @Override
        public void onSuccess(Integer result) {
          System.out.println("get listenable future's result with callback " + result);
        }
        @Override
        public void onFailure(Throwable t) {
          t.printStackTrace();
        }
      }, executorService);
    });
  }


CompletableFuture


       Futrue对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能。

       CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。

       CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。


      下面将会一个个的例子来说明CompletableFuture

     

异步执行


/**
   *
   * public static CompletableFuture<Void>   runAsync(Runnable runnable)
   * public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
   * public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
   * public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
   *
   *
   * 以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。
   *
   * runAsync方法也好理解,它以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。
   *
   * supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。
   */
public static void runAsyncExample() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
      System.out.println("异常执行代码");
    });
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
      //长时间的计算任务
      return "·00";
    });
    System.out.println(future.join());
  }


计算结果完成时的处理


 /**
   *
   * 当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:
   *
   * whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T>
   * whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T>
   * whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public
   * CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)
   *
   * 不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。
   * Java的CompletableFuture类总是遵循这样的原则
   *
   * 如果你希望不管 CompletableFuture 运行正常与否 都执行一段代码,如释放资源,更新状态,记录日志等,但是同时不影响原来的执行结果。
   * 那么你可以使用 whenComplete 方法。exceptionally非常类似于 catch(),而 whenComplete 则非常类似于 finally:
   */
  public static void whenComplete() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
      @Override
      public Integer get() {
        return 2323;
      }
    });
    Future<Integer> f = future.whenComplete((v, e) -> {
      System.out.println(v);
      System.out.println(e);
    });
    System.out.println(f.get());
  }


转换


/**
   * 转换
   * @throws ExecutionException
   * @throws InterruptedException
   */
  public static void thenApply() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<String> f =  future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString());
    //"1000"
    System.out.println(f.get());
  }


Action


/**
   * 上面的方法是当计算完成的时候,会生成新的计算结果(thenApply, handle),或者返回同样的计算结果whenComplete
   * CompletableFuture还提供了一种处理结果的方法,只对结果执行Action,而不返回新的计算值,因此计算值为Void:
   *
   * public CompletableFuture<Void>   thenAccept(Consumer<? super T> action)
   * public CompletableFuture<Void>   thenAcceptAsync(Consumer<? super T> action)
   * public CompletableFuture<Void>   thenAcceptAsync(Consumer<? super T> action, Executor executor)
   */
  public static void action() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<Void> f =  future.thenAccept(System.out::println);
    System.out.println(f.get());
  }


thenAccept


 /**
   * thenAcceptBoth以及相关方法提供了类似的功能,当两个CompletionStage都正常完成计算的时候,就会执行提供的action,它用来组合另外一个异步的结果。
   * runAfterBoth是当两个CompletionStage都正常完成计算的时候,执行一个Runnable,这个Runnable并不使用计算的结果。
   *
   * public <U> CompletableFuture<Void>   thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
   * public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
   * public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
   * public     CompletableFuture<Void>   runAfterBoth(CompletionStage<?> other,  Runnable action)
   */
  public static void thenAcceptBoth() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<Void> f =  future.thenAcceptBoth(CompletableFuture.completedFuture(10), (x, y) -> System.out.println(x * y));
    System.out.println(f.get());
  }


thenRun


/**
   * 当计算完成的时候会执行一个Runnable,与thenAccept不同,Runnable并不使用CompletableFuture计算的结果。
   *
   * public CompletableFuture<Void>   thenRun(Runnable action)
   * public CompletableFuture<Void>   thenRunAsync(Runnable action)
   * public CompletableFuture<Void>   thenRunAsync(Runnable action, Executor executor)
   */
  public static void  thenRun() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<Void> f =  future.thenRun(() -> System.out.println("finished"));
    System.out.println(f.get());
  }


复合


/**
   * thenCombine用来复合另外一个CompletionStage的结果。它的功能类似
   *
   * A +
   *   |
   *   +------> C
   *   +------^
   * B +
   *
   * 两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。
   *
   * public <U,V> CompletableFuture<V>   thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
   * public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
   * public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
   *
   * 其实从功能上来讲,它们的功能更类似thenAcceptBoth,只不过thenAcceptBoth是纯消费,它的函数参数没有返回值,而thenCombine的函数参数fn有返回值。
   */
  public static void thenCombine() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
      return "abc";
    });
    CompletableFuture<String> f =  future.thenCombine(future2, (x,y) -> y + "-" + x);
    System.out.println(f.get()); //abc-100
  }


组合


/**
   * 组合
   * 这一组方法接受一个Function作为参数,这个Function的输入是当前的CompletableFuture的计算值,返回结果将是一个新的CompletableFuture,
   * 这个新的CompletableFuture会组合原来的CompletableFuture和函数返回的CompletableFuture。因此它的功能类似: A +--> B +---> C
   *
   * thenCompose返回的对象并不是函数fn返回的对象,如果原来的CompletableFuture还没有计算出来,它就会生成一个新的组合后的CompletableFuture。
   */
  public static void thenCompose() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      return 100;
    });
    CompletableFuture<String> f =  future.thenCompose( i -> {
      return CompletableFuture.supplyAsync(() -> {
        return (i * 10) + "";
      });
    });
    System.out.println(f.get()); //1000
  }


Either


/**
   * Either 语义:表示的是两个CompletableFuture,当其中任意一个CompletableFuture计算完成的时候就会执行。
   *
   * public CompletableFuture<Void>   acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
   * public CompletableFuture<Void>   acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
   * public CompletableFuture<Void>   acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
   * public <U> CompletableFuture<U>   applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
   * public <U> CompletableFuture<U>   applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
   * public <U> CompletableFuture<U>   applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
   *
   *
   * acceptEither方法是当任意一个CompletionStage完成的时候,action这个消费者就会被执行。这个方法返回CompletableFuture<Void>
   *
   * applyToEither方法是当任意一个CompletionStage完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。
   */
  public static void either() {
    Random random = new Random();
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
      try {
        Thread.sleep(random.nextInt(1000));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return "from future1";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
      try {
        Thread.sleep(random.nextInt(1000));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return "from future2";
    });
    CompletableFuture<Void> haha = future1
        .acceptEitherAsync(future2, str -> System.out.println("The future is " + str));
    try {
      System.out.println(haha.get());
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
  }


All 


 /**
   * allOf方法是当所有的CompletableFuture都执行完后执行计算。
   * anyOf接受任意多的CompletableFuture
   *
   * anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果相同。
   */
  public static void allOfAndAnyOf() throws ExecutionException, InterruptedException {
    Random rand = new Random();
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
      try {
        Thread.sleep(10000 + rand.nextInt(1000));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return 100;
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
      try {
        Thread.sleep(10000 + rand.nextInt(1000));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return "abc";
    });
    //CompletableFuture<Void> f =  CompletableFuture.allOf(future1,future2);
    CompletableFuture<Object> f =  CompletableFuture.anyOf(future1,future2);
    System.out.println(f.get());
  }


allOf 如果其中一个失败了如何快速结束所有?


/**
   * allOf 如果其中一个失败了如何快速结束所有?
   *
   * 默认情况下,allOf 会等待所有的任务都完成,即使其中有一个失败了,也不会影响其他任务继续执行。但是大部分情况下,一个任务的失败,往往意味着整个任务的失败,继续执行完剩余的任务意义并不大。
   * 在 谷歌的 Guava 的 allAsList 如果其中某个任务失败整个任务就会取消执行:
   *
   * 一种做法就是对 allOf 数组中的每个 CompletableFuture 的 exceptionally 方法进行捕获处理:如果有异常,那么整个 allOf 就直接抛出那个异常:
   */
  public static void allOfOneFail(){
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
      System.out.println("-- future1 -->");
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      System.out.println("<-- future1 --");
      return "Hello";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
      System.out.println("-- future2 -->");
      try {
        Thread.sleep(2000);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      System.out.println("<-- future2 --");
      throw new RuntimeException("Oops!");
    });
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
      System.out.println("-- future3 -->");
      try {
        Thread.sleep(4000);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      System.out.println("<-- future3 --");
      return "world";
    });
    // CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
    // combinedFuture.join();
    CompletableFuture<Void> allWithFailFast = CompletableFuture.allOf(future1, future2, future3);
    Stream.of(future1, future2, future3).forEach(f -> f.exceptionally(e -> {
      allWithFailFast.completeExceptionally(e);
      return null;
    }));
    allWithFailFast.join();
  }


我自己的一个demo


 /**
   * 假设你有一个集合,需要请求N个接口,接口数据全部返回后进行后续操作。
   */
  public static void myDemo(){
    ArrayList<String> strings = Lists.newArrayList("1", "2", "3", "4");
    CompletableFuture[] cfs = strings.stream()
        .map(s -> CompletableFuture.supplyAsync(() -> {
          return s + " $";
        }).thenAccept(s1 -> {
          System.out.println(s1+ " #");
        }).exceptionally(t -> {
          return null;
        })).toArray(CompletableFuture[]::new);
    // 等待future全部执行完
    CompletableFuture.allOf(cfs).join();
  }
相关文章
Java CompletableFuture:allOf等待所有异步线程任务结束(4)
Java CompletableFuture:allOf等待所有线程任务结束(4) private void method() throws ExecutionException, InterruptedExcept...
6262 0
|
1月前
|
消息中间件 缓存 NoSQL
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
Java多线程实战-CompletableFuture异步编程优化查询接口响应速度
|
8月前
|
存储 Dubbo Java
JUC第二十五讲:JUC线程池-CompletableFuture 实现原理与实践
JUC第二十五讲:JUC线程池-CompletableFuture 实现原理与实践
|
JavaScript 前端开发 Java
JUC-Java多线程Future,CompletableFuture
JUC-Java多线程Future,CompletableFuture
JUC-Java多线程Future,CompletableFuture
|
Java
Java多线程Future与CompletableFuture-异步获取接口返回结果
当调用一些耗时接口时,如果我们一直在原地等待方法返回,整体程序的运行效率会大大降低。可以把调用的过程放到子线程去执行,再通过 Future 去控制子线程的调用过程,最后获取到调用结果,来提高整个程序的运行效率。
2330 0
|
3天前
|
存储 测试技术
【工作实践(多线程)】十个线程任务生成720w测试数据对系统进行性能测试
【工作实践(多线程)】十个线程任务生成720w测试数据对系统进行性能测试
10 0
【工作实践(多线程)】十个线程任务生成720w测试数据对系统进行性能测试
|
4天前
|
数据采集 Java Unix
10-多线程、多进程和线程池编程(2)
10-多线程、多进程和线程池编程
|
4天前
|
安全 Java 调度
10-多线程、多进程和线程池编程(1)
10-多线程、多进程和线程池编程
|
8天前
|
存储 Linux C语言
c++进阶篇——初窥多线程(二) 基于C语言实现的多线程编写
本文介绍了C++中使用C语言的pthread库实现多线程编程。`pthread_create`用于创建新线程,`pthread_self`返回当前线程ID。示例展示了如何创建线程并打印线程ID,强调了线程同步的重要性,如使用`sleep`防止主线程提前结束导致子线程未执行完。`pthread_exit`用于线程退出,`pthread_join`用来等待并回收子线程,`pthread_detach`则分离线程。文中还提到了线程取消功能,通过`pthread_cancel`实现。这些基本操作是理解和使用C/C++多线程的关键。