JUC-Java多线程Future,CompletableFuture

简介: JUC-Java多线程Future,CompletableFuture

多线程相关概念


1把锁:synchronized


2个并:并发(concurrent)在同一实体上的多个事件,在一台处理器上“同时处理多个任务”,同一时刻,其实是只有一个时间在发生


       并行(parallel)在不同实体上的多个时间,在多台处理器上同时处理多个任务,同一时刻,大家都在做事情,你做你的,我做到我的,但是我们都在做


3个程:进程:在系统中运行的一个应用程序就是一个进程,每一个进程都有自己的内存空间和系统资源


        线程:也被成为轻量级进程,在一个进程中会有1个或多个进程,是大多数操作系统进行时序调度的基本单元


        管程:Monitor(监视器),也就是平时所说的锁


Monitor其实就是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。

JVM中同步是基于进入和退出监视器对象(Monitor,管程对象)来实现的,每个对象实例都会有一个Monitor对象

Object o = new Object();
    Thread t1 = new Thread(() -> {
            synchronized (o) {
                System.out.println(1);
            }
        }, "t1");
        t1.start();


用户线程和守护线程


一般情况下不做特别说明配置,默认都是用户线程


用户线程(User Thread): 是系统的工作线程,它会完成这个程序需要完成的业务条件。


守护线程(Daemon Thread):是一种特殊的线程为其它线程服务的,在后台默默地完成一些系统性的服务

守护线程作为一个服务线程,没有服务对象就没有必要继续运行了 ,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出了,所以假如当系统只剩下守护线程的时候,java虚拟机会自动退出。

// 是否是守护线程
t1.isDaemon();
// 设置为守护线程
t1.setDaemon(true);


setDaemon(true)方法必须在start()之前设置


Future接口


Future接口(Future实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。


比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者执行完,过了一会才去获取子任务的执行结果或变更的任务状态。


总结: Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。


FutureTask异步任务


public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask futureTask = new FutureTask(new MyThread());
        Thread t1 = new Thread(futureTask);
        t1.start();
        System.out.println(futureTask.get());
    }
}
class MyThread implements Callable<String>{
    @Override
    public String call() throws Exception {
        System.out.println("come in call----");
        return "hello callable";
    }
}


Future+线程池异步多线程任务配合,能显著提高程序的执行效率。

futureTask.get();
futureTask.isDone();


Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。


CompletableFuture


从jdk1.8开始引入,它是Future的功能增强版,减少阻塞和轮询。可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

ExecutorService threadPool = Executors.newFixedThreadPool(3);
//      CompletableFuture.runAsync() 无返回值
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("come in ---");
            return "hello";
        },threadPool);
        String s = completableFuture.get();
        System.out.println(s);
        threadPool.shutdown();
//这里用自定义线程池,采用默认线程会当作一个守护线程,main方法执行完后future线程还未处理完时会直接关闭
ExecutorService threadPool = Executors.newFixedThreadPool(3);
        try {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("come in ----");
                int i = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("---1秒后出结果:" + i);
                return i;
            }, threadPool).whenComplete((v, e) -> {
                if (e == null) {
                    System.out.println("计算完成:" + v);
                }
            }).exceptionally(e -> {
                System.out.println(e.getCause());
                return null;
            });
            System.out.println(java.lang.Thread.currentThread().getName() + "去忙其他任务了");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }


CompletableFuture join和get区别


在编译时是否报出检查型异常


CompletableFuture的优点


异步任务结束时,会自动回调某个对象的方法

主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行

异步任务出错时,会自动回调某个对象的方法


ps:和javascript回调不能说相似,只能说一模一样,一通百通


Lambda表达式+Stream流式调用+CHain链式调用+Java8函数式编程


Runnable


无参数,无返回值

1673406836245.jpg

Function


Function<T, R> 接收一个参数,并且有返回值

1673406846654.jpg

Consumer


Consumer接收一个参数,并且没有返回值

1673406854757.jpg

BiConsumer


BiConsumer<T, U>接收两个参数(Bi,英文单词词根,代表两个的意思),没有返回值

1673406863413.jpg

Supplier


Supplier供给型函数接口,没有参数,有一个返回值

1673406871086.jpg

Predicate


一般用于做判断,返回boolean类型

1673406879705.jpg

总结

函数时接口名称 方法名称 参数 返回值
Runnable run 无参数 无返回值
Function apply 1个参数 有返回值
Consumer accept 1个参数 无返回值
Supplier get 没有参数 有返回值
BiConsumer accept 2个参数 无返回值
Predicate test 1个参数 有返回值(boolean)


日常工作中如何进行开发?


功能→性能,先完成功能实现,再考虑性能优化


CompletableFuture用例


假设要从多个电商平台查询一件商品的价格,每次查询耗时设定为1秒


普通查询:查询电商平台1→查询电商平台2→查询电商平台3 …


CompletableFuture: 同时异步执行要查询的电商平台

public class CompletableFutureDemo {
  static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao"),
            new NetMall("pdd"),
            new NetMall("tmall")
    );
    /**
     * step by step 一家家搜查
     * List<NetMall> ----->map------> List<String>
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPrice(List<NetMall> list,String productName)
    {
        //《mysql》 in taobao price is 90.43
        return list
                .stream()
                .map(netMall ->
                        String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }
    /**
     * List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName)
    {
        return list.stream().map(netMall ->
                        CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(s -> s.join())
                .collect(Collectors.toList());
    }
    public static void main(String[] args)
    {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        for (String element : list1) {
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");
        System.out.println("--------------------");
        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByCompletableFuture(list, "mysql");
        for (String element : list2) {
            System.out.println(element);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");
    }
}
class NetMall
{
    @Getter
    private String netMallName;
    public NetMall(String netMallName)
    {
        this.netMallName = netMallName;
    }
    public double calcPrice(String productName)
    {
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}


执行效率显著提高

1673406950337.jpg


CompletableFuture常用API


获取结果和触发计算


T get(); 容易造成阻塞,非得拿到结果,否则不往下执行


T get(long timeout, TimeUnit unit); 指定时间后还没拿到结果,直接TimtoutException


T join(); 和get一样,区别在于编译时是否报出检查型异常


T getNow(T valueIfAbsent); 没有计算完成的情况下,返回一个替代结果。立即获取结果不阻塞:计算完,返回计算完成后的结果,没计算完:返回设定的valueIfAbsend值


boolean complete(T value); 是否打断get方法立即返回括号值,计算完:不打断,返回计算后的结果,没计算完:打断返回设定的value值


allOf:多个CompletableFuture任务并发执行,所有CompletableFuture任务完成时,返回一个新的CompletableFuture对象,其返回值为Void,也就是无返回值。该方法的应用之一是在继续程序之前等待一组独立的 CompletableFuture 完成,如:CompletableFuture.allOf(c1, c2, c3).join();


anyOf:多个CompletableFuture任务并发执行,只要有一个CompletableFuture任务完成时,就会返回一个新的CompletableFuture对象,并返回该CompletableFuture执行完成任务的返回值。


对计算结果进行处理


thenApply 计算结果存在依赖关系,将两个线程串行化,由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。


handle 计算结果存在依赖关系,这两个线程串行化。有异常也可以往下一步走,根据带的异常参数可以进一步处理


execptionally类似 try/catch


whenCpmplete和handle类似 try/finally,有异常也会往下执行


对计算结果进行消费


thenAccept 顾名思义,消费型接口。接收任务的处理结果,并消费处理,无返回结果。


对比


API
说明
thenRun thenRun(Runnable runnable) 任务A执行完执行B,并且B不需要A的结果
thenAccept thenAccpet(Consumer action) 任务A执行完执行B,B需要A的结果,但是任务B无返回值
thenApply thenApply(Function fn) 任务A执行完执行B,B需要A的结果,同时任务B有返回值
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());


thenRun和thenRunAsync区别?

没有传入自定义线程池,都用默认线程池ForkJoinPool


如果执行第一个任务的时候,传入一个自定义线程池


调用thenRun方法执行第二个任务时,则第二个任务和第一个任务时使用同一个线程池

调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoinpool线程池

备注:有可能处理的太快,系统优化切换原则,直接使用main线程处理


其他如:thenAccept和thenAccpetAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理


对计算速度选用


applyToEither对比任务执行速度

CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            System.out.println("A come in");
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            return "playA";
        });
        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            System.out.println("B come in");
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return "playB";
        });
        CompletableFuture<String> result = playA.applyToEither(playB, f -> {
            return f + " is winer";
        });
        System.out.println(Thread.currentThread().getName()+"\t"+"-----: "+result.join());


对计算结果进行合并


两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理。先完成的先等着,等待其他分支任务。

   public static void main(String[] args) {
        long start = System.currentTimeMillis();
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10;
        });
        CompletableFuture<Integer> future2= CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10;
        });
        CompletableFuture<Integer> future = future1.thenCombine(future2, (x, y) -> {
            System.out.println("-----开始两个结果合并");
            return x + y;
        });
        System.out.println(future.join());
        long end = System.currentTimeMillis();
        System.out.println("耗时:" + (end-start));
    }
相关文章
|
1天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
1天前
|
安全 Java 编译器
Java多线程编程的陷阱与最佳实践####
【10月更文挑战第29天】 本文深入探讨了Java多线程编程中的常见陷阱,如竞态条件、死锁、内存一致性错误等,并通过实例分析揭示了这些陷阱的成因。同时,文章也分享了一系列最佳实践,包括使用volatile关键字、原子类、线程安全集合以及并发框架(如java.util.concurrent包下的工具类),帮助开发者有效避免多线程编程中的问题,提升应用的稳定性和性能。 ####
18 1
|
5天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
6天前
|
Java 开发者
在Java多线程编程的世界里,Lock接口正逐渐成为高手们的首选,取代了传统的synchronized关键字
在Java多线程编程的世界里,Lock接口正逐渐成为高手们的首选,取代了传统的synchronized关键字
31 4
|
6月前
|
安全 Java
深入理解Java并发编程:线程安全与性能优化
【2月更文挑战第22天】在Java并发编程中,线程安全和性能优化是两个重要的主题。本文将深入探讨这两个主题,包括线程安全的基本概念,如何实现线程安全,以及如何在保证线程安全的同时进行性能优化。
55 0
|
6月前
|
存储 安全 Java
深入理解Java并发编程:线程安全与锁机制
【5月更文挑战第31天】在Java并发编程中,线程安全和锁机制是两个核心概念。本文将深入探讨这两个概念,包括它们的定义、实现方式以及在实际开发中的应用。通过对线程安全和锁机制的深入理解,可以帮助我们更好地解决并发编程中的问题,提高程序的性能和稳定性。
|
3月前
|
存储 安全 Java
解锁Java并发编程奥秘:深入剖析Synchronized关键字的同步机制与实现原理,让多线程安全如磐石般稳固!
【8月更文挑战第4天】Java并发编程中,Synchronized关键字是确保多线程环境下数据一致性与线程安全的基础机制。它可通过修饰实例方法、静态方法或代码块来控制对共享资源的独占访问。Synchronized基于Java对象头中的监视器锁实现,通过MonitorEnter/MonitorExit指令管理锁的获取与释放。示例展示了如何使用Synchronized修饰方法以实现线程间的同步,避免数据竞争。掌握其原理对编写高效安全的多线程程序极为关键。
63 1
|
4月前
|
安全 Java 开发者
Java并发编程中的线程安全问题及解决方案探讨
在Java编程中,特别是在并发编程领域,线程安全问题是开发过程中常见且关键的挑战。本文将深入探讨Java中的线程安全性,分析常见的线程安全问题,并介绍相应的解决方案,帮助开发者更好地理解和应对并发环境下的挑战。【7月更文挑战第3天】
89 0
|
5月前
|
安全 Java 开发者
Java并发编程中的线程安全策略
在现代软件开发中,Java语言的并发编程特性使得多线程应用成为可能。然而,随着线程数量的增加,如何确保数据的一致性和系统的稳定性成为开发者面临的挑战。本文将探讨Java并发编程中实现线程安全的几种策略,包括同步机制、volatile关键字的使用、以及java.util.concurrent包提供的工具类,旨在为Java开发者提供一系列实用的方法来应对并发问题。
44 0
|
6月前
|
安全 Java 容器
Java一分钟之-并发编程:线程安全的集合类
【5月更文挑战第19天】Java提供线程安全集合类以解决并发环境中的数据一致性问题。例如,Vector是线程安全但效率低;可以使用Collections.synchronizedXxx将ArrayList或HashMap同步;ConcurrentHashMap是高效线程安全的映射;CopyOnWriteArrayList和CopyOnWriteArraySet适合读多写少场景;LinkedBlockingQueue是生产者-消费者模型中的线程安全队列。注意,过度同步可能影响性能,应尽量减少共享状态并利用并发工具类。
62 2
下一篇
无影云桌面