面试官都爱问的并发知识

简介: 面试官都爱问的并发知识

CompletableFuture和CompletionService可以让我们更好的编写并发代码,更专注于编写我们自己的业务逻辑,今天让我们一起来学习一下吧

一.CompletionService

为什么需要completionService呢,假如有这样一个需求,并发请求3个接口,得到某一个接口的结果后,就可以执行下一步业务,不需要等待另外两个请求的返回。

方案一:使用Future+线程池 + 阻塞队列实现

需要把Future.get通过线程池放到LinkedBlockingQueue阻塞队列里,还是有点复杂的。

public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
        for(int i=0;i<3;i++) {
            int finalI = i;
            Future<String> future = executorService.submit(() -> {
                //請求接口1
                try {
                    Thread.sleep(new Random().nextInt(10));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "接口"+ finalI;
            });
            executorService.execute(()-> {
                try {
                    blockingQueue.put(future.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
        }
//优先获取先返回的結果,做后续逻辑
 System.out.println(blockingQueue.take());

方案二:通过completionService实现

使用completionService,可以降低代码的复杂度,使得批量执行任务管理更加简单,底层其实也是线程池+LinkedBlockingQueue实现

public static void main(String[] args) throws InterruptedException {  
ExecutorService executorService = Executors.newFixedThreadPool(10);
        CompletionService<String> completionService =  new ExecutorCompletionService(executorService);
        for (int i = 0; i < 3; i++) {
            int finalI = i;
            completionService.submit(() -> {
                //請求接口1
                try {
                    Thread.sleep(new Random().nextInt(10));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "接口" + finalI;
            });
        }
       //优先获取先返回的結果,做后续逻辑
        try {
            System.out.println(completionService.take().get());
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
}

怎么创建completionService,通过ExecutorCompletionService创建

//传入线程池,不传入队列,默认使用LinkedBlockingQueue无界队列,存放future返回的结果
ExecutorCompletionService(Executor executor);
//传入线程池+自定义队列
ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue)。

怎么使用completionService,通过submit方法提交任务,通过take或者poll方法获取结果

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() 
  throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) 
  throws InterruptedException;

二.CompletableFuture

有这么一个需求,泡茶例子:有三个线程,线程1负责烧开水,线程2洗茶具,线程3负责泡茶(需要等待线程1和线程2完成),下面代码是用CompletableFuture实现,是不是比较简单,基本只需要写业务代码就可以了,他们怎么合作协调的完全不用关。

//任务1:烧开水
        CompletableFuture<Boolean> f1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("T1:烧开水..." + ":" + Thread.currentThread().getName());
            sleep(15000);
            return true;
        });
        //任务2:洗茶具
        CompletableFuture<Boolean> f2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("T2:洗茶具..." + ":" + Thread.currentThread().getName());
            sleep(1000);
            return true;
        });
        //任务3:任务1和任务2完成后执行:泡茶
        CompletableFuture<String> f3 = f1.thenCombine(f2, (t1, t2) -> {
            if (t1 && t2) {
                System.out.println("T3:泡茶..." + ":" + Thread.currentThread().getName());
            }
            return "上茶";
        });
        //等待任务3执行结果
        System.out.println(f3.join());

让我们一起来学习一下怎么使用CompletableFuture吧

怎么创建CompletableFuture对象

分成两个方法run和supply,run方法没有返回值,supply方法可以通过get获取返回值,不传入线程池使用默认线程池ForkJoinPool,所有CompletableFuture都会公用同一个线程池,避免有些任务是比较慢的操作,影响到其他任务,所以建议不同的业务使用不同的线程池。

//使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//可以指定线程池  
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

CompletableFuture的任务组合功能

CompletableFuture实现了CompletionStage接口,提供了组合任务的功能,可以讲多个任务串行化执行,多个任务结果聚合等功能

1. 描述串行关系

带Async的方法会异步执行fn,action,consumer这些函数,相反不带Async则是同步执行

Apply:可以有入参和返回值  ;Accept可以支持入参;Run:不支持入参和返回值

CompletionStage thenApply(fn);
CompletionStage thenApplyAsync(fn);
CompletionStage thenAccept(consumer);
CompletionStage thenAcceptAsync(consumer);
CompletionStage thenRun(action);
CompletionStage thenRunAsync(action);
CompletionStage thenCompose(fn);
CompletionStage thenComposeAsync(fn);

举个例子:下面的代码是先返回我是java,再拼接上小面,最后转成大小字母返回。

CompletableFuture f0 = CompletableFuture.supplyAsync( () -> "我是java") //①
.thenApply(s -> s + " 小面") //②
.thenApply(String::toUpperCase);//③
System.out.println(f0.join());

2. 描述 AND 汇聚关系

同样的方法带Async的是异步执行,否则是同步执行

thenCombine:得到两个任务的结果,作为fn函数的输入值,有返回值

AcceptBoth:得到两个任务的结果,作为consumer函数的输入值,没有返回值

runAfterBoth:得到两个任务的结果,没有入参和返回值

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

例子可以看上面泡茶的例子使用了thenCombine

3. 描述 OR 汇聚关系

同样的方法带Async的是异步执行,否则是同步执行

applyToEither:得到任意一个任务的结果,作为fn函数的输入值,有返回值

acceptEither:得到任意一个任务的结果,作为consumer函数的输入值,没有返回值

runAfterEither:得到任意一个任务的结果,没有入参和返回值

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

下面例子演示了怎么使用applyToEither

CompletableFuture<String> f1 =
                CompletableFuture.supplyAsync(()->{
                    int t = new Random().nextInt(10);
                    sleep(t);
                    return "小面1摸鱼了:"+ String.valueOf(t) + "ms";
                });
CompletableFuture<String> f2 =
                CompletableFuture.supplyAsync(()->{
                    int t = new Random().nextInt(10);
                    sleep(t);
                    return "小面2摸鱼了:"+ String.valueOf(t) + "ms";
});
CompletableFuture<String> f3 =
                f1.applyToEither(f2,s -> s);
System.out.println(f3.join());

4.异常处理

当我们使用CompletableFuture编写异步代码时,出现运行时错误时,我们怎么知道呢?

CompletionStage接口也是很好的支持了异常处理的功能

exceptionally:出现异常就会调用

whenComplete和handle:类似于try finally 的finally方法,无论是否报错都会执行

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

下面的例子:supplyAsync发生了一次,exceptionally就会执行,最后执行whenComplete方法

CompletableFuture<Integer> f0 = CompletableFuture.supplyAsync(() -> (7 / 0))
                .thenApply(r -> r * 10)
                .exceptionally(e -> 0)
                .whenComplete((a, t) -> System.out.println(a));
        System.out.println(f0.join());

三.小结

我们来总结一下,我们学习了CompletionService 和CompletableFuture的创建和常用的方法,以及在并发编程中的运用,你学会了吗?

相关文章
|
5天前
|
缓存 安全 算法
Java面试题:如何通过JVM参数调整GC行为以优化应用性能?如何使用synchronized和volatile关键字解决并发问题?如何使用ConcurrentHashMap实现线程安全的缓存?
Java面试题:如何通过JVM参数调整GC行为以优化应用性能?如何使用synchronized和volatile关键字解决并发问题?如何使用ConcurrentHashMap实现线程安全的缓存?
8 0
|
5天前
|
设计模式 安全 Java
Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
17 1
|
5天前
|
安全 算法 Java
Java面试题:如何使用并发集合,例如ConcurrentHashMap?
Java面试题:如何使用并发集合,例如ConcurrentHashMap?
15 1
|
5天前
|
设计模式 安全 Java
Java面试题:请解释Java中的线程池以及为什么要使用线程池?请解释Java中的内存模型以及如何避免内存泄漏?请解释Java中的并发工具包以及如何实现一个简单的线程安全队列?
Java面试题:请解释Java中的线程池以及为什么要使用线程池?请解释Java中的内存模型以及如何避免内存泄漏?请解释Java中的并发工具包以及如何实现一个简单的线程安全队列?
10 1
|
5天前
|
设计模式 缓存 安全
Java面试题:工厂模式与内存泄漏防范?线程安全与volatile关键字的适用性?并发集合与线程池管理问题
Java面试题:工厂模式与内存泄漏防范?线程安全与volatile关键字的适用性?并发集合与线程池管理问题
11 1
|
5天前
|
设计模式 存储 缓存
Java面试题:结合设计模式与并发工具包实现高效缓存;多线程与内存管理优化实践;并发框架与设计模式在复杂系统中的应用
Java面试题:结合设计模式与并发工具包实现高效缓存;多线程与内存管理优化实践;并发框架与设计模式在复杂系统中的应用
8 0
|
5天前
|
设计模式 安全 NoSQL
Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
9 0
|
5天前
|
设计模式 并行计算 安全
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
10 0
|
5天前
|
设计模式 存储 缓存
Java面试题:结合单例模式与Java内存模型,设计一个线程安全的单例类?使用内存屏障与Java并发工具类,实现一个高效的并发缓存系统?结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:结合单例模式与Java内存模型,设计一个线程安全的单例类?使用内存屏障与Java并发工具类,实现一个高效的并发缓存系统?结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
10 0
|
5天前
|
设计模式 缓存 安全
Java面试题:详解单例模式与内存泄漏?内存模型与volatile关键字的实操?并发工具包与并发框架的应用实例
Java面试题:详解单例模式与内存泄漏?内存模型与volatile关键字的实操?并发工具包与并发框架的应用实例
7 0