Java SDK 并发包全面总结(三)

简介: Java 并发包中的 Lock 和 Condition 主要解决的是线程的互斥和同步问题,这两者的配合使用,相当于 synchronized、wait()、notify() 的使用。

七、ThreadPoolExecutor


1. 线程池的工作原理

由于线程是一种重量级对象,频繁的创建和销毁比较消耗系统资源,因此线程池的优势就显现出来了。线程池可有降低资源消耗,因为不用频繁创建和销毁线程;提高响应速度,需要执行任务时,可直接使用线程池中的线程资源;还能够有效的管理、监控线程池中的线程。

Java 中的线程池的实现是一种很典型的生产者-消费者模式,使用线程的一方是生产者,主要提供需要执行的任务,线程池是消费者,消费生产者提供的任务。

下面这段代码能够帮助理解线程池的实现原理(仅用于帮助理解,实际执行结果有出入):

public class ThreadPool {
    //保存任务的阻塞队列
    private BlockingQueue<Runnable> workQueue;
    //保存工作线程的列表
    private List<WorkThread> threadList = new ArrayList<>();
    //构造方法
    public ThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        //根据poolSize的数量创建工作线程,并执行线程
        for (int i = 0; i < poolSize; i++) {
            WorkThread thread = new WorkThread();
            thread.start();
            threadList.add(thread);
        }
    }
    //执行任务的方法,主要是将任务添加到队列中
    public void execute(Runnable task) {
        try {
            workQueue.put(task);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //工作线程
    class WorkThread extends Thread{
        @Override
        public void run() {
            //循环取出任务执行
            while (!workQueue.isEmpty()) {
                try {
                    Runnable task = workQueue.take();
                    task.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

上面的代码注释很详细了,主要是使用了一个阻塞队列,用来存储生产者的任务。然后在构造器中创建线程,并循环从队列中取出任务执行。

2. Java 中的线程池

Java 中提供了 Executors 这个类来快速创建线程池,简单使用示例如下:

Executors.newSingleThreadExecutor();//创建一个线程的线程池

Executors.newFixedThreadPool(5);//创建固定数量线程

Executors.newCachedThreadPool();//创建可调整数量的线程

Executors.newScheduledThreadPool(5);//创建定时任务线程池

但是在《阿里巴巴Java开发手册》中,明确禁止使用 Executors 创建线程池(甚至也不建议使用 Thread 显式创建线程),主要原因是 Executors 的默认方法都是使用的无界队列,在高负载的情况下,很容易导致 OOM(Out Of Memory)。

所以在 Java 中创建线程池的正确姿势是使用 ThreadPoolExecutor ,其构造函数有七个:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,//可选
                          RejectedExecutionHandler handler//可选
                          ) { ...
  • corePoolSize:线程池中最少的线程数
  • maximumPoolSize:线程池中创建的最大的线程数
  • keepAliveTime:表示线程池中线程的活跃时间,如果线程在这个活跃时间内没有执行任务,并且线程数量超过了 corePoolSize,那么线程池就会回收多余的线程。
  • TimeUnit:上一个参数的时间单位
  • workQueue:保存任务的队列,为了避免 OOM,建议使用有界队列
  • threadFactory:可选参数,不传的话就是默认值。也可以自己传一个实现了 ThreadFactory 接口的类,表示自定义线程,例如给线程指定名字,线程组等。
  • handler:可选参数。定义任务的拒绝策略,表示无空闲线程时,并且队列中的任务满了的,怎么拒绝新的任务。目前的拒绝策略有四种:
  • AbortPolicy:默认的拒绝策略,抛出 RejectedExecutionException 异常
  • CallerRunsPolicy:让提交任务的线程自己去执行这个任务
  • DiscardOldestPolicy:丢弃最老的任务,及最先加入队列中的任务,并添加新的任务
  • DiscardPolicy:直接丢弃任务,并且不会抛出任何异常

调用 ThreadPoolExecutor

线程池创建好了之后,就需要执行任务,ThreadPoolExecutor 提供了两个方法,一是 execute,二是 submit。execute 没有返回值,也就是说无法获取执行结果。使用示例如下:

public static void main(String[] args) {
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
    threadPool.execute(() -> {
        System.out.println("In this world");
    });
    threadPool.shutdown();
}

而 submit 方法有一个 Future 接口的返回值,Future 接口有五个方法:

  • cancle:取消任务
  • isCancled:任务是否已取消
  • isDone:任务是否已执行完
  • get:获取任务执行结果
  • get(long timeout, TimeUnit unit):支持超时获取任务执行结果

下面代码展示了取消任务的方法:

public static void main(String[] args) {
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
    Future<?> future = threadPool.submit(() -> {
        System.out.println("I am roseduan");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    future.cancel(false);
    threadPool.shutdown();
}

程序的本意是打印语句然后休眠 5 秒,但由于调用了 cancle 方法 ,因此程序直接结束,不会有任何输出。


八、FutureTask


FutureTask 也是一个支持获取任务执行结果的工具类,FutureTask 实现了 Runnable 和 Future 接口。

所以可以将 FutureTask 作为任务提交给 ThreadPoolExecutor 或者 Thread 执行,并且可以获取执行结果。简单的使用如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    //创建任务
    FutureTask<String> task = new FutureTask<>(() -> "Java and " + "Python");
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
    threadPool.execute(task);
    //获取执行结果
    System.out.println(task.get());
    threadPool.shutdown();
}
传给 Thread 作为参数的使用示例如下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> task = new FutureTask<>(() -> 1 + 2);
        Thread thread = new Thread(task);
        thread.start();
        System.out.println(task.get());//输出3
    }


九、CompletableFuture


CompletableFuture 是一个异步编程的工具类,异步化能够最大化并行程序的执行,是多线程性能优化的基础。


1. 创建 CompletableFuture 对象

Completable 有四个静态方法,可以用来创建对象:

runAsync(Runnable runnable);//无返回值

runAsync(Runnable runnable, Executor executor);//无返回值,可指定线程池


supplyAsync(Supplier supplier);//有返回值

supplyAsync(Supplier supplier, Executor executor);//有返回值,可指定线程池

可以看到,四个方法分为了是否有返回值,和是否自定义线程池。如果不自定义线程池,那么 CompletableFuture 会使用公共的线程池,默认创建 CPU 核数的数量的线程池,当有多个任务的时候,还是建议根据每个任务自定义线程池。

一个简单的使用示例如下,其中 task3 会等待两个任务都执行完毕:

public static void main(String[] args) {
    CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
        System.out.println("任务1执行完毕");
    });
    CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务2执行完毕");
    });
    CompletableFuture<String> task3 = task1.thenCombine(task2, (__, res) -> "两个任务执行完毕");
    System.out.println(task3.join());


CompletableFuture 实现了 Future 接口,因此可以查看任务执行的情况,并且可以获取返回值。


2. CompletionStage 接口中的方法

CompletableFuture 还实现了 CompletionStage 接口。这个接口描述了任务之间的时序关系,分别有串行、并行、聚合三种关系。需要注意的是,并行本就是其所具有的特性,所以不再探讨了,并且聚合关系又分为了 AND 聚合关系和 OR 聚合关系。下面依次介绍串行、AND 聚合、OR 聚合这三种关系。

首先是串行关系,串行很简单,一个任务执行完后再执行另一个任务,例如下图:

描述串行关系的几个方法是:thenApply、thenAccept、thenRun、thenCompose。

thenApply 既支持接收参数,又能够支持返回值。

thenAccept 支持接收参数,但是不支持返回值。

thenRun 既不能接收参数,也不能有返回值。

CompletionStage 中的大部分方法都有带有 Async 后缀的方法,表示可能会使用其他的线程来执行主体中的内容,后面介绍的方法都类似这样,不再赘述。

简单的使用示例如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务1执行完毕");
        return "Task1";
    }).thenApply((s) -> "接收到的参数 : " + s);;
    System.out.println(future.get());
}

其次是 AND 汇聚关系,典型的场景便是一个线程等待两个线程都执行完后再执行,例如下图:

描述 AND 聚合关系的有三个方法:thenCombine、thenAcceptBoth、runAfterBoth,其是否接收参数和支持返回值,和上面的三个方法对应。一个简单的使用示例如下:

public static void main(String[] args) {
    CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务1执行完毕");
        return "task1";
    });
    CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2执行完毕");
        return "task2";
    });
    CompletableFuture<String> task3 = task1.thenCombine(task2, (r,s) -> r + " " + s);
    System.out.println(task3.join());
}

任务 1 休眠了 2 秒,任务 3 会等待前面两个任务执行完成之后再执行。

最后是 OR 聚合关系,表示线程等待其中一个线程满足条件之后,就可以继续执行了,不用等待全部的线程。

描述 OR 聚合关系的是 applyToEither、acceptEither、runAfterEither。使用示例和上面的类似,只需要将方法改一下就是了,这里不再赘述了。


3. 处理异常

在异步编程中,CompletionStage 接口还提供了几个可以处理异常的方法,和 try() catch() finally() 类似。

这几个方法分别是 :

  • exceptionally:相当于 catch
  • whenComplete:相当于 finally
  • handle:相当于 finally ,支持返回值

使用示例如下:

public static void main(String[] args) {
    CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
        String str = null;
        return str.length();
        //相当于catch
    }).exceptionally((e) -> {
        System.out.println("发生异常");
        return 0;
    });
    //相当于 finally
    task.whenComplete((s, r) -> {
        System.out.println("执行结束");
    });
    System.out.println(task.join());
}


十、CompletionService


CompletionService 是一个批量执行异步任务的工具类,先来看一个例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    StringBuffer sb = new StringBuffer();
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, 5,
            10, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5));
    Future<String> task1 = threadPool.submit(() -> {
        Thread.sleep(2000);
        return "Task1";
    });
    Future<String> task2 = threadPool.submit(() -> "Task2");
    Future<String> task3 = threadPool.submit(() -> "Task3");
    sb.append(task1.get());
    sb.append(task2.get());
    sb.append(task3.get());
}

程序的意思是,依次执行三个任务,并将其结果存储到 StringBuffer 中,由于 task1 休眠了 2 秒,所以 sb 会在这里阻塞。

由于这三个任务之间没有关联,所以等待的消耗完全是没必要的,解决的办法便是利用一个阻塞队列,先执行完的任务将结果保存在队列中,sb 从队列中取出就行了。

CompletionService 实际上就是将线程池和阻塞队列的功能整合了起来,解决了类似上面的问题。CompletionService 的实现类是 ExecutorCompletionService,这个类有两个构造方法:

public ExecutorCompletionService(Executor executor) {}
public ExecutorCompletionService(Executor executor,
    BlockingQueue<Future<V>> completionQueue) {}

如果不传一个阻塞队列,则会使用默认的无界队列。

CompletionService 主要有这几个方法:

submit() 提交任务、take() 从阻塞队列中获取执行结果(如果队列为空,线程阻塞)、poll() 也是从队列中获取执行结果(如果队列为空,则返回 null),另外 poll 还支持超时获取。

使用 CompletionService 改造后的程序示例如下:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    StringBuffer sb = new StringBuffer();
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, 5,
            10, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5));
    CompletionService<String> service = new ExecutorCompletionService<>(threadPool);
    service.submit(() -> {
        Thread.sleep(2000);
        return "Task1";
    });
    service.submit(() -> "Task2");
    service.submit(() -> "Task3");
    System.out.println(sb.append(service.take().get()).toString());
    System.out.println(sb.append(service.take().get()).toString());
    System.out.println(sb.append(service.take().get()).toString());
}


十一、Fork/Join


1. Fork/Join 使用

Fork/Join 是一个处理分治任务的计算框架,所谓分治,即分而治之,将一个任务分解成子任务,求解子任务,然后将子任务的结果合并,就得到了最后的结果。分治思想的应用十分的广泛,例如常见的快速排序、归并排序,还有流行的大数据计算框架 MapReduce,都应用了分治思想。

Java 中,Fork 对应的是 任务分解,Join 则表示 子任务的结果合并。

Fork/Join 主要包含两个主要的实现类:

  • 一是线程池 ForkJoinPool,默认会创建 CPU核数数量的线程
  • 二是ForkJoinTask,这是一个抽象类,主要的方法有 fork() 和 join(),前者表示执行子任务,后者表示阻塞等待子任务的执行结果。ForkJoinTask 还有两个子类:
  • RecursiveTask
  • RecursiveAction

这两个类也是抽象的,我们需要自定义并继承这个类,并覆盖其 compute 方法。其中 RecursiveTask 有返回值,而 RecursiveAction 没有返回值。

下面是一个使用 ForkJoin 的示例,实现了 n 的阶乘,注释写得比较详细。

public class ForkJoinTest {
    public static void main(String[] args) {
        //创建线程池
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        //创建任务
        Factorial task = new Factorial(6);
        //invoke 方法执行任务(还可以使用 execute、submit),得到执行的结果
        Integer res = forkJoinPool.invoke(task);
        System.out.println(res);
    }
    static class Factorial extends RecursiveTask<Integer> {
        private final int n;
        Factorial(int n) {
            this.n = n;
        }
        @Override
        protected Integer compute() {
            if (n == 0){
                return 1;
            }
            Factorial f = new Factorial(n - 1);
            //执行子任务
            f.fork();
            //等待子任务结果
            return n * factorial.join();
        }
    }
}


2. ForkJoinPool 原理

和普通的线程池类似,ForkJoinPool 是一个特殊的线程池,并且也采用的是生产者 - 消费者模式。跟普通线程池共享一个队列不同,ForkJoinPool 其中维护了多个双端队列,当一个线程对应的任务队列为空的时候,线程并不会空闲,而是“窃取”其他队列的任务执行。

由于是双端队列,正常执行任务和“窃取任务”可以从两端进行出队,这样避免了数据竞争。

采用“任务窃取”这种模式,也是 ForkJoinPool 比普通线程池更加智能的体现。

相关文章
|
2月前
|
Java API 开发工具
百宝箱开放平台 ✖️ Java SDK
百宝箱提供Java SDK,支持开发者集成其开放能力。需先发布应用,准备Java 8+及Maven环境,通过添加依赖安装SDK,并初始化客户端调用对话型或生成型智能体,实现会话管理、消息查询与文件上传等功能。
1327 0
百宝箱开放平台 ✖️ Java SDK
|
6月前
|
存储 Java API
MinIO Java SDK 7.1.4 升级到 8.5.17 需要注意什么
现在我需要你帮我分析对比这个两个sdk在对外的接口设计上是否有不兼容的变更
526 5
|
Java Apache 开发工具
【Azure 事件中心】 org.slf4j.Logger 收集 Event Hub SDK(Java) 输出日志并以文件形式保存
【Azure 事件中心】 org.slf4j.Logger 收集 Event Hub SDK(Java) 输出日志并以文件形式保存
230 1
|
存储 Java API
【Azure 存储服务】Java Storage SDK 调用 uploadWithResponse 代码示例(询问ChatGTP得代码原型后人力验证)
【Azure 存储服务】Java Storage SDK 调用 uploadWithResponse 代码示例(询问ChatGTP得代码原型后人力验证)
164 0
|
Java 开发工具
通过Java SDK调用阿里云模型服务
在阿里云平台上,可以通过创建应用并使用模型服务完成特定任务,如生成文章内容。本示例展示了一段简化的Java代码,演示了如何调用阿里云模型服务生成关于“春秋战国经济与文化”的简短文章。示例代码通过设置系统角色为历史学家,并提出文章生成需求,最终处理并输出生成的文章内容。在实际部署前,请确保正确配置环境变量中的密钥和ID,并根据需要调整SDK导入语句及类名。更多详情和示例,请参考相关链接。
|
JSON Java API
【Azure API 管理】通过Java APIM SDK创建一个新的API,如何为Reqeust的Representation设置一个内容示例(Sample)?
【Azure API 管理】通过Java APIM SDK创建一个新的API,如何为Reqeust的Representation设置一个内容示例(Sample)?
130 0
|
存储 Java 开发工具
【Azure 存储服务】Java Azure Storage SDK V12使用Endpoint连接Blob Service遇见 The Azure Storage endpoint url is malformed
【Azure 存储服务】Java Azure Storage SDK V12使用Endpoint连接Blob Service遇见 The Azure Storage endpoint url is malformed
192 0
|
机器学习/深度学习 编解码 Java
阿里云视觉智能开放平台(VIAPI)人脸美颜Java SDK使用说明
本文介绍人脸美颜FaceBeauty的语法及示例。
1656 0
阿里云视觉智能开放平台(VIAPI)人脸美颜Java SDK使用说明
|
Java 开发工具 计算机视觉
阿里云智能视觉生产图像处理人像分割Java SDK使用说明
人像分割用于识别输入图像中的人体轮廓,与背景进行分离,返回分割后的前景人像图(4通道),适用于单人、多人、复杂背景、各类人体姿态等场景。本文介绍如何使用阿里云智能视觉生产图像处理人体分割Java SDK,包括SDK的安装方法及SDK代码示例。
2991 1
|
自然语言处理 安全 Java
阿里云智能语音交互实时语音识别Java SDK使用说明
实时语音识别功能提供了对长时间的语音数据流进行识别,适用于会议演讲、视频直播等长时间不间断识别的场景。。本文介绍如何使用阿里云智能语音服务提供的Java SDK,包括SDK的安装方法及SDK代码示例。
3849 0