Java并发编程学习系列五:函数式接口、Stream流等

简介: Java并发编程学习系列五:函数式接口、Stream流等

四大函数接口


什么是函数式接口?


有且只有一个抽象方法的接口被称为函数式接口,函数式接口适用于函数式编程的场景,Lambda 就是 Java 中函数式编程的体现,可以使用Lambda表达式创建一个函数式接口的对象,一定要确保接口中有且只有一个抽象方法,这样Lambda才能顺利的进行推导。


函数式接口里除了抽象方法之外,还允许包含默认方法和静态方法。


@FunctionalInterface注解


与@Override 注解的作用类似,Java 8中专门为函数式接口引入了一个新的注解:@FunctionalInterface 。该注解用于编译级错误检查,加上该注解,当你写的接口不符合函数式接口定义的时候,编译器会报错。 。但是这个注解不是必须的,只要符合函数式接口的定义,那么这个接口就是函数式接口。


java.util.function 包下定义了内置核心四大函数式接口,可以使用 lambda 表达式。


1.jpg


关于这四个接口的介绍如下图所示:


2.jpg


Function


函数型接口,有一个输入,有一个输出。


3.jpg


public static void main(String[] args) {
//        Function function = new Function<String, Integer>() {
//            @Override
//            public Integer apply(String s) {
//                return s.length();
//            }
//        };
        //使用lambda表达式
        Function<String, Integer> function = s -> {
            return s.length();
        };
        System.out.println(function.apply("xxx"));
    }
复制代码


Predicate


断定型接口,有一个输入参数,返回只有布尔值。


4.jpg


public static void main(String[] args) {
        //判断字符串是否为空,空返回true
//        Predicate predicate = new Predicate<String>() {
//            @Override
//            public boolean test(String s) {
//                return s.isEmpty();
//            }
//        };
        Predicate<String> predicate = str ->{return str.isEmpty();};
        System.out.println(predicate.test("ff"));
    }
复制代码


Consumer


消费型接口,有一个输入参数,没有返回值。


5.jpg


public static void main(String[] args) {
//        Consumer<String> consumer = new Consumer<String>() {
//            @Override
//            public void accept(String s) {
//                System.out.println(s);
//            }
//        };
        Consumer<String> consumer = Str ->{System.out.println(Str);};
        consumer.accept("fjdskf");
    }
复制代码


Supplier


供给型接口,没有输入参数,只有返回参数。


6.jpg


public static void main(String[] args) {
//        Supplier<String> supplier = new Supplier<String>() {
//            @Override
//            public String get() {
//                return "hresh";
//            }
//        };
        Supplier<String> supplier = () -> {
            return "hresh";
        };
        System.out.println(supplier.get());
    }
复制代码


Stream流式计算


官网文档定义如下:


7.jpg


关于流的方法可以去官网看详细介绍。


流(Stream)到底是什么呢?


是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。


集合存储数据,流讲的是计算!


特点:


  • Stream 自己不会存储元素。
  • Stream 不会改变源对象,相反,他们会返回一个持有结果的新Stream。
  • Stream 操作是延迟执行的。这意味着他们会等到需要结果的时候才执行。

8.jpg


案例测试


1、新建一个实体类 User


@Data
@AllArgsConstructor
public class User {
    private int id;
    private String name;
    private int age;
}
复制代码


2、流式计算


/**
 * 题目:请按照给出数据,找出同时满足以下条件的用户
 * 也即以下条件:
 * 1、全部满足偶数ID
 * 2、年龄大于24
 * 3、用户名转为大写
 * 4、用户名字母倒排序
 * 5、只输出一个用户名字 limit
 */
public class Test {
    public static void main(String[] args) {
        User u1 = new User(1,"a",22);
        User u2 = new User(2,"b",23);
        User u3 = new User(3,"c",24);
        User u4 = new User(4,"d",25);
        User u5 = new User(6,"e",26);
        List<User> list = Arrays.asList(u1,u2,u3,u4,u5);
        list.stream().filter(u->{return u.getAge()>23;})
                .filter(u->{return u.getId() %2 ==0;})
                .map(u->{return u.getName().toUpperCase();})
                .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
                .limit(1)
                .forEach(System.out::println);
        List<Integer> list2 = null;
        list2 = list.stream().map(u -> {return u.getAge()+2;}).collect(Collectors.toList());
        list2.forEach(System.out::println);
    }
}
复制代码


使用流式计算,代码看起来更加简洁,效率相应也会有所提升。                  

分支合并


什么是ForkJoin


从 JDK1.7开始,Java 提供 Fork/Join 框架用于并行执行任务。ForkJoin 的框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。相应的 ForkJoin 将复杂的计算当做一个任务。而分解的多个计算则是当做一个子任务。


主要有两步:

  1. 任务切分;
  2. 结果合并


9.jpg


它的模型大致是这样的:线程池中的每个线程都有自己的工作队列(PS:这一点和 ThreadPoolExecutor 不同,ThreadPoolExecutor 是所有线程共用一个工作队列,所有线程都从这个工作队列中取任务),当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。


工作窃取


另外,forkjoin 有一个工作窃取的概念。简单理解,就是一个工作线程下会维护一个包含多个子任务的双端队列。而对于每个工作线程来说,会从头部到尾部依次执行任务。这时,总会有一些线程执行的速度较快,很快就把所有任务执行完了。空闲下来的线程不会闲置下来,而是随机选择一个其他的线程从队列的尾巴上“偷走”一个任务。这个过程会一直继续下去,知道所有的任务都执行完毕。


工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:


10.jpg


工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。


核心类


ForkJoinPool


11.jpg


在官方文档中有如下定义:


12.jpg


ForkJoinPool 执行任务的线程池,继承了 AbstractExecutorService 类,该线程池是通过DefaultForkJoinWorkerThreadFactory 或者 InnoCuousForkJoinWorkerThreadFactory 线程工厂产生的工作线程 。


ForkJoinPool 主要通过 executeinvokesubmit 这三个方法来处理任务  ForkJoinTask 。查看方法详细介绍可知:execute  方法异步执行给定任务,无返回值;invoke 方法执行给定的任务,在完成后返回其结果,结果类型与 ForkJoinTask 中的 V 类型一致;submit 方法执行任务 ForkJoinTask 并返回一个结果任务 ForkJoinTask


查看上述三个方法,实质上都执行的是 externalPush 方法,在该方法中有个任务队列 WorkQueue,它是 ForkJoinPool 的内部类, WorkQueue 中有执行任务的线程(ForkJoinWorkerThreadowner),还有这个线程需要处理的任务(ForkJoinTask<?>[] array),新提交的任务就是加到 array 中。


ForkJoinWorkerThread


执行任务的工作线程,即 ForkJoinPool 线程池里面的线程,每个线程都维护者一个双端队列 WorkQueue,用于存放内部任务。


ForkJoinTask


13.jpg


ForkJoinTask 代表运行在 ForkJoinPool 中的任务。主要方法:


  • fork()    在当前线程运行的线程池中安排一个异步执行。简单的理解就是再创建一个子任务。
  • join()    当任务完成的时候返回计算结果。
  • invoke()    开始执行任务,如果必要,等待计算完成。

子类: Recursive:递归

  • RecursiveAction    一个递归无结果的 ForkJoinTask(没有返回值)
  • RecursiveTask    一个递归有结果的 ForkJoinTask(有返回值)


代码测试


RecursiveTask 实现类


public class ForkJoinDemo extends RecursiveTask<Long> {
    private Long start; //起始值
    private Long end;   //结束值
    public static final Long temp = 10000L;//临界值
    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Long compute() {
        Long length = end - start;
        //判断是否拆分完毕
        if(length <= temp){
            Long sum = 0L;
            //如果拆分完毕就相加
            for (Long i = start; i <= end; i++) {
                sum+=i;
            }
            return sum;
        }else{
            Long middle = (start+end)/2;
            ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
            task1.fork();//拆分,并压入线程队列
            ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
            task2.fork();
            //合并结果
            return task1.join()+task2.join();
        }
    }
}
复制代码


测试代码


public class ForkJoinTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Long start = 0L;
        Long end = 1000000000L;//10亿
        work1(start,end);   //5687
//        work2(start,end);   //4360
//        work3(start,end);   //195
    }
    //普通线程计算
    public static void work1(Long start,Long end){
        long startTime = System.currentTimeMillis();
        Long sum=0L;
        for (Long i = start;  i<= end; i++) {
            sum+=i;
        }
        long endTime = System.currentTimeMillis();
        System.out.println("result="+sum+",time="+(endTime-startTime));
    }
    //ForkJoin实现
    public static void work2(Long start,Long end) throws ExecutionException, InterruptedException {
        long startTime = System.currentTimeMillis();
        Long sum=0L;
        ForkJoinPool pool = new ForkJoinPool();//实现ForkJoin 就必须有ForkJoinPool的支持
        ForkJoinTask task = new ForkJoinDemo(start,end);
//        ForkJoinTask result = pool.submit(task);
//        sum = (Long) task.get();
        sum = (Long) pool.invoke(task);
        long endTime = System.currentTimeMillis();
        System.out.println("result="+sum+",time="+(endTime-startTime));
    }
    //并行流进行大数值运算
    public static void work3(Long start,Long end) {
        long startTime = System.currentTimeMillis();
        Long sum=0L;
        sum = LongStream.rangeClosed(start,end).parallel().reduce(0,Long::sum);
        long endTime = System.currentTimeMillis();
        System.out.println("result="+sum+",time="+(endTime-startTime));
    }
}
复制代码


异步回调

前言


我们前面讲并发编程一直都着重于多线程同步调用,除了同步线程,还存在异步线程。在此之前我们来回顾一下同步和异步的定义。


同步:就是当任务A依赖于任务B的执行时,必须等待任务B执行完毕之后任务A才继续执行,此过程任务A被阻塞。任务要么都成功,要么都失败!想一想我们打电话的情景即可! 异步:任务A调用任务B,任务A不需要等到任务B执行完毕,任务B只是返回一个虚拟的结果给任务A,使得任务A能够继续做其他事情,等到任务B执行完成之后再通知任务A(回调)或者是任务A主动去请求任务B要结果。


Future 模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。(因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果)


14.jpg


上图简单描述了普通模式和使用Future的区别,普通模式下,客户端访问服务端,等待结果返回非常耗时,此时客户端只能等待无法去做其他任务。而 Future 模式下,客户端向服务端发送完请求之后,先得到一个虚拟结果,真实的结果在未来某个时刻完成之后返回给客户端,而客户端在此期间可以去做其他任务。


Future的优点:比更底层的 Thread 更易用。要使用 Future,通常只需要将耗时的操作封装在一个 Callable 对象中,再将它提交给 ExecutorService


ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
复制代码


当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。


一个Future接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(可能会等待)
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。


使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。


代码测试


CompletableFuture可以指定异步处理流程:

  • runAsync()返回无结果的CompletableFuture
  • supplyAsync()返回无结果的CompletableFuture
  • whenComplete()处理正常和异常结果;
  • thenAccept()处理正常结果;
  • exceptional()处理异常结果;
  • thenApplyAsync()用于串行化另一个CompletableFuture
  • anyOf()allOf()用于并行化多个CompletableFuture


CompletableFuture.runAsync()


返回一个CompletableFuture,它需要一个实现了Runnable接口的对象 ,无返回值(此处说的无返回值指的是 CompletableFuture)。


public static void main(String[] args) throws ExecutionException, InterruptedException {
        //没有返回值的异步回调
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        });
        System.out.println("主线程优先执行");
        completableFuture.get();
    }
复制代码


执行结果为:


主线程优先执行
ForkJoinPool.commonPool-worker-1
复制代码


CompletableFuture.supplyAsync()

返回一个CompletableFuture,它需要一个实现了Supplier接口的对象 ,有返回值。


public class CompletableFutureTest {
    public static void main(String[] args) throws InterruptedException {
        //创建一个CompletableFuture
        CompletableFuture<Double> cfture = CompletableFuture.supplyAsync(CompletableFutureTest::fetchPrice);//lambda语法简化方法调用
//        cfture.thenAccept(result ->{// 如果执行成功
//            System.out.println(result);
//        }).exceptionally(e ->{// 如果执行异常
//            e.printStackTrace();
//            return null;
//        });
        cfture.whenComplete((r1,r2) ->{
            System.out.println("执行结果为:"+r1); //输出执行成功的结果
            System.out.println("异常信息:"+r2); //输出异常信息
        }).exceptionally(e ->{// 如果执行异常
            e.printStackTrace();
            return null;
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭
        TimeUnit.SECONDS.sleep(2);
        System.out.println("主线程执行完毕");
    }
    static Double fetchPrice() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        if (Math.random() < 0.3) {
            throw new RuntimeException("fetch price failed!");
        }
        return 5 + Math.random() * 20;
    }
}
复制代码


无异常时结果为:


执行结果为:6.110276836465158
异常信息:null
主线程执行完毕
复制代码


抛出异常结果为:


15.jpg


相比FutureCompletableFuture更强大的功能是,多个CompletableFuture可以串行执行。


public class CompletableFutureTest {
    public static void main(String[] args) throws Exception {
        // 第一个任务:
        CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油");
        });
        cfQuery.thenAccept((result) -> {
            System.out.println("query result: " + result);
        });
        // cfQuery成功后继续执行下一个任务:
        CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice(code);
        });
        // cfFetch成功后打印结果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        TimeUnit.SECONDS.sleep(2);
    }
    static String queryCode(String name) {
        try {
            TimeUnit.MILLISECONDS.sleep(200);
        } catch (InterruptedException e) {
        }
        return name;
    }
    static Double fetchPrice(String code) {
        try {
            TimeUnit.MILLISECONDS.sleep(600);
        } catch (InterruptedException e) {
        }
        return 5 + Math.random() * 20;
    }
}
复制代码


除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们考虑这样的场景:


同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作。


public class CompletableFutureTest {
    public static void main(String[] args) throws Exception {
        // 两个CompletableFuture执行异步查询:
        CompletableFuture<String> cfQueryFromBing = CompletableFuture.supplyAsync(() -> {
            return queryName("hresh", "https://cn.bing.com/");
        });
        CompletableFuture<String> cfQueryFromBaidu = CompletableFuture.supplyAsync(() -> {
            return queryName("hresh2", "https://cn.baidu.com/");
        });
        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromBing, cfQueryFromBaidu);
        // 并行执行结果可能是两个CompletableFuture中任意一个的返回结果
        cfQuery.thenAccept((result) -> {
            System.out.println("name: " + result);
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(200);
    }
    static String queryName(String name, String url) {
        System.out.println("query name from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
        }
        return name;
    }
}


目录
相关文章
|
11天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
16天前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
|
10天前
|
Java 开发者
Java多线程编程的艺术与实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的技术文档,本文以实战为导向,通过生动的实例和详尽的代码解析,引领读者领略多线程编程的魅力,掌握其在提升应用性能、优化资源利用方面的关键作用。无论你是Java初学者还是有一定经验的开发者,本文都将为你打开多线程编程的新视角。 ####
|
9天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
7天前
|
Java
在Java中,接口之间可以继承吗?
接口继承是一种重要的机制,它允许一个接口从另一个或多个接口继承方法和常量。
30 1
|
12天前
|
安全 Java 开发者
Java多线程编程中的常见问题与解决方案
本文深入探讨了Java多线程编程中常见的问题,包括线程安全问题、死锁、竞态条件等,并提供了相应的解决策略。文章首先介绍了多线程的基础知识,随后详细分析了每个问题的产生原因和典型场景,最后提出了实用的解决方案,旨在帮助开发者提高多线程程序的稳定性和性能。
|
14天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
17天前
|
存储 缓存 安全
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见。本文介绍了使用 `File.createTempFile` 方法和自定义创建临时文件的两种方式,详细探讨了它们的使用场景和注意事项,包括数据缓存、文件上传下载和日志记录等。强调了清理临时文件、确保文件名唯一性和合理设置文件权限的重要性。
40 2
|
2天前
|
Java API 数据库
Java 反射机制:动态编程的 “魔法钥匙”
Java反射机制是允许程序在运行时访问类、方法和字段信息的强大工具,被誉为动态编程的“魔法钥匙”。通过反射,开发者可以创建更加灵活、可扩展的应用程序。
|
Java
Java接口和抽象类
Java接口和抽象类
90 0