《Java 并发编程》线程池

简介: 《Java 并发编程》线程池

🚀1. 自定义线程池2e718f6de6d94987ae2f2182c528d9c8.png

  • 阻塞队列中维护了由主线程(或者其他线程)所产生的的任务
  • 主线程类似于生产者,产生任务并放入阻塞队列中
  • 线程池类似于消费者,得到阻塞队列中已有的任务并执行

自定义线程池

/**
 * 自定义线程池
 */
class ThreadPool {
   /**
    * 自定义阻塞队列
    */
   private BlockingQueue<Runnable> blockingQueue;
   /**
    * 核心线程数
    */
   private int coreSize;
   private HashSet<Worker> workers = new HashSet<>();
   /**
    * 用于指定线程最大存活时间
    */
   private TimeUnit timeUnit;
   private long timeout;
   /**
    * 工作线程类
    * 内部封装了Thread类,并且添加了一些属性
    */
   private class Worker extends Thread {
      Runnable task;
      public Worker(Runnable task) {
         System.out.println("初始化任务");
         this.task = task;
      }
      @Override
      public void run() {
         // 如果有任务就执行
         // 如果阻塞队列中有任务,就继续执行
         while (task != null || (task = blockingQueue.take()) != null) {
            try {
               System.out.println("执行任务");
               task.run();
            } catch (Exception e) {
               e.printStackTrace();
            } finally {
               // 任务执行完毕,设为空
               System.out.println("任务执行完毕");
               task = null;
            }
         }
         // 移除任务
         synchronized (workers) {
            System.out.println("移除任务");
            workers.remove(this);
         }
      }
   }
   public ThreadPool(int coreSize, TimeUnit timeUnit, long timeout, int capacity) {
      this.coreSize = coreSize;
      this.timeUnit = timeUnit;
      blockingQueue = new BlockingQueue<>(capacity);
      this.timeout = timeout;
   }
   public void execute(Runnable task) {
      synchronized (workers) {
         // 创建任务
         // 池中还有空余线程时,可以运行任务
         // 否则阻塞
         if (workers.size() < coreSize) {
            Worker worker = new Worker(task);
            workers.add(worker);
            worker.start();
         } else {
            System.out.println("线程池中线程已用完,请稍等");
            blockingQueue.put(task);
         }
      }
   }
}

自定义阻塞队列

/**
 * 阻塞队列
 * 用于存放主线程或其他线程产生的任务
 */
class BlockingQueue<T> {
   /**
    * 阻塞队列
    */
   private  Deque<T> blockingQueue;
   /**
    * 阻塞队列容量
    */
   private int capacity;
   /**
    * 锁
    */
   private ReentrantLock lock;
   /**
    * 条件队列
    */
   private Condition fullQueue;
   private Condition emptyQueue;
   public BlockingQueue(int capacity) {
      blockingQueue = new ArrayDeque<>(capacity);
      lock = new ReentrantLock();
      fullQueue = lock.newCondition();
      emptyQueue = lock.newCondition();
      this.capacity = capacity;
   }
   /**
    * 获取任务的方法
    */
   public T take() {
      // 加锁
      lock.lock();
      try {
         // 如果阻塞队列为空(没有任务),就一直等待
         while (blockingQueue.isEmpty()) {
            try {
               emptyQueue.await();
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
         // 获取任务并唤醒生产者线程
         T task = blockingQueue.removeFirst();
         fullQueue.signalAll();
         return task;
      } finally {
         lock.unlock();
      }
   }
   public T takeNanos(long timeout, TimeUnit unit) {
      // 转换等待时间
      lock.lock();
      try {
         long nanos = unit.toNanos(timeout);
         while (blockingQueue.isEmpty()) {
            try {
               // awaitNanos会返回剩下的等待时间
               nanos = emptyQueue.awaitNanos(nanos);
               if (nanos < 0) {
                  return null;
               }
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
         T task = blockingQueue.removeFirst();
         fullQueue.signalAll();
         return task;
      } finally {
         lock.unlock();
      }
   }
   /**
    * 放入任务的方法
    * @param task 放入阻塞队列的任务
    */
   public void put(T task) {
      lock.lock();
      try {
         while (blockingQueue.size() == capacity) {
            try {
               System.out.println("阻塞队列已满");
               fullQueue.await();
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
         blockingQueue.add(task);
         // 唤醒等待的消费者
         emptyQueue.signalAll();
      } finally {
         lock.unlock();
      }
   }
   public int getSize() {
      lock.lock();
      try {
         return blockingQueue.size();
      } finally {
         lock.unlock();
      }
   }
}

调用

public class Test {
   public static void main(String[] args) {
      ThreadPool threadPool = new ThreadPool(2,  TimeUnit.SECONDS, 1, 4);
      for (int i = 0; i < 10; i++) {
         threadPool.execute(()->{
            try {
               TimeUnit.SECONDS.sleep(10000);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
            System.out.println("任务正在执行!");
         });
      }
   }
}

🚀2. ThreadPoolExecutor

ThreadPoolExecutor 的继承关系图如下图所示2e718f6de6d94987ae2f2182c528d9c8.png

🚁2.1 线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

// 线程池状态
// runState is stored in the high-order bits
// RUNNING 高3位为111
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN 高3位为000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 高3位 001
private static final int STOP       =  1 << COUNT_BITS;
// 高3位 010
private static final int TIDYING    =  2 << COUNT_BITS;
// 高3位 011
private static final int TERMINATED =  3 << COUNT_BITS;
状态名称 高3位的值 描述
RUNNING 111 接收新任务,同时处理任务队列中的任务
SHUTDOWN 000 不接受新任务,但是处理任务队列中的任务
STOP 001 中断正在执行的任务,同时抛弃阻塞队列中的任务
TIDYING 010 任务执行完毕,活动线程为0时,即将进入终结阶段
TERMINATED 011 终结状态

线程池状态和线程池中线程的数量由一个原子整型变量 ctl 保存,可以通过一次 CAS 同时更改两个属性的值。

// 原子整数,前3位保存了线程池的状态,剩余位保存的是线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 并不是所有平台的int都是32位。
// 去掉前三位保存线程状态的位数,剩下的用于保存线程数量
// 高3位为0,剩余位数全为1
private static final int COUNT_BITS = Integer.SIZE - 3;
// 2^COUNT_BITS次方,表示可以保存的最大线程数
// CAPACITY 的高3位为 0
private static final int CAPACITY   = (1 << COUNT_BITS) - 1

获取线程池状态、线程数量以及合并两个值的操作

// Packing and unpacking ctl
// 获取运行状态
// 该操作会让除高3位以外的数全部变为0
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 获取运行线程数
// 该操作会让高3位为0
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 计算ctl新值
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程属性

// 工作线程,内部封装了Thread
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    ...
}
// 阻塞队列,用于存放来不及被核心线程执行的任务
private final BlockingQueue<Runnable> workQueue;
// 锁
private final ReentrantLock mainLock = new ReentrantLock();
//  用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素(核心线程)
private final HashSet<Worker> workers = new HashSet<Worker>();

🚁2.2 构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

✨参数解释


corePoolSize:核心线程数

maxiumPoolSize:最大线程数,maximumPoolSize - corePoolSize = 救急线程数

keepAliveTime:救急线程空闲时的最大生存时间

unit:时间单位(针对救急线程)

workQueue:阻塞队列(存放任务)

有界阻塞队列 ArrayBlockingQueue

无界阻塞队列 LinkedBlockingQueue

最多只有一个同步元素的 SynchronousQueue

优先队列 PriorityBlockingQueue

threadFactory:线程工厂(可以为线程创建时起名字)

handler:拒绝策略

工作方式2e718f6de6d94987ae2f2182c528d9c8.png

线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务

当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入 workQueue 队列排队,直到有空闲的线程

如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maxiumPoolSize-corePoolSize 数目的线程来救急

如果线程达到 maxiumPoolSize 仍然有新任务这时会执行拒绝策略,

当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束来节省资源,这个时间由 keepAliveTime 和 unit 来控制2e718f6de6d94987ae2f2182c528d9c8.png

对于拒绝策略 JDK 提供了 4 种实现:


AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略

CallerRunsPolicy 让调用者执行任务

DiscardPolicy 放弃本次任务

DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

一些著名框架,也提供了具体的实现:


Dubbo 的实现,在抛出 RejectedExecution 异常之前会记录日志,并 dump 线程栈信息,方便定位问题

Netty 的实现,是创建一个新线程来执行任务

ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略

PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

使用

public class Test {
    static AtomicInteger threadId = new AtomicInteger(0);
    public static void main(String[] args) {
        //手动创建线程池
        //创建有界阻塞队列
        ArrayBlockingQueue<Runnable> runnable = new ArrayBlockingQueue<>(10);
        //创建线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "working_thread"+threadId.getAndIncrement());
                return thread;
            }
        };
        //手动创建线程池
        //拒绝策略采用默认策略
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 7, 10, TimeUnit.SECONDS, runnable, threadFactory);
        for (int i = 0; i < 20; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread());
                    try {
                        Thread.sleep(100000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

🚁2.3 newFiexedThreadPool

内部调用的构造方法

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

特点:

  • 核心线程数=最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务

适用于任务量已知,相对耗时的任务。

使用如下:

public class TestFixedThreadPool {
    public static void main(String[] args) {
        ThreadFactory factory = new ThreadFactory() {
            AtomicInteger atomicInteger = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "myThread_"+atomicInteger.getAndIncrement());
            }
        };
        //创建核心线程数量为2的线程池
        //通过 ThreadFactory 可以给线程添加名字
        ExecutorService executorService = Executors.newFixedThreadPool(2,factory);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
                System.out.println("this is fixedThreadPool");
            }
        };
        executorService.execute(runnable);
    }
}

🚁2.4 newCachedThreadPool

内部调用的构造方法

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                  60L, TimeUnit.SECONDS,
                  new SynchronousQueue<Runnable>());
  )
}

特点:


核心线程数是 0,最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着,全部都是救急线程(60s 后可以回收),救急线程可以无限创建。

阻塞队列采用了 SynchronousQueue,实现特点是,它没有容量,没有线程来取是放不进去的,只有当线程取任务时,才会将任务放入该阻塞队列中。

🚁2.5 newSingleThreadExecutor

内部构造方法

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService(
  new ThreadPoolExecutor(1,1,0L, TimeUnit.MILLSECONDS, 
  new LinkedBlockingQueue<Runnable>()));
}

✨内部调用了 new ThreadPoolExecutor 的构造方法,传入的 corePoolSize 和 maximumPoolSize 都为1。然后将该对象传给了 FinalizableDelegatedExecutorService。该类修饰了 ThreadPoolExecutor,让外部无法调用 ThreadPoolExecutor 内部的某些方法来修改所创建的线程池的大小。


注意点:


SingleThread 和自己创建一个线程来运行多个任务的区别:


当线程正在执行的任务发生错误时,如果是自己创建的线程,该任务和剩余的任务就无法再继续运行下去。而 SingleThread 会创建一个新线程,继续执行任务队列中剩余的任务

SingleThread 和 newFixedThreadPool(1) 的区别:


newFixedThreadPool(1) 传值为 1,可以将 FixedThreadPool 强转为 ThreadPoolExecutor,然后通过 setCorePoolSize 改变核心线程数,而 SingleThread 无法修改核心线程数

// 强转为ThreadPoolExecutor
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
// 改变核心线程数
threadPool.setCorePoolSize(2);

🚀3. 提交任务

execute() 方法,传入一个 Runnable 对象,执行其中的 run 方法

//执行任务
void execute(Runnable command);

submit() 方法,传入一个 Callable 对象,用 Future 来捕获返回值

使用

// 通过submit执行Callable中的call方法
// 通过Future来捕获返回值
Future<String> future = threadPool.submit(new Callable<String>() {
   @Override
   public String call() throws Exception {
      return "hello submit";
   }
});
// 查看捕获的返回值
System.out.println(future.get());

🚀4. 关闭线程池

shutdown()

/*
线程池状态变为 SHUTDOWN
  -不会接收新任务
  -但已提交任务会执行完
  -此方法不会阻塞调用线程的执行
*/
void shutdown();
public void shutdown() {
       final ReentrantLock mainlock = this.mainLock;
       mainlock.lock();
       try {
           checkShutdownAccess();
           //修改线程池状态
           advanceRunState(SHUTDOWN);
           //仅会打断空闲线程
           interruptIdleWorkers();
           onShutdown();//扩展点 ScheduleThreadPoolExecutor
       } finally {
           mainlock.unlock();
       }
       //尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
       tryTerminate();
}

shutdownNow

/*
线程池状态变为 STOP
  -不会接收新任务
  -会将队列中的任务返回
  -并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() {
      List<Runnable> tasks;
      final ReentrantLock mainLock = this.mainLock();
      mainLock.lock();
      try {
          checkShutdownAccess();
          //修改线程池状态
          advanceRunState(STOP);
          //打断所有线程
          interruptWorkers();
          //获取队列中剩余任务
          tasks = drainQueue();
      } finally {
          mainLock.unlock();
      }
      //尝试终结
      tryTerminate();
      return tasks;
}

其他方法

//不在 RUNNING 状态的线程池,此方法返回 true
boolean isShutdown();
//线程池状态是否是 TERMINATED
boolean isTerminated();
//调用shutdown后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMIANTED后做这些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

🚀5. 任务调度线程池

✨在【任务调度线程池】功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

public static void main(String[] args) {
        Timer timer = new Timer();
        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                System.out.println("task 1");
                sleep(2);
            }
        };
        TimerTask task2 = new TimerTask() {
            @Override
            public void run() {
                System.out.println("task 2");
            }
        };
        //使用timer添加两个任务,希望它们都在1s后执行
        //但由于timer内只有一个线程来顺序执行队列中的任务,因此【任务1】的延时,影响了【任务2】的执行
        timer.schedule(task1, 1000);
        timer.schedule(task2, 1000);
}

使用 ScheduleExecutorService 改写

ScheduleExecutorService executor = Executors.newScheduledThreadPool(2);
//添加两个任务,希望它们都在1s后执行
executor.schedule(()->{
  System.out.println("任务1,执行时间"+new Date());
  try {
    Thread.sleep(2000);
  } catch(InterruptedException e) {
  }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(()->{
  System.out.println("任务2,执行时间:"+new Date());
}, 1000, TimeUnit.MILLISECONDS);
相关文章
|
10天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
19天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?
|
6天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
25 9
|
9天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
6天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
9天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
24 3
|
8天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
9天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
20 1
|
9天前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
9天前
|
安全 Java 编译器
Java多线程编程的陷阱与最佳实践####
【10月更文挑战第29天】 本文深入探讨了Java多线程编程中的常见陷阱,如竞态条件、死锁、内存一致性错误等,并通过实例分析揭示了这些陷阱的成因。同时,文章也分享了一系列最佳实践,包括使用volatile关键字、原子类、线程安全集合以及并发框架(如java.util.concurrent包下的工具类),帮助开发者有效避免多线程编程中的问题,提升应用的稳定性和性能。 ####
37 1