多线程并发之线程池Executor与Fork/Join框架

简介: 多线程并发之线程池Executor与Fork/Join框架

【1】常见接口和实现类

① 什么是线程池

首先可以联想一下数据库连接池,Redis中的pool。

线程池提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。


② 常见的体系结构

常见的线程池体系结构:

 java.util.concurrent.Executor : 负责线程的使用与调度的根接口
    |--ExecutorService 子接口: 线程池的主要接口
    |--ThreadPoolExecutor 线程池的实现类
    |--ScheduledExecutorService 子接口:负责线程的调度
     |--ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService


如下图所示:

线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。


③ 线程池的创建

为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子(hook)。但是,强烈建议程序员使用较为方便的Executors 工厂方法:


Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)

Executors.newFixedThreadPool(int)(固定大小线程池)

Executors.newSingleThreadExecutor()(单个后台线程)

Executors.newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。


需要注意的是,它们均为大多数使用场景预定义了设置,也就是说通常情况下可以直接使用无需额外配置。但是实际上还有一个创建线程池的方法那就是手动构造线程池(ThreadPoolExecutor)

如下所示:

ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(5);
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService executorService1 = Executors.newFixedThreadPool(5);
ExecutorService executorService2 = Executors.newSingleThreadExecutor();


Executors.newCachedThreadPool():

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

Executors.newFixedThreadPool(int):

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }


Executors.newSingleThreadExecutor():

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }


Executors.newScheduledThreadPool():

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
public ScheduledThreadPoolExecutor(int corePoolSize) {
     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
           new DelayedWorkQueue());
 }

【2】线程池使用实例

线程池提交任务的三个方法:

<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
 <T> Future<T> submit(Runnable task, T result);

实例代码如下:

package com.jane.controller;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
 * Created by Janus on 2018/9/28.
 */
public class TestThreadPool {
    public static void main(String[] args) throws Exception {
        //1.创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(5);
        //参数为Runnable
        ThreadPoolRunnable threadPoolRunnable = new ThreadPoolRunnable();
        // 2.为线程池中的线程分配任务
//        for (int i = 0; i <10 ; i++) {
//            pool.submit(threadPoolRunnable);
//        }
        //参数为Callable
        ThreadPoolCallable threadPoolCallable = new ThreadPoolCallable();
        List<Future<Integer>> futures = new ArrayList<>();
        for (int i = 0; i <10 ; i++) {
            Future<Integer> future = pool.submit(threadPoolCallable);
            futures.add(future);
        }
        for (Future<Integer> future : futures) {
            System.out.println(future.get());
        }
        //3.关闭线程池
        pool.shutdown();
    }
}
class ThreadPoolRunnable implements  Runnable{
    private int i = 0;
    @Override
    public void run() {
        while(i <= 100){
            System.out.println(Thread.currentThread().getName() + " : " + i++);
        }
    }
}
class ThreadPoolCallable implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        int sum =0;
        for(int i=1;i<=100;i++){
            sum+=i;
        }
        System.out.println(Thread.currentThread().getName() + " : " +sum);
        return sum;
    }
}

【3】线程池调度实例

ScheduledExecutorService使用实例代码如下:

package com.jane.controller;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.*;
/**
 * Created by Janus on 2018/9/29.
 */
public class TestScheduledThreadPool {
    public static void main(String[] args){
        System.out.println("项目启动 : "+new Date());
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        // 延迟两秒执行ScheduledThread
        ScheduledThread scheduledThread = new ScheduledThread();
        scheduledExecutorService.schedule(scheduledThread,2, TimeUnit.SECONDS);
        //项目启动时延迟一秒,然后以2秒为周期执行
//        ScheduledRunnable scheduledRunnable = new ScheduledRunnable();
//        scheduledExecutorService.scheduleAtFixedRate(scheduledRunnable,1,2,TimeUnit.SECONDS);
        // 上一次结束后 延迟一秒,然后以2秒为周期执行
//        scheduledExecutorService.scheduleWithFixedDelay(scheduledRunnable,1,2,TimeUnit.SECONDS);
    }
}
class ScheduledThread implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        int i = new Random().nextInt(100);
        System.out.println(Thread.currentThread().getName()+" : "+i+" , "+new Date());
        return i;
    }
}
class ScheduledRunnable implements  Runnable{
    @Override
    public void run() {
        int i = (int) (Math.random()*1000);
        System.out.println(Thread.currentThread().getName()+" : "+i+" , "+new Date());
    }
}

测试一,单独执行schedule:

 ScheduledThread scheduledThread = new ScheduledThread();
 scheduledExecutorService.schedule(scheduledThread,2, TimeUnit.SECONDS);

效果如下:

项目启动 : Sat Sep 29 10:37:16 CST 2018
pool-1-thread-1 : 69 , Sat Sep 29 10:37:19 CST 2018

在主程序启动2秒后执行,效果正常。


测试二,单独执行scheduleAtFixedRate:

ScheduledRunnable scheduledRunnable = new ScheduledRunnable();
        scheduledExecutorService.scheduleAtFixedRate(scheduledRunnable,1,2,TimeUnit.SECONDS);

效果如下:

项目启动 : Sat Sep 29 10:38:51 CST 2018
pool-1-thread-1 : 475 , Sat Sep 29 10:38:52 CST 2018
pool-1-thread-1 : 444 , Sat Sep 29 10:38:54 CST 2018
pool-1-thread-2 : 37 , Sat Sep 29 10:38:56 CST 2018
pool-1-thread-1 : 619 , Sat Sep 29 10:38:58 CST 2018
pool-1-thread-3 : 296 , Sat Sep 29 10:39:00 CST 2018
pool-1-thread-3 : 304 , Sat Sep 29 10:39:02 CST 2018

主程序启动,一秒延迟后,以2秒为间隔周期执行。


测试三,单独执行scheduleWithFixedDelay:

主程序启动,一秒延迟后,以2秒为间隔周期执行。
测试三,单独执行scheduleWithFixedDelay:
Sch

效果如下:

项目启动 : Sat Sep 29 10:42:41 CST 2018
pool-1-thread-1 : 473 , Sat Sep 29 10:42:42 CST 2018
pool-1-thread-1 : 667 , Sat Sep 29 10:42:44 CST 2018
pool-1-thread-2 : 788 , Sat Sep 29 10:42:46 CST 2018
pool-1-thread-1 : 836 , Sat Sep 29 10:42:48 CST 2018
pool-1-thread-3 : 457 , Sat Sep 29 10:42:50 CST 2018
pool-1-thread-3 : 923 , Sat Sep 29 10:42:52 CST 2018
pool-1-thread-3 : 120 , Sat Sep 29 10:42:54 CST 2018

看起来和测试二一样?


测试四,添加ScheduledRunnable2 ,run中sleep一秒:


添加一个类ScheduledRunnable2 如下:

class ScheduledRunnable2 implements  Runnable{
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int i = (int) (Math.random()*1000);
        System.out.println(Thread.currentThread().getName()+" : "+i+" , "+new Date());
    }
}

测试代码如下:

ScheduledRunnable2 scheduledRunnable2 = new ScheduledRunnable2();
scheduledExecutorService.scheduleWithFixedDelay(scheduledRunnable2,1,2,TimeUnit.SECONDS);

效果如下:

项目启动 : Sat Sep 29 10:51:15 CST 2018
pool-1-thread-1 : 521 , Sat Sep 29 10:51:17 CST 2018
pool-1-thread-1 : 334 , Sat Sep 29 10:51:20 CST 2018
pool-1-thread-2 : 388 , Sat Sep 29 10:51:23 CST 2018
pool-1-thread-1 : 528 , Sat Sep 29 10:51:26 CST 2018
pool-1-thread-3 : 862 , Sat Sep 29 10:51:29 CST 2018
pool-1-thread-3 : 312 , Sat Sep 29 10:51:32 CST 2018
pool-1-thread-3 : 534 , Sat Sep 29 10:51:35 CST 2018
pool-1-thread-3 : 452 , Sat Sep 29 10:51:38 CST 2018


启动后,延迟一秒,再sleep一秒,然后打印第一次调用;之后等待上一次结束后以2秒周期间隔再执行。

注意,后面间隔都是三秒,充分说明了第二次调用是在第一次调用结束后(因此每次执行都会sleep 1 秒)!


测试五 ,使用scheduleAtFixedRate调用scheduledRunnable2:

测试代码如下:

ScheduledRunnable2 scheduledRunnable2 = new ScheduledRunnable2();
scheduledExecutorService.scheduleAtFixedRate(scheduledRunnable2,1,2,TimeUnit.SECONDS);

效果如下:

项目启动 : Sat Sep 29 10:57:27 CST 2018
pool-1-thread-1 : 500 , Sat Sep 29 10:57:29 CST 2018
pool-1-thread-1 : 991 , Sat Sep 29 10:57:31 CST 2018
pool-1-thread-2 : 1 , Sat Sep 29 10:57:33 CST 2018
pool-1-thread-1 : 241 , Sat Sep 29 10:57:35 CST 2018
pool-1-thread-3 : 92 , Sat Sep 29 10:57:37 CST 2018
pool-1-thread-3 : 517 , Sat Sep 29 10:57:39 CST 2018

项目启动后,延迟一秒,又sleep 1 秒,然后打印第一次调用;之后以2秒间隔周期进行线程调用!!

由此可见FixedDelay与FixedRate区别:

  • FixedDelay 上一次结束后,等待X秒,执行下一次;
  • FixedRate 两次调用间隔X秒。

【4】Fork/Join 框架

Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行join 汇总。


示意图如下:

实例代码模拟如下:

package com.jane.controller;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
 * Created by Janus on 2018/9/29.
 */
public class TestForkJoinPool {
    public static void main(String[] args){
  //开始时间
        Instant start = Instant.now();
  //forkJoinTask执行离不开ForkJoinPool
        ForkJoinPool pool = new ForkJoinPool();
       // ForkJoinTask<Long> task = new ForkJoinSumCalculate(0, 1000000000L);
        ForkJoinTask<Long> task = new ForkJoinSumCalculate(0, 50000000000L);
        Long sum = pool.invoke(task);
        System.out.println(sum);
        //使用JDK1.8新特性--日期--结束时间
        Instant end = Instant.now();
        long millis = Duration.between(start, end).toMillis();//842-33044
        System.out.println("耗费时间为 : "+millis);
    }
}
class ForkJoinSumCalculate extends RecursiveTask<Long>{
    private  long start;
    private  long end;
  //拆分临界值
    private static  final long THURSHOLD = 10000L;
    public ForkJoinSumCalculate(long start,long end){
        this.start = start;
        this.end = end;
    }
//The main computation performed by this task.
    @Override
    protected Long compute() {
        long length = end-start;
    //小于临界值,直接求和
        if (length<THURSHOLD){
            long sum = 0L;
            for(long i=start;i<=end;i++){
                sum+=i;
            }
            return sum;
        }else{
          //否则,进行递归拆分
            long middle = (start+end)/2;
            ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
            left.fork();//进行拆分,同时压入线程队列
            ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end);
            right.fork();
            return left.join()+right.join();
        }
    }
}

分别使用for循环和java8新特性测试如下:

// for 循环
  @Test
  public void testFor(){
    Instant start = Instant.now();
    long sum = 0L;
//    for(long i = 0;i<=1000000000L;i++){
    for(long i = 0;i<=50000000000L;i++){
      sum+=i;
    }
    System.out.println(sum);
    Instant end = Instant.now();
    long millis = Duration.between(start, end).toMillis();//2526--22117
    System.out.println("耗费时间为 : "+millis);
  }
  // java8新特性--并行流
  @Test
  public void test2(){
    Instant start = Instant.now();
//    long sum = LongStream.rangeClosed(0L, 1000000000L).parallel().reduce(0L, Long::sum);
    long sum = LongStream.rangeClosed(0L, 50000000000L).parallel().reduce(0L, Long::sum);
    System.out.println(sum);
    Instant end = Instant.now();
    long millis = Duration.between(start, end).toMillis();//1699--18909
    System.out.println("耗费时间为 : "+millis);
  }

总结如下:


  • 第一求和数据比较小的时候无需考虑哪种方式;
  • 第二,求和数据比较大的时候java8新特性是优异于for循环的;
  • 第三,求和数据比较大的时候使用ForkJoin需要考虑临界值的设置,否则可能效率不如for循环和java8新特性。

【5】Fork/Join 框架与线程池的区别

① Fork/Join采用“工作窃取”模式(work-stealing)


当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。


② Fork/Join优势


相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上。


在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能。


③ ForkJoinPool继承图

如下图所示ForkJoinPool是AbstractExecutorService子类实现了ExecutorService接口(其是Executor接口子类)。


【6】ThreadPoolExecutor


前面提到的Executors的几个静态方法的实现,其实就用到了ThreadPoolExecutor,只是根据不同的场景传入了不同的参数。完整的ThreadPoolExecutor总共有七个参数:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

corePoolSize:核心线程数


maximumPoolSize:最大线程数


keepAliveTime:当线程数大于corePoolSize时,这是多余的空闲线程在终止前等待新任务的最长时间


unit:keepAliveTime 存活时间的单位


workQueue:工作任务队列,用于保存任务并将其传递给工作线程的队列。


threadFactory:构造线程池中线程的工厂


handler:由于达到线程边界和队列容量而阻止执行时要使用的处理程序


解释如下:


提交任务的时候,判断当前线程池中的存活线程数量是否小于corePoolSize


如果小于corePoolSize,则不管是否有线程处于空闲状态,都会新建一个线程。


如果线程数量已经达到corePoolSize,则将任务扔进队列workQueue


随着任务越来越多,队列可能已经满了,则需要看当前线程是否已经达到了maximumPoolSize,如果没有达到,则创建新的线程,并用它执行该任务。


最坏情况,任务实在太多了,队列已经满了,而线程数量已经达到maximumPoolSize,还有新的任务来,说白了,就是已经满负荷了,任然还有任务需要执行,这个时候就会handler来处理该任务了。


另外三个参数keepAliveTime、unit 和factory 说明如下:


keepAliveTime。如我们所知,使用线程池的目的就是为了减少线程的创建,因为创建线程本身是比较耗资源的。由于线程本身需要占用资源,有一种情况就是,某个时候线程数量比较多,但是任务没有多少,就会出现有的线程没有活干,所以我们就可以考虑释放掉其资源,但是呢,我们又无法预知未来的任务量,所以我们就准许其空闲一段时间,如果过了这段时间都还是空闲的,那么就会释放掉其资源,这个参数就是用指定这段空闲时间的。默认情况下是有超过corePoolSize个线程时,就会用到该值, 但是也可以指定corePoolSize数量之内的线程空闲时是否释放资源(allowCoreThreadTimeout)。


unit 这个参数很好理解,就是单位,就是前面keepAliveTime这个我们准许空闲的时间的单位


factory .其类型为ThreadFactory,顾名思义,就是一个创建Thread的Factory. 该接口只有一个方法,产生一个Thread。通常情况下,我们都只需要使用默认的factory就可以了,但是为了定位问题方便,我们可以为线程池创建的线程设置一个名字,这样看日志的时候就比较方便了。


【7】RejectedExecutionHandler

采用Executors静态方法时,并没有让传入该参数,默认值如下:

  private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

实际上jdk本身提供了四种策略分别是:

  • AbortPolicy:会抛出异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
  • CallerRunnerPolicy:在调用execute的方法中执行被拒绝的任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
  • DiscardOldestPolicy:丢掉队列中最老的任务,然后重试
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
  • DiscardPolicy:直接丢掉该任务
 public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }
        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

另外还有ThreadPoolExecutor还提供了一些hook方法,如有需要可以使用

  • beforeExecute() 任务执行之前调用
  • afterExecute() 任务执行之后调用
目录
相关文章
|
17天前
|
存储 Java 数据库连接
java多线程之线程通信
java多线程之线程通信
|
29天前
|
存储 缓存 NoSQL
Redis单线程已经很快了6.0引入多线程
Redis单线程已经很快了6.0引入多线程
31 3
|
30天前
|
存储 算法 Java
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
74 0
|
1天前
|
数据采集 存储 Java
高德地图爬虫实践:Java多线程并发处理策略
高德地图爬虫实践:Java多线程并发处理策略
|
2天前
|
安全 算法 Java
JavaSE&多线程&线程池
JavaSE&多线程&线程池
16 7
|
3天前
|
存储 缓存 NoSQL
为什么Redis使用单线程 性能会优于多线程?
在计算机领域,性能一直都是一个关键的话题。无论是应用开发还是系统优化,我们都需要关注如何在有限的资源下,实现最大程度的性能提升。Redis,作为一款高性能的开源内存数据库,因其出色的单线程性能而备受瞩目。那么,为什么Redis使用单线程性能会优于多线程呢?
15 1
|
8天前
|
安全 Java
深入理解 Java 多线程和并发工具类
【4月更文挑战第19天】本文探讨了Java多线程和并发工具类在实现高性能应用程序中的关键作用。通过继承`Thread`或实现`Runnable`创建线程,利用`Executors`管理线程池,以及使用`Semaphore`、`CountDownLatch`和`CyclicBarrier`进行线程同步。保证线程安全、实现线程协作和性能调优(如设置线程池大小、避免不必要同步)是重要环节。理解并恰当运用这些工具能提升程序效率和可靠性。
|
10天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
7 0
|
13天前
|
Java API 调度
安卓多线程和并发处理:提高应用效率
【4月更文挑战第13天】本文探讨了安卓应用中多线程和并发处理的优化方法,包括使用Thread、AsyncTask、Loader、IntentService、JobScheduler、WorkManager以及线程池。此外,还介绍了RxJava和Kotlin协程作为异步编程工具。理解并恰当运用这些技术能提升应用效率,避免UI卡顿,确保良好用户体验。随着安卓技术发展,更高级的异步处理工具将助力开发者构建高性能应用。
|
24天前
|
安全 Java 容器
Java并发编程:实现高效、线程安全的多线程应用
综上所述,Java并发编程需要注意线程安全、可见性、性能等方面的问题。合理使用线程池、同步机制、并发容器等工具,可以实现高效且线程安全的多线程应用。
14 1