JUC--线程池

简介: 一、线程池的介绍二、线程池的创建三、特殊线程池3.1.CompletionService异步处理3.2.ThreadPoolExecutor3.3 ForkJoinPool

 


       虽然多线程的技术大大帮助了程序运行的效率,但是在太多的线程的创建与销毁下,系统的开销也将会是非常庞大的。所以为了实现线程的可管理性,并且降低开销。则JUC中提供了线程池的概念,以及相关实现方法

一、线程池的介绍

面试题:线程池的实现

创建一个阻塞队列来容纳任务,在第一次执行任务时创建足够多的线程,并处理任务,之后每个工作线程自动从任务队列中获取线程,直到任务队列中任务为0为止,此时线程处于等待状态,一旦有工作任务加入任务队列中,即刻唤醒工作线程进行处理,实现线程的可复用性。

创建四种线程池的方法

方法名 描述
newCachedThreadPool() 创建一个可以根据需要自动调整线程数量的线程池。
newFixedThreadPool(int nThreads) 创建具有固定数量线程的线程池。
newSingleThreadScheduledExecutor() 创建仅包含单个线程的线程池,可以按照计划安排任务的执行。
newScheduledThreadPool(int corePoolSize) 创建具有固定核心线程数量的线程池,可以按照计划安排任务的执行,并且具有一定数量的可变大小线程池。

四种线程池的作用

    • newCachedThreadPool():创建一个可以根据需要自动调整线程数量的线程池。适用于执行大量短期异步任务的场景,线程池会根据任务的数量自动增加或减少线程的数量。
    • newFixedThreadPool(int nThreads):创建具有固定数量线程的线程池。适用于需要控制并发线程数的场景,线程数量固定不变。
    • newSingleThreadScheduledExecutor():创建仅包含单个线程的线程池,可以按照计划安排任务的执行。适用于需要按顺序执行任务的场景,每个任务都会在前一个任务完成后再执行。
    • newScheduledThreadPool(int corePoolSize):创建具有固定核心线程数量的线程池,可以按照计划安排任务的执行,并且具有一定数量的可变大小线程池。适用于需要根据计划安排任务执行时间的场景,核心线程数固定,但可以根据需要增加额外的线程来处理更多任务。

    创建线程池

    Executor类创建的线程池主要通过两类接口描述:ExecutorService(线程池)和ScheduledExecutorService(调度线程池)

    线程池常用的方法

    方法名 描述
    execute(Runnable command) 按顺序执行给定的命令,提交到线程池中进行执行。
    submit(Runnable task) 提交一个可运行的任务给线程池,并返回一个表示该任务的未决结果的 Future。
    submit(Callable task) 提交一个可调用的任务给线程池,并返回一个表示该任务的未决结果的 Future。
    invokeAll(Collection<? extends Callable<T>> tasks) 在给定的任务列表中的所有任务完成之前,按顺序执行并等待每个任务的完成,并返回一个表示所有任务结果的 Future 列表。
    invokeAny(Collection<? extends Callable<T>> tasks) 执行给定的任务列表,返回其中一个已经成功完成的任务的结果,并取消所有其他任务。
    isShutdown() 如果线程池已经调用了 shutdown()shutdownNow() 方法,返回 true。否则返回 false
    shutdown() 平滑地关闭线程池,停止接受新任务,并尝试将所有未完成的任务继续执行。
    schedule() 安排在给定的延迟后执行任务,并返回一个表示该任务的未决结果的 ScheduledFuture

    二、线程池的创建

    面试题:线程池中的线程是怎么创建的?

    四种线程对象的创建

    1.案例代码:创建缓存线程池(建立了一个线程池,而且线程数量是没有限制的(当然,不能超过Integer的最大值),新增一个任务即有一个线程处理,或者复用之前空闲的线程,或者重亲启动一个线程,但是一旦一个线程在60秒内一直处于等待状态时(也就是一分钟无事可做),则会被终止)

    package Example2135;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    public class javaDemo {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
    //        如果线程池中的线程不够用,则会自动新创建线程,线程最多为Integer.MAX_VALUE个
            for (int i=0;i<100;i++){
                executorService.submit(()->{
                    System.out.println(Thread.currentThread().getId()+"--"+Thread.currentThread().getName());
                });
            }
            if (!executorService.isShutdown()){
                executorService.shutdown();
            }
        }
    }

    image.gif

    image.gif编辑

    注意:关于execute与submit方法的关系

      1. 返回值类型:execute() 方法没有返回值,仅用于提交可运行的任务;而 submit() 方法将提交的任务封装成一个 Future 对象,可以通过该对象获取任务执行的结果。
      2. 参数类型:execute() 方法接受一个 Runnable 类型的参数,用于提交不需要返回结果的任务;而 submit() 方法可以接受 Runnable 或者 Callable 类型的参数,用于提交既可以返回结果也可以不返回结果的任务。
      3. 异常处理:execute() 方法无法捕获任务执行过程中的异常,如果任务抛出异常,线程池无法感知;而 submit() 方法可以捕获任务执行过程中的异常,并以 Future 形式返回异常信息。

      综上所述,execute() 更适合用于执行简单的、无需返回结果的任务,而 submit() 则更灵活,可以用来执行有返回结果的任务,并且可以捕获任务的异常。

      2.案例代码:创建固定长度的线程池(在初始化时已经决定了线程的最大数量,若任务添加的能力超出了线程的处理能力,则建立阻塞队列容纳多余的任务)

      package Example2136;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      public class javaDemo {
          public static void main(String[] args) {
      //    创建固定长度为4的线程池
              ExecutorService executorService = Executors.newFixedThreadPool(4);
      //           让线程输出自己的默认名称
              for(int i=0;i<10;i++){
                  executorService.execute(()->{
                      System.out.println(Thread.currentThread().getName());
                  });
              }
              executorService.shutdown();
          }
      }

      image.gif

      image.gif编辑

      3.案例代码:创建单线程(顾名思义就是一个池中只有一个线程在运行,该线程永不超时,而且由于是一个线程,当有多个任务需要处理时,会将它们放置到一个无界阻塞队列中逐个处理)

      package Example2137;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      public class javaDemo {
          public static void main(String[] args) {
      //        创建单线程对象
              ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
              for (int i=0;i<10;i++){
                  executorService.submit(()->{
                      System.out.println(Thread.currentThread().getName());
                  });
              }
              executorService.shutdown();
          }
      }

      image.gif

      image.gif编辑

      4.案例代码:创建调度线程池

      package Example2139;
      import java.util.Date;
      import java.util.concurrent.Executors;
      import java.util.concurrent.ScheduledExecutorService;
      import java.util.concurrent.TimeUnit;
      public class javaDemo {
          public static void main(String[] args) {
      //        创建调度线程池,并设置内核线程数量为1
              ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
      //        设置调度的任务,并设置3秒后执行,之后每隔2秒执行一次
              scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                  @Override
                  public void run() {
                      Date date = new Date();
                      System.out.println(Thread.currentThread().getName()+"为您播报:当前时间为"+ date);
                  }
              },3,2, TimeUnit.SECONDS);
          }
      }

      image.gif

      image.gif编辑

      如果在线程池中传入了Callable接口实例,那么可以通过Future接口获取返回的结果在ExecutorService接口中提供了invokeAny和invokeAll两个方法可以实现一组Callable实例的执行

      案例代码:执行一组Callable实例

      package Example2140;
      import java.util.HashSet;
      import java.util.List;
      import java.util.Set;
      import java.util.concurrent.*;
      public class javDemo {
          public static void main(String[] args) throws InterruptedException, ExecutionException {
      //        保存多个线程对象
              Set<Callable<String>> allThread = new HashSet<Callable<String>>();
      //        集合中追加线程
              for (int i=0;i<10;i++){
                  final int temp = i;
                  allThread.add(()->{
                     return Thread.currentThread().getName()+temp;
                  });
              }
      //        创建固定长度的线程
              ExecutorService service = Executors.newFixedThreadPool(3);
      //        使用Future接收类型
              List<Future<String>> list = service.invokeAll(allThread);
              for (Future<String> future : list){
                  System.out.println(future.get());
              }
          }
      }

      image.gif

      image.gif编辑


      三、特殊线程池

      3.1.CompletionService异步处理

      CompletionService的主要功能是可以通过异步处理获取到线程池返回的结果,CompletionService可以接收Callable或者Runnable实现的线程任务。并且可以通过ExecutorCompletionService子类实例化接口对象

      什么是异步处理

      异步处理指的是在任务执行期间,不需要等待任务完成就可以继续执行其他操作。通过异步处理,可以提高程序的响应速度和效率。在传统的同步处理中,必须等待一个任务执行完毕后才能执行下一个任务,而异步处理则可以同时执行多个任务,提高了任务的并发性。

      案例:使用CompletionService接口获取异步执行任务结果

      package Example2141;
      import java.util.concurrent.*;
      //线程体
      class ThreadItem implements Callable<String> {
          @Override
          public String call() throws Exception {
              //        获取当前时间戳
              long TimeMillis = System.currentTimeMillis();
              try {
                  System.out.println("[start]"+Thread.currentThread().getName());
                  TimeUnit.SECONDS.sleep(3);
                  System.out.println("[end]"+Thread.currentThread().getName());
              }catch (Exception e){}
              return Thread.currentThread().getName()+":"+ TimeMillis;
          }
      }
      public class javaDemo {
          public static void main(String[] args) throws InterruptedException, ExecutionException {
              ExecutorService executorService = Executors.newCachedThreadPool();
              CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
      //        信息生产者
              for (int i=0;i<10;i++){
      //            提交线程
                  completionService.submit(new ThreadItem());
              }
              for (int i=0;i<10;i++){
                  System.out.println("获取数据"+completionService.take().get());
              }
      //        关闭线程池
              executorService.shutdown();
          }
      }

      image.gif

      3.2.ThreadPoolExecutor

      通过Executors类可以进行实现线程池的创建,而通过Executors类的创建都是基于ThreadPoolExecutor类的实现创建。在一些特殊的环境下,开发者也可以直接使用ThreadPoolExecutor类结合阻塞队列与拒绝策略实现属于自己的线程池。

      什么是拒绝策略

      拒绝策略(Rejection Policy)是在线程池中,当提交的任务超过线程池容量且无法处理时,决定如何处理这些任务的一种策略。当线程池中的工作队列(阻塞队列)已满,并且没有空闲的线程可以执行任务时,就会触发拒绝策略。

      四种拒绝策略

      拒绝策略 描述
      AbortPolicy 默认策略,当线程池无法处理新任务时,直接抛出RejectedExecutionException异常,表示拒绝执行该任务。
      CallerRunsPolicy 当线程池无法处理新任务时,将任务返回给调用者进行处理。也就是由调用submit方法的线程来执行该任务。
      DiscardOldestPolicy 当线程池无法处理新任务时,会丢弃最早进入工作队列的任务,并尝试重新提交当前任务。
      DiscardPolicy 当线程池无法处理新任务时,会默默丢弃无法处理的任务,不给予任何提示。该策略可能会导致任务丢失,慎用。

      案例代码:

      package Example2142;
      import java.util.concurrent.*;
      public class javaDemo {
          public static void main(String[] args) {
              BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2);
      //        通过ThreadPoolExecutor创建线程池,该线程有2个内核线程,最大线程量为2,每个线程存活时间为6秒,使用默认拒绝策略
              ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,6L, TimeUnit.SECONDS,queue, Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
              for (int i=0;i<5;i++){
                  threadPoolExecutor.submit(()->{
                      System.out.println("[BEFORE]"+Thread.currentThread().getName());
                      try {
                          TimeUnit.SECONDS.sleep(2);
                      }catch (Exception  e){}
                      System.out.println("[AFTER]" +Thread.currentThread().getName());
                  });
              }
          }
      }

      image.gif

      image.gif编辑

      3.3 ForkJoinPool

      在JDK1.7以后为了充分利用多核CPU的性能优势,可以将一个复杂的业务计算进行拆分,交由多台CPU并行计算,这样用以提高程序的执行性能,所以引入了ForkJoinPool类,该类包含两个操作

      Fork(分解操作):将一个大型业务拆分成多个小的任务放在框架中执行

      Join(合并操作):主任务将等待多个子任务完成后进行合并

      在ForkJoinPool中有两个子类,RecursiveTask(有返回值的任务)、RecursiveAction(无返回值的任务)

      该类常用的方法:

      方法 说明
      fork() 将任务分解为更小的子任务并异步执行。将当前任务放入工作队列中,以供其他工作线程获取并执行。
      join() 等待当前任务的执行结果,并返回结果。如果任务还没有完成,则调用线程会被阻塞,直到任务完成后才会继续执行。
      isCompleteNormally() 判断任务是否已经正常完成。如果任务在完成时没有抛出异常,该方法返回true;如果任务被取消或者发生异常,返回false
      invokeAll(tasks) 执行给定的任务集合,并等待所有任务完成。该方法会将任务集合拆分为更小的子任务,并由线程池中的工作线程异步执行。
      getException() 获取任务执行过程中所发生的异常。如果任务正常完成或者尚未完成,该方法返回null;如果任务被取消或者异常中止,返回引发异常的原因。

      案例代码:

      package Example2143;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.ForkJoinPool;
      import java.util.concurrent.Future;
      import java.util.concurrent.RecursiveTask;
      class SumTask extends RecursiveTask<Integer> {
          private int start;
          private int end;
          public SumTask(int start, int end) {
              this.start = start;
              this.end = end;
          }
          @Override
          protected Integer compute() {
              int sum = 0;
              if (this.end - this.start < 100) {
                  for (int x = this.start; x <= this.end; x++) {
                      sum += x;
                  }
              } else {
                  int middle = (start + end) / 2;
                  SumTask leftTask = new SumTask(this.start, middle);
                  SumTask rightTask = new SumTask(middle + 1, this.end);
                  leftTask.fork();
                  rightTask.fork();
                  // 等待子任务完成并将结果相加
                  sum = leftTask.join() + rightTask.join();
              }
              return sum;
          }
      }
      public class javaDemo {
          public static void main(String[] args) throws ExecutionException, InterruptedException {
              SumTask task = new SumTask(0,100);
              ForkJoinPool pool = new ForkJoinPool();
              Future<Integer> future = pool.submit(task);
              System.out.println(future.get());
          }
      }

      image.gif

      image.gif编辑


      目录
      相关文章
      |
      8月前
      |
      存储 Java 数据安全/隐私保护
      【JUC】ThreadLocal 如何实现数据的线程隔离?
      【1月更文挑战第15天】【JUC】ThreadLocal 如何实现数据的线程隔离?ThreadLocal 导致内存泄漏问题?
      |
      8月前
      |
      安全 算法 Java
      剑指JUC原理-19.线程安全集合(上)
      剑指JUC原理-19.线程安全集合
      58 0
      |
      4月前
      |
      存储 缓存 安全
      【Java面试题汇总】多线程、JUC、锁篇(2023版)
      线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
      |
      3月前
      |
      Java C++
      【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
      【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
      43 0
      |
      4月前
      |
      监控 Java 调度
      【Java学习】多线程&JUC万字超详解
      本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
      266 6
      |
      5月前
      |
      算法 Java
      JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
      该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
      JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
      |
      5月前
      |
      设计模式 Java 调度
      JUC线程池: ScheduledThreadPoolExecutor详解
      `ScheduledThreadPoolExecutor`是Java标准库提供的一个强大的定时任务调度工具,它让并发编程中的任务调度变得简单而可靠。这个类的设计兼顾了灵活性与功能性,使其成为实现复杂定时任务逻辑的理想选择。不过,使用时仍需留意任务的执行时间以及系统的实际响应能力,以避免潜在的调度问题影响应用程序的行为。
      99 1
      |
      5月前
      |
      Java API 调度
      JUC线程池: FutureTask详解
      总而言之,FutureTask是Java并发编程中一个非常实用的类,它在异步任务执行及结果处理方面提供了优雅的解决方案。在实现细节方面可以搭配线程池的使用,以及与Callable接口的配合使用,来完成高效的并发任务执行和结果处理。
      52 0
      |
      5月前
      |
      Java 程序员 容器
      【多线程面试题二十四】、 说说你对JUC的了解
      这篇文章介绍了Java并发包java.util.concurrent(简称JUC),它是JSR 166规范的实现,提供了并发编程所需的基础组件,包括原子更新类、锁与条件变量、线程池、阻塞队列、并发容器和同步器等多种工具。
      |
      7月前
      |
      存储 安全 Java
      Java多线程编程--JUC
      Java多线程编程