多种方式实现 Future 回调返回结果

简介: 多种方式实现 Future 回调返回结果

JDK  实现

public class FutureTest { 
    
    public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newFixedThreadPool(10);
        Future<Integer> f = es.submit(() ->{
            Thread.sleep(5000);
            // 结果
            return 100;
        });
        Integer result = f.get();
        System.out.println(result);
        
//        也可以轮询等结束
//        while (f.isDone()) {
//            System.out.println(result);
//        }
    }
}

public class FutureTest {

   

   public static void main(String[] args) throws Exception {

       ExecutorService es = Executors.newFixedThreadPool(10);

       Future<Integer> f = es.submit(() ->{

           Thread.sleep(5000);

           // 结果

           return 100;

       });


       Integer result = f.get();

       System.out.println(result);

       

//        也可以轮询等结束

//        while (f.isDone()) {

//            System.out.println(result);

//        }

   }


}

虽然这些方法提供了异步执行任务的能力,但是对于结果的获取却还是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。

阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时的得到计算结果。

Java的一些框架,

Netty,自己扩展了Java的 Future 接口,提供了 addListener 等多个扩展方法。

Google的guava也提供了通用的扩展Future:ListenableFuture 、 SettableFuture 以及辅助类 Futures 等,方便异步编程。

Java 在JDK1.8 这个版本中增加了一个能力更强的Future类:CompletableFuture 。它提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果。下面来看看这几种方式。

Netty-Future

引入Maven依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.50.Final</version>
</dependency>
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.3.7</version>
</dependency>

 

package com.vipsoft;
import cn.hutool.core.date.DateUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
public class FutureTest {
    public static void main(String[] args) throws InterruptedException {
        EventExecutorGroup group = new DefaultEventExecutorGroup(4);
        System.out.println("开始:" + DateUtil.now());
        Future<Integer> f = group.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("开始耗时计算:" + DateUtil.now());
                Thread.sleep(5000);
                System.out.println("结束耗时计算:" + DateUtil.now());
                int a = 0;
                int b = 1;
                int c = b / a;
                return 100;
            }
        });
        //通过监听,待线程结束后,自动触发,避免了主线程 的阻塞和等待
        f.addListener(new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> objectFuture) throws Exception {
                System.out.println("计算结果:" + objectFuture.get());
            }
        });
        System.out.println("结束:" + DateUtil.now());
        // 不让守护线程退出
        new CountDownLatch(1).await();
    }
}

在Listener添加成功之后,会立即检查状态,如果任务已经完成立刻进行回调,通过监听,待线程结束后,自动触发,避免了主线程 的阻塞和等待

Guava-Future

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>29.0-jre</version>
</dependency>
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.3.7</version>
</dependency>

 

package com.vipsoft;
import cn.hutool.core.date.DateUtil;
import com.google.common.util.concurrent.*;
import javax.annotation.Nullable;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FutureTest {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("开始:" + DateUtil.now());
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
        ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("开始耗时计算:" + DateUtil.now());
                Thread.sleep(5000);
                System.out.println("结束耗时计算:" + DateUtil.now());
                return 100;
            }
        });
        //增加回调函数,一般用于不在乎执行结果的地方
        future.addListener(new Runnable() {
            @Override
            public void run() {
                System.out.println("调用成功--不关心结果");
            }
        }, executorService);
        //通过addCallback 获得结果
        Futures.addCallback(future, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(@Nullable Integer result) {
                System.out.println("成功,计算结果:" + result);
            }
            @Override
            public void onFailure(Throwable t) {
                System.out.println("失败");
            }
        }, executorService);
        System.out.println("结束:" + DateUtil.now());
        new CountDownLatch(1).await();
    }
}

CompletableFuture 使用

package com.vipsoft;
import cn.hutool.core.date.DateUtil;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
public class FutureTest {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("开始:" + DateUtil.now());
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始耗时计算:" + DateUtil.now());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("结束耗时计算:" + DateUtil.now());
            return 100;
        });
        //使用 thenCompose 或者 thenComposeAsync 等方法可以实现回调的回调,且写出来的方法易于维护。
        completableFuture = completableFuture.thenCompose(i -> {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("在回调的回调中执行耗时操作...");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return i + 200;
            });
        });
        completableFuture.whenComplete((result, e) -> {
            System.out.println("回调结果:" + result);
        });
        System.out.println("结束:" + DateUtil.now());
        new CountDownLatch(1).await();
    }
}

JDK1.8 已经提供了一种更为高级的回调方式:CompletableFuture,不需要引入任何第三方的依赖,为Future模式增加回调功能就不需要阻塞等待结果的返回并且不需要消耗无谓的CPU资源去轮询处理状态,JDK8之前使用Netty或者Guava提供的工具类,JDK8之后则可以使用自带的 CompletableFuture 类。Future 有两种模式:将来式和回调式。而回调式会出现回调地狱的问题,由此衍生出了 Promise 模式来解决这个问题。这才是 Future 模式和 Promise 模式的相关性。

目录
相关文章
|
7月前
|
前端开发 JavaScript
Promise 等待多个接口返回数据再执行操作
Promise 等待多个接口返回数据再执行操作
69 0
|
消息中间件
celery--调用异步任务的三种方法和task参数
celery--调用异步任务的三种方法和task参数
|
前端开发
手写promise自定义封装异步任务回调的执行
手写promise自定义封装异步任务回调的执行
|
Java
Java多线程Future与CompletableFuture-异步获取接口返回结果
当调用一些耗时接口时,如果我们一直在原地等待方法返回,整体程序的运行效率会大大降低。可以把调用的过程放到子线程去执行,再通过 Future 去控制子线程的调用过程,最后获取到调用结果,来提高整个程序的运行效率。
2487 0
|
前端开发
手写promise异步状态修改then方法返回来的结果
手写promise异步状态修改then方法返回来的结果
手写promise异步状态修改then方法返回来的结果
|
C# C++
c#调用c++带有回调函数方法的实现
在c++中有个回调函数指针的概念,只需要某个函数在调用定时器函数时传入一个函数指针就能达到目的,但C#中没有函数指针的概念,我们该怎样来实现呢。 其实说到回调函数,大家应该能想到c#中的委托,虽然名字不一样,但在各自的语言范畴都能实现相似的功能。
1571 0
|
Android开发 开发者
【Android 异步操作】FutureTask 分析 ( Future 接口解析 | Runnable 接口解析 | Callable 接口解析 )
【Android 异步操作】FutureTask 分析 ( Future 接口解析 | Runnable 接口解析 | Callable 接口解析 )
214 0
CompletableFuture的applyToEitherAsync:最快返回输出的线程结果作为下一次任务的输入
CompletableFuture的applyToEitherAsync:最快返回输出的线程结果作为下一次任务的输入 applyToEitherAsync和附录1的acceptEitherAsync类似,只是说acceptEitherAsync是对结果的消费,而applyToEitherAsync则是把最快返回的计算输出结果,再利用起来作为下一次线程任务的输入。
1781 0
|
6月前
|
前端开发 JavaScript Java
java实现异步回调返回给前端
综上,Java中实现异步回调并将结果返回给前端是一项涉及后端异步处理和前端交互的综合任务。在实际项目中,开发人员需要根据应用需求和性能预期选择合适的异步模型与工具,并进行适当的配置和优化。
285 3

热门文章

最新文章